diff --git a/src/python/AsyncStageOut/PublisherWorker.py b/src/python/AsyncStageOut/PublisherWorker.py index 95cad3c..9447d79 100644 --- a/src/python/AsyncStageOut/PublisherWorker.py +++ b/src/python/AsyncStageOut/PublisherWorker.py @@ -14,8 +14,7 @@ import json import time import uuid -import types -import pprint +#import pprint import urllib import tarfile import logging @@ -63,7 +62,7 @@ def __init__(self, user, config): try: self.userDN = getDNFromUserName(self.user, self.logger) except Exception as ex: - msg = "Error retrieving the user DN" + msg = "Error retrieving the user DN" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) @@ -93,7 +92,7 @@ def __init__(self, user, config): try: self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] except Exception as ex: - msg = "Error getting user cache_area" + msg = "Error getting user cache_area" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) @@ -124,10 +123,6 @@ def __init__(self, user, config): # Use the operator's proxy when the user proxy in invalid. # This will be moved soon self.logger.error('Did not get valid proxy. Setting proxy to ops proxy') - info = {'server_key': self.config.opsProxy, 'server_cert': self.config.opsProxy, 'logger': self.logger} - self.logger.info("Ops proxy info: %s" % str(info)) - opsProxy = Proxy({'server_key': self.config.opsProxy, 'server_cert': self.config.opsProxy, 'logger': self.logger}) - self.userDN = opsProxy.getSubject() self.userProxy = self.config.opsProxy #self.cache_area = self.config.cache_area self.phedexApi = PhEDEx(responseType='json') @@ -191,7 +186,7 @@ def __call__(self): query = {'reduce': False, 'key': user_wf['key']}#'stale': 'ok'} try: active_files = self.db.loadView('DBSPublisher', 'publish', query)['rows'] - except Exception, e: + except Exception as e: msg = "A problem occured to retrieve the list of active files for %s: %s" % (self.user, e) self.logger.error(wfnamemsg+msg) msg = "Publications will be retried next time." @@ -297,7 +292,7 @@ def __call__(self): data = {'workflow': workflow} header = {"Content-Type ":"application/json"} try: - response, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug + _, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug except Exception as ex: msg = "Error retrieving status from cache." msg += str(ex) @@ -407,14 +402,14 @@ def mark_good(self, workflow, files=[]): updateUri += "?" + urllib.urlencode(data) self.logger.info(wfnamemsg+"URI: %s" % updateUri) self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) - except Exception, ex: + except Exception as ex: msg = "Error updating document in Couch." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) try: self.db.commit() - except Exception, ex: + except Exception as ex: msg = "Error committing documents in Couch." msg += str(ex) msg += str(traceback.format_exc()) @@ -438,7 +433,7 @@ def mark_failed(self, workflow, files=[], failure_reason="", force_failure=False # Load document to get the retry_count try: document = self.db.document(docId) - except Exception, ex: + except Exception as ex: msg = "Error loading document from Couch." msg += str(ex) msg += str(traceback.format_exc()) @@ -458,14 +453,14 @@ def mark_failed(self, workflow, files=[], failure_reason="", force_failure=False updateUri += "?" + urllib.urlencode(data) self.logger.info(wfnamemsg+"URI: %s" % updateUri) self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) - except Exception, ex: + except Exception as ex: msg = "Error updating document in Couch." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) try: self.db.commit() - except Exception, ex: + except Exception as ex: msg = "Error committing documents in Couch." msg += str(ex) msg += str(traceback.format_exc()) @@ -487,7 +482,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): ## so to not retrieve the filemetadata unnecesarily. if not False in map(lambda x: len(x) < self.max_files_per_block, lfn_ready.values()) and not self.force_publication: msg = "Skipping publication as there are not enough ready files in any of the datasets (and publication was not forced)." - self.logger.info(wfnamemsg+msg) + self.logger.info(wfnamemsg+msg) return retdict ## Get the filemetada for this workflow. msg = "Retrieving publication description files." @@ -496,7 +491,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): for v in lfn_ready.values(): lfn_ready_list.extend(v) try: - publDescFiles_list = self.getPublDescFiles(workflow, self.user) + publDescFiles_list = self.getPublDescFiles(workflow) except (tarfile.ReadError, RuntimeError): msg = "Error retrieving publication description files." self.logger.error(wfnamemsg+msg) @@ -557,26 +552,12 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): return retdict - def getPublDescFiles(self, workflow, userhn): + def getPublDescFiles(self, workflow): """ Download and read the files describing what needs to be published """ wfnamemsg = "%s: " % (workflow) - def decodeAsString(a): - """ - DBS is stupid and doesn't understand that unicode is a string: (if type(obj) == type('')) - So best to convert as much of the decoded JSON to str as possible. Some is left over and handled by - PoorMansBufferFile - """ - newDict = {} - for key, value in a.iteritems(): - if type(key) == types.UnicodeType: - key = str(key) - if type(value) == types.UnicodeType: - value = str(value) - newDict.update({key : value}) - return newDict buf = cStringIO.StringIO() res = [] # TODO: input sanitization @@ -586,8 +567,8 @@ def decodeAsString(a): msg = "Retrieving data from %s" % (url) self.logger.info(wfnamemsg+msg) try: - response, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug - except Exception, ex: + _, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug + except Exception as ex: msg = "Error retrieving data." msg += str(ex) msg += str(traceback.format_exc()) @@ -598,7 +579,7 @@ def decodeAsString(a): try: buf.close() res = json.loads(res_) - except Exception, ex: + except Exception as ex: msg = "Error loading results. Trying next time!" msg += str(ex) msg += str(traceback.format_exc()) @@ -724,7 +705,7 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas numTimes = 10 msg = "Will monitor their status for up to %d seconds." % (waitTime * numTimes) self.logger.info(wfnamemsg+msg) - for i in range(numTimes): + for _ in range(numTimes): msg = "%d block migrations in progress." % (numMigrationsInProgress) msg += " Will check migrations status in %d seconds." % (waitTime) self.logger.info(wfnamemsg+msg) @@ -737,7 +718,7 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas status = migrateApi.statusMigration(migration_rqst_id = reqid) state = status[0].get('migration_status') retry = status[0].get('retry_count') - except Exception, ex: + except Exception as ex: msg = "Could not get status for migration id %d:\n%s" % (reqid, ex.msg) self.logger.error(wfnamemsg+msg) else: @@ -822,7 +803,7 @@ def requestBlockMigration(self, workflow, migrateApi, sourceApi, block): data = {'migration_url': sourceURL, 'migration_input': block} try: result = migrateApi.submitMigration(data) - except HTTPError, he: + except HTTPError as he: if "is already at destination" in he.msg: msg = "Block is already at destination." self.logger.info(wfnamemsg+msg) @@ -967,7 +948,6 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): appName = 'cmsRun' appVer = files[0]["swversion"] - appFam = 'output' pset_hash = files[0]['publishname'].split("-")[-1] gtag = str(files[0]['globaltag']) if gtag == "None": @@ -993,7 +973,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): msg += " (%d valid, %d invalid)." % (len(existingFilesValid), len(existingFiles) - len(existingFilesValid)) self.logger.info(wfnamemsg+msg) results[dataset]['existingFiles'] = len(existingFiles) - except Exception, ex: + except Exception as ex: msg = "Error when listing files in DBS: %s" % (str(ex)) msg += "\n%s" % (str(traceback.format_exc())) self.logger.error(wfnamemsg+msg) @@ -1065,11 +1045,11 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): parentFiles.add(parentFile) ## Is this parent file already in the destination DBS instance? ## (If yes, then we don't have to migrate this block.) - blocksDict = destReadApi.listBlocks(logical_file_name = parentFile) + blocksDict = destReadApi.listBlocks(logical_file_name=parentFile) if not blocksDict: ## No, this parent file is not in the destination DBS instance. ## Maybe it is in the same DBS instance as the input dataset? - blocksDict = sourceApi.listBlocks(logical_file_name = parentFile) + blocksDict = sourceApi.listBlocks(logical_file_name=parentFile) if blocksDict: ## Yes, this parent file is in the same DBS instance as the input dataset. ## Add the corresponding block to the set of blocks from the source DBS @@ -1078,7 +1058,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): else: ## No, this parent file is not in the same DBS instance as input dataset. ## Maybe it is in global DBS instance? - blocksDict = globalApi.listBlocks(logical_file_name = parentFile) + blocksDict = globalApi.listBlocks(logical_file_name=parentFile) if blocksDict: ## Yes, this parent file is in global DBS instance. ## Add the corresponding block to the set of blocks from global DBS @@ -1155,7 +1135,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): #self.logger.debug(wfnamemsg+"Block to insert: %s\n" % pprint.pformat(blockDump)) destApi.insertBulkBlock(blockDump) block_count += 1 - except Exception, ex: + except Exception as ex: failed[dataset].extend([f['logical_file_name'] for f in files_to_publish]) msg = "Error when publishing (%s) " % ", ".join(failed[dataset]) msg += str(ex)