Skip to content

Commit

Permalink
Added detection of slurm errors and retrospective error check
Browse files Browse the repository at this point in the history
  • Loading branch information
dwest77a committed Feb 13, 2024
1 parent 45dcf4c commit c520cc6
Showing 1 changed file with 74 additions and 7 deletions.
81 changes: 74 additions & 7 deletions assess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import argparse
import glob
import json

from pipeline.logs import init_logger, get_attribute
from pipeline.errors import MissingVariableError
Expand All @@ -12,10 +13,11 @@
'INFO [main]': "No error recorded",
'MissingKerchunkError': "Missing the Kerchunk file",
'KerchunkDriverFatalError': "Kerchunking failed for one or more files",
'ExpectTimeoutError': "Time remaining estimate exceeded allowed job time (scan)"
'ExpectTimeoutError': "Time remaining estimate exceeded allowed job time (scan)",
'BlackListProjectCode': "Problematic Project currently on blacklist so being ignored"
}

phases = ['scan', 'compute', 'validate']
phases = ['scan', 'compute', 'validate', 'complete']
checks = ['/detail-cfg.json','/*kerchunk*','/*.complete']

def format_str(string: str, length: int):
Expand Down Expand Up @@ -138,6 +140,9 @@ def extract_keys(filepath: str, logger, savetype=None, examine=None, phase=None,
else:
key = log[-1][0]

if '/var/spool/slurmd' in key:
key = 'SlurmError'

logger.debug(f'Identified error type {key}')
# Count error types
if key in keys:
Expand Down Expand Up @@ -204,29 +209,33 @@ def cleanup(cleantype: str, groupdir: str, logger):
if 'proj_codes_1' not in p:
os.system(f'rm {p}')
elif cleantype == 'errors':
os.system(f'rm {groupdir}/errs/*')
os.system(f'rm -rf {groupdir}/errs/*')
elif cleantype == 'outputs':
os.system(f'rm {groupdir}/outs/*')
os.system(f'rm -rf {groupdir}/outs/*')
else:
pass

def progress_check(args, logger):
"""Check general progress of pipeline for a specific group.
Lists progress up to the provided phase, options to save all project codes stuck at a specific phase to a repeat_id for later use."""

if args.phase not in phases:
logger.error(f'Phase not accepted here - {args.phase}')
return None
else:
logger.info(f'Discovering dataset progress within group {args.groupID}')
redo_pcodes = []
for index, phase in enumerate(phases):
for index, phase in enumerate(phases[:-1]): # Ignore complete check as this happens as a byproduct
redo_pcodes, completes = find_codes(phase, args.workdir, args.groupID, checks[index], ignore=redo_pcodes)
logger.info(f'{phase}: {len(redo_pcodes)} datasets')
if completes:
logger.info(f'complete: {len(completes)} datasets')
if phase == args.phase:
break
if args.phase == 'complete':
redo_pcodes = completes


# Write pcodes
if not args.repeat_label:
Expand Down Expand Up @@ -309,11 +318,69 @@ def add_to_blacklist(args, logger):
# Translate blacklist ID into project code if not already a code
# Add project code to the blacklist file

def retro_errors(args, logger):
"""Retrospective analysis of errors for all project codes within this group.
- Saved in ErrorSummary.json file"""

proj_file = f'{args.groupdir}/proj_codes_1.txt'
summ_file = f'{args.groupdir}/ErrorSummary.json'

with open(proj_file) as f:
proj_codes = [r.strip() for r in f.readlines()]

try:
with open(summ_file) as f:
summ_refs = json.load(f)
except FileNotFoundError:
logger.info("No prior summary error file detected")
summ_refs = {}

if args.write:

logger.info(f"Initialising {len(proj_codes)} error entries")
# Initialise summary file
for pcode in proj_codes:
if pcode not in summ_refs:
summ_refs[pcode] = None

errs = {}

logs = glob.glob(f'{args.groupdir}/errs/*')
for l in logs:
logger.info(f'Reading error files for {l.split("/")[-1]}')
pcode_file = f'{l}/proj_codes.txt'
try:
with open(pcode_file) as f:
repeat_pcodes = [r.strip() for r in f.readlines()]
except FileNotFoundError:
logger.info(f'Skipped {l.split("/")[-1]} - old version')
for x, rpc in enumerate(repeat_pcodes):
rpc_err = f'{l}/{x}.err'
try:
with open(rpc_err) as f:
err = f.readlines()[-1].split(':')[0]
except FileNotFoundError:
logger.debug(f'Not able to locate error file {x}')
summ_refs[rpc] = err

for rpc in summ_refs.keys():
err = summ_refs[rpc]
if err not in errs:
errs[err] = 1
else:
errs[err] += 1

print('Retrospective Error Summary:')
print(errs)



operations = {
'progress': progress_check,
'errors': error_check,
'outputs': output_check,
'blacklist': add_to_blacklist
'blacklist': add_to_blacklist,
'retro': retro_errors
}

def assess_main(args):
Expand All @@ -338,7 +405,7 @@ def assess_main(args):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Run a pipeline step for a single dataset')
parser.add_argument('groupID',type=str, help='Group identifier code')
parser.add_argument('operation',type=str, help='Operation to perform - choose from `progress`,`errors`,`outputs`, `blacklist`.')
parser.add_argument('operation',type=str, help=f'Operation to perform - choose from {operations.keys()}.')

parser.add_argument('-B','--blacklist', dest='blacklist', help='')
parser.add_argument('-R','--blacklist-reason', dest='reason', help='')
Expand Down

0 comments on commit c520cc6

Please sign in to comment.