Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

Commit

Permalink
fixes for oracle
Browse files Browse the repository at this point in the history
  • Loading branch information
root authored and dciangot committed Dec 2, 2016
1 parent 38d13ab commit aaf0a2a
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 202 deletions.
23 changes: 13 additions & 10 deletions src/python/AsyncStageOut/PublisherDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,29 @@ def __init__(self, config):
server = CouchServer(dburl=self.config.couch_instance,
ckey=self.config.opsProxy,
cert=self.config.opsProxy)
self.db = server.connectDatabase(self.config.files_database)
self.logger.debug('Connected to CouchDB')
# Set up a factory for loading plugins
self.factory = WMFactory(self.config.schedAlgoDir,
namespace=self.config.schedAlgoDir)
self.pool = Pool(processes=self.config.publication_pool_size)

self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
if self.config.isOracle:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
else:
self.db = server.connectDatabase(self.config.files_database)

def algorithm(self, parameters=None):
"""
1. Get a list of users with files to publish from the couchdb instance
2. For each user get a suitably sized input for publish
3. Submit the publish to a subprocess
"""
if self.config.isOracle:
users = self.active_users(self.oracleDB)
else:
users = self.active_users(self.db)
if self.config.isOracle:
users = self.active_users(self.oracleDB)
else:
users = self.active_users(self.db)
self.logger.debug('kicking off pool %s' %users)
for u in users:
self.logger.debug('current_running %s' %current_running)
Expand Down Expand Up @@ -131,14 +133,15 @@ def active_users(self, db):
try:
results = db.get(self.config.oracleFileTrans,
data=encodeRequest(fileDoc))
result = oracleOutputMapping(results)
except Exception as ex:
self.logger.error("Failed to acquire publications \
from oracleDB: %s" %ex)

result = oracleOutputMapping(results)
self.logger.debug("%s acquired puclications retrieved" % len(result))
#TODO: join query for publisher (same of submitter)
unique_users = [list(i) for i in set(tuple([x['username'], x['user_group'], x['user_role']]) for x in result
if x['transfer_state']==3)]
if x['transfer_state'] == 3)]
return unique_users
else:
# TODO: Remove stale=ok for now until tested
Expand Down
173 changes: 104 additions & 69 deletions src/python/AsyncStageOut/PublisherWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from AsyncStageOut import getCommonLogFormatter

from RESTInteractions import HTTPRequests
from ServerUtilities import getHashLfn, PUBLICATIONDB_STATUSES, encodeRequest, oracleOutputMapping
from ServerUtilities import getColumn, getHashLfn, PUBLICATIONDB_STATUSES, encodeRequest, oracleOutputMapping

class PublisherWorker:
"""
Expand Down Expand Up @@ -94,13 +94,7 @@ def __init__(self, user, config):
'uisource': self.uiSetupScript
}
# If we're just testing publication, we skip the DB connection.
if os.getenv("TEST_ASO"):
self.db = None
else:
server = CouchServer(dburl=self.config.couch_instance,
ckey=self.config.opsProxy,
cert=self.config.opsProxy)
self.db = server.connectDatabase(self.config.files_database)

if hasattr(self.config, "cache_area"):
try:
getCache = re.compile('https?://([^/]*)/.*')
Expand Down Expand Up @@ -156,6 +150,20 @@ def __init__(self, user, config):
self.logger.error('Did not get valid proxy. Setting proxy to ops proxy')
self.userProxy = self.config.opsProxy
# self.cache_area = self.config.cache_area
if os.getenv("TEST_ASO"):
self.db = None
elif not self.config.isOracle:
server = CouchServer(dburl=self.config.couch_instance,
ckey=self.config.opsProxy,
cert=self.config.opsProxy)
self.db = server.connectDatabase(self.config.files_database)
else:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
self.oracleDB_user = HTTPRequests(self.config.oracleDB,
self.userProxy,
self.userProxy)
self.phedexApi = PhEDEx(responseType='json')
self.max_files_per_block = max(1, self.config.max_files_per_block)
self.block_publication_timeout = self.config.block_closure_timeout
Expand All @@ -181,10 +189,6 @@ def __init__(self, user, config):
msg += str(traceback.format_exc())
self.logger.debug(msg)

self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)

def __call__(self):
"""
1- check the nubmer of files in wf to publish if it is < max_files_per_block
Expand Down Expand Up @@ -247,6 +251,7 @@ def __call__(self):
]}
for x in toPub_docs if x['transfer_state']==3 and x['publication_state'] not in [2,3,5]]
# self.logger.debug("active_user_workflows: %s" %active_user_workflows)
workflow = ''
for user_wf in unique_user_workflows:
workflow = str(user_wf['key'][3])
wfnamemsg = "%s: " % (workflow)
Expand Down Expand Up @@ -363,33 +368,36 @@ def __call__(self):
cert=self.userProxy
)# , verbose=True) # for debug
except Exception as ex:
msg = "Error retrieving status from cache. Fall back to user cache area"
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(wfnamemsg+msg)
query = {'key': self.user}
try:
self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows']
except Exception as ex:
msg = "Error getting user cache_area"
if self.config.isOracle:
self.logger.exception('Error retrieving status from cache.')
else:
msg = "Error retrieving status from cache. Fall back to user cache area"
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)
self.logger.error(wfnamemsg+msg)
query = {'key': self.user}
try:
self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows']
except Exception as ex:
msg = "Error getting user cache_area"
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)

