diff --git a/assess.py b/assess.py index ba9e532..f5bdee8 100644 --- a/assess.py +++ b/assess.py @@ -1,6 +1,7 @@ import os import argparse import glob +import json from pipeline.logs import init_logger, get_attribute from pipeline.errors import MissingVariableError @@ -12,10 +13,11 @@ 'INFO [main]': "No error recorded", 'MissingKerchunkError': "Missing the Kerchunk file", 'KerchunkDriverFatalError': "Kerchunking failed for one or more files", - 'ExpectTimeoutError': "Time remaining estimate exceeded allowed job time (scan)" + 'ExpectTimeoutError': "Time remaining estimate exceeded allowed job time (scan)", + 'BlackListProjectCode': "Problematic Project currently on blacklist so being ignored" } -phases = ['scan', 'compute', 'validate'] +phases = ['scan', 'compute', 'validate', 'complete'] checks = ['/detail-cfg.json','/*kerchunk*','/*.complete'] def format_str(string: str, length: int): @@ -138,6 +140,9 @@ def extract_keys(filepath: str, logger, savetype=None, examine=None, phase=None, else: key = log[-1][0] + if '/var/spool/slurmd' in key: + key = 'SlurmError' + logger.debug(f'Identified error type {key}') # Count error types if key in keys: @@ -204,9 +209,9 @@ def cleanup(cleantype: str, groupdir: str, logger): if 'proj_codes_1' not in p: os.system(f'rm {p}') elif cleantype == 'errors': - os.system(f'rm {groupdir}/errs/*') + os.system(f'rm -rf {groupdir}/errs/*') elif cleantype == 'outputs': - os.system(f'rm {groupdir}/outs/*') + os.system(f'rm -rf {groupdir}/outs/*') else: pass @@ -214,19 +219,23 @@ def progress_check(args, logger): """Check general progress of pipeline for a specific group. Lists progress up to the provided phase, options to save all project codes stuck at a specific phase to a repeat_id for later use.""" + if args.phase not in phases: logger.error(f'Phase not accepted here - {args.phase}') return None else: logger.info(f'Discovering dataset progress within group {args.groupID}') redo_pcodes = [] - for index, phase in enumerate(phases): + for index, phase in enumerate(phases[:-1]): # Ignore complete check as this happens as a byproduct redo_pcodes, completes = find_codes(phase, args.workdir, args.groupID, checks[index], ignore=redo_pcodes) logger.info(f'{phase}: {len(redo_pcodes)} datasets') if completes: logger.info(f'complete: {len(completes)} datasets') if phase == args.phase: break + if args.phase == 'complete': + redo_pcodes = completes + # Write pcodes if not args.repeat_label: @@ -309,11 +318,69 @@ def add_to_blacklist(args, logger): # Translate blacklist ID into project code if not already a code # Add project code to the blacklist file +def retro_errors(args, logger): + """Retrospective analysis of errors for all project codes within this group. + - Saved in ErrorSummary.json file""" + + proj_file = f'{args.groupdir}/proj_codes_1.txt' + summ_file = f'{args.groupdir}/ErrorSummary.json' + + with open(proj_file) as f: + proj_codes = [r.strip() for r in f.readlines()] + + try: + with open(summ_file) as f: + summ_refs = json.load(f) + except FileNotFoundError: + logger.info("No prior summary error file detected") + summ_refs = {} + + if args.write: + + logger.info(f"Initialising {len(proj_codes)} error entries") + # Initialise summary file + for pcode in proj_codes: + if pcode not in summ_refs: + summ_refs[pcode] = None + + errs = {} + + logs = glob.glob(f'{args.groupdir}/errs/*') + for l in logs: + logger.info(f'Reading error files for {l.split("/")[-1]}') + pcode_file = f'{l}/proj_codes.txt' + try: + with open(pcode_file) as f: + repeat_pcodes = [r.strip() for r in f.readlines()] + except FileNotFoundError: + logger.info(f'Skipped {l.split("/")[-1]} - old version') + for x, rpc in enumerate(repeat_pcodes): + rpc_err = f'{l}/{x}.err' + try: + with open(rpc_err) as f: + err = f.readlines()[-1].split(':')[0] + except FileNotFoundError: + logger.debug(f'Not able to locate error file {x}') + summ_refs[rpc] = err + + for rpc in summ_refs.keys(): + err = summ_refs[rpc] + if err not in errs: + errs[err] = 1 + else: + errs[err] += 1 + + print('Retrospective Error Summary:') + print(errs) + + + operations = { 'progress': progress_check, 'errors': error_check, 'outputs': output_check, - 'blacklist': add_to_blacklist + 'blacklist': add_to_blacklist, + 'retro': retro_errors } def assess_main(args): @@ -338,7 +405,7 @@ def assess_main(args): if __name__ == "__main__": parser = argparse.ArgumentParser(description='Run a pipeline step for a single dataset') parser.add_argument('groupID',type=str, help='Group identifier code') - parser.add_argument('operation',type=str, help='Operation to perform - choose from `progress`,`errors`,`outputs`, `blacklist`.') + parser.add_argument('operation',type=str, help=f'Operation to perform - choose from {operations.keys()}.') parser.add_argument('-B','--blacklist', dest='blacklist', help='') parser.add_argument('-R','--blacklist-reason', dest='reason', help='')