Skip to content

Commit

Permalink
Updated docstrings, added other messages, changed order of args
Browse files Browse the repository at this point in the history
  • Loading branch information
dwest77a committed Feb 15, 2024
1 parent d921625 commit 0d07d33
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 47 deletions.
5 changes: 3 additions & 2 deletions assess.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def find_codes(phase: str, workdir: str, groupID: str, check: str, ignore=[]):
redo_pcodes : list (str-like)
List of project codes to re-run for this phase.
complete : list (str-like)
List of project codes considered to be complete for the whole pipeline
"""
List of project codes considered to be complete for the whole pipeline.
"""
checkdir = f'{workdir}/in_progress/{groupID}/'
proj_codes = os.listdir(checkdir)

Expand Down
38 changes: 21 additions & 17 deletions group_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import argparse

from pipeline.logs import init_logger
from pipeline.logs import init_logger, BypassSwitch

def get_group_len(workdir, group, repeat_id=1):
"""Implement parallel reads from single 'group' file"""
Expand Down Expand Up @@ -132,29 +132,33 @@ def main(args):
parser.add_argument('phase', type=str, help='Phase of the pipeline to initiate')
parser.add_argument('groupID',type=str, help='Group identifier code')

parser.add_argument('-s',dest='source', help='Path to directory containing master scripts (this one)')
parser.add_argument('-e',dest='venvpath', help='Path to virtual (e)nvironment (excludes /bin/activate)')
# Group-run specific
parser.add_argument('-S','--source', dest='source', help='Path to directory containing master scripts (this one)')
parser.add_argument('-e','--environ',dest='venvpath', help='Path to virtual (e)nvironment (excludes /bin/activate)')
parser.add_argument('-i', '--input', dest='input', help='input file (for init phase)')

# Action-based - standard flags
parser.add_argument('-f','--forceful',dest='forceful',action='store_true', help='Force overwrite of steps if previously done')
parser.add_argument('-v','--verbose', dest='verbose', action='count', default=0, help='Print helpful statements while running')
parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' )
parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run')
parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help=BypassSwitch().help())

# Environment variables
parser.add_argument('-w','--workdir', dest='workdir', help='Working directory for pipeline')
parser.add_argument('-g','--groupdir', dest='groupdir', help='Group directory for pipeline')
parser.add_argument('-p','--proj_dir', dest='proj_dir', help='Project directory for pipeline')
parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version')
parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)')
parser.add_argument('-M','--memory', dest='memory', default=None, help='Memory allocation for this job (i.e "2G" for 2GB)')
parser.add_argument('-t','--time-allowed',dest='time_allowed', default=None, help='Time limit for this job')
parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help='Bypass all error messages - skip failed jobs')

parser.add_argument('-i', '--input', dest='input', help='input file (for init phase)')

parser.add_argument('-S','--subset', dest='subset', default=1, type=int, help='Size of subset within group')
# Single-job within group
parser.add_argument('-G','--groupID', dest='groupID', default=None, help='Group identifier label')
parser.add_argument('-t','--time-allowed',dest='time_allowed', help='Time limit for this job')
parser.add_argument('-M','--memory', dest='memory', default='2G', help='Memory allocation for this job (i.e "2G" for 2GB)')
parser.add_argument('-s','--subset', dest='subset', default=1, type=int, help='Size of subset within group')
parser.add_argument('-r','--repeat_id', dest='repeat_id', default='1', help='Repeat id (1 if first time running, <phase>_<repeat> otherwise)')

parser.add_argument('-f',dest='forceful', action='store_true', help='Force overwrite of steps if previously done')

parser.add_argument('-v','--verbose',dest='verbose' , action='count', default=0, help='Print helpful statements while running')
parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' )

parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run')
# Specialised
parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version')
parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)')

args = parser.parse_args()

Expand Down
43 changes: 27 additions & 16 deletions pipeline/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,21 @@ def __init__(self, proj_code, groupdir):
self.save()
def save(self):
upload_err(self.proj_code, self.groupdir, self.get_str())

class KerchunkDriverFatalError(KerchunkException):
"""All drivers failed (NetCDF3/Hdf5/Tiff) - run without driver bypass to assess the issue with each driver type."""
def __init__(self,verbose=0, proj_code=None, groupdir=None):
self.message = "All drivers failed when performing conversion"
super().__init__(proj_code, groupdir)
if verbose < 1:
self.__class__.__module__ = 'builtins'
def get_str(self):
return 'MissingVariableError'