self.cache_area = self.user_cache_area[0]['value'][0]+self.user_cache_area[0]['value'][1]+'/filemetadata'
try:
_, res_ = self.connection.request(url,
data,
header,
doseq=True,
ckey=self.userProxy,
cert=self.userProxy
self.cache_area = self.user_cache_area[0]['value'][0]+self.user_cache_area[0]['value'][1]+'/filemetadata'
try:
_, res_ = self.connection.request(url,
data,
header,
doseq=True,
ckey=self.userProxy,
cert=self.userProxy
)#, verbose=True)# for debug
except Exception as ex:
msg = "Error retrieving status from user cache area."
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(wfnamemsg+msg)
except Exception as ex:
msg = "Error retrieving status from user cache area."
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(wfnamemsg+msg)

msg = "Status retrieved from cache. Loading task status."
self.logger.info(wfnamemsg+msg)
Expand Down Expand Up @@ -423,41 +431,58 @@ def __call__(self):
# Get when was the last time a publication was done for this workflow (this
# should be more or less independent of the output dataset in case there are
# more than one).
query = {'reduce': True, 'key': user_wf['key']}
try:
last_publication_time = self.db.loadView('DBSPublisher', 'last_publication', query)['rows']
except Exception as ex:
msg = "Cannot get last publication time for %s: %s" % (user_wf['key'], ex)
self.logger.error(wfnamemsg+msg)
last_publication_time = None
if not self.config.isOracle:
query = {'reduce': True, 'key': user_wf['key']}
try:
last_publication_time = self.db.loadView('DBSPublisher', 'last_publication', query)['rows']
except Exception as ex:
msg = "Cannot get last publication time for %s: %s" % (user_wf['key'], ex)
self.logger.error(wfnamemsg+msg)
else:
data['workflow'] = workflow
data['subresource'] = 'search'
try:
result = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','task'),
data=encodeRequest(data))
self.logger.debug("task: %s " % str(result[0]))
self.logger.debug("task: %s " % getColumn(result[0],'tm_last_publication'))
except Exception as ex:
self.logger.error("Error during task doc retrieving: %s" %ex)
if last_publication_time:
date = oracleOutputMapping(result)['last_publication']
seconds = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f").timetuple()
last_publication_time = time.mktime(seconds)

msg = "Last publication time: %s." % str(last_publication_time)
self.logger.debug(wfnamemsg+msg)
# If this is the first time a publication would be done for this workflow, go
# ahead and publish.
if not last_publication_time:
self.force_publication = True
msg = "There was no previous publication. Will force publication."
self.logger.info(wfnamemsg+msg)
# Otherwise...
else:
msg = "Last publication time: %s." % (last_publication_time)
last = last_publication_time
msg = "Last published block: %s" % (last)
self.logger.debug(wfnamemsg+msg)
# If this is the first time a publication would be done for this workflow, go
# ahead and publish.
if not last_publication_time:
# If the last publication was long time ago (> our block publication timeout),
# go ahead and publish.
time_since_last_publication = now - last
hours = int(time_since_last_publication/60/60)
minutes = int((time_since_last_publication - hours*60*60)/60)
timeout_hours = int(self.block_publication_timeout/60/60)
timeout_minutes = int((self.block_publication_timeout - timeout_hours*60*60)/60)
msg = "Last publication was %sh:%sm ago" % (hours, minutes)
if time_since_last_publication > self.block_publication_timeout:
self.force_publication = True
msg = "There was no previous publication. Will force publication."
self.logger.info(wfnamemsg+msg)
# Otherwise...
msg += " (more than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes)
msg += " Will force publication."
else:
msg = "Last published block: %s" % (last_publication_time[0]['value']['max'])
self.logger.debug(wfnamemsg+msg)
# If the last publication was long time ago (> our block publication timeout),
# go ahead and publish.
time_since_last_publication = now - last_publication_time[0]['value']['max']
hours = int(time_since_last_publication/60/60)
minutes = int((time_since_last_publication - hours*60*60)/60)
timeout_hours = int(self.block_publication_timeout/60/60)
timeout_minutes = int((self.block_publication_timeout - timeout_hours*60*60)/60)
msg = "Last publication was %sh:%sm ago" % (hours, minutes)
if time_since_last_publication > self.block_publication_timeout:
self.force_publication = True
msg += " (more than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes)
msg += " Will force publication."
else:
msg += " (less than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes)
msg += " Not enough to force publication."
self.logger.info(wfnamemsg+msg)
msg += " (less than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes)
msg += " Not enough to force publication."
self.logger.info(wfnamemsg+msg)
# Call the publish method with the lists of ready files to publish for this
# workflow grouped by datasets.
result = self.publish(workflow, input_dataset, input_dbs_url, pnn, lfn_ready)
Expand All @@ -471,9 +496,19 @@ def __call__(self):
force_failure = result[dataset].get('force_failure', False)
self.mark_failed(workflow, failed_files, failure_reason, force_failure)

self.logger.info("Publications for user %s (group: %s, role: %s) completed." % (self.user,
self.group,
self.role))
try:
# TODO: update last publication time db task updatepublicationtime,workflow,
self.logger.debug("Updating last publication type for: %s " % workflow)
data['workflow'] = workflow
data['subresource'] = 'updatepublicationtime'
result = self.oracleDB_user.get(self.config.oracleFileTrans.replace('filetransfers','task'),
data=encodeRequest(data))
self.logger.debug("%s last publication type update: %s " % (workflow,str(result)))
self.logger.info("Publications for user %s (group: %s, role: %s) completed." % (self.user,
self.group,
self.role))
except Exception as ex:
self.logger.error("Error during task doc retrieving: %s" %ex)

def mark_good(self, workflow, files):
"""
Expand Down
Loading

0 comments on commit aaf0a2a

Please sign in to comment.