From 4f849d3442aec474712511f29c5af3e5b80a6406 Mon Sep 17 00:00:00 2001 From: dciangot Date: Sun, 4 Dec 2016 20:37:50 +0100 Subject: [PATCH] acquired documents query fix + small fixes on reporter worker --- src/python/AsyncStageOut/ReporterWorker.py | 22 ++++++++++----------- src/python/AsyncStageOut/TransferDaemon.py | 1 + src/python/AsyncStageOut/TransferWorker.py | 23 +++++++++++++--------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/python/AsyncStageOut/ReporterWorker.py b/src/python/AsyncStageOut/ReporterWorker.py index 4018c34..a7b2f2a 100644 --- a/src/python/AsyncStageOut/ReporterWorker.py +++ b/src/python/AsyncStageOut/ReporterWorker.py @@ -232,8 +232,6 @@ def __call__(self): self.logger.debug('Marking failed %s %s' %(failed_lfns, failure_reason)) updated_failed_lfns = self.mark_failed(failed_lfns, failure_reason) - if len(updated_failed_lfns) != len(failed_lfns): - remove_failed = False if 'Done' or 'FINISHED' in json_data['transferStatus']: # Sort good files @@ -248,13 +246,9 @@ def __call__(self): except: self.logger.exception('Either no files to mark or failed to update state') - if len(updated_good_lfns) != len(good_lfns): - remove_good = False - - if remove_good and remove_failed: - # Remove the json file - self.logger.debug('Removing %s' % input_file) - os.unlink( input_file ) + # Remove the json file + self.logger.debug('Removing %s' % input_file) + os.unlink( input_file ) else: self.logger.info('Empty file %s' % input_file) @@ -346,7 +340,8 @@ def mark_good(self, files): self.logger.debug("Marked good %s" % good_ids) except Exception: self.logger.exception('Error updating document') - + return {} + self.logger.info("Transferred file %s updated, removing now source file" %docId) try: docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','fileusertransfers'), @@ -355,7 +350,8 @@ def mark_good(self, files): except Exception: msg = "Error getting file from source" self.logger.exception(msg) - raise + return {} + if document["source"] not in self.site_tfc_map: self.logger.debug("site not found... gathering info from phedex") self.site_tfc_map[document["source"]] = self.get_tfc_rules(document["source"]) @@ -461,13 +457,15 @@ def mark_failed(self, files=[], failures_reasons=[], force_fail=False): fatal_error = self.determine_fatal_error(failures_reasons[files.index(lfn)]) if fatal_error: data['list_of_transfer_state'] = 'FAILED' + data['list_of_failure_reason'] = failures_reasons[files.index(lfn)] data['list_of_retry_value'] = 0 self.logger.debug("update: %s" % data) result = self.oracleDB.post(self.config.oracleFileTrans, data=encodeRequest(data)) - updated_lfn.append(lfn) + if not data['list_of_transfer_state'] == 'RETRY': + updated_lfn.append(lfn) self.logger.debug("Marked failed %s" % lfn) except Exception as ex: self.logger.error("Error updating document status: %s" %ex) diff --git a/src/python/AsyncStageOut/TransferDaemon.py b/src/python/AsyncStageOut/TransferDaemon.py index 6cc12e2..f113987 100644 --- a/src/python/AsyncStageOut/TransferDaemon.py +++ b/src/python/AsyncStageOut/TransferDaemon.py @@ -162,6 +162,7 @@ def algorithm(self, parameters=None): u[i] = '' self.logger.debug('current_running %s' % current_running) + self.logger.debug('BBBBBB: %s %s %s' % (u, current_running, (u not in current_running))) if u not in current_running: self.logger.debug('processing %s' % u) current_running.append(u) diff --git a/src/python/AsyncStageOut/TransferWorker.py b/src/python/AsyncStageOut/TransferWorker.py index 766d35f..089aa4a 100644 --- a/src/python/AsyncStageOut/TransferWorker.py +++ b/src/python/AsyncStageOut/TransferWorker.py @@ -213,18 +213,17 @@ def source_destinations_by_user(self): result = [] self.logger.debug('Request: ' + str(fileDoc)) - try: results = self.oracleDB.get(self.config.oracleFileTrans, data=encodeRequest(fileDoc)) result = oracleOutputMapping(results) - self.logger.debug('toBeTransferred: ',result) res = [[x['source'], x['destination']] for x in result] res.sort() res = list(k for k, _ in itertools.groupby(res)) except Exception as ex: self.logger.error("Failed to get acquired transfers \ from oracleDB: %s" %ex) + return [], {} return res, result else: query = {'group': True, @@ -250,7 +249,9 @@ def files_for_transfer(self): jobs_report = {} self.logger.info('%s has %s links to transfer on: %s' % (self.user, len(source_dests), str(source_dests))) try: + count = 0 for (source, destination) in source_dests: + count += 1 self.logger.info('dest1: %s source: %s' % (docs[0]['destination'],source)) if self.config.isOracle: if self.group == '': @@ -285,8 +286,8 @@ def map_active(inputdoc): return outDict active_files = [map_active(x) for x in active_docs] #active_files = active_files[:1000] - self.logger.debug('%s has %s files to transfer \ - from %s to %s' % (self.user, + self.logger.debug('%s - %s has %s files to transfer \ + from %s to %s' % (count, self.user, len(active_files), source, destination)) @@ -307,6 +308,7 @@ def map_active(inputdoc): pfn_list = [] dash_report = [] + # take these active files and make a copyjob entry def tfc_map(item): self.logger.debug('Preparing PFNs...') @@ -523,7 +525,7 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): self.logger.debug(msg) submission_error = True failure_reasons.append(msg) - self.logger.debug("List files in job %s" % files_) + #self.logger.debug("List files in job %s" % files_) file_buf.close() for file_in_job in files_res: if 'file_id' in file_in_job: @@ -612,15 +614,17 @@ def mark_acquired(self, files=[]): fileDoc = dict() fileDoc['asoworker'] = self.config.asoworker fileDoc['subresource'] = 'updateTransfers' - fileDoc['list_of_ids'] = toUpdate - fileDoc['list_of_transfer_state'] = ["SUBMITTED" for x in toUpdate] + fileDoc['list_of_ids'] = files[0]['key'][5] + fileDoc['list_of_transfer_state'] = "SUBMITTED" + + self.logger.debug("Marking acquired %s" % (fileDoc)) result = self.oracleDB.post(self.config.oracleFileTrans, data=encodeRequest(fileDoc)) + self.logger.debug("Marked acquired %s of %s" % (fileDoc, result)) except Exception as ex: self.logger.error("Error during status update: %s" %ex) - self.logger.debug("Marked acquired %s of %s" % (docId, lfn)) # TODO: no need of mark good right? the postjob should updated the status in case of direct stageout I think return lfn_in_transfer, dash_rep else: @@ -726,7 +730,8 @@ def mark_failed(self, files=[], force_fail=False, submission_error=False): docId = getHashLfn(temp_lfn) self.logger.debug("Marking failed %s" % docId) try: - docbyId = self.oracleDB.get(self.config.oracleFileTrans, + docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers', + 'fileusertransfers'), data=encodeRequest({'subresource': 'getById', 'id': docId})) except Exception as ex: self.logger.error("Error updating failed docs: %s" %ex)