Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #4202 from HassenRiahi/FixCouchFilesCleanerComm
Browse files Browse the repository at this point in the history
Do not remove files if couch does not respond.
  • Loading branch information
HassenRiahi committed May 8, 2014
2 parents 1e425cd + 03114e5 commit 4afe3ff
Showing 1 changed file with 55 additions and 55 deletions.
110 changes: 55 additions & 55 deletions src/python/AsyncStageOut/CleanerDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,66 +65,66 @@ def algorithm(self, parameters = None):
self.logger.debug('Active sites are: %s' % sites)
for site in sites:
self.site_tfc_map[site] = self.get_tfc_rules(site)
query = {}
try:
since = self.config_db.loadView('asynctransfer_config', 'lastFilesCleaningTime', query)['rows'][0]['key']
except IndexError:
self.logger.debug('No records to determine last cleanning time, waiting for next iteration')
return
except KeyError:
self.logger.debug('Could not get results from CouchDB, waiting for next iteration')
return
except Exception, e:
self.logger.exception('A problem occured when contacting couchDB to determine last cleanning time: %s' % e)
return
if sites:
query = {}
try:
since = self.config_db.loadView('asynctransfer_config', 'lastFilesCleaningTime', query)['rows'][0]['key']
except IndexError:
self.logger.debug('No records to determine last cleanning time, waiting for next iteration')
return
except KeyError:
self.logger.debug('Could not get results from CouchDB, waiting for next iteration')
return
except Exception, e:
self.logger.exception('A problem occured when contacting couchDB to determine last cleanning time: %s' % e)
return

end_time = time.time()
query = { 'startkey': since, 'endkey': end_time, 'stale': 'ok'}
try:
all_LFNs = self.db.loadView('AsyncTransfer', 'LFNSiteByLastUpdate', query)['rows']
except Exception, e:
self.logger.exception('A problem occured when contacting couchDB to retrieve LFNs: %s' % e)
return
end_time = time.time()
query = { 'startkey': since, 'endkey': end_time, 'stale': 'ok'}
try:
all_LFNs = self.db.loadView('AsyncTransfer', 'LFNSiteByLastUpdate', query)['rows']
except Exception, e:
self.logger.exception('A problem occured when contacting couchDB to retrieve LFNs: %s' % e)
return

updateUri = "/" + self.config_db.name + "/_design/asynctransfer_config/_update/lastCleaningTime/LAST_CLEANING_TIME"
updateUri += "?last_cleaning_time=%d" % end_time
try:
self.config_db.makeRequest(uri = updateUri, type = "PUT", decode = False)
except Exception, e:
self.logger.exception('A problem occured when contacting couchDB to update last cleanning time: %s' % e)
return
updateUri = "/" + self.config_db.name + "/_design/asynctransfer_config/_update/lastCleaningTime/LAST_CLEANING_TIME"
updateUri += "?last_cleaning_time=%d" % end_time
try:
self.config_db.makeRequest(uri = updateUri, type = "PUT", decode = False)
except Exception, e:
self.logger.exception('A problem occured when contacting couchDB to update last cleanning time: %s' % e)
return

self.logger.info('LFNs to remove: %s' % len(all_LFNs))
self.logger.debug('LFNs to remove: %s' % all_LFNs)
self.logger.info('LFNs to remove: %s' % len(all_LFNs))
self.logger.debug('LFNs to remove: %s' % all_LFNs)

for lfnDetails in all_LFNs:
lfn = lfnDetails['value']['lfn']
location = lfnDetails['value']['location']
self.logger.info("Removing %s from %s" %(lfn, location))
pfn = self.apply_tfc_to_lfn( '%s:%s' %(location, lfn))
if pfn:
logfile = open('%s/%s_%s.lcg-del.log' % ( self.log_dir, location, str(time.time()) ), 'w')
command = 'export X509_USER_PROXY=%s ; source %s ; lcg-del -lv %s' % \
(self.opsProxy, self.uiSetupScript, pfn)
self.logger.debug("Running remove command %s" % command)
self.logger.info("log file: %s" % logfile.name)
proc = subprocess.Popen(
["/bin/bash"], shell=True, cwd=os.environ['PWD'],
stdout=logfile,
stderr=logfile,
stdin=subprocess.PIPE,
)
proc.stdin.write(command)
stdout, stderr = proc.communicate()
rc = proc.returncode
if rc:
self.logger.info("Deletion command failed with output %s and error %s" %(stdout, stderr))
for lfnDetails in all_LFNs:
lfn = lfnDetails['value']['lfn']
location = lfnDetails['value']['location']
self.logger.info("Removing %s from %s" %(lfn, location))
pfn = self.apply_tfc_to_lfn( '%s:%s' %(location, lfn))
if pfn:
logfile = open('%s/%s_%s.lcg-del.log' % ( self.log_dir, location, str(time.time()) ), 'w')
command = 'export X509_USER_PROXY=%s ; source %s ; lcg-del -lv %s' % \
(self.opsProxy, self.uiSetupScript, pfn)
self.logger.debug("Running remove command %s" % command)
self.logger.info("log file: %s" % logfile.name)
proc = subprocess.Popen(
["/bin/bash"], shell=True, cwd=os.environ['PWD'],
stdout=logfile,
stderr=logfile,
stdin=subprocess.PIPE,
)
proc.stdin.write(command)
stdout, stderr = proc.communicate()
rc = proc.returncode
if rc:
self.logger.info("Deletion command failed with output %s and error %s" %(stdout, stderr))
else:
self.logger.info("File Deleted.")
logfile.close()
else:
self.logger.info("File Deleted.")
logfile.close()
else:
self.logger.info("Removing %s from %s failed when retrieving the PFN" %(lfn, location))

self.logger.info("Removing %s from %s failed when retrieving the PFN" %(lfn, location))
return

def apply_tfc_to_lfn(self, file):
Expand Down

0 comments on commit 4afe3ff

Please sign in to comment.