class BlacklistProjectCode(KerchunkException):
"""The project code you are trying to run for is on the list of project codes to ignore."""
def __init__(self, verbose=0, proj_code=None, groupdir=None):
"""The project code you are trying to run for is on the list of project codes to ignore."""
self.message = 'Project Code listed in blacklist for bad data - will not be processed.'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -35,8 +46,8 @@ def get_str(self):
return 'BlacklistProjectCode'

class MissingVariableError(KerchunkException):
"""A variable is missing from the environment or set of arguments."""
def __init__(self, type='$', verbose=0, proj_code=None, groupdir=None):
"""A variable is missing from the environment or set of arguments."""
self.message = f'Missing variable: {type}'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -45,8 +56,8 @@ def get_str(self):
return 'MissingVariableError'

class ExpectTimeoutError(KerchunkException):
"""The process is expected to time out given timing estimates."""
def __init__(self, required=0, current='', verbose=0, proj_code=None, groupdir=None):
"""The process is expected to time out given timing estimates."""
self.message = f'Scan requires minimum {required} - current {current}'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -55,8 +66,8 @@ def get_str(self):
return 'ExpectTimeoutError'

class ExpectMemoryError(KerchunkException):
"""The process is expected to run out of memory given size estimates."""
def __init__(self, required='', current='', verbose=0, proj_code=None, groupdir=None):
"""The process is expected to run out of memory given size estimates."""
self.message = f'Scan requires minimum {required} - current {current}'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -65,8 +76,8 @@ def get_str(self):
return 'ExpectTimeoutError'

class ProjectCodeError(KerchunkException):
"""Could not find the correct project code from the list of project codes for this run."""
def __init__(self, verbose=0, proj_code=None, groupdir=None):
"""Could not find the correct project code from the list of project codes for this run."""
self.message = f'Project Code Extraction Failed'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -75,8 +86,8 @@ def get_str(self):
return 'ProjectCodeError'

class FilecapExceededError(KerchunkException):
"""During scanning, could not find suitable files within the set of files specified."""
def __init__(self, nfiles=0, verbose=0, proj_code=None, groupdir=None):
"""During scanning, could not find suitable files within the set of files specified."""
self.message = f'Filecap exceeded: {nfiles} files attempted'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -85,8 +96,8 @@ def get_str(self):
return 'FilecapExceededError'

class ChunkDataError(KerchunkException):
"""Overflow Error from pandas during decoding of chunk information, most likely caused by bad data retrieval."""
def __init__(self, verbose=0, proj_code=None, groupdir=None):
"""Overflow Error from pandas during decoding of chunk information, most likely caused by bad data retrieval."""
self.message = f'Decoding resulted in overflow - received chunk data contains junk (attempted 3 times)'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -95,8 +106,8 @@ def get_str(self):
return 'ChunkDataError'

class NoValidTimeSlicesError(KerchunkException):
"""Unable to find any time slices to test within the object."""
def __init__(self, message='Kerchunk', verbose=0, proj_code=None, groupdir=None):
"""Unable to find any time slices to test within the object."""
self.message = f'No valid timeslices found for {message}'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -105,8 +116,8 @@ def get_str(self):
return 'NoValidTimeSlicesError'

class VariableMismatchError(KerchunkException):
"""During testing, variables present in the NetCDF file are not present in Kerchunk"""
def __init__(self, missing={}, verbose=0, proj_code=None, groupdir=None):
"""During testing, variables present in the NetCDF file are not present in Kerchunk"""
self.message = f'Missing variables {missing} in Kerchunk file'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -115,8 +126,8 @@ def get_str(self):
return 'VariableMismatchError'

class ShapeMismatchError(KerchunkException):
"""Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using a subset of the Netcdf files."""
def __init__(self, var={}, first={}, second={}, verbose=0, proj_code=None, groupdir=None):
"""Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using a subset of the Netcdf files."""
self.message = f'Kerchunk/NetCDF mismatch for variable {var} with shapes - K {first} vs N {second}'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -125,8 +136,8 @@ def get_str(self):
return 'ShapeMismatchError'

class TrueShapeValidationError(KerchunkException):
"""Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using the complete set of files."""
def __init__(self, message='Kerchunk', verbose=0, proj_code=None, groupdir=None):
"""Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using the complete set of files."""
self.message = f'Kerchunk/NetCDF mismatch with shapes using full dataset - check logs'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -135,8 +146,8 @@ def get_str(self):
return 'TrueShapeValidationError'

class NoOverwriteError(KerchunkException):
"""Output file already exists and the process does not have forceful overwrite (-f) set."""
def __init__(self, verbose=0, proj_code=None, groupdir=None):
"""Output file already exists and the process does not have forceful overwrite (-f) set."""
self.message = 'Output file already exists and forceful overwrite not set.'
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -145,8 +156,8 @@ def get_str(self):
return 'NoOverwriteError'

