diff --git a/LICENSE.txt b/LICENSE similarity index 100% rename from LICENSE.txt rename to LICENSE diff --git a/padocc/__init__.py b/padocc/__init__.py index c6544ad..21dc9fa 100644 --- a/padocc/__init__.py +++ b/padocc/__init__.py @@ -2,20 +2,7 @@ __contact__ = "daniel.westwood@stfc.ac.uk" __copyright__ = "Copyright 2024 United Kingdom Research and Innovation" -from padocc.phases import ( - ScanOperation, - KerchunkDS, - ZarrDS, - cfa_handler, - ValidateOperation -) +from .core import ProjectOperation +from .groups import GroupOperation -phase_map = { - 'scan': ScanOperation, - 'compute': { - 'kerchunk': KerchunkDS, - 'zarr': ZarrDS, - 'CFA': cfa_handler, - }, - 'validate': ValidateOperation -} \ No newline at end of file +from .phases import phase_map diff --git a/padocc/cli.py b/padocc/cli.py index cf8f91d..4137d41 100644 --- a/padocc/cli.py +++ b/padocc/cli.py @@ -6,9 +6,8 @@ import argparse -from padocc.core.utils import BypassSwitch -from padocc.operations import GroupOperation -from padocc import phase_map +from padocc.core.utils import BypassSwitch, get_attribute +from padocc import GroupOperation, phase_map def get_args(): parser = argparse.ArgumentParser(description='Run a pipeline step for a group of datasets') @@ -26,8 +25,9 @@ def get_args(): # Single-job within group parser.add_argument('-G','--groupID', dest='groupID', default=None, help='Group identifier label') - parser.add_argument('-s','--subset', dest='subset', default=1, type=int, help='Size of subset within group') + parser.add_argument('-s','--subset', dest='subset', default=None, type=int, help='Size of subset within group') parser.add_argument('-r','--repeat_id', dest='repeat_id', default='main', help='Repeat id (main if first time running, _ otherwise)') + parser.add_argument('-p','--proj_code',dest='proj_code',help='Run for a specific project code, within a group or otherwise') # Specialised parser.add_argument('-C','--cloud-format', dest='mode', default='kerchunk', help='Output format required.') @@ -43,6 +43,9 @@ def get_args(): parser.add_argument('--allow-band-increase', dest='band_increase',action='store_true', help='Allow automatic banding increase relative to previous runs.') args = parser.parse_args() + + args.workdir = get_attribute('WORKDIR', args, 'workdir') + return args def main(): @@ -65,7 +68,7 @@ def main(): ) if args.phase == 'init': - group.init_from_file(args.input_file) + group.init_from_file(args.input) return group.run( diff --git a/padocc/core/errors.py b/padocc/core/errors.py index e34e2c4..5595f20 100644 --- a/padocc/core/errors.py +++ b/padocc/core/errors.py @@ -56,7 +56,6 @@ def get_status(tb: list) -> str: else: raise err - class KerchunkException(Exception): def __init__(self, proj_code: Union[str,None], groupdir: Union[str,None]) -> None: self.proj_code = proj_code @@ -65,7 +64,7 @@ def __init__(self, proj_code: Union[str,None], groupdir: Union[str,None]) -> Non msg = getattr(self,'message') super().__init__(msg) -class PartialDriverError(KerchunkException): +class PartialDriverError(KerchunkException): # Keep """All drivers failed (NetCDF3/Hdf5/Tiff) for one or more files within the list""" def __init__( self, @@ -81,39 +80,8 @@ def __init__( def get_str(self): return 'PartialDriverError' -class NaNComparisonError(KerchunkException): - """When comparing NaN values between objects - different values found""" - def __init__( - self, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - self.message = f"NaN values do not match between comparison objects" - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'NaNComparisonError' - -class RemoteProtocolError(KerchunkException): - """All drivers failed (NetCDF3/Hdf5/Tiff) for one or more files within the list""" - def __init__( - self, - filenums: Union[int,None] = None, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - - self.message = f"All drivers failed when performing conversion for files {filenums}" - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'PartialDriverError' -class KerchunkDriverFatalError(KerchunkException): +class KerchunkDriverFatalError(KerchunkException): # Keep """All drivers failed (NetCDF3/Hdf5/Tiff) - run without driver bypass to assess the issue with each driver type.""" def __init__( self, @@ -128,55 +96,7 @@ def __init__( def get_str(self): return 'KerchunkDriverFatalError' -class IdenticalVariablesError(KerchunkException): - """All variables found to be suitably identical between files as to not stack or concatenate""" - def __init__( - self, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - self.message = "All variables are identical across files" - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'IdenticalVariablesError' - -class XKShapeToleranceError(KerchunkException): - """Attempted validation using a tolerance for shape mismatch on concat-dims, shape difference exceeds tolerance allowance.""" - def __init__( - self, - tolerance: int = 0, - diff: int = 0, - dim: str = '', - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - self.message = f"Shape difference ({diff}) exceeds allowed tolerance ({tolerance}) for dimension ({dim})" - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'XKShapeToleranceError' - -class BlacklistProjectCode(KerchunkException): - """The project code you are trying to run for is on the list of project codes to ignore.""" - def __init__( - self, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - self.message = 'Project Code listed in blacklist for bad data - will not be processed.' - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'BlacklistProjectCode' - -class MissingVariableError(KerchunkException): +class MissingVariableError(KerchunkException): # Keep """A variable is missing from the environment or set of arguments.""" def __init__( self, @@ -226,38 +146,7 @@ def __init__( def get_str(self): return 'ExpectTimeoutError' -class ProjectCodeError(KerchunkException): - """Could not find the correct project code from the list of project codes for this run.""" - def __init__( - self, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - self.message = f'Project Code Extraction Failed' - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'ProjectCodeError' - -class FilecapExceededError(KerchunkException): - """During scanning, could not find suitable files within the set of files specified.""" - def __init__( - self, - nfiles: int = 0, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - self.message = f'Filecap exceeded: {nfiles} files attempted' - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'FilecapExceededError' - -class ChunkDataError(KerchunkException): +class ChunkDataError(KerchunkException): # Keep """Overflow Error from pandas during decoding of chunk information, most likely caused by bad data retrieval.""" def __init__( self, @@ -288,64 +177,6 @@ def __init__( def get_str(self): return 'NoValidTimeSlicesError' -class VariableMismatchError(KerchunkException): - """During testing, variables present in the NetCDF file are not present in Kerchunk""" - def __init__( - self, - missing: Union[dict, None] = None, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - missing = missing or {} - - self.message = f'Missing variables {missing} in Kerchunk file' - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'VariableMismatchError' - -class ShapeMismatchError(KerchunkException): - """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using a subset of the Netcdf files.""" - def __init__( - self, - var: Union[dict,None] = None, - first: Union[dict,None] = None, - second: Union[dict,None] = None, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - - var = var or {} - first = first or {} - second = second or {} - - self.message = f'Kerchunk/NetCDF mismatch for variable {var} with shapes - K {first} vs X {second}' - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'ShapeMismatchError' - -class TrueShapeValidationError(KerchunkException): - """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using the complete set of files.""" - def __init__( - self, - message: str = 'kerchunk', - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - - self.message = f'{message} mismatch with shapes using full dataset - check logs' - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'TrueShapeValidationError' - class NoOverwriteError(KerchunkException): """Output file already exists and the process does not have forceful overwrite (-f) set.""" def __init__( @@ -362,7 +193,7 @@ def __init__( def get_str(self): return 'NoOverwriteError' -class MissingKerchunkError(KerchunkException): +class MissingKerchunkError(KerchunkException): # Keep """Kerchunk file not found.""" def __init__( self, @@ -385,14 +216,14 @@ def __init__( proj_code: Union[str,None] = None, groupdir: Union[str,None] = None ) -> None: - self.message = "Fatal Validation Error" + self.message = "Fatal Validation Error - see data report." super().__init__(proj_code, groupdir) if verbose < 1: self.__class__.__module__ = 'builtins' def get_str(self): return 'ValidationError' -class ComputeError(KerchunkException): +class ComputeError(KerchunkException): # Keep """Compute stage failed - likely due to invalid config/use of the classes""" def __init__( self, @@ -406,22 +237,6 @@ def __init__( self.__class__.__module__ = 'builtins' def get_str(self): return 'ComputeError' - -class SoftfailBypassError(KerchunkException): - """Validation could not be completed because some arrays only contained NaN values which cannot be compared.""" - def __init__( - self, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - - self.message = "Kerchunk validation failed softly with no bypass - rerun with bypass flag" - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'SoftfailBypassError' class ConcatenationError(KerchunkException): """Variables could not be concatenated over time and are not duplicates - no known solution""" @@ -439,7 +254,7 @@ def __init__( def get_str(self): return 'ConcatenationError' -class ConcatFatalError(KerchunkException): +class ConcatFatalError(KerchunkException): # Keep """Chunk sizes differ between refs - files cannot be concatenated""" def __init__( self, @@ -458,7 +273,7 @@ def __init__( def get_str(self): return 'ConcatFatalError' -class SourceNotFoundError(KerchunkException): +class SourceNotFoundError(KerchunkException): # Keep """Source File could not be located.""" def __init__( self, @@ -492,7 +307,7 @@ def __init__( def get_str(self): return 'ArchiveConnectError' -class KerchunkDecodeError(KerchunkException): +class KerchunkDecodeError(KerchunkException): # Keep """Decoding of Kerchunk file failed - likely a time array issue.""" def __init__( self, @@ -506,20 +321,4 @@ def __init__( if verbose < 1: self.__class__.__module__ = 'builtins' def get_str(self): - return 'KerchunkDecodeError' - -class FullsetRequiredError(KerchunkException): - """This project must be validated using the full set of files.""" - def __init__( - self, - verbose: int = 0, - proj_code: Union[str,None] = None, - groupdir: Union[str,None] = None - ) -> None: - - self.message = f"This project must be validated by opening the full set of files." - super().__init__(proj_code, groupdir) - if verbose < 1: - self.__class__.__module__ = 'builtins' - def get_str(self): - return 'FullsetRequiredError' \ No newline at end of file + return 'KerchunkDecodeError' \ No newline at end of file diff --git a/padocc/core/filehandlers.py b/padocc/core/filehandlers.py index 0282232..95fd4ed 100644 --- a/padocc/core/filehandlers.py +++ b/padocc/core/filehandlers.py @@ -10,9 +10,12 @@ from typing import Iterator from typing import Optional, Union import xarray as xr +import fsspec +import re -from padocc.core import LoggedOperation, FalseLogger +from .logs import LoggedOperation, FalseLogger from .utils import format_str +from .errors import KerchunkDecodeError, ChunkDataError class FileIOMixin(LoggedOperation): @@ -51,6 +54,7 @@ def __init__( logid : Optional[str] = None, dryrun : bool = False, forceful : bool = False, + thorough : bool = False, verbose : int = 0 ) -> None: """ @@ -93,7 +97,10 @@ def __init__( label=label, fh=fh, logid=logid, - verbose=verbose) + verbose=verbose, + dryrun=dryrun, + forceful=forceful, + thorough=thorough) @property def filepath(self) -> str: @@ -199,6 +206,12 @@ def append(self, newvalue: str) -> None: self._value.append(newvalue) + def pop(self, oldvalue: str) -> None: + """Remove a value from the internal list""" + self._obtain_value() + + self._value.pop(oldvalue) + def set(self, value: list) -> None: """ Reset the value as a whole for this @@ -496,12 +509,14 @@ def add_kerchunk_history(self, version_no: str) -> None: from datetime import datetime # Get current time - attrs = self.get('refs',None) + attrs = self.get_meta() if attrs is None or not isinstance(attrs,str): raise ValueError( 'Attribute "refs" not present in Kerchunk file' ) + + attrs = attrs['.zattrs'] # Format for different uses now = datetime.now() @@ -519,10 +534,73 @@ def add_kerchunk_history(self, version_no: str) -> None: else: attrs['history'] = 'Kerchunk file created on ' + now.strftime("%D") + '\n' - attrs['kerchunk_revision'] = version_no - attrs['kerchunk_creation_date'] = now.strftime("%d%m%yT%H%M%S") + attrs['padocc_revision'] = version_no + attrs['padocc_creation_date'] = now.strftime("%d%m%yT%H%M%S") - self['refs'] = attrs + self.set_meta(attrs) + + def open_dataset( + self, + fsspec_kwargs: Union[dict,None] = None, + retry: bool = False, + **kwargs) -> xr.Dataset: + """ + Open the kerchunk file as a dataset""" + + default_fsspec = {'target_options':{'compression':None}} + if fsspec_kwargs is not None: + default_fsspec.update(fsspec_kwargs) + + default_zarr = {'consolidated':False, 'decode_times':True} + default_zarr.update(kwargs) + + self.logger.info(f'Attempting to open Kerchunk JSON file') + try: + mapper = fsspec.get_mapper('reference://',fo=self.filepath, **fsspec_kwargs) + except json.JSONDecodeError as err: + self.logger.error(f"Kerchunk file {self.filepath} appears to be empty") + return None + + # Need a safe repeat here + ds = None + attempts = 0 + while attempts < 3 and not ds: + attempts += 1 + try: + ds = xr.open_zarr(mapper, **default_zarr) + except OverflowError: + ds = None + except KeyError as err: + if re.match('.*https.*',str(err)) and not retry: + # RemoteProtocol is not https - retry with correct protocol + self.logger.warning('Found KeyError "https" on opening the Kerchunk file - retrying with local filepaths.') + return self.open_dataset(fsspec_kwargs=fsspec_kwargs, retry=True) + else: + raise err + except Exception as err: + if 'decode' in str(err): + raise KerchunkDecodeError + raise err #MissingKerchunkError(message=f'Failed to open kerchunk file {kfile}') + if not ds: + raise ChunkDataError + self.logger.debug('Successfully opened Kerchunk with virtual xarray ds') + return ds + + def get_meta(self): + """ + Obtain the metadata dictionary + """ + return self._value['refs']['.zattrs'] + + def set_meta(self, values: dict): + """ + Reset the metadata dictionary + """ + if 'refs' not in self._value: + raise ValueError( + 'Cannot reset metadata for a file with no existing values.' + ) + self._value['refs']['.zattrs'] = values class GenericStore(LoggedOperation): """ @@ -542,6 +620,7 @@ def __init__( logid : Optional[str] = None, dryrun : bool = False, forceful : bool = False, + thorough : bool = False, verbose : int = 0 ) -> None: @@ -552,8 +631,11 @@ def __init__( self._meta: JSONFileHandler = JSONFileHandler( self.store_path, metadata_name) - self._dryrun: bool = dryrun - self._forceful: bool = forceful + self._set_fh_kwargs( + forceful=forceful, + dryrun=dryrun, + thorough=thorough + ) # All filehandlers are logged operations super().__init__( @@ -562,7 +644,22 @@ def __init__( fh=fh, logid=logid, verbose=verbose) - + + def _update_history( + self, + addition: str, + new_version: str, + ) -> None: + + attrs = self._meta['refs']['.zattrs'] + now = datetime.now() + + attrs['history'].append(addition) + attrs['padocc_revision'] = new_version + attrs['padocc_last_changed'] = now.strftime("%d%m%yT%H%M%S") + + self._meta['refs']['.zattrs'] = attrs + @property def store_path(self) -> str: """Assemble the store path""" @@ -579,9 +676,30 @@ def clear(self) -> None: f'Store "{self._store_name}" in dryrun mode.' ) - def open(self, engine: str = 'zarr', **open_kwargs) -> xr.Dataset: - """Open the store as a dataset (READ_ONLY)""" - return xr.open_dataset(self.store_path, engine=engine,**open_kwargs) + @property + def is_empty(self) -> bool: + """ + Check if the store contains any data + """ + if not os.path.exists(self.store_path): + return True + return len(os.listdir(self.store_path)) == 0 + + def get_meta(self): + """ + Obtain the metadata dictionary + """ + return self._meta['refs']['.zattrs'] + + def set_meta(self, values: dict): + """ + Reset the metadata dictionary + """ + if 'refs' not in self._meta: + raise ValueError( + 'Cannot reset metadata for a file with no existing values.' + ) + self._meta['refs']['.zattrs'] = values def __contains__(self, key: str) -> bool: """ @@ -627,11 +745,11 @@ def __repr__(self) -> str: """Programmatic representation""" return f'' - def open(self, *args, **zarr_kwargs) -> xr.Dataset: + def open_dataset(self, **zarr_kwargs) -> xr.Dataset: """ Open the ZarrStore as an xarray dataset """ - return super().open(engine='zarr',**zarr_kwargs) + return xr.open_dataset(self.store_path, engine='zarr', **zarr_kwargs) class KerchunkStore(GenericStore): """ @@ -657,11 +775,39 @@ def __repr__(self) -> str: """Programmatic representation""" return f'' - def open(self, *args, **parquet_kwargs) -> xr.Dataset: + def open_dataset( + self, + rfs_kwargs: Union[dict,None] = None, + **parquet_kwargs + ) -> xr.Dataset: """ Open the Parquet Store as an xarray dataset """ - raise NotImplementedError + self.logger.debug('Opening Kerchunk Parquet store') + + default_rfs = { + 'remote_protocol':'file', + 'target_protocol':'file', + 'lazy':True + } + if rfs_kwargs is not None: + default_rfs.update(rfs_kwargs) + + default_parquet = { + 'backend_kwargs':{"consolidated": False, "decode_times": False} + } + default_parquet.update(parquet_kwargs) + + from fsspec.implementations.reference import ReferenceFileSystem + fs = ReferenceFileSystem( + self.filepath, + **default_rfs) + + return xr.open_dataset( + fs.get_mapper(), + engine="zarr", + **default_parquet + ) class LogFileHandler(ListFileHandler): """Log File handler for padocc phase logs.""" @@ -727,4 +873,32 @@ def update_status( status = status.replace(',', '.').replace('\n','.') addition = f'{phase},{status},{datetime.now().strftime("%H:%M %D")},{jobid}' self.append(addition) - self.logger.info(f'Updated new status: {phase} - {status}') \ No newline at end of file + self.logger.info(f'Updated new status: {phase} - {status}') + +class CFADataset: + """ + Basic handler for CFA dataset + """ + + def __init__(self, filepath, identifier): + + if 'CFA' not in xr.backends.list_engines(): + raise ImportError( + 'CFA Engine Module not found, see the documentation ' + 'at https://github.com/cedadev/CFAPyX' + ) + + self._filepath = filepath + self._ident = identifier + + def __str__(self) -> str: + """String representation of CFA Dataset""" + return f'' + + def __repr__(self) -> str: + """Programmatic representation of CFA Dataset""" + return self.__str__ + + def open_dataset(self, **kwargs) -> xr.Dataset: + """Open the CFA Dataset [READ-ONLY]""" + return xr.open_dataset(self._filepath, engine='CFA',**kwargs) \ No newline at end of file diff --git a/padocc/core/logs.py b/padocc/core/logs.py index a42ef17..ca9fd61 100644 --- a/padocc/core/logs.py +++ b/padocc/core/logs.py @@ -46,12 +46,18 @@ def __init__( logger : Union[logging.Logger,FalseLogger, None] = None, label : Union[str,None] = None, fh : Union[str,None] = None, - logid : Union[str,None] = None, + logid : Union[str,None] = None, + forceful: bool = None, + dryrun : bool = None, + thorough: bool = None, verbose: int = 0 ) -> None: self._logid = logid self._verbose = verbose + + self._set_fh_kwargs(forceful=forceful, dryrun=dryrun, thorough=thorough) + if hasattr(self, 'logger'): return if logger is None: @@ -63,6 +69,28 @@ def __init__( else: self.logger = logger + def values(self): + print(f' - forceful: {bool(self._forceful)}') + print(f' - thorough: {bool(self._thorough)}') + print(f' - dryrun: {bool(self._dryrun)}') + + @property + def fh_kwargs(self): + return { + 'dryrun': self._dryrun, + 'forceful': self._forceful, + 'thorough': self._thorough, + } + + @fh_kwargs.setter + def fh_kwargs(self, value): + self._set_fh_kwargs(**value) + + def _set_fh_kwargs(self, forceful=None, dryrun=None, thorough=None): + self._forceful = forceful + self._dryrun = dryrun + self._thorough = thorough + def reset_file_handler( logger : logging.Logger, verbose : int, diff --git a/padocc/core/mixins.py b/padocc/core/mixins.py deleted file mode 100644 index c7aabf0..0000000 --- a/padocc/core/mixins.py +++ /dev/null @@ -1,284 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" - -import os -import logging - -from .logs import LoggedOperation, levels -from .utils import BypassSwitch - -class DirectoryMixin(LoggedOperation): - """ - Container class for Operations which require functionality to create - directories (workdir, groupdir, cache etc.) - """ - - def __init__( - self, - workdir : str, - groupID : str = None, - forceful: bool = None, - dryrun : bool = None, - thorough: bool = None, - logger : logging.Logger = None, - bypass : BypassSwitch = None, - label : str = None, - fh : str = None, - logid : str = None, - verbose : int = 0 - ): - - self.workdir = workdir - self.groupID = groupID - - self._thorough = thorough - self._bypass = bypass - - if verbose in levels: - verbose = levels.index(verbose) - - self._set_fh_kwargs(forceful=forceful, dryrun=dryrun) - - super().__init__( - logger, - label=label, - fh=fh, - logid=logid, - verbose=verbose) - - def values(self): - print(f' - forceful: {bool(self._forceful)}') - print(f' - verbose: {bool(self._verbose)}') - print(f' - dryrun: {bool(self._dryrun)}') - - @property - def fh_kwargs(self): - return { - 'dryrun': self._dryrun, - 'forceful': self._forceful, - 'verbose': self._verbose, - } - - @fh_kwargs.setter - def fh_kwargs(self, value): - self._set_fh_kwargs(**value) - - def _set_fh_kwargs(self, forceful=None, dryrun=None, verbose=None): - self._forceful = forceful - self._dryrun = dryrun - self._verbose = verbose - - def _setup_workdir(self): - if not os.path.isdir(self.workdir): - if self._dryrun: - self.logger.debug(f'DRYRUN: Skip making workdir {self.workdir}') - else: - os.makedirs(self.workdir) - - def _setup_groupdir(self): - if self.groupID: - # Create group directory - if not os.path.isdir(self.groupdir): - if self._dryrun: - self.logger.debug(f'DRYRUN: Skip making groupdir {self.groupdir}') - else: - os.makedirs(self.groupdir) - - def _setup_directories(self): - self._setup_workdir() - self._setup_groupdir() - - def _setup_cache(self): - self.cache = f'{self.dir}/cache' - - if not os.path.isdir(self.cache): - os.makedirs(self.cache) - if self._thorough: - os.system(f'rm -rf {self.cache}/*') - - @property - def groupdir(self): - if self.groupID: - return f'{self.workdir}/groups/{self.groupID}' - else: - raise ValueError( - 'Operation has no "groupID" so cannot construct a "groupdir".' - ) - - def setup_slurm_directories(self): - # Make Directories - for dirx in ['sbatch','errs']: - if not os.path.isdir(f'{self.dir}/{dirx}'): - if self._dryrun: - self.logger.debug(f"DRYRUN: Skipped creating {dirx}") - continue - os.makedirs(f'{self.dir}/{dirx}') - -class EvaluationsMixin: - - def set_last_run(self, phase: str, time : str) -> None: - """ - Set the phase and time of the last run for this project. - """ - lr = (phase, time) - self.base_cfg['last_run'] = lr - - def get_last_run(self) -> tuple: - """ - Get the tuple-value for this projects last run.""" - return self.base_cfg['last_run'] - - def get_last_status(self) -> str: - """ - Gets the last line of the correct log file - """ - return self.status_log[-1] - - def get_log_contents(self, phase: str) -> str: - """ - Get the contents of the log file as a string - """ - - if phase in self.phase_logs: - return str(self.phase_logs[phase]) - self.logger.warning(f'Phase "{phase}" not recognised - no log file retrieved.') - return '' - - def show_log_contents(self, phase: str, halt : bool = False): - """ - Format the contents of the log file to print. - """ - - logfh = self.get_log_contents(phase=phase) - status = self.status_log[-1].split(',') - self.logger.info(logfh) - - self.logger.info(f'Project Code: {self.proj_code}') - self.logger.info(f'Status: {status}') - - self.logger.info(self._rerun_command()) - - if halt: - paused = input('Type "E" to exit assessment:') - if paused == 'E': - raise KeyboardInterrupt - - def delete_project(self, ask: bool = True): - """ - Delete a project - """ - if self._dryrun: - self.logger.info('Skipped Deleting directory in dryrun mode.') - return - if ask: - inp = input(f'Are you sure you want to delete {self.proj_code}? (Y/N)?') - if inp != 'Y': - self.logger.info(f'Skipped Deleting directory (User entered {inp})') - return - - os.system(f'rm -rf {self.dir}') - self.logger.info(f'All internal files for {self.proj_code} deleted.') - - def _rerun_command(self): - """ - Setup for running this specific component interactively. - """ - return '' - -class PropertiesMixin: - - def _check_override(self, key, mapper) -> str: - if self.base_cfg['override'][key] is not None: - return self.base_cfg['override'][key] - - if self.detail_cfg[mapper] is not None: - self.base_cfg['override'][key] = self.detail_cfg[mapper] - self.base_cfg.close() - return self.base_cfg['override'][key] - - return None - - @property - def outpath(self): - return f'{self.dir}/{self.outproduct}' - - @property - def outproduct(self): - if self.stage == 'complete': - return f'{self.proj_code}.{self.revision}.{self.file_type}' - else: - vn = f'{self.revision}a' - if self._is_trial: - vn = f'trial-{vn}' - return f'{vn}.{self.file_type}' - - @property - def revision(self) -> str: - - if self.cloud_format is None: - raise ValueError( - 'Cloud format not set, revision is unknown' - ) - - if self.file_type is not None: - return ''.join((self.cloud_format[0],self.file_type[0],self.version_no)) - else: - return ''.join((self.cloud_format[0],self.version_no)) - - @property - def version_no(self) -> str: - - return self.base_cfg['version_no'] - - @property - def cloud_format(self) -> str: - return self._check_override('cloud_type','scanned_with') or 'kerchunk' - - @cloud_format.setter - def cloud_format(self, value): - self.base_cfg['override']['cloud_type'] = value - - @property - def file_type(self) -> str: - """ - Return the file type for this project. - """ - - return self._check_override('file_type','type') - - @file_type.setter - def file_type(self, value): - - type_map = { - 'kerchunk': ['json','parq'], - } - - if self.cloud_format in type_map: - if value in type_map[self.cloud_format]: - self.base_cfg['override']['file_type'] = value - else: - raise ValueError( - f'Could not set property "file_type:{value} - accepted ' - f'values for format: {self.cloud_format} are {type_map.get(self.cloud_format,None)}.' - ) - else: - raise ValueError( - f'Could not set property "file_type:{value}" - cloud format ' - f'{self.cloud_format} does not accept alternate types.' - ) - - @property - def source_format(self) -> str: - return self.detail_cfg.get(index='driver', default=None) - - def minor_version_increment(self): - """ - Use this function for when properties of the cloud file have been changed.""" - raise NotImplementedError - - def major_version_increment(self): - """ - Use this function for major changes to the cloud file - - e.g. replacement of source file data.""" - raise NotImplementedError diff --git a/padocc/core/mixins/__init__.py b/padocc/core/mixins/__init__.py new file mode 100644 index 0000000..988979f --- /dev/null +++ b/padocc/core/mixins/__init__.py @@ -0,0 +1,4 @@ +from .directory import DirectoryMixin +from .dataset import DatasetHandlerMixin +from .properties import PropertiesMixin +from .status import StatusMixin \ No newline at end of file diff --git a/padocc/core/mixins/dataset.py b/padocc/core/mixins/dataset.py new file mode 100644 index 0000000..8103a12 --- /dev/null +++ b/padocc/core/mixins/dataset.py @@ -0,0 +1,152 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" + +from typing import Union +import xarray as xr +import os + +from ..filehandlers import ( + KerchunkFile, + KerchunkStore, + ZarrStore, + CFADataset, + GenericStore +) + +class DatasetHandlerMixin: + """ + Mixin class for properties relating to opening products. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: ProjectOperation [ONLY] + """ + + @property + def kfile(self) -> Union[KerchunkFile,None]: + """ + Retrieve the kfile filehandler, create if not present + """ + + if self.cloud_format != 'kerchunk': + return None + + if self.file_type != 'json': + return None + + if self._kfile is None: + self._kfile = KerchunkFile( + self.dir, + self.outproduct + ) + + return self._kfile + + @property + def kstore(self) -> Union[KerchunkStore,None]: + """ + Retrieve the kstore filehandler, create if not present + """ + if self.cloud_format != 'kerchunk': + return None + + if self.file_type == 'json': + return None + + if self._kfile is None: + self._kfile = KerchunkStore( + self.dir, + self.outproduct + ) + + return self._kfile + + @property + def dataset( + self + ) -> Union[KerchunkFile,GenericStore, CFADataset, None]: + + if self.cloud_format is None: + raise ValueError( + f'Dataset for {self.proj_code} does not exist yet.' + ) + + if self.cloud_format == 'kerchunk': + if self.file_type == 'parq': + return self.kstore + else: + return self.kfile + elif self.cloud_format == 'zarr': + return self.zstore + elif self.cloud_format == 'cfa': + return self.cfa_dataset + else: + raise ValueError( + f'Unrecognised cloud format {self.cloud_format}' + ) + + @property + def cfa_dataset(self) -> xr.Dataset: + """ + Retrieve a read-only xarray representation + of a CFA dataset""" + + if not self._cfa_dataset: + self._cfa_dataset = CFADataset( + self.cfa_path, + self.proj_code + ) + + return self._cfa_dataset + + @property + def cfa_path(self) -> str: + return f'{self.dir}/{self.proj_code}.nca' + + @property + def zstore(self) -> Union[ZarrStore, None]: + """ + Retrieve a filehandler for the zarr store""" + + if self.cloud_format != 'zarr': + return None + + if self._zstore is None: + self._zstore = ZarrStore( + self.dir, + self.outproduct + ) + + return self._zstore + + def update_attribute( + self, + attribute, + value, + target: str = 'kfile', + ): + """ + Update an attribute within a + dataset representation's metadata. + """ + + if hasattr(self,target): + meta = getattr(self,target).get_meta() + + meta[attribute] = value + + getattr(self, target).set_meta(meta) + + @property + def dataset_attributes(self): + """ + Fetch a dictionary of the metadata for the dataset + where possible. + """ \ No newline at end of file diff --git a/padocc/core/mixins/directory.py b/padocc/core/mixins/directory.py new file mode 100644 index 0000000..fac0d53 --- /dev/null +++ b/padocc/core/mixins/directory.py @@ -0,0 +1,112 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" + +import os +import logging + +from ..logs import LoggedOperation, levels +from ..utils import BypassSwitch + +class DirectoryMixin(LoggedOperation): + """ + Container class for Operations which require functionality to create + directories (workdir, groupdir, cache etc.) + + This Mixin enables all child classes the ability + to manipulate the filesystem to create new directories + as required, and handles the so-called fh-kwargs, which + relate to forceful overwrites of filesystem objects, + skipping creation or starting from scratch, all relating + to the filesystem. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: ProjectOperation, GroupOperation + """ + + def __init__( + self, + workdir : str, + groupID : str = None, + forceful: bool = None, + dryrun : bool = None, + thorough: bool = None, + logger : logging.Logger = None, + bypass : BypassSwitch = None, + label : str = None, + fh : str = None, + logid : str = None, + verbose : int = 0 + ): + + self.workdir = workdir + self.groupID = groupID + + self._bypass = bypass + + if verbose in levels: + verbose = levels.index(verbose) + + super().__init__( + logger, + label=label, + fh=fh, + logid=logid, + verbose=verbose, + forceful=forceful, + dryrun=dryrun, + thorough=thorough) + + def _setup_workdir(self): + if self.workdir is None: + raise ValueError( + 'Working directory not defined.' + 'If using the CLI tool, please specify working directory with -w' + ) + + if not os.path.isdir(self.workdir): + if self._dryrun: + self.logger.debug(f'DRYRUN: Skip making workdir {self.workdir}') + else: + os.makedirs(self.workdir) + + def _setup_groupdir(self): + if self.groupID: + # Create group directory + if not os.path.isdir(self.groupdir): + if self._dryrun: + self.logger.debug(f'DRYRUN: Skip making groupdir {self.groupdir}') + else: + os.makedirs(self.groupdir) + + def _setup_directories(self): + """ + Ensure working and group directories are created.""" + self._setup_workdir() + self._setup_groupdir() + + def _setup_cache(self, dir): + """ + Set up the personal cache for this directory object""" + self.cache = f'{dir}/cache' + + if not os.path.isdir(self.cache): + os.makedirs(self.cache) + if self._thorough: + os.system(f'rm -rf {self.cache}/*') + + @property + def groupdir(self): + if self.groupID: + return f'{self.workdir}/groups/{self.groupID}' + else: + raise ValueError( + 'Operation has no "groupID" so cannot construct a "groupdir".' + ) \ No newline at end of file diff --git a/padocc/core/mixins/properties.py b/padocc/core/mixins/properties.py new file mode 100644 index 0000000..46a0cf8 --- /dev/null +++ b/padocc/core/mixins/properties.py @@ -0,0 +1,124 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" + +class PropertiesMixin: + """ + Properties relating to the ProjectOperation class that + are stored separately for convenience and easier debugging. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: ProjectOperation [ONLY] + """ + + def _check_override(self, key, mapper) -> str: + if self.base_cfg['override'][key] is not None: + return self.base_cfg['override'][key] + + if self.detail_cfg[mapper] is not None: + self.base_cfg['override'][key] = self.detail_cfg[mapper] + self.base_cfg.close() + return self.base_cfg['override'][key] + + return None + + @property + def outpath(self): + return f'{self.dir}/{self.outproduct}' + + @property + def outproduct(self): + if self.stage == 'complete': + return f'{self.proj_code}.{self.revision}' + else: + vn = f'{self.revision}a' + if self._is_trial: + vn = f'trial-{vn}' + return vn + + @property + def revision(self) -> str: + + if self.cloud_format is None: + raise ValueError( + 'Cloud format not set, revision is unknown' + ) + + if self.file_type is not None: + return ''.join((self.cloud_format[0],self.file_type[0],self.version_no)) + else: + return ''.join((self.cloud_format[0],self.version_no)) + + @property + def version_no(self) -> str: + + return self.base_cfg['version_no'] + + @property + def cloud_format(self) -> str: + return self._check_override('cloud_type','scanned_with') or 'kerchunk' + + @cloud_format.setter + def cloud_format(self, value): + self.base_cfg['override']['cloud_type'] = value + + @property + def file_type(self) -> str: + """ + Return the file type for this project. + """ + + return self._check_override('file_type','type') + + @file_type.setter + def file_type(self, value): + + type_map = { + 'kerchunk': ['json','parq'], + 'zarr':[None], + } + + if self.cloud_format in type_map: + if value in type_map[self.cloud_format]: + self.base_cfg['override']['file_type'] = value + else: + raise ValueError( + f'Could not set property "file_type:{value} - accepted ' + f'values for format: {self.cloud_format} are {type_map.get(self.cloud_format,None)}.' + ) + else: + raise ValueError( + f'Could not set property "file_type:{value}" - cloud format ' + f'{self.cloud_format} does not accept alternate types.' + ) + + @property + def source_format(self) -> str: + return self.detail_cfg.get(index='driver', default=None) + + def minor_version_increment(self): + """ + Use this function for when properties of the cloud file have been changed.""" + + major, minor = self.version_no.split('.') + minor = str(int(minor)+1) + + self.version_no = f'{major}.{minor}' + + def major_version_increment(self): + """ + Use this function for major changes to the cloud file + - e.g. replacement of source file data.""" + raise NotImplementedError + + major, minor = self.version_no.split('.') + major = str(int(major)+1) + + self.version_no = f'{major}.{minor}' diff --git a/padocc/core/mixins/status.py b/padocc/core/mixins/status.py new file mode 100644 index 0000000..2d24be0 --- /dev/null +++ b/padocc/core/mixins/status.py @@ -0,0 +1,73 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" + +class StatusMixin: + + """ + Methods relating to the ProjectOperation class, in terms + of determining the status of previous runs. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: ProjectOperation [ONLY] + """ + + def set_last_run(self, phase: str, time : str) -> None: + """ + Set the phase and time of the last run for this project. + """ + lr = (phase, time) + self.base_cfg['last_run'] = lr + + def get_last_run(self) -> tuple: + """ + Get the tuple-value for this projects last run.""" + return self.base_cfg['last_run'] + + def get_last_status(self) -> str: + """ + Gets the last line of the correct log file + """ + return self.status_log[-1] + + def get_log_contents(self, phase: str) -> str: + """ + Get the contents of the log file as a string + """ + + if phase in self.phase_logs: + return str(self.phase_logs[phase]) + self.logger.warning(f'Phase "{phase}" not recognised - no log file retrieved.') + return '' + + def show_log_contents(self, phase: str, halt : bool = False): + """ + Format the contents of the log file to print. + """ + + logfh = self.get_log_contents(phase=phase) + status = self.status_log[-1].split(',') + self.logger.info(logfh) + + self.logger.info(f'Project Code: {self.proj_code}') + self.logger.info(f'Status: {status}') + + self.logger.info(self._rerun_command()) + + if halt: + paused = input('Type "E" to exit assessment:') + if paused == 'E': + raise KeyboardInterrupt + + def _rerun_command(self): + """ + Setup for running this specific component interactively. + """ + return f'padocc -G {self.groupID} -p {self.proj_code} -vvv' diff --git a/padocc/core/project.py b/padocc/core/project.py index 8e86de2..ae1a650 100644 --- a/padocc/core/project.py +++ b/padocc/core/project.py @@ -9,22 +9,22 @@ from typing import Union from .errors import error_handler -from .utils import extract_file, BypassSwitch, apply_substitutions, phases, file_configs +from .utils import extract_file, BypassSwitch, apply_substitutions, phases, file_configs, FILE_DEFAULT from .logs import reset_file_handler -from .mixins import DirectoryMixin, EvaluationsMixin, PropertiesMixin +from .mixins import DirectoryMixin, DatasetHandlerMixin, StatusMixin, PropertiesMixin from .filehandlers import ( JSONFileHandler, CSVFileHandler, ListFileHandler, LogFileHandler, - KerchunkFile ) class ProjectOperation( DirectoryMixin, - EvaluationsMixin, + DatasetHandlerMixin, + StatusMixin, PropertiesMixin): """ PADOCC Project Operation class, able to access project files @@ -152,11 +152,13 @@ def __init__( **self.fh_kwargs ) - self.kfile = None - self.kstore = None - self.zstore = None + self._kfile = None + self._kstore = None + self._zstore = None + self._cfa_dataset = None self._is_trial = False + self.stage = None def __str__(self): @@ -217,6 +219,13 @@ def run( if dryrun is not None: self._dryrun = dryrun + if self.cloud_format != mode: + self.logger.info( + f'Switching cloud format to {mode}' + ) + self.cloud_format = mode + self.file_type = FILE_DEFAULT[mode] + try: status = self._run(mode=mode, **kwargs) self.save_files() @@ -236,17 +245,6 @@ def _run(self, **kwargs) -> None: # Default project operation run. self.logger.info("Nothing to run with this setup!") - def create_new_kfile(self, product : str) -> None: - self.kfile = KerchunkFile( - self.dir, - product, - logger=self.logger, - **self.fh_kwargs - ) - - def create_new_kstore(self, product: str) -> None: - raise NotImplementedError - @property def dir(self): if self.groupID: @@ -254,23 +252,32 @@ def dir(self): else: return f'{self.workdir}/in_progress/general/{self.proj_code}' - @property - def cfa_path(self): - return f'{self.dir}/{self.proj_code}.nca' - - def dir_exists(self, checkdir : str = None): - if not checkdir: - checkdir = self.dir - - if os.path.isdir(checkdir): - return True - return False - def file_exists(self, file : str): - """Check if a named file exists (without extension)""" + """ + Check if a named file exists (without extension). + This can be any generic filehandler attached.""" if hasattr(self, file): fhandle = getattr(self, file) return fhandle.file_exists() + + def delete_project(self, ask: bool = True): + """ + Delete a project + """ + if self._dryrun: + self.logger.info('Skipped Deleting directory in dryrun mode.') + return + if ask: + inp = input(f'Are you sure you want to delete {self.proj_code}? (Y/N)?') + if inp != 'Y': + self.logger.info(f'Skipped Deleting directory (User entered {inp})') + return + + os.system(f'rm -rf {self.dir}') + self.logger.info(f'All internal files for {self.proj_code} deleted.') + + def migrate(self, newgroupID: str): + pass def update_status( self, @@ -347,8 +354,22 @@ def _setup_config( config['substitutions'] = substitutions self.base_cfg.set(config) + def _dir_exists(self, checkdir : str = None): + """ + Check a directory exists on the filesystem + """ + if not checkdir: + checkdir = self.dir + + if os.path.isdir(checkdir): + return True + return False + def _create_dirs(self, first_time : bool = None): - if not self.dir_exists(): + """ + Create Project directory and other required directories + """ + if not self._dir_exists(): if self._dryrun: self.logger.debug(f'DRYRUN: Skip making project directory for: "{self}"') else: @@ -358,7 +379,7 @@ def _create_dirs(self, first_time : bool = None): self.logger.warning(f'"{self.dir}" already exists.') logdir = f'{self.dir}/phase_logs' - if not self.dir_exists(logdir): + if not self._dir_exists(logdir): if self._dryrun: self.logger.debug(f'DRYRUN: Skip making phase_logs directory for: "{self}"') else: diff --git a/padocc/core/utils.py b/padocc/core/utils.py index 43499fc..44575c5 100644 --- a/padocc/core/utils.py +++ b/padocc/core/utils.py @@ -10,7 +10,7 @@ import numpy as np import re -from padocc.core.errors import ( +from .errors import ( MissingVariableError, MissingKerchunkError, ChunkDataError, @@ -46,8 +46,8 @@ 'identical_vars':'Unknown' }, 'override':{ - 'cloud_type':None, - 'file_type':None + 'cloud_type':'kerchunk', + 'file_type':'json' # Default values }, 'last_run': (None, None), } @@ -67,6 +67,11 @@ 'detail_cfg':DETAIL_CFG } +FILE_DEFAULT = { + 'kerchunk':'json', + 'zarr':None, +} + class BypassSwitch: """Class to represent all bypass switches throughout the pipeline. Requires a switch string which is used to enable/disable specific pipeline @@ -159,7 +164,7 @@ def open_kerchunk(kfile: str, logger, isparq=False, retry=False, attempt=1, **kw logger.debug('Successfully opened Kerchunk with virtual xarray ds') return ds -def get_attribute(env: str, value: str) -> str: +def get_attribute(env: str, args, value: str) -> str: """ Assemble environment variable or take from passed argument. Find value of variable from Environment or ParseArgs object, or reports failure. @@ -172,7 +177,7 @@ def get_attribute(env: str, value: str) -> str: :returns: Value of either environment variable or argparse value. """ - if value is None: + if getattr(args, value) is None: if not os.getenv(env): raise MissingVariableError(vtype=env) else: diff --git a/padocc/operations/__init__.py b/padocc/groups/__init__.py similarity index 100% rename from padocc/operations/__init__.py rename to padocc/groups/__init__.py diff --git a/padocc/operations/group.py b/padocc/groups/group.py similarity index 92% rename from padocc/operations/group.py rename to padocc/groups/group.py index 0712e4e..b98fd11 100644 --- a/padocc/operations/group.py +++ b/padocc/groups/group.py @@ -103,9 +103,9 @@ def __init__( self._setup_directories() self.proj_codes = {} - self.blacklist_codes = CSVFileHandler( + self.faultlist_codes = CSVFileHandler( self.groupdir, - 'blacklist_codes', + 'faultlist_codes', logger=self.logger, dryrun=self._dryrun, forceful=self._forceful, @@ -138,13 +138,6 @@ def __getitem__(self, index: int) -> ProjectOperation: @property def proj_codes_dir(self): return f'{self.groupdir}/proj_codes' - - @property - def new_inputfile(self): - if self.groupID: - return f'{self.workdir}/groups/filelists/{self.groupID}.txt' - else: - raise NotImplementedError def merge(group_A,group_B): """ @@ -152,7 +145,7 @@ def merge(group_A,group_B): 1. Migrate all projects from B to A and reset groupID values. 2. Combine datasets.csv 3. Combine project codes - 4. Combine blacklists. + 4. Combine faultlists. """ new_proj_dir = f'{group_A.workdir}/in_progress/{group_A.groupID}' @@ -175,12 +168,12 @@ def merge(group_A,group_B): group_B.datasets.remove_file() group_A.logger.debug(f'Removed dataset file for {group_B.groupID}') - # Blacklists - group_A.blacklist_codes.set( - group_A.blacklist_codes.get() + group_B.blacklist_codes.get() + # faultlists + group_A.faultlist_codes.set( + group_A.faultlist_codes.get() + group_B.faultlist_codes.get() ) - group_B.blacklist_codes.remove_file() - group_A.logger.debug(f'Removed blacklist file for {group_B.groupID}') + group_B.faultlist_codes.remove_file() + group_A.logger.debug(f'Removed faultlist file for {group_B.groupID}') # Subsets for name, subset in group_B.proj_codes.items(): @@ -203,7 +196,7 @@ def unmerge(group_A, group_B, dataset_list: list): according to the list 1. Migrate projects 2. Set the datasets - 3. Set the blacklists + 3. Set the faultlists 4. Project codes (remove group B sections)""" group_A.logger.info( @@ -231,17 +224,17 @@ def unmerge(group_A, group_B, dataset_list: list): group_A.logger.debug(f"Created datasets file for {group_B.groupID}") - # Set blacklist - A_blacklist, B_blacklist = [],[] - for bl in group_A.blacklist_codes: + # Set faultlist + A_faultlist, B_faultlist = [],[] + for bl in group_A.faultlist_codes: if bl in dataset_list: - B_blacklist.append(bl) + B_faultlist.append(bl) else: - A_blacklist.append(bl) + A_faultlist.append(bl) - group_A.blacklist_codes.set(A_blacklist) - group_B.blacklist_codes.set(B_blacklist) - group_A.logger.debug(f"Created blacklist file for {group_B.groupID}") + group_A.faultlist_codes.set(A_faultlist) + group_B.faultlist_codes.set(B_faultlist) + group_A.logger.debug(f"Created faultlist file for {group_B.groupID}") # Combine project subsets group_B.proj_codes['main'].set(dataset_list) @@ -303,7 +296,7 @@ def run( codeset = self.proj_codes[repeat_id].get() if subset is not None: - codeset = self.configure_subset(codeset, subset, proj_code) + codeset = self._configure_subset(codeset, subset, proj_code) elif proj_code in codeset: self.logger.info(f'Project code: {proj_code}') @@ -432,9 +425,10 @@ def _compute_config( self.workdir, groupID=self.groupID, logger=self.logger, + bypass=bypass, **kwargs ) - status = proj_op.run() + status = proj_op.run(mode=mode) proj_op.save_files() return status @@ -475,7 +469,7 @@ def _save_proj_codes(self): self.proj_codes[pc].close() def save_files(self): - self.blacklist_codes.close() + self.faultlist_codes.close() self.datasets.close() self._save_proj_codes() @@ -488,6 +482,25 @@ def _add_proj_codeset(self, name : str, newcodes : list): dryrun=self._dryrun, forceful=self._forceful ) + + def _delete_proj_codeset(self, name: str): + """ + Delete a project codeset + """ + + if name == 'main': + raise ValueError( + 'Operation not permitted - removing the main codeset' + 'cannot be achieved using this function.' + ) + + if name not in self.proj_codes: + self.logger.warning( + f'Subset ID "{name}" could not be deleted - no matching subset.' + ) + + self.proj_codes[name].remove_file() + self.proj_codes.pop(name) def check_writable(self): if not os.access(self.workdir, os.W_OK): @@ -727,6 +740,18 @@ def _setup_groupdir(self): self.logger.debug(f'DRYRUN: Skip making codes-dir for {self.groupID}') else: os.makedirs(codes_dir) + + def _setup_slurm_directories(self): + """ + Currently Unused function to set up + the slurm directories for a group.""" + + for dirx in ['sbatch','errs']: + if not os.path.isdir(f'{self.groupdir}/{dirx}'): + if self._dryrun: + self.logger.debug(f"DRYRUN: Skipped creating {dirx}") + continue + os.makedirs(f'{self.dir}/{dirx}') def _configure_subset(self, main_set, subset_size: int, subset_id: int): # Configure subset controls diff --git a/padocc/operations/mixins.py b/padocc/groups/mixins.py similarity index 55% rename from padocc/operations/mixins.py rename to padocc/groups/mixins.py index 5538968..125d9c0 100644 --- a/padocc/operations/mixins.py +++ b/padocc/groups/mixins.py @@ -7,19 +7,58 @@ import glob import json import binpacking +import datetime + +from typing import Union, Optional +from collections.abc import Callable from padocc.core import ( - FalseLogger, - LoggedOperation + FalseLogger ) -from padocc.core.utils import extract_file, times, apply_substitutions, file_configs +from padocc.core.utils import extract_file, times, apply_substitutions, file_configs, format_str + +from padocc.core import ProjectOperation + +def _deformat_float(item: str) -> str: + """ + Format byte-value with proper units. + """ + units = ['','K','M','G','T','P'] + value, suffix = item.split(' ') -from padocc.core.project import ProjectOperation + ord = units.index(suffix)*1000 + return float(value)*ord + +def _format_float(value: float) -> str: + """ + Format byte-value with proper units. + """ + + if value is not None: + unit_index = 0 + units = ['','K','M','G','T','P'] + while value > 1000: + value = value / 1000 + unit_index += 1 + return f'{value:.2f} {units[unit_index]}B' + else: + return None class InitialisationMixin: """ Mixin container class for initialisation - routines for groups via input files.""" + routines for groups via input files. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: GroupOperation [ONLY] + """ def init_from_stac(self): pass @@ -188,52 +227,105 @@ def _open_json(file): self.save_files() class ModifiersMixin: + """ + Modifiers to the group in terms of the projects associated, + allows adding and removing projects. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: GroupOperation [ONLY] + """ - def add_project(self): + def add_project( + self, + config: dict, + ): """ - Add a project to this group + Add a project to this group. """ - pass + self._init_project(config) + + self.proj_codes['main'].append(config['proj_code']) - def remove_project(self): + def remove_project(self, proj_code: str) -> None: """ Remove a project from this group + Steps required: + 1. Remove the project directory including all internal files. + 2. Remove the project code from all project files. """ - pass + for pset in self.proj_codes.values(): + if proj_code in pset: + pset.remove(proj_code) + + if proj_code in self.datasets: + self.datasets.pop(proj_code) + if proj_code in self.faultlist: + self.faultlist.pop(proj_code) + + proj_op = ProjectOperation( + proj_code, + self.workdir, + groupID=self.groupID, + forceful=self._forceful, + dryrun=self._dryrun + ) + + proj_op.delete_project() + + def transfer_project(self, proj_code: str, receiver_group) -> None: + """ + Transfer an existing project to a new group + """ + + for pset in self.proj_codes.values(): + if proj_code in pset: + pset.remove(proj_code) + + proj_op = ProjectOperation( + proj_code, + self.workdir, + self.groupID + ) + + proj_op.migrate(receiver_group.groupID) -""" -Replacement for assessor tool. Requires the following (public) methods: - - progress (progress_check) - - blacklist - - upgrade (upgrade_version) - - summarise (summary_data) - - display (show_options) - - cleanup (cleanup) - May not need since this is built into the group. - - match ? - - status (status_log) - - allocations (assess_allocation) - - -Private methods suspected: - - _get_rerun_command : To get a specific rerun for a dataset. - - _merge_old_new : Combine sets of project codes. - - _save_project_codes : Depends how the group stuff works if we need this - _ _analyse_data : Connect to project codes and get a summary of each. - - _force_datetime_decode : Decode datetimes. -""" + receiver_group.proj_codes['main'].append(proj_code) class EvaluationsMixin: """ Group Mixin for methods to evaluate the status of a group. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: GroupOperation [ONLY] """ + def _assess_info(self): print('Assessment methods:') - print(' > group.summary_data() - Get a printout summary of data representations in this group') - print(' > group.remove_projects() - Remove projects fitting some parameters from this group') + print(' > group.summarise_status() - Summarise the status of all group member projects.') + print(' > group.summarise_data() - Get a printout summary of data representations in this group') + print(' > group.repeat_by_status() - Create a new subset group to (re)run an operation, based on the current status') print(' > group.progress_display() - Get a human-readable display of progress within the group.') print(' > group.progress_repr() - Get a dict version of the progress report (for AirFlow)') def get_project(self, proj_code: str): + """ + Get a project operation from this group + """ + return ProjectOperation( proj_code, self.workdir, @@ -241,28 +333,264 @@ def get_project(self, proj_code: str): logger=self.logger, dryrun=True ) - - def summary_data(self): + def repeat_by_status( + self, + status: str, + new_repeat_id: str, + phase: Optional[str] = None, + old_repeat_id: str = 'main' + ) -> None: """ - Summarise data stored across all files + Group projects by their status, to then + create a new repeat ID. """ - pass + faultdict = self._get_fault_dict() + status_dict = self._get_status_dict( + old_repeat_id, + faultdict, + specific_phase=phase, + specific_error=status + ) + + # New codes are in the status_dict + new_codes = status_dict[phase][status] + self._add_proj_codeset( + new_repeat_id, + new_codes + ) + + self._save_proj_codes() - def remove_projects(self): + def remove_by_status( + self, + status: str, + phase: Optional[str] = None, + old_repeat_id: str = 'main' + ) -> None: """ - Delete a set of projects which match some criteria. + Group projects by their status for + removal from the group """ - pass + faultdict = self._get_fault_dict() + status_dict = self._get_status_dict( + old_repeat_id, + faultdict, + specific_phase=phase, + specific_error=status + ) + + for code in status_dict[phase][status]: + self.remove_project(code) + + self.save_files() + + def merge_subsets( + self, + subset_list: list[str], + combined_id: str, + remove_after: False, + ) -> None: + """ + Merge one or more of the subsets previously created + """ + newset = [] + + for subset in subset_list: + if subset not in self.proj_codes: + raise ValueError( + f'Repeat subset "{subset}" not found in existing subsets.' + ) + + newset = newset + self.proj_codes[subset].get() + + self._add_proj_codeset(combined_id, newset) + + if remove_after: + for subset in subset_list: + self._delete_proj_codeset(subset) + + self._save_proj_codes() + + def summarise_data(self, repeat_id: str = 'main', func: Callable = print): + """ + Summarise data stored across all projects, mostly + concatenating results from the detail-cfg files from + all projects. + """ + import numpy as np + + # Cloud Formats and File Types + # Source Data [Avg,Total] + # Cloud Data [Avg,Total] + # File Count [Avg,Total] + + cloud_formats: dict = {} + file_types: dict = {} + + source_data: list = [] + cloud_data: list = [] + file_count: list = [] + + # Chunk Info + ## Chunks per file [Avg,Total] + ## Total Chunks [Avg, Total] + + chunks_per_file: list = [] + total_chunks: list = [] + + for proj_code in self.proj_codes[repeat_id]: + op = ProjectOperation( + proj_code, + self.workdir, + groupID=self.groupID, + **self.fh_kwargs + ) - def progress(self, repeat_id, write=True): - """Give a general overview of progress within the pipeline + if op.cloud_format in cloud_formats: + cloud_formats[op.cloud_format] += 1 + else: + cloud_formats[op.cloud_format] = 1 + + if op.file_type in file_types: + file_types[op.file_type] += 1 + else: + file_types[op.file_type] = 1 + + details = op.detail_cfg.get() + + if 'source_data' in details: + source_data.append( + _deformat_float(details['source_data']) + ) + if 'cloud_data' in details: + cloud_data.append( + _deformat_float(details['cloud_data']) + ) + + file_count.append(int(details['num_files'])) + + chunk_data = details['chunk_info'] + chunks_per_file.append( + float(chunk_data['chunks_per_file']) + ) + total_chunks.append( + int(chunk_data['total_chunks']) + ) + + # Render Outputs + ot = [] + + ot.append(f'Summary Report: {self.groupID}') + ot.append(f'Project Codes: {len(self.proj_codes[repeat_id])}') + ot.append() + ot.append(f'Source Files: {sum(file_count)} [Avg. {np.mean(file_count):.2f} per project]') + ot.append(f'Source Data: {_format_float(sum(source_data))} [Avg. {np.mean(source_data):.2f} per project]') + ot.append(f'Cloud Data: {_format_float(sum(cloud_data))} [Avg. {np.mean(cloud_data):.2f} per project]') + ot.append() + ot.append(f'Cloud Formats: {list(set(cloud_formats))}') + ot.append(f'File Types: {list(set(file_types))}') + ot.append() + ot.append( + f'Chunks per File: {_format_float(sum(chunks_per_file))} [Avg. {np.mean(chunks_per_file):.2f} per project]') + ot.append( + f'Total Chunks: {_format_float(sum(total_chunks))} [Avg. {np.mean(total_chunks):.2f} per project]') + + func('\n'.join(ot)) + + def summarise_status( + self, + repeat_id, + specific_phase: Union[str,None] = None, + specific_error: Union[str,None] = None, + long_display: Union[bool,None] = None, + display_upto: int = 5, + halt: bool = False, + write: bool = False, + fn: Callable = print, + ) -> None: + """ + Gives a general overview of progress within the pipeline - How many datasets currently at each stage of the pipeline - Errors within each pipeline phase - Allows for examination of error logs - Allows saving codes matching an error type into a new repeat group """ - blacklist = self.blacklist_codes + + faultdict = self._get_fault_dict() + + status_dict = self._get_status_dict( + repeat_id, + faultdict=faultdict, + specific_phase=specific_phase, + specific_error=specific_error, + halt=halt, + write=write, + ) + + num_codes = len(self.proj_codes[repeat_id]) + ot = [] + ot.append('') + ot.append(f'Group: {self.groupID}') + ot.append(f' Total Codes: {num_codes}') + ot.append() + ot.append('Pipeline Current:') + if long_display is None and longest_err > 30: + longest_err = 30 + + for phase, records in status_dict.items(): + + if isinstance(records, dict): + self._summarise_dict(phase, records, num_codes, status_len=longest_err, numbers=display_upto) + else: + ot.append() + + ot.append() + ot.append('Pipeline Complete:') + ot.append() + + complete = len(status_dict['complete']) + + complete_percent = format_str(f'{complete*100/num_codes:.1f}',4) + ot.append(f' complete : {format_str(complete,5)} [{complete_percent}%]') + + for option, records in faultdict['faultlist'].items(): + self._summarise_dict(option, records, num_codes, status_len=longest_err, numbers=0) + + ot.append() + fn('\n'.join(ot)) + + def _get_fault_dict(self) -> dict: + """ + Assemble the fault list into a dictionary + with all reasons. + """ + extras = {'faultlist': {}} + for code, reason in self.faultlist: + if reason in extras['faultlist']: + extras['faultlist'][reason].append(0) + else: + extras['faultlist'][reason] = [0] + extras['ignore'][code] = True + return extras + + def _get_status_dict( + self, + repeat_id, + faultdict: dict = None, + specific_phase: Union[str,None] = None, + specific_error: Union[str,None] = None, + halt: bool = False, + write: bool = False, + ) -> dict: + + """ + Assemble the status dict, can be used for stopping and + directly assessing specific errors if needed. + """ + + faultdict = faultdict or {} + proj_codes = self.proj_codes[repeat_id] if write: @@ -272,159 +600,130 @@ def progress(self, repeat_id, write=True): ' - Will update status with "JobCancelled" for >24hr pending jobs' ) - done_set = {} - extras = {'blacklist': {}} - complete = 0 - - # Summarising the blacklist reasons - for code, reason in blacklist: - if reason in extras['blacklist']: - extras['blacklist'][reason].append(0) - else: - extras['blacklist'][reason] = [0] - done_set[code] = True + status_dict = {'init':{},'scan': {}, 'compute': {}, 'validate': {},'complete':[]} - phases = {'init':{}, 'scan': {}, 'compute': {}, 'validate': {}} - savecodes = [] - longest_err = 0 for idx, p in enumerate(proj_codes): - - proj_op = ProjectOperation( - self.workdir, - p, - groupID=self.groupID, - logger=self.logger + if p in faultdict['ignore']: + continue + + status_dict = self._assess_status_of_project( + p, idx, + status_dict, + write=write, + specific_phase=specific_phase, + specific_error=specific_error, + halt=halt ) + return status_dict + + def _assess_status_of_project( + self, + proj_code: str, + pid: int, + status_dict: dict, + write: bool = False, + specific_phase: Union[str,None] = None, + specific_error: Union[str,None] = None, + halt: bool = False, + ) -> dict: + """ + Assess the status of a single project + """ - try: - if p not in done_set: - proj_dir = f'{args.workdir}/in_progress/{args.groupID}/{p}' - current = get_log_status(proj_dir) - if not current: - seek_unknown(proj_dir) - if 'unknown' in extras: - extras['unknown']['no data'].append(idx) - else: - extras['unknown'] = {'no data':[idx]} - continue - entry = current.split(',') - if len(entry[1]) > longest_err: - longest_err = len(entry[1]) - - if entry[1] == 'pending' and args.write: - timediff = (datetime.now() - force_datetime_decode(entry[2])).total_seconds() - if timediff > 86400: # 1 Day - fixed for now - entry[1] = 'JobCancelled' - log_status(entry[0], proj_dir, entry[1], FalseLogger()) - - match_phase = (bool(args.phase) and args.phase == entry[0]) - match_error = (bool(args.error) and any([err == entry[1].split(' ')[0] for err in args.error])) - - if bool(args.phase) != (args.phase == entry[0]): - total_match = False - elif bool(args.error) != (any([err == entry[1].split(' ')[0] for err in args.error])): - total_match = False - else: - total_match = match_phase or match_error - - if total_match: - if args.examine: - examine_log(args.workdir, p, entry[0], groupID=args.groupID, repeat_id=args.repeat_id, error=entry[1]) - if args.new_id or args.blacklist: - savecodes.append(p) - - merge_errs = True # Debug - add as argument later? - if merge_errs: - err_type = entry[1].split(' ')[0] - else: - err_type = entry[1] - - if entry[0] == 'complete': - complete += 1 - else: - if err_type in phases[entry[0]]: - phases[entry[0]][err_type].append(idx) - else: - phases[entry[0]][err_type] = [idx] - except KeyboardInterrupt as err: - raise err - except Exception as err: - examine_log(args.workdir, p, entry[0], groupID=args.groupID, repeat_id=args.repeat_id, error=entry[1]) - print(f'Issue with analysis of error log: {p}') - num_codes = len(proj_codes) - print() - print(f'Group: {args.groupID}') - print(f' Total Codes: {num_codes}') - - def summary_dict(pdict, num_codes, status_len=5, numbers=0): - """Display summary information for a dictionary structure of the expected format.""" - for entry in pdict.keys(): - pcount = len(list(pdict[entry].keys())) - num_types = sum([len(pdict[entry][pop]) for pop in pdict[entry].keys()]) - if pcount > 0: - print() - fmentry = format_str(entry,10, concat=False) - fmnum_types = format_str(num_types,5, concat=False) - fmcalc = format_str(f'{num_types*100/num_codes:.1f}',4, concat=False) - print(f' {fmentry}: {fmnum_types} [{fmcalc}%] (Variety: {int(pcount)})') - - # Convert from key : len to key : [list] - errkeys = reversed(sorted(pdict[entry], key=lambda x:len(pdict[entry][x]))) - for err in errkeys: - num_errs = len(pdict[entry][err]) - if num_errs < numbers: - print(f' - {format_str(err, status_len+1, concat=True)}: {num_errs} (IDs = {list(pdict[entry][err])})') - else: - print(f' - {format_str(err, status_len+1, concat=True)}: {num_errs}') - if not args.new_id: - print() - print('Pipeline Current:') - if not args.long and longest_err > 30: - longest_err = 30 - summary_dict(phases, num_codes, status_len=longest_err, numbers=int(args.numbers)) - print() - print('Pipeline Complete:') - print() - complete_percent = format_str(f'{complete*100/num_codes:.1f}',4) - print(f' complete : {format_str(complete,5)} [{complete_percent}%]') - summary_dict(extras, num_codes, status_len=longest_err, numbers=0) - print() - - if args.new_id: - logger.debug(f'Preparing to write {len(savecodes)} codes to proj_codes/{args.new_id}.txt') - if args.write: - save_selection(savecodes, groupdir, args.new_id, logger, overwrite=args.overwrite) - else: - print('Skipped writing new codes - Write flag not present') + # Open the specific project + proj_op = ProjectOperation( + self.workdir, + proj_code, + groupID=self.groupID, + logger=self.logger + ) - if args.blacklist: - logger.debug(f'Preparing to add {len(savecodes)} codes to the blacklist') - if args.write: - add_to_blacklist(savecodes, args.groupdir, args.reason, logger) - else: - print('Skipped blacklisting codes - Write flag not present') + current = proj_op.get_last_status() + entry = current.split(',') - def get_operation(self, opt): - """Operation to perform - deprecated""" - if hasattr(self, opt): - try: - getattr(self, opt)() - except TypeError as err: - self.logger.error( - f'Attribute "{opt}" is not callable' - ) - raise err - except KeyboardInterrupt as err: - raise err - except Exception as err: - examine_log(args.workdir, p, entry[0], groupID=args.groupID, repeat_id=args.repeat_id, error=entry[1]) - print(f'Issue with analysis of error log: {p}') + phase = entry[0] + status = entry[1] + time = entry[2] + + if len(status) > longest_err: + longest_err = len(status) + + if status == 'pending' and write: + timediff = (datetime.now() - datetime(time)).total_seconds() + if timediff > 86400: # 1 Day - fixed for now + status = 'JobCancelled' + proj_op.update_status(phase, 'JobCancelled') + + match_phase = (specific_phase == phase) + match_error = (specific_error == status) + + if bool(specific_phase) != (match_phase) or bool(specific_error) != (match_error): + total_match = False else: - self.logger.error( - 'Unrecognised operation type for EvaluationOperation.') + total_match = match_phase or match_error -class AllocationsMixin: + if total_match: + proj_op.show_log_contents(specific_phase, halt=halt) + + if status == 'complete': + status_dict['complete'] += 1 + else: + if status in status_dict[phase]: + status_dict[phase][status].append(pid) + else: + status_dict[phase][status] = [pid] + + return status_dict + + def _summarise_dict( + self, + phase: str, + records: dict, + num_codes: int, + status_len: int = 5, + numbers: int = 0 + ) -> list: + """ + Summarise information for a dictionary structure + that contains a set of errors for a phase within the pipeline + """ + ot = [] + pcount = len(list(records.keys())) + num_types = sum([len(records[pop]) for pop in records.keys()]) + if pcount > 0: + + ot.append('') + fmentry = format_str(phase,10, concat=False) + fmnum_types = format_str(num_types,5, concat=False) + fmcalc = format_str(f'{num_types*100/num_codes:.1f}',4, concat=False) + + ot.append(f' {fmentry}: {fmnum_types} [{fmcalc}%] (Variety: {int(pcount)})') + + # Convert from key : len to key : [list] + errkeys = reversed(sorted(records, key=lambda x:len(records[x]))) + for err in errkeys: + num_errs = len(records[err]) + if num_errs < numbers: + ot.append(f' - {format_str(err, status_len+1, concat=True)}: {num_errs} (IDs = {list(records[err])})') + else: + ot.append(f' - {format_str(err, status_len+1, concat=True)}: {num_errs}') + return ot + +class AllocationsMixin: + """ + Enables the use of Allocations for job deployments via Slurm. + + This is a behavioural Mixin class and thus should not be + directly accessed. Where possible, encapsulated classes + should contain all relevant parameters for their operation + as per convention, however this is not the case for mixin + classes. The mixin classes here will explicitly state + where they are designed to be used, as an extension of an + existing class. + + Use case: GroupOperation [ONLY] + """ def create_allocations( self, phase, diff --git a/padocc/operations/shepard.py b/padocc/groups/shepard.py similarity index 98% rename from padocc/operations/shepard.py rename to padocc/groups/shepard.py index 0d2123d..aba2ad5 100644 --- a/padocc/operations/shepard.py +++ b/padocc/groups/shepard.py @@ -15,7 +15,8 @@ import json from padocc.core.logs import LoggedOperation -from padocc.operations.group import GroupOperation + +from .group import GroupOperation shepard_template = { 'workdir': '/my/workdir', diff --git a/padocc/phases/__init__.py b/padocc/phases/__init__.py index fee7b8e..95369fb 100644 --- a/padocc/phases/__init__.py +++ b/padocc/phases/__init__.py @@ -7,4 +7,14 @@ from .ingest import IngestOperation from .validate import ValidateOperation -KNOWN_PHASES = ['init', 'scan', 'compute', 'validate'] \ No newline at end of file +KNOWN_PHASES = ['init', 'scan', 'compute', 'validate'] + +phase_map = { + 'scan': ScanOperation, + 'compute': { + 'kerchunk': KerchunkDS, + 'zarr': ZarrDS, + 'CFA': cfa_handler, + }, + 'validate': ValidateOperation +} \ No newline at end of file diff --git a/padocc/phases/compute.py b/padocc/phases/compute.py index 8a3d480..5e019cc 100644 --- a/padocc/phases/compute.py +++ b/padocc/phases/compute.py @@ -10,7 +10,7 @@ import numpy as np import base64 import logging -from typing import Optional +from typing import Optional, Union import rechunker @@ -254,7 +254,6 @@ def __init__( """ self.phase = 'compute' - self._is_trial = is_trial super().__init__( proj_code, @@ -263,6 +262,8 @@ def __init__( thorough=thorough, label=label, **kwargs) + + self._is_trial = is_trial self.logger.debug('Starting variable definitions') @@ -294,7 +295,7 @@ def __init__( if not self.limiter: self.limiter = num_files - self._setup_cache() + self._setup_cache(self.dir) self.temp_zattrs = JSONFileHandler( self.cache, @@ -685,7 +686,9 @@ def _run( self.update_status('compute',status,jobid=self._logid) return status - def create_refs(self, check_dimensions: bool = False) -> None: + def create_refs( + self, + check_refs : bool = False) -> None: """Organise creation and loading of refs - Load existing cached refs - Create new refs @@ -733,11 +736,9 @@ def create_refs(self, check_dimensions: bool = False) -> None: allzattrs.append(ref['refs']['.zattrs']) refs.append(ref) - if not self.quality_required: - self._perform_shape_checks(ref) - - if check_dimensions: - refs = self._perform_dimensions_checks(ref) + if check_refs: + # Perform any and all checks here if required + refs = self._perform_shape_checks(ref) CacheFile.set(ref) CacheFile.close() @@ -847,11 +848,8 @@ def _data_to_parq(self, refs: dict) -> None: from fsspec.implementations.reference import LazyReferenceMapper self.logger.debug('Starting parquet-write process') - self.create_new_kstore(self.outproduct) - if not os.path.isdir(self.outstore): - os.makedirs(self.outstore) - out = LazyReferenceMapper.create(self.record_size, self.outstore, fs = filesystem("file"), **self.pre_kwargs) + out = LazyReferenceMapper.create(self.record_size, str(self.kstore), fs = filesystem("file"), **self.pre_kwargs) out_dict = MultiZarrToZarr( refs, @@ -861,10 +859,10 @@ def _data_to_parq(self, refs: dict) -> None: ).translate() if self.partial: - self.logger.info(f'Skipped writing to parquet store - {self.outstore}') + self.logger.info(f'Skipped writing to parquet store - {self.kstore}') else: out.flush() - self.logger.info(f'Written to parquet store - {self.outstore}') + self.logger.info(f'Written to parquet store - {self.kstore}') def _data_to_json(self, refs: dict) -> None: """ @@ -873,7 +871,6 @@ def _data_to_json(self, refs: dict) -> None: from kerchunk.combine import MultiZarrToZarr self.logger.debug('Starting JSON-write process') - self.create_new_kfile(self.outproduct) # Already have default options saved to class variables if len(refs) > 1: @@ -884,7 +881,6 @@ def _data_to_json(self, refs: dict) -> None: self.combine_kwargs['concat_dims'] = [vdim] try: - print(self.combine_kwargs) mzz = MultiZarrToZarr(list(refs), **self.combine_kwargs).translate() except ValueError as err: if 'chunk size mismatch' in str(err): @@ -900,41 +896,41 @@ def _data_to_json(self, refs: dict) -> None: else: self.logger.debug('Found single ref to save') self.kfile.set(refs[0]) - - if not self.partial: - self.logger.info(f'Written to JSON file - {self.outproduct}') - self.kfile.close() - else: - self.logger.info(f'Skipped writing to JSON file - {self.outproduct}') - def _perform_shape_checks(self, ref: dict) -> None: + self.kfile.close() + + def _perform_shape_checks(self, ref: dict) -> dict: """ Check the shape of each variable for inconsistencies which will require a thorough validation process. """ + + if self.source_format not in ['ncf3','hdf5']: + self.logger.warning( + 'Skipped reference checks, source file not compatible.' + ) + + # Identify variables to be checked if self.base_cfg['data_properties']['aggregated_vars'] != 'Unknown': variables = self.base_cfg['data_properties']['aggregated_vars'] checklist = [f'{v}/.zarray' for v in variables] else: checklist = [r for r in ref['refs'].keys() if '.zarray' in r] + # Determine correct values from a single source file + ## - Test opening a netcdf file and extracting the dimensions + ## - Already checked the files are of netcdf type. + + # Perform corrections for key in checklist: zarray = json.loads(ref['refs'][key]) if key not in self.var_shapes: self.var_shapes[key] = zarray['shape'] if self.var_shapes[key] != zarray['shape']: - self.quality_required = True - - def _perform_dimension_checks(self, ref: dict) -> dict: - """ - Perform dimensional corrections, developed in - response to issues with CCI lakes datasets. - """ - - raise NotImplementedError( - 'This feature is not implemented in pre-release v1.3a' - ) + self.logger.debug( + f'Reference Correction: {zarray["shape"]} to ' + ) class ZarrDS(ComputeOperation): @@ -943,21 +939,21 @@ def __init__( self, proj_code, workdir, - stage = 'in_progress', + groupID: Union[str,None] = None, + stage : str = 'in_progress', mem_allowed : str = '100MB', preferences = None, **kwargs, ) -> None: - super().__init__(proj_code, workdir, stage, *kwargs) + super().__init__(proj_code, workdir, groupID=groupID, stage=stage, **kwargs) - self.tempstore = ZarrStore(self.dir, "zarrcache.zarr", logger=self.logger, **self.fh_kwargs) + self.tempstore = ZarrStore(self.dir, "zarrcache", logger=self.logger, **self.fh_kwargs) self.preferences = preferences - if self.thorough or self.forceful: - os.system(f'rm -rf {self.tempstore}') + if self._thorough or self._forceful: + self.tempstore.clear() - self.filelist = [] self.mem_allowed = mem_allowed def _run(self, **kwargs) -> str: @@ -968,25 +964,25 @@ def _run(self, **kwargs) -> str: self.update_status('compute',status,jobid=self._logid) return status - def create_store(self): + def create_store(self, file_limit: int = None): + """ + Create the Zarr Store + """ - # Abort process if overwrite method not specified - if not self.carryon: - self.logger.info('Process aborted - no overwrite plan for existing file.') - return None + self.combine_kwargs = self.detail_cfg['kwargs']['combine_kwargs'] # Open all files for this process (depending on limiter) self.logger.debug('Starting timed section for estimation of whole process') t1 = datetime.now() - self.obtain_file_subset() + self.logger.info(f"Retrieved required xarray dataset objects - {(datetime.now()-t1).total_seconds():.2f}s") # Determine concatenation dimensions if self.base_cfg['data_properties']['aggregated_vars'] == 'Unknown': # Determine dimension specs for concatenation. self._determine_dim_specs([ - xr.open_dataset(self.filelist[0]), - xr.open_dataset(self.filelist[1]) + xr.open_dataset(self.allfiles[0]), + xr.open_dataset(self.allfiles[1]) ]) if not self.combine_kwargs['concat_dims']: self.logger.error('No concatenation dimensions - unsupported for zarr conversion') @@ -995,7 +991,16 @@ def create_store(self): # Perform Concatenation self.logger.info(f'Concatenating xarray objects across dimensions ({self.combine_kwargs["concat_dims"]})') - self.combined_ds = xr.open_mfdataset(self.filelist, combine='nested', concat_dim=self.combine_kwargs['concat_dims']) + if file_limit: + fileset = self.allfiles[:file_limit] + else: + fileset = self.allfiles.get() + + self.combined_ds = xr.open_mfdataset( + fileset, + combine='nested', + concat_dim=self.combine_kwargs['concat_dims'], + data_vars='minimal') # Assessment values self.std_vars = list(self.combined_ds.variables) @@ -1011,17 +1016,31 @@ def create_store(self): self.logger.debug(f'CPF: {self.cpf[0]}, VPF: {self.volm[0]}, num_vars: {len(self.std_vars)}') self.concat_time = (datetime.now()-t1).total_seconds()/self.limiter + + for store in [self.tempstore, self.zstore]: + if not store.is_empty: + if self._forceful or self._thorough: + store.clear() + else: + raise ValueError( + 'Unable to rechunk to zarr - store already exists ' + 'and no overwrite plan has been given. Use ' + '-f or -Q on the commandline to clear or overwrite' + 'existing store' + ) # Perform Rechunking self.logger.info(f'Starting Rechunking - {(datetime.now()-t1).total_seconds():.2f}s') - if not self.dryrun: + if not self._dryrun: t1 = datetime.now() + rechunker.rechunk( self.combined_ds, concat_dim_rechunk, self.mem_allowed, - self.outstore, - temp_store=self.tempstore).execute() + self.zstore.store_path, + temp_store=self.tempstore.store_path).execute() + self.convert_time = (datetime.now()-t1).total_seconds()/self.limiter self.logger.info(f'Concluded Rechunking - {(datetime.now()-t1).total_seconds():.2f}s') else: diff --git a/padocc/phases/scan.py b/padocc/phases/scan.py index fd590c0..25b7daf 100644 --- a/padocc/phases/scan.py +++ b/padocc/phases/scan.py @@ -10,15 +10,15 @@ from typing import Union -from padocc.core import FalseLogger -from padocc.core.errors import ConcatFatalError from padocc.core import ProjectOperation -from padocc.core.utils import BypassSwitch -from .compute import KerchunkDS, cfa_handler +from padocc.core import FalseLogger +from padocc.core.errors import ConcatFatalError from padocc.core.filehandlers import JSONFileHandler -def _format_float(value: int, logger: logging.Logger = FalseLogger()) -> str: +from .compute import KerchunkDS, cfa_handler + +def _format_float(value: float, logger: logging.Logger = FalseLogger()) -> str: """ Format byte-value with proper units. """ diff --git a/padocc/phases/validate.py b/padocc/phases/validate.py index 38014cb..985e026 100644 --- a/padocc/phases/validate.py +++ b/padocc/phases/validate.py @@ -2,34 +2,26 @@ __contact__ = "daniel.westwood@stfc.ac.uk" __copyright__ = "Copyright 2023 United Kingdom Research and Innovation" -import os import xarray as xr import json from datetime import datetime -import fsspec -from fsspec.implementations.reference import ReferenceNotReachable + import random import numpy as np -import glob -import logging -import math -import re -from functools import reduce -from itertools import groupby + from typing import Union, Optional -from padocc.core.errors import ChunkDataError -from padocc.core import BypassSwitch, FalseLogger -from padocc.core.utils import open_kerchunk +from padocc.core import ProjectOperation +from padocc.core import LoggedOperation +from padocc.core import BypassSwitch + +from padocc.core.errors import ValidationError from padocc.core.filehandlers import JSONFileHandler from padocc.core.utils import format_tuple SUFFIXES = [] SUFFIX_LIST = [] - -from padocc.core import ProjectOperation, LoggedOperation - def mem_to_value(mem) -> float: """ Convert a memory value i.e 2G into a value @@ -865,9 +857,14 @@ def _run( mode: str = 'kerchunk', **kwargs ) -> None: - # Replaces validate timestep + """ + Run hook for project operation run method + """ + + if mode != self.cloud_format and mode is not None: + self.cloud_format = mode - test = self._open_product() + test = self.dataset.open_dataset() sample = self._open_sample() meta_fh = JSONFileHandler(self.dir, 'metadata_report',logger=self.logger, **self.fh_kwargs) @@ -899,6 +896,10 @@ def _run( vd.save_report() self.update_status('validate',vd.pass_fail,jobid=self._logid) + + if vd.pass_fail == 'Fatal': + raise ValidationError + return vd.pass_fail def _open_sample(self): @@ -913,31 +914,7 @@ def _open_cfa(self): """ Open the CFA dataset for this project """ - - return xr.open_dataset(self.cfa_path, engine='CFA', cfa_options=None) - - def _open_product(self): - """ - Configuration to open object wrappers in the appropriate way so actions - can be applied to all. Any products not usable with Xarray should have - an xarray-wrapper to allow the application of typical methods for comparison. - """ - - if self.cloud_format == 'kerchunk': - - self.create_new_kfile(self.outproduct) - - # Kerchunk opening sequence - return open_kerchunk( - self.kfile.filepath, - self.logger, - isparq = (self.file_type == 'parq'), - retry = True, - attempt = 3 - ) - raise NotImplementedError( - f'Opening sequence not known for {self.cloud_format}' - ) + return self.cfa_dataset.open_dataset() def _get_preslice(self, test, sample, variables): """Match timestamp of xarray object to kerchunk object. diff --git a/padocc/tests/test_compute.py b/padocc/tests/test_compute.py index da3e435..7f1665d 100644 --- a/padocc/tests/test_compute.py +++ b/padocc/tests/test_compute.py @@ -1,4 +1,4 @@ -from padocc.operations import GroupOperation +from padocc import GroupOperation WORKDIR = 'padocc/tests/auto_testdata_dir' diff --git a/padocc/tests/test_init.py b/padocc/tests/test_init.py index 239cf26..d66e8e0 100644 --- a/padocc/tests/test_init.py +++ b/padocc/tests/test_init.py @@ -1,4 +1,4 @@ -from padocc.operations import GroupOperation +from padocc import GroupOperation WORKDIR = 'padocc/tests/auto_testdata_dir' diff --git a/padocc/tests/test_scan.py b/padocc/tests/test_scan.py index 4b9d6c8..d5a5009 100644 --- a/padocc/tests/test_scan.py +++ b/padocc/tests/test_scan.py @@ -1,4 +1,4 @@ -from padocc.operations import GroupOperation +from padocc import GroupOperation from padocc.phases import ScanOperation WORKDIR = 'padocc/tests/auto_testdata_dir' diff --git a/padocc/tests/test_validate.py b/padocc/tests/test_validate.py index 03a1b65..090efb4 100644 --- a/padocc/tests/test_validate.py +++ b/padocc/tests/test_validate.py @@ -1,4 +1,4 @@ -from padocc.operations import GroupOperation +from padocc import GroupOperation from padocc.core.utils import BypassSwitch diff --git a/padocc/tests/test_zarr_comp.py b/padocc/tests/test_zarr_comp.py new file mode 100644 index 0000000..8069398 --- /dev/null +++ b/padocc/tests/test_zarr_comp.py @@ -0,0 +1,22 @@ +from padocc import GroupOperation +from padocc.core.utils import BypassSwitch + +WORKDIR = 'padocc/tests/auto_testdata_dir' + +class TestCompute: + def test_compute_basic(self, workdir=WORKDIR): + groupID = 'padocc-test-suite' + + process = GroupOperation( + groupID, + workdir=workdir, + label='test_compute', + verbose=1) + + results = process.run('compute', mode='zarr', forceful=True, bypass=BypassSwitch('D'), proj_code='1DAgg') + + assert results['Success'] == 1 + +if __name__ == '__main__': + #workdir = '/home/users/dwest77/cedadev/padocc/padocc/tests/auto_testdata_dir' + TestCompute().test_compute_basic()#workdir=workdir) \ No newline at end of file diff --git a/padocc/tests/test_zarr_valid.py b/padocc/tests/test_zarr_valid.py new file mode 100644 index 0000000..66219cc --- /dev/null +++ b/padocc/tests/test_zarr_valid.py @@ -0,0 +1,23 @@ +from padocc import GroupOperation + +from padocc.core.utils import BypassSwitch + +WORKDIR = 'padocc/tests/auto_testdata_dir' + +class TestValidate: + def test_validate(self, workdir=WORKDIR): + groupID = 'padocc-test-suite' + + process = GroupOperation( + groupID, + workdir=workdir, + label='test_validate', + verbose=1) + + results = process.run('validate', mode='zarr', forceful=True, bypass=BypassSwitch('DS'),proj_code='1DAgg') + + print(results) + +if __name__ == '__main__': + #workdir = '/home/users/dwest77/cedadev/padocc/padocc/tests/auto_testdata_dir' + TestValidate().test_validate()#workdir=workdir) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8174113..a6c7f73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,10 @@ [tool.poetry] name = "padocc" -version = "1.3" +version = "1.3.0" description = "Pipeline to Aggregate Data for Optimised Cloud Capabilities" authors = ["Daniel Westwood "] -license = "{file='LICENSE'}" +license = "BSD 3" readme = "README.md" - include = [ { path = "padocc/tests/*" } ]