Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

Commit

Permalink
Include pylint changes. Fix #4459
Browse files Browse the repository at this point in the history
  • Loading branch information
Hassen Riahi authored and Hassen Riahi committed Nov 26, 2015
1 parent 2e3d412 commit 24a4fb2
Showing 1 changed file with 24 additions and 44 deletions.
68 changes: 24 additions & 44 deletions src/python/AsyncStageOut/PublisherWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import json
import time
import uuid
import types
import pprint
#import pprint
import urllib
import tarfile
import logging
Expand Down Expand Up @@ -63,7 +62,7 @@ def __init__(self, user, config):
try:
self.userDN = getDNFromUserName(self.user, self.logger)
except Exception as ex:
msg = "Error retrieving the user DN"
msg = "Error retrieving the user DN"
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)
Expand Down Expand Up @@ -93,7 +92,7 @@ def __init__(self, user, config):
try:
self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows']
except Exception as ex:
msg = "Error getting user cache_area"
msg = "Error getting user cache_area"
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)
Expand Down Expand Up @@ -124,10 +123,6 @@ def __init__(self, user, config):
# Use the operator's proxy when the user proxy in invalid.
# This will be moved soon
self.logger.error('Did not get valid proxy. Setting proxy to ops proxy')
info = {'server_key': self.config.opsProxy, 'server_cert': self.config.opsProxy, 'logger': self.logger}
self.logger.info("Ops proxy info: %s" % str(info))
opsProxy = Proxy({'server_key': self.config.opsProxy, 'server_cert': self.config.opsProxy, 'logger': self.logger})
self.userDN = opsProxy.getSubject()
self.userProxy = self.config.opsProxy
#self.cache_area = self.config.cache_area
self.phedexApi = PhEDEx(responseType='json')
Expand Down Expand Up @@ -191,7 +186,7 @@ def __call__(self):
query = {'reduce': False, 'key': user_wf['key']}#'stale': 'ok'}
try:
active_files = self.db.loadView('DBSPublisher', 'publish', query)['rows']
except Exception, e:
except Exception as e:
msg = "A problem occured to retrieve the list of active files for %s: %s" % (self.user, e)
self.logger.error(wfnamemsg+msg)
msg = "Publications will be retried next time."
Expand Down Expand Up @@ -297,7 +292,7 @@ def __call__(self):
data = {'workflow': workflow}
header = {"Content-Type ":"application/json"}
try:
response, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug
_, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug
except Exception as ex:
msg = "Error retrieving status from cache."
msg += str(ex)
Expand Down Expand Up @@ -407,14 +402,14 @@ def mark_good(self, workflow, files=[]):
updateUri += "?" + urllib.urlencode(data)
self.logger.info(wfnamemsg+"URI: %s" % updateUri)
self.db.makeRequest(uri = updateUri, type = "PUT", decode = False)
except Exception, ex:
except Exception as ex:
msg = "Error updating document in Couch."
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(wfnamemsg+msg)
try:
self.db.commit()
except Exception, ex:
except Exception as ex:
msg = "Error committing documents in Couch."
msg += str(ex)
msg += str(traceback.format_exc())
Expand All @@ -438,7 +433,7 @@ def mark_failed(self, workflow, files=[], failure_reason="", force_failure=False
# Load document to get the retry_count
try:
document = self.db.document(docId)
except Exception, ex:
except Exception as ex:
msg = "Error loading document from Couch."
msg += str(ex)
msg += str(traceback.format_exc())
Expand All @@ -458,14 +453,14 @@ def mark_failed(self, workflow, files=[], failure_reason="", force_failure=False
updateUri += "?" + urllib.urlencode(data)
self.logger.info(wfnamemsg+"URI: %s" % updateUri)
self.db.makeRequest(uri = updateUri, type = "PUT", decode = False)
except Exception, ex:
except Exception as ex:
msg = "Error updating document in Couch."
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(wfnamemsg+msg)
try:
self.db.commit()
except Exception, ex:
except Exception as ex:
msg = "Error committing documents in Couch."
msg += str(ex)
msg += str(traceback.format_exc())
Expand All @@ -487,7 +482,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready):
## so to not retrieve the filemetadata unnecesarily.
if not False in map(lambda x: len(x) < self.max_files_per_block, lfn_ready.values()) and not self.force_publication:
msg = "Skipping publication as there are not enough ready files in any of the datasets (and publication was not forced)."
self.logger.info(wfnamemsg+msg)
self.logger.info(wfnamemsg+msg)
return retdict
## Get the filemetada for this workflow.
msg = "Retrieving publication description files."
Expand All @@ -496,7 +491,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready):
for v in lfn_ready.values():
lfn_ready_list.extend(v)
try:
publDescFiles_list = self.getPublDescFiles(workflow, self.user)
publDescFiles_list = self.getPublDescFiles(workflow)
except (tarfile.ReadError, RuntimeError):
msg = "Error retrieving publication description files."
self.logger.error(wfnamemsg+msg)
Expand Down Expand Up @@ -557,26 +552,12 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready):
return retdict