class MissingKerchunkError(KerchunkException):
"""Kerchunk file not found."""
def __init__(self, message="No suitable kerchunk file found for validation.", verbose=0, proj_code=None, groupdir=None):
"""Kerchunk file not found."""
self.message = message
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -155,8 +166,8 @@ def get_str(self):
return 'MissingKerchunkError'

class ValidationError(KerchunkException):
"""One or more checks within validation have failed - most likely elementwise comparison of data."""
def __init__(self, message="Fatal comparison failure for Kerchunk/NetCDF", verbose=0, proj_code=None, groupdir=None):
"""One or more checks within validation have failed - most likely elementwise comparison of data."""
self.message = message
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -165,8 +176,8 @@ def get_str(self):
return 'ValidationError'

class SoftfailBypassError(KerchunkException):
"""Validation could not be completed because some arrays only contained NaN values which cannot be compared."""
def __init__(self, message="Kerchunk validation failed softly with no bypass - rerun with bypass flag", verbose=0, proj_code=None, groupdir=None):
"""Validation could not be completed because some arrays only contained NaN values which cannot be compared."""
self.message = message
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand All @@ -175,8 +186,8 @@ def get_str(self):
return 'SoftfailBypassError'

class ConcatenationError(KerchunkException):
"""Variables could not be concatenated over time and are not duplicates - no known solution"""
def __init__(self, message="Variables could not be concatenated over time and are not duplicates - no known solution", verbose=0, proj_code=None, groupdir=None):
"""Variables could not be concatenated over time and are not duplicates - no known solution"""
self.message = message
super().__init__(proj_code, groupdir)
if verbose < 1:
Expand Down
12 changes: 12 additions & 0 deletions pipeline/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ def __init__(self, switch='FDSC'):

def __str__(self):
return self.switch

def help(self):
return str("""
Bypass switch options: \n
"F" - * Skip individual file scanning errors.
"D" - * Skip driver failures - Pipeline tries different options for NetCDF (default).
- Only need to turn this skip off if all drivers fail (KerchunkFatalDriverError).
"B" - Skip Box compute errors.
"S" - * Skip Soft fails (NaN-only boxes in validation) (default).
"C" - * Skip calculation (data sum) errors (time array typically cannot be summed) (default).
"M" - Skip memory checks (validate/compute aborts if utilisation estimate exceeds cap).
""")

class FalseLogger:
def __init__(self):
Expand Down
27 changes: 15 additions & 12 deletions single_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,26 +186,29 @@ def main(args):
parser.add_argument('phase', type=str, help='Phase of the pipeline to initiate')
parser.add_argument('proj_code',type=str, help='Project identifier code')

# Action-based - standard flags
parser.add_argument('-f','--forceful',dest='forceful',action='store_true', help='Force overwrite of steps if previously done')
parser.add_argument('-v','--verbose', dest='verbose', action='count', default=0, help='Print helpful statements while running')
parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' )
parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run')
parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help=BypassSwitch().help())

# Environment variables
parser.add_argument('-w','--workdir', dest='workdir', help='Working directory for pipeline')
parser.add_argument('-g','--groupdir', dest='groupdir', help='Group directory for pipeline')
parser.add_argument('-G','--groupID', dest='groupID', default=None, help='Group identifier label')
parser.add_argument('-p','--proj_dir', dest='proj_dir', help='Project directory for pipeline')
parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version')
parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)')

# Single job within group
parser.add_argument('-G','--groupID', dest='groupID', default=None, help='Group identifier label')
parser.add_argument('-t','--time-allowed',dest='time_allowed', help='Time limit for this job')
parser.add_argument('-M','--memory', dest='memory', default='2G', help='Memory allocation for this job (i.e "2G" for 2GB)')
parser.add_argument('-b','--bypass-errs', dest='bypass', default='FDSC', help='Bypass all error messages - skip failed jobs')

parser.add_argument('-s','--subset', dest='subset', default=1, type=int, help='Size of subset within group')
parser.add_argument('-r','--repeat_id', dest='repeat_id', default='1', help='Repeat id (1 if first time running, <phase>_<repeat> otherwise)')

parser.add_argument('-f', dest='forceful', action='store_true', help='Force overwrite of steps if previously done')

parser.add_argument('-v','--verbose', dest='verbose', action='count', default=0, help='Print helpful statements while running')
parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' )

parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run')

# Specialised
parser.add_argument('-n','--new_version', dest='new_version', help='If present, create a new version')
parser.add_argument('-m','--mode', dest='mode', default=None, help='Print or record information (log or std)')

args = parser.parse_args()

success = main(args)
Expand Down

0 comments on commit 0d07d33

Please sign in to comment.