Skip to content

Commit

Permalink
Reorganise extractors
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterKraus committed Mar 26, 2024
1 parent 30569d5 commit 3d7c66b
Show file tree
Hide file tree
Showing 31 changed files with 195 additions and 193 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ dist/
*.egg-info
*.egg
build/
public/
/public/
docs/source/apidoc/
169 changes: 169 additions & 0 deletions src/yadg/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from importlib import metadata
import logging
import importlib
import xarray as xr
import numpy as np
from typing import Callable
from zoneinfo import ZoneInfo
from datatree import DataTree
from xarray import Dataset
from pydantic import BaseModel

from dgbowl_schemas.yadg.dataschema import DataSchema
from yadg import dgutils

datagram_version = metadata.version("yadg")
logger = logging.getLogger(__name__)


def infer_extractor(extractor: str) -> Callable:
"""
A function that finds an :func:`extract` function of the supplied ``extractor``.
"""
modnames = [
f"yadg.extractors.public.{extractor}",
f"yadg.extractors.custom.{extractor}",
f"yadg.extractors.{extractor.replace('.','')}"
]
for modname in modnames:
try:
m = importlib.import_module(modname)
if hasattr(m, "extract"):
return getattr(m, "extract")
except ImportError:
logger.critical(f"could not import module '{modname}'")
raise RuntimeError


def process_schema(dataschema: DataSchema, strict_merge: bool = False) -> DataTree:
"""
The main processing function of yadg.
Takes in a :class:`DataSchema` object and returns a single :class:`DataTree` created
from the :class:`DataSchema`.
"""
if strict_merge:
concatmode = "identical"
else:
concatmode = "drop_conflicts"

while hasattr(dataschema, "update"):
dataschema = dataschema.update()

root = DataTree()
root.attrs = {
"provenance": "yadg process",
"date": dgutils.now(asstr=True),
"input_schema": dataschema.model_dump_json(),
"datagram_version": datagram_version,
}
root.attrs.update(dgutils.get_yadg_metadata())

for si, step in enumerate(dataschema.steps):
logger.info(f"Processing step {si}.")

# Backfill default timezone, locale, encoding.
if step.extractor.timezone is None:
step.extractor.timezone = dataschema.step_defaults.timezone

if step.extractor.locale is None:
step.extractor.locale = dataschema.step_defaults.locale
if step.extractor.encoding is None:
step.extractor.encoding = dataschema.step_defaults.encoding

sattrs = {"extractor_schema": step.extractor.model_dump_json(exclude_none=True)}
step.extractor.timezone = ZoneInfo(step.extractor.timezone)

if step.tag is None:
step.tag = f"{si}"

handler = infer_extractor(step.extractor.filetype)
todofiles = step.input.paths()
vals = None
if len(todofiles) == 0:
logger.warning(f"No files processed by step '{step.tag}'.")
vals = {}
for tf in todofiles:
logger.info(f"Processing file '{tf}'.")
ret = handler(fn=tf, **vars(step.extractor))
if isinstance(ret, DataTree):
tasks = ret.to_dict()
elif isinstance(ret, Dataset):
tasks = {"/": ret}
else:
raise RuntimeError(type(ret))
fvals = {}
for name, dset in tasks.items():
if name == "/" and len(dset.variables) == 0:
# The root datatree node may sometimes carry metadata, even if
# there are no variables - we don't add 'uts' to those.
fvals[name] = dset
else:
fvals[name] = complete_uts(
dset, tf, step.externaldate, step.extractor.timezone
)
vals = merge_dicttrees(vals, fvals, concatmode)

stepdt = DataTree.from_dict({} if vals is None else vals)
stepdt.name = step.tag
stepdt.attrs = sattrs
stepdt.parent = root
return root


def complete_uts(
ds: Dataset,
filename: str,
externaldate: BaseModel,
timezone: ZoneInfo,
) -> Dataset:
"""
A helper function ensuring that the Dataset ``ds`` contains a dimension ``"uts"``,
and that the timestamps in ``"uts"`` are completed as instructed in the
``externaldate`` specification.
"""
if not hasattr(ds, "uts"):
ds = ds.expand_dims("uts")
if len(ds.uts.coords) == 0:
ds["uts"] = np.zeros(ds.uts.size)
ds.attrs["fulldate"] = False
if not ds.attrs.get("fulldate", True) or externaldate is not None:
ts, fulldate = dgutils.complete_timestamps(
timesteps=ds.uts.values,
fn=filename,
spec=externaldate,
timezone=timezone,
)
ds["uts"] = ts
if fulldate:
ds.attrs.pop("fulldate", None)
else:
# cannot store booleans in NetCDF files
ds.attrs["fulldate"] = int(fulldate)

return ds


def merge_dicttrees(vals: dict, fvals: dict, mode: str) -> dict:
"""
A helper function that merges two ``DataTree.to_dict()`` objects by concatenating
the new values in ``fvals`` to the existing ones in ``vals``.
"""
if vals is None:
return fvals
for k in fvals.keys():
try:
vals[k] = xr.concat([vals[k], fvals[k]], dim="uts", combine_attrs=mode)
except xr.MergeError:
raise RuntimeError(
"Merging metadata from multiple files has failed, as some of the "
"values differ between files. This might be caused by trying to "
"parse data obtained using different techniques/protocols in a "
"single step. If you are certain this is what you want, try using "
"yadg with the '--ignore-merge-errors' option."
)
return vals
163 changes: 0 additions & 163 deletions src/yadg/core/__init__.py

This file was deleted.

12 changes: 6 additions & 6 deletions src/yadg/dgutils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ def schema_3to4(oldschema: list) -> dict:

def update_schema(object: Union[list, dict]) -> dict:
"""
Yadg's update worker function.
The ``yadg update`` worker function.
This is the main function called when **yadg** is executed as ``yadg update``.
The main idea is to allow a simple update pathway from older versions of `schema` and
``datagram`` files to the current latest and greatest.
The main purpose is to allow a simple update pathway from older versions of
dataschema files to the current latest and greatest.
Currently supports:
- updating ``DataSchema`` version 3.1 to 4.0 using routines in ``yadg``
- updating ``DataSchema`` version 4.0 and above to the latest ``DataSchema``
- updating dataschema version 3.1 to 4.0 using routines in ``yadg``,
- updating dataschema version 4.0 and above to the latest dataschema using the
in-built ``.update()`` mechanism.
Parameters
----------
Expand Down
Loading

0 comments on commit 3d7c66b

Please sign in to comment.