Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

Commit

Permalink
acquired documents query fix + small fixes on reporter worker
Browse files Browse the repository at this point in the history
  • Loading branch information
dciangot committed Dec 4, 2016
1 parent aaf0a2a commit 4f849d3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
22 changes: 10 additions & 12 deletions src/python/AsyncStageOut/ReporterWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ def __call__(self):
self.logger.debug('Marking failed %s %s' %(failed_lfns, failure_reason))
updated_failed_lfns = self.mark_failed(failed_lfns, failure_reason)

if len(updated_failed_lfns) != len(failed_lfns):
remove_failed = False

if 'Done' or 'FINISHED' in json_data['transferStatus']:
# Sort good files
Expand All @@ -248,13 +246,9 @@ def __call__(self):
except:
self.logger.exception('Either no files to mark or failed to update state')

if len(updated_good_lfns) != len(good_lfns):
remove_good = False

if remove_good and remove_failed:
# Remove the json file
self.logger.debug('Removing %s' % input_file)
os.unlink( input_file )
# Remove the json file
self.logger.debug('Removing %s' % input_file)
os.unlink( input_file )

else:
self.logger.info('Empty file %s' % input_file)
Expand Down Expand Up @@ -346,7 +340,8 @@ def mark_good(self, files):
self.logger.debug("Marked good %s" % good_ids)
except Exception:
self.logger.exception('Error updating document')

return {}

self.logger.info("Transferred file %s updated, removing now source file" %docId)
try:
docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','fileusertransfers'),
Expand All @@ -355,7 +350,8 @@ def mark_good(self, files):
except Exception:
msg = "Error getting file from source"
self.logger.exception(msg)
raise
return {}

if document["source"] not in self.site_tfc_map:
self.logger.debug("site not found... gathering info from phedex")
self.site_tfc_map[document["source"]] = self.get_tfc_rules(document["source"])
Expand Down Expand Up @@ -461,13 +457,15 @@ def mark_failed(self, files=[], failures_reasons=[], force_fail=False):
fatal_error = self.determine_fatal_error(failures_reasons[files.index(lfn)])
if fatal_error:
data['list_of_transfer_state'] = 'FAILED'

data['list_of_failure_reason'] = failures_reasons[files.index(lfn)]
data['list_of_retry_value'] = 0

self.logger.debug("update: %s" % data)
result = self.oracleDB.post(self.config.oracleFileTrans,
data=encodeRequest(data))
updated_lfn.append(lfn)
if not data['list_of_transfer_state'] == 'RETRY':
updated_lfn.append(lfn)
self.logger.debug("Marked failed %s" % lfn)
except Exception as ex:
self.logger.error("Error updating document status: %s" %ex)
Expand Down
1 change: 1 addition & 0 deletions src/python/AsyncStageOut/TransferDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def algorithm(self, parameters=None):
u[i] = ''

self.logger.debug('current_running %s' % current_running)
self.logger.debug('BBBBBB: %s %s %s' % (u, current_running, (u not in current_running)))
if u not in current_running:
self.logger.debug('processing %s' % u)
current_running.append(u)
Expand Down
23 changes: 14 additions & 9 deletions src/python/AsyncStageOut/TransferWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,17 @@ def source_destinations_by_user(self):
result = []

self.logger.debug('Request: ' + str(fileDoc))

try:
results = self.oracleDB.get(self.config.oracleFileTrans,
data=encodeRequest(fileDoc))
result = oracleOutputMapping(results)
self.logger.debug('toBeTransferred: ',result)
res = [[x['source'], x['destination']] for x in result]
res.sort()
res = list(k for k, _ in itertools.groupby(res))
except Exception as ex:
self.logger.error("Failed to get acquired transfers \
from oracleDB: %s" %ex)
return [], {}
return res, result
else:
query = {'group': True,
Expand All @@ -250,7 +249,9 @@ def files_for_transfer(self):
jobs_report = {}
self.logger.info('%s has %s links to transfer on: %s' % (self.user, len(source_dests), str(source_dests)))
try:
count = 0
for (source, destination) in source_dests:
count += 1
self.logger.info('dest1: %s source: %s' % (docs[0]['destination'],source))
if self.config.isOracle:
if self.group == '':
Expand Down Expand Up @@ -285,8 +286,8 @@ def map_active(inputdoc):
return outDict
active_files = [map_active(x) for x in active_docs]
#active_files = active_files[:1000]
self.logger.debug('%s has %s files to transfer \
from %s to %s' % (self.user,
self.logger.debug('%s - %s has %s files to transfer \
from %s to %s' % (count, self.user,
len(active_files),
source,
destination))
Expand All @@ -307,6 +308,7 @@ def map_active(inputdoc):
pfn_list = []
dash_report = []


# take these active files and make a copyjob entry
def tfc_map(item):
self.logger.debug('Preparing PFNs...')
Expand Down Expand Up @@ -523,7 +525,7 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report):
self.logger.debug(msg)
submission_error = True
failure_reasons.append(msg)
self.logger.debug("List files in job %s" % files_)
#self.logger.debug("List files in job %s" % files_)
file_buf.close()
for file_in_job in files_res:
if 'file_id' in file_in_job:
Expand Down Expand Up @@ -612,15 +614,17 @@ def mark_acquired(self, files=[]):
fileDoc = dict()
fileDoc['asoworker'] = self.config.asoworker
fileDoc['subresource'] = 'updateTransfers'
fileDoc['list_of_ids'] = toUpdate
fileDoc['list_of_transfer_state'] = ["SUBMITTED" for x in toUpdate]
fileDoc['list_of_ids'] = files[0]['key'][5]
fileDoc['list_of_transfer_state'] = "SUBMITTED"

self.logger.debug("Marking acquired %s" % (fileDoc))

result = self.oracleDB.post(self.config.oracleFileTrans,
data=encodeRequest(fileDoc))
self.logger.debug("Marked acquired %s of %s" % (fileDoc, result))
except Exception as ex:
self.logger.error("Error during status update: %s" %ex)

self.logger.debug("Marked acquired %s of %s" % (docId, lfn))
# TODO: no need of mark good right? the postjob should updated the status in case of direct stageout I think
return lfn_in_transfer, dash_rep
else:
Expand Down Expand Up @@ -726,7 +730,8 @@ def mark_failed(self, files=[], force_fail=False, submission_error=False):
docId = getHashLfn(temp_lfn)
self.logger.debug("Marking failed %s" % docId)
try:
docbyId = self.oracleDB.get(self.config.oracleFileTrans,
docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers',
'fileusertransfers'),
data=encodeRequest({'subresource': 'getById', 'id': docId}))
except Exception as ex:
self.logger.error("Error updating failed docs: %s" %ex)
Expand Down

0 comments on commit 4f849d3

Please sign in to comment.