diff --git a/assess.py b/assess.py index 7e837d5..42b664c 100644 --- a/assess.py +++ b/assess.py @@ -49,8 +49,9 @@ def find_codes(phase: str, workdir: str, groupID: str, check: str, ignore=[]): redo_pcodes : list (str-like) List of project codes to re-run for this phase. complete : list (str-like) - List of project codes considered to be complete for the whole pipeline - """ + List of project codes considered to be complete for the whole pipeline. + + """ checkdir = f'{workdir}/in_progress/{groupID}/' proj_codes = os.listdir(checkdir) diff --git a/group_run.py b/group_run.py index 35dc74f..be747fa 100644 --- a/group_run.py +++ b/group_run.py @@ -3,7 +3,7 @@ import os import argparse -from pipeline.logs import init_logger +from pipeline.logs import init_logger, BypassSwitch def get_group_len(workdir, group, repeat_id=1): """Implement parallel reads from single 'group' file""" @@ -132,29 +132,33 @@ def main(args): parser.add_argument('phase', type=str, help='Phase of the pipeline to initiate') parser.add_argument('groupID',type=str, help='Group identifier code') - parser.add_argument('-s',dest='source', help='Path to directory containing master scripts (this one)') - parser.add_argument('-e',dest='venvpath', help='Path to virtual (e)nvironment (excludes /bin/activate)') + # Group-run specific + parser.add_argument('-S','--source', dest='source', help='Path to directory containing master scripts (this one)') + parser.add_argument('-e','--environ',dest='venvpath', help='Path to virtual (e)nvironment (excludes /bin/activate)') + parser.add_argument('-i', '--input', dest='input', help='input file (for init phase)') + + # Action-based - standard flags + parser.add_argument('-f','--forceful',dest='forceful',action='store_true', help='Force overwrite of steps if previously done') + parser.add_argument('-v','--verbose', dest='verbose', action='count', default=0, help='Print helpful statements while running') + parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' ) + parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run') + parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help=BypassSwitch().help()) + # Environment variables parser.add_argument('-w','--workdir', dest='workdir', help='Working directory for pipeline') parser.add_argument('-g','--groupdir', dest='groupdir', help='Group directory for pipeline') parser.add_argument('-p','--proj_dir', dest='proj_dir', help='Project directory for pipeline') - parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version') - parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)') - parser.add_argument('-M','--memory', dest='memory', default=None, help='Memory allocation for this job (i.e "2G" for 2GB)') - parser.add_argument('-t','--time-allowed',dest='time_allowed', default=None, help='Time limit for this job') - parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help='Bypass all error messages - skip failed jobs') - - parser.add_argument('-i', '--input', dest='input', help='input file (for init phase)') - parser.add_argument('-S','--subset', dest='subset', default=1, type=int, help='Size of subset within group') + # Single-job within group + parser.add_argument('-G','--groupID', dest='groupID', default=None, help='Group identifier label') + parser.add_argument('-t','--time-allowed',dest='time_allowed', help='Time limit for this job') + parser.add_argument('-M','--memory', dest='memory', default='2G', help='Memory allocation for this job (i.e "2G" for 2GB)') + parser.add_argument('-s','--subset', dest='subset', default=1, type=int, help='Size of subset within group') parser.add_argument('-r','--repeat_id', dest='repeat_id', default='1', help='Repeat id (1 if first time running, _ otherwise)') - parser.add_argument('-f',dest='forceful', action='store_true', help='Force overwrite of steps if previously done') - - parser.add_argument('-v','--verbose',dest='verbose' , action='count', default=0, help='Print helpful statements while running') - parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' ) - - parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run') + # Specialised + parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version') + parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)') args = parser.parse_args() diff --git a/pipeline/errors.py b/pipeline/errors.py index 4912428..54299cb 100644 --- a/pipeline/errors.py +++ b/pipeline/errors.py @@ -23,10 +23,21 @@ def __init__(self, proj_code, groupdir): self.save() def save(self): upload_err(self.proj_code, self.groupdir, self.get_str()) + +class KerchunkDriverFatalError(KerchunkException): + """All drivers failed (NetCDF3/Hdf5/Tiff) - run without driver bypass to assess the issue with each driver type.""" + def __init__(self,verbose=0, proj_code=None, groupdir=None): + self.message = "All drivers failed when performing conversion" + super().__init__(proj_code, groupdir) + if verbose < 1: + self.__class__.__module__ = 'builtins' + def get_str(self): + return 'MissingVariableError' + class BlacklistProjectCode(KerchunkException): + """The project code you are trying to run for is on the list of project codes to ignore.""" def __init__(self, verbose=0, proj_code=None, groupdir=None): - """The project code you are trying to run for is on the list of project codes to ignore.""" self.message = 'Project Code listed in blacklist for bad data - will not be processed.' super().__init__(proj_code, groupdir) if verbose < 1: @@ -35,8 +46,8 @@ def get_str(self): return 'BlacklistProjectCode' class MissingVariableError(KerchunkException): + """A variable is missing from the environment or set of arguments.""" def __init__(self, type='$', verbose=0, proj_code=None, groupdir=None): - """A variable is missing from the environment or set of arguments.""" self.message = f'Missing variable: {type}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -45,8 +56,8 @@ def get_str(self): return 'MissingVariableError' class ExpectTimeoutError(KerchunkException): + """The process is expected to time out given timing estimates.""" def __init__(self, required=0, current='', verbose=0, proj_code=None, groupdir=None): - """The process is expected to time out given timing estimates.""" self.message = f'Scan requires minimum {required} - current {current}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -55,8 +66,8 @@ def get_str(self): return 'ExpectTimeoutError' class ExpectMemoryError(KerchunkException): + """The process is expected to run out of memory given size estimates.""" def __init__(self, required='', current='', verbose=0, proj_code=None, groupdir=None): - """The process is expected to run out of memory given size estimates.""" self.message = f'Scan requires minimum {required} - current {current}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -65,8 +76,8 @@ def get_str(self): return 'ExpectTimeoutError' class ProjectCodeError(KerchunkException): + """Could not find the correct project code from the list of project codes for this run.""" def __init__(self, verbose=0, proj_code=None, groupdir=None): - """Could not find the correct project code from the list of project codes for this run.""" self.message = f'Project Code Extraction Failed' super().__init__(proj_code, groupdir) if verbose < 1: @@ -75,8 +86,8 @@ def get_str(self): return 'ProjectCodeError' class FilecapExceededError(KerchunkException): + """During scanning, could not find suitable files within the set of files specified.""" def __init__(self, nfiles=0, verbose=0, proj_code=None, groupdir=None): - """During scanning, could not find suitable files within the set of files specified.""" self.message = f'Filecap exceeded: {nfiles} files attempted' super().__init__(proj_code, groupdir) if verbose < 1: @@ -85,8 +96,8 @@ def get_str(self): return 'FilecapExceededError' class ChunkDataError(KerchunkException): + """Overflow Error from pandas during decoding of chunk information, most likely caused by bad data retrieval.""" def __init__(self, verbose=0, proj_code=None, groupdir=None): - """Overflow Error from pandas during decoding of chunk information, most likely caused by bad data retrieval.""" self.message = f'Decoding resulted in overflow - received chunk data contains junk (attempted 3 times)' super().__init__(proj_code, groupdir) if verbose < 1: @@ -95,8 +106,8 @@ def get_str(self): return 'ChunkDataError' class NoValidTimeSlicesError(KerchunkException): + """Unable to find any time slices to test within the object.""" def __init__(self, message='Kerchunk', verbose=0, proj_code=None, groupdir=None): - """Unable to find any time slices to test within the object.""" self.message = f'No valid timeslices found for {message}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -105,8 +116,8 @@ def get_str(self): return 'NoValidTimeSlicesError' class VariableMismatchError(KerchunkException): + """During testing, variables present in the NetCDF file are not present in Kerchunk""" def __init__(self, missing={}, verbose=0, proj_code=None, groupdir=None): - """During testing, variables present in the NetCDF file are not present in Kerchunk""" self.message = f'Missing variables {missing} in Kerchunk file' super().__init__(proj_code, groupdir) if verbose < 1: @@ -115,8 +126,8 @@ def get_str(self): return 'VariableMismatchError' class ShapeMismatchError(KerchunkException): + """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using a subset of the Netcdf files.""" def __init__(self, var={}, first={}, second={}, verbose=0, proj_code=None, groupdir=None): - """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using a subset of the Netcdf files.""" self.message = f'Kerchunk/NetCDF mismatch for variable {var} with shapes - K {first} vs N {second}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -125,8 +136,8 @@ def get_str(self): return 'ShapeMismatchError' class TrueShapeValidationError(KerchunkException): + """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using the complete set of files.""" def __init__(self, message='Kerchunk', verbose=0, proj_code=None, groupdir=None): - """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using the complete set of files.""" self.message = f'Kerchunk/NetCDF mismatch with shapes using full dataset - check logs' super().__init__(proj_code, groupdir) if verbose < 1: @@ -135,8 +146,8 @@ def get_str(self): return 'TrueShapeValidationError' class NoOverwriteError(KerchunkException): + """Output file already exists and the process does not have forceful overwrite (-f) set.""" def __init__(self, verbose=0, proj_code=None, groupdir=None): - """Output file already exists and the process does not have forceful overwrite (-f) set.""" self.message = 'Output file already exists and forceful overwrite not set.' super().__init__(proj_code, groupdir) if verbose < 1: @@ -145,8 +156,8 @@ def get_str(self): return 'NoOverwriteError' class MissingKerchunkError(KerchunkException): + """Kerchunk file not found.""" def __init__(self, message="No suitable kerchunk file found for validation.", verbose=0, proj_code=None, groupdir=None): - """Kerchunk file not found.""" self.message = message super().__init__(proj_code, groupdir) if verbose < 1: @@ -155,8 +166,8 @@ def get_str(self): return 'MissingKerchunkError' class ValidationError(KerchunkException): + """One or more checks within validation have failed - most likely elementwise comparison of data.""" def __init__(self, message="Fatal comparison failure for Kerchunk/NetCDF", verbose=0, proj_code=None, groupdir=None): - """One or more checks within validation have failed - most likely elementwise comparison of data.""" self.message = message super().__init__(proj_code, groupdir) if verbose < 1: @@ -165,8 +176,8 @@ def get_str(self): return 'ValidationError' class SoftfailBypassError(KerchunkException): + """Validation could not be completed because some arrays only contained NaN values which cannot be compared.""" def __init__(self, message="Kerchunk validation failed softly with no bypass - rerun with bypass flag", verbose=0, proj_code=None, groupdir=None): - """Validation could not be completed because some arrays only contained NaN values which cannot be compared.""" self.message = message super().__init__(proj_code, groupdir) if verbose < 1: @@ -175,8 +186,8 @@ def get_str(self): return 'SoftfailBypassError' class ConcatenationError(KerchunkException): + """Variables could not be concatenated over time and are not duplicates - no known solution""" def __init__(self, message="Variables could not be concatenated over time and are not duplicates - no known solution", verbose=0, proj_code=None, groupdir=None): - """Variables could not be concatenated over time and are not duplicates - no known solution""" self.message = message super().__init__(proj_code, groupdir) if verbose < 1: diff --git a/pipeline/logs.py b/pipeline/logs.py index 2dff9f2..b51f111 100644 --- a/pipeline/logs.py +++ b/pipeline/logs.py @@ -37,6 +37,18 @@ def __init__(self, switch='FDSC'): def __str__(self): return self.switch + + def help(self): + return str(""" +Bypass switch options: \n + "F" - * Skip individual file scanning errors. + "D" - * Skip driver failures - Pipeline tries different options for NetCDF (default). + - Only need to turn this skip off if all drivers fail (KerchunkFatalDriverError). + "B" - Skip Box compute errors. + "S" - * Skip Soft fails (NaN-only boxes in validation) (default). + "C" - * Skip calculation (data sum) errors (time array typically cannot be summed) (default). + "M" - Skip memory checks (validate/compute aborts if utilisation estimate exceeds cap). +""") class FalseLogger: def __init__(self): diff --git a/single_run.py b/single_run.py index 01eb213..ff9eac8 100644 --- a/single_run.py +++ b/single_run.py @@ -186,26 +186,29 @@ def main(args): parser.add_argument('phase', type=str, help='Phase of the pipeline to initiate') parser.add_argument('proj_code',type=str, help='Project identifier code') + # Action-based - standard flags + parser.add_argument('-f','--forceful',dest='forceful',action='store_true', help='Force overwrite of steps if previously done') + parser.add_argument('-v','--verbose', dest='verbose', action='count', default=0, help='Print helpful statements while running') + parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' ) + parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run') + parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help=BypassSwitch().help()) + + # Environment variables parser.add_argument('-w','--workdir', dest='workdir', help='Working directory for pipeline') parser.add_argument('-g','--groupdir', dest='groupdir', help='Group directory for pipeline') - parser.add_argument('-G','--groupID', dest='groupID', default=None, help='Group identifier label') parser.add_argument('-p','--proj_dir', dest='proj_dir', help='Project directory for pipeline') - parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version') - parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)') + + # Single job within group + parser.add_argument('-G','--groupID', dest='groupID', default=None, help='Group identifier label') parser.add_argument('-t','--time-allowed',dest='time_allowed', help='Time limit for this job') parser.add_argument('-M','--memory', dest='memory', default='2G', help='Memory allocation for this job (i.e "2G" for 2GB)') - parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help='Bypass all error messages - skip failed jobs') - parser.add_argument('-s','--subset', dest='subset', default=1, type=int, help='Size of subset within group') parser.add_argument('-r','--repeat_id', dest='repeat_id', default='1', help='Repeat id (1 if first time running, _ otherwise)') - parser.add_argument('-f', dest='forceful', action='store_true', help='Force overwrite of steps if previously done') - - parser.add_argument('-v','--verbose', dest='verbose', action='count', default=0, help='Print helpful statements while running') - parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' ) - - parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run') - + # Specialised + parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version') + parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)') + args = parser.parse_args() success = main(args)