def getPublDescFiles(self, workflow, userhn):
def getPublDescFiles(self, workflow):
"""
Download and read the files describing
what needs to be published
"""
wfnamemsg = "%s: " % (workflow)
def decodeAsString(a):
"""
DBS is stupid and doesn't understand that unicode is a string: (if type(obj) == type(''))
So best to convert as much of the decoded JSON to str as possible. Some is left over and handled by
PoorMansBufferFile
"""
newDict = {}
for key, value in a.iteritems():
if type(key) == types.UnicodeType:
key = str(key)
if type(value) == types.UnicodeType:
value = str(value)
newDict.update({key : value})
return newDict
buf = cStringIO.StringIO()
res = []
# TODO: input sanitization
Expand All @@ -586,8 +567,8 @@ def decodeAsString(a):
msg = "Retrieving data from %s" % (url)
self.logger.info(wfnamemsg+msg)
try:
response, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug
except Exception, ex:
_, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug
except Exception as ex:
msg = "Error retrieving data."
msg += str(ex)
msg += str(traceback.format_exc())
Expand All @@ -598,7 +579,7 @@ def decodeAsString(a):
try:
buf.close()
res = json.loads(res_)
except Exception, ex:
except Exception as ex:
msg = "Error loading results. Trying next time!"
msg += str(ex)
msg += str(traceback.format_exc())
Expand Down Expand Up @@ -724,7 +705,7 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas
numTimes = 10
msg = "Will monitor their status for up to %d seconds." % (waitTime * numTimes)
self.logger.info(wfnamemsg+msg)
for i in range(numTimes):
for _ in range(numTimes):
msg = "%d block migrations in progress." % (numMigrationsInProgress)
msg += " Will check migrations status in %d seconds." % (waitTime)
self.logger.info(wfnamemsg+msg)
Expand All @@ -737,7 +718,7 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas
status = migrateApi.statusMigration(migration_rqst_id = reqid)
state = status[0].get('migration_status')
retry = status[0].get('retry_count')
except Exception, ex:
except Exception as ex:
msg = "Could not get status for migration id %d:\n%s" % (reqid, ex.msg)
self.logger.error(wfnamemsg+msg)
else:
Expand Down Expand Up @@ -822,7 +803,7 @@ def requestBlockMigration(self, workflow, migrateApi, sourceApi, block):
data = {'migration_url': sourceURL, 'migration_input': block}
try:
result = migrateApi.submitMigration(data)
except HTTPError, he:
except HTTPError as he:
if "is already at destination" in he.msg:
msg = "Block is already at destination."
self.logger.info(wfnamemsg+msg)
Expand Down Expand Up @@ -967,7 +948,6 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn):

appName = 'cmsRun'
appVer = files[0]["swversion"]
appFam = 'output'
pset_hash = files[0]['publishname'].split("-")[-1]
gtag = str(files[0]['globaltag'])
if gtag == "None":
Expand All @@ -993,7 +973,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn):
msg += " (%d valid, %d invalid)." % (len(existingFilesValid), len(existingFiles) - len(existingFilesValid))
self.logger.info(wfnamemsg+msg)
results[dataset]['existingFiles'] = len(existingFiles)
except Exception, ex:
except Exception as ex:
msg = "Error when listing files in DBS: %s" % (str(ex))
msg += "\n%s" % (str(traceback.format_exc()))
self.logger.error(wfnamemsg+msg)
Expand Down Expand Up @@ -1065,11 +1045,11 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn):
parentFiles.add(parentFile)
## Is this parent file already in the destination DBS instance?
## (If yes, then we don't have to migrate this block.)
blocksDict = destReadApi.listBlocks(logical_file_name = parentFile)
blocksDict = destReadApi.listBlocks(logical_file_name=parentFile)
if not blocksDict:
## No, this parent file is not in the destination DBS instance.
## Maybe it is in the same DBS instance as the input dataset?
blocksDict = sourceApi.listBlocks(logical_file_name = parentFile)
blocksDict = sourceApi.listBlocks(logical_file_name=parentFile)
if blocksDict:
## Yes, this parent file is in the same DBS instance as the input dataset.
## Add the corresponding block to the set of blocks from the source DBS
Expand All @@ -1078,7 +1058,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn):
else:
## No, this parent file is not in the same DBS instance as input dataset.
## Maybe it is in global DBS instance?
blocksDict = globalApi.listBlocks(logical_file_name = parentFile)
blocksDict = globalApi.listBlocks(logical_file_name=parentFile)
if blocksDict:
## Yes, this parent file is in global DBS instance.
## Add the corresponding block to the set of blocks from global DBS
Expand Down Expand Up @@ -1155,7 +1135,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn):
#self.logger.debug(wfnamemsg+"Block to insert: %s\n" % pprint.pformat(blockDump))
destApi.insertBulkBlock(blockDump)
block_count += 1
except Exception, ex:
except Exception as ex:
failed[dataset].extend([f['logical_file_name'] for f in files_to_publish])
msg = "Error when publishing (%s) " % ", ".join(failed[dataset])
msg += str(ex)
Expand Down

0 comments on commit 24a4fb2

Please sign in to comment.