From 1b5f6906f94bcc6590195e1e672f7bac35e9283f Mon Sep 17 00:00:00 2001 From: Peter Kraus Date: Fri, 29 Mar 2024 22:29:58 +0100 Subject: [PATCH] Nuke most of parsers --- src/yadg/parsers/basiccsv/__init__.py | 49 --- src/yadg/parsers/basiccsv/main.py | 230 ----------- src/yadg/parsers/chromdata/__init__.py | 93 ----- src/yadg/parsers/chromdata/empalccsv.py | 225 ----------- src/yadg/parsers/chromdata/empalcxlsx.py | 220 ---------- src/yadg/parsers/chromdata/fusioncsv.py | 143 ------- src/yadg/parsers/chromdata/fusionjson.py | 154 ------- src/yadg/parsers/chromdata/fusionzip.py | 68 ---- src/yadg/parsers/chromtrace/__init__.py | 116 ------ src/yadg/parsers/chromtrace/agilentch.py | 161 -------- src/yadg/parsers/chromtrace/agilentcsv.py | 180 --------- src/yadg/parsers/chromtrace/agilentdx.py | 87 ---- src/yadg/parsers/chromtrace/ezchromasc.py | 169 -------- src/yadg/parsers/chromtrace/fusionjson.py | 111 ------ src/yadg/parsers/chromtrace/fusionzip.py | 82 ---- src/yadg/parsers/dummy/__init__.py | 98 ----- src/yadg/parsers/electrochem/__init__.py | 88 ---- src/yadg/parsers/electrochem/eclabmpr.py | 2 +- src/yadg/parsers/electrochem/eclabmpt.py | 2 +- src/yadg/parsers/electrochem/tomatojson.py | 142 ------- src/yadg/parsers/flowdata/__init__.py | 39 -- src/yadg/parsers/flowdata/drycal.py | 228 ----------- src/yadg/parsers/flowdata/main.py | 66 --- src/yadg/parsers/masstrace/__init__.py | 90 ----- src/yadg/parsers/masstrace/quadstarsac.py | 265 ------------- src/yadg/parsers/meascsv/__init__.py | 115 ------ src/yadg/parsers/qftrace/__init__.py | 71 ---- src/yadg/parsers/qftrace/labviewcsv.py | 130 ------ src/yadg/parsers/xpstrace/__init__.py | 70 ---- src/yadg/parsers/xpstrace/phispe.py | 375 ------------------ src/yadg/parsers/xrdtrace/__init__.py | 75 ---- src/yadg/parsers/xrdtrace/common.py | 62 --- src/yadg/parsers/xrdtrace/panalyticalcsv.py | 170 -------- src/yadg/parsers/xrdtrace/panalyticalxrdml.py | 277 ------------- src/yadg/parsers/xrdtrace/panalyticalxy.py | 87 ---- 35 files changed, 2 insertions(+), 4538 deletions(-) delete mode 100644 src/yadg/parsers/basiccsv/__init__.py delete mode 100644 src/yadg/parsers/basiccsv/main.py delete mode 100644 src/yadg/parsers/chromdata/__init__.py delete mode 100644 src/yadg/parsers/chromdata/empalccsv.py delete mode 100644 src/yadg/parsers/chromdata/empalcxlsx.py delete mode 100644 src/yadg/parsers/chromdata/fusioncsv.py delete mode 100644 src/yadg/parsers/chromdata/fusionjson.py delete mode 100644 src/yadg/parsers/chromdata/fusionzip.py delete mode 100644 src/yadg/parsers/chromtrace/__init__.py delete mode 100644 src/yadg/parsers/chromtrace/agilentch.py delete mode 100644 src/yadg/parsers/chromtrace/agilentcsv.py delete mode 100644 src/yadg/parsers/chromtrace/agilentdx.py delete mode 100644 src/yadg/parsers/chromtrace/ezchromasc.py delete mode 100644 src/yadg/parsers/chromtrace/fusionjson.py delete mode 100644 src/yadg/parsers/chromtrace/fusionzip.py delete mode 100644 src/yadg/parsers/dummy/__init__.py delete mode 100644 src/yadg/parsers/electrochem/tomatojson.py delete mode 100644 src/yadg/parsers/flowdata/__init__.py delete mode 100644 src/yadg/parsers/flowdata/drycal.py delete mode 100644 src/yadg/parsers/flowdata/main.py delete mode 100644 src/yadg/parsers/masstrace/__init__.py delete mode 100644 src/yadg/parsers/masstrace/quadstarsac.py delete mode 100644 src/yadg/parsers/meascsv/__init__.py delete mode 100644 src/yadg/parsers/qftrace/__init__.py delete mode 100644 src/yadg/parsers/qftrace/labviewcsv.py delete mode 100644 src/yadg/parsers/xpstrace/__init__.py delete mode 100644 src/yadg/parsers/xpstrace/phispe.py delete mode 100644 src/yadg/parsers/xrdtrace/__init__.py delete mode 100644 src/yadg/parsers/xrdtrace/common.py delete mode 100644 src/yadg/parsers/xrdtrace/panalyticalcsv.py delete mode 100644 src/yadg/parsers/xrdtrace/panalyticalxrdml.py delete mode 100644 src/yadg/parsers/xrdtrace/panalyticalxy.py diff --git a/src/yadg/parsers/basiccsv/__init__.py b/src/yadg/parsers/basiccsv/__init__.py deleted file mode 100644 index 1cd8ba85..00000000 --- a/src/yadg/parsers/basiccsv/__init__.py +++ /dev/null @@ -1,49 +0,0 @@ -""" -Handles the reading and processing of any tabular files, as long as the first line -contains the column headers. By default, the second should contain the units. The -columns of the table must be separated using a separator such as ``,``, ``;``, -or ``\\t``. - -.. warning:: - - Since ``yadg-5.0``, the parser handles sparse tables (i.e. tables with missing - data) by back-filling empty cells with ``np.NaNs``. - -.. note:: - - :mod:`~yadg.parsers.basiccsv` attempts to deduce the timestamp from the column - headers, using :func:`yadg.dgutils.dateutils.infer_timestamp_from`. Alternatively, - the column(s) containing the timestamp data and their format can be provided using - parameters. - -Usage -````` -Available since ``yadg-4.0``. The parser supports the following parameters: - -.. _yadg.parsers.basiccsv.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.BasicCSV - -Schema -`````` -The primary functionality of :mod:`~yadg.parsers.basiccsv` is to load the tabular -data, and determine the Unix timestamp. The headers of the tabular data are taken -`verbatim` from the file, and appear as ``data_vars`` of the :class:`xarray.Dataset`. -The single ``coord`` for the ``data_vars`` is the deduced Unix timestamp, ``uts``. - -.. code-block:: yaml - - xr.Dataset: - coords: - uts: !!float # Unix timestamp - data_vars: - {{ headers }}: (uts) # Populated from file headers - -Module Functions -```````````````` - -""" - -from .main import process - -__all__ = ["process"] diff --git a/src/yadg/parsers/basiccsv/main.py b/src/yadg/parsers/basiccsv/main.py deleted file mode 100644 index 3187b267..00000000 --- a/src/yadg/parsers/basiccsv/main.py +++ /dev/null @@ -1,230 +0,0 @@ -import logging -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -from typing import Callable, Any -from pydantic import BaseModel -import locale as lc -from ... import dgutils - -import numpy as np -import xarray as xr - -logger = logging.getLogger(__name__) - - -def process_row( - headers: list, - items: list, - datefunc: Callable, - datecolumns: list, -) -> tuple[dict, dict]: - """ - A function that processes a row of a table. - - This is the main worker function of :mod:`~yadg.parsers.basiccsv`, but is often - re-used by any other parser that needs to process tabular data. - - Parameters - ---------- - headers - A list of headers of the table. - - items - A list of values corresponding to the headers. Must be the same length as headers. - - units - A dict for looking up the units corresponding to a certain header. - - datefunc - A function that will generate ``uts`` given a list of values. - - datecolumns - Column indices that need to be passed to ``datefunc`` to generate uts. - - Returns - ------- - vals, devs - A tuple of result dictionaries, with the first element containing the values - and the second element containing the deviations of the values. - - """ - assert len(headers) == len(items), ( - f"process_row: Length mismatch between provided headers: " - f"{headers} and provided items: {items}." - ) - - vals = {} - devs = {} - columns = [column.strip() for column in items] - - # Process raw data, assign sigma and units - vals["uts"] = datefunc(*[columns[i] for i in datecolumns]) - for ci, header in enumerate(headers): - if ci in datecolumns: - continue - elif columns[ci] == "": - continue - try: - val, dev = tuple_fromstr(lc.delocalize(columns[ci])) - vals[header] = val - devs[header] = dev - except ValueError: - vals[header] = columns[ci] - - return vals, devs - - -def append_dicts( - vals: dict[str, Any], - devs: dict[str, Any], - data: dict[str, list[Any]], - meta: dict[str, list[Any]], - fn: str = None, - li: int = 0, -) -> None: - if "_fn" in meta and fn is not None: - meta["_fn"].append(str(fn)) - for k, v in vals.items(): - if k not in data: - data[k] = [None if isinstance(v, str) else np.nan] * li - data[k].append(v) - for k, v in devs.items(): - if k not in meta: - meta[k] = [np.nan] * li - meta[k].append(v) - - for k in set(data) - set(vals): - data[k].append(np.nan) - for k in set(meta) - set(devs): - if k != "_fn": - meta[k].append(np.nan) - - -def dicts_to_dataset( - data: dict[str, list[Any]], - meta: dict[str, list[Any]], - units: dict[str, str] = dict(), - fulldate: bool = True, -) -> xr.Dataset: - darrs = {} - for k, v in data.items(): - attrs = {} - u = units.get(k, None) - if u is not None: - attrs["units"] = u - if k == "uts": - continue - darrs[k] = xr.DataArray(data=v, dims=["uts"], attrs=attrs) - if k in meta and darrs[k].dtype.kind in {"i", "u", "f", "c", "m", "M"}: - err = f"{k}_std_err" - darrs[k].attrs["ancillary_variables"] = err - attrs["standard_name"] = f"{k} standard error" - darrs[err] = xr.DataArray(data=meta[k], dims=["uts"], attrs=attrs) - if "uts" in data: - coords = dict(uts=data.pop("uts")) - else: - coords = dict() - if fulldate: - attrs = dict() - else: - attrs = dict(fulldate=False) - return xr.Dataset(data_vars=darrs, coords=coords, attrs=attrs) - - -def process( - *, - fn: str, - encoding: str, - locale: str, - timezone: str, - parameters: BaseModel, - **kwargs: dict, -) -> xr.Dataset: - """ - A basic csv parser. - - This parser processes a csv file. The header of the csv file consists of one or two - lines, with the column headers in the first line and the units in the second. The - parser also attempts to parse column names to produce a timestamp, and save all other - columns as floats or strings. - - Parameters - ---------- - fn - File to process - - encoding - Encoding of ``fn``, by default "utf-8". - - timezone - A string description of the timezone. Default is "localtime". - - parameters - Parameters for :class:`~dgbowl_schemas.yadg.dataschema_5_0.step.BasicCSV`. - - Returns - ------- - :class:`xarray.Dataset` - No metadata is returned by the :mod:`~yadg.parsers.basiccsv` parser. The full - date might not be returned, eg. when only time is specified in columns. - - """ - - if hasattr(parameters, "strip"): - strip = parameters.strip - else: - strip = None - - # Load file, extract headers and get timestamping function - with open(fn, "r", encoding=encoding) as infile: - # This decode/encode is done to account for some csv files that have a BOM - # at the beginning of each line. - lines = [i.encode().decode(encoding) for i in infile.readlines()] - assert len(lines) >= 2 - headers = [h.strip().strip(strip) for h in lines[0].split(parameters.sep)] - - for hi, header in enumerate(headers): - if "/" in header: - logger.warning("Replacing '/' for '_' in header '%s'.", header) - headers[hi] = header.replace("/", "_") - - datecolumns, datefunc, fulldate = dgutils.infer_timestamp_from( - headers=headers, spec=parameters.timestamp, timezone=timezone - ) - - # Populate units - units = parameters.units - if units is None: - units = {} - _units = [c.strip().strip(strip) for c in lines[1].split(parameters.sep)] - for header in headers: - units[header] = _units.pop(0) - si = 2 - else: - for header in headers: - if header not in units: - logger.warning( - "Using implicit dimensionless unit ' ' for '%s'.", header - ) - units[header] = " " - elif units[header] == "": - units[header] = " " - si = 1 - - units = dgutils.sanitize_units(units) - - # Process rows - old_loc = lc.getlocale(category=lc.LC_NUMERIC) - lc.setlocale(lc.LC_NUMERIC, locale=locale) - data_vals = {} - meta_vals = {"_fn": []} - for li, line in enumerate(lines[si:]): - vals, devs = process_row( - headers, - [i.strip().strip(strip) for i in line.split(parameters.sep)], - datefunc, - datecolumns, - ) - append_dicts(vals, devs, data_vals, meta_vals, fn, li) - lc.setlocale(category=lc.LC_NUMERIC, locale=old_loc) - - return dicts_to_dataset(data_vals, meta_vals, units, fulldate) diff --git a/src/yadg/parsers/chromdata/__init__.py b/src/yadg/parsers/chromdata/__init__.py deleted file mode 100644 index 19f85f3a..00000000 --- a/src/yadg/parsers/chromdata/__init__.py +++ /dev/null @@ -1,93 +0,0 @@ -""" -Handles the reading of post-processed chromatography data, i.e. files containing peak -areas, concentrations, or mole fractions. - -.. note:: - - To parse trace data as present in raw chromatograms, use the - :mod:`~yadg.parsers.chromtrace` parser. - -Usage -````` -Available since ``yadg-4.2``. The parser supports the following parameters: - -.. _yadg.parsers.chromdata.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.ChromData - -.. _yadg.parsers.chromdata.formats: - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - Inficon Fusion JSON format (``fusion.json``): - see :mod:`~yadg.parsers.chromdata.fusionjson` - - Inficon Fusion zip archive (``fusion.zip``): - see :mod:`~yadg.parsers.chromdata.fusionzip` - - Inficon Fusion csv export (``fusion.csv``): - see :mod:`~yadg.parsers.chromdata.fusioncsv` - - Empa's Agilent LC csv export (``empalc.csv``): - see :mod:`~yadg.parsers.chromdata.empalccsv` - - Empa's Agilent LC excel export (``empalc.xlsx``): - see :mod:`~yadg.parsers.chromdata.empalcxlsx` - -Schema -`````` -Each file is processed into a single :class:`xarray.Dataset`, containing the following -``coords`` and ``data_vars`` (if present in the file): - -.. code-block:: yaml - - xr.Dataset: - coords: - uts: !!float # Unix timestamp - species: !!str # Species names - data_vars: - height: (uts, species) # Peak height maximum - area: (uts, species) # Integrated peak area - retention time: (uts, species) # Peak retention time - concentration: (uts, species) # Species concentration (mol/l) - xout: (uts, species) # Species mole fraction (-) - -Module Functions -```````````````` - -""" - -import xarray as xr - -from . import ( - fusionjson, - fusionzip, - fusioncsv, - empalccsv, - empalcxlsx, -) - - -def process(*, filetype: str, **kwargs: dict) -> xr.Dataset: - """ - Unified chromatographic data parser. Forwards ``kwargs`` to the worker functions - based on the supplied ``filetype``. - - Parameters - ---------- - filetype - Discriminator used to select the appropriate worker function. - - Returns - ------- - :class:`xarray.Dataset` - - """ - if filetype == "fusion.json": - return fusionjson.process(**kwargs) - elif filetype == "fusion.zip": - return fusionzip.process(**kwargs) - elif filetype == "fusion.csv": - return fusioncsv.process(**kwargs) - elif filetype == "empalc.csv": - return empalccsv.process(**kwargs) - elif filetype == "empalc.xlsx": - return empalcxlsx.process(**kwargs) diff --git a/src/yadg/parsers/chromdata/empalccsv.py b/src/yadg/parsers/chromdata/empalccsv.py deleted file mode 100644 index 2a587931..00000000 --- a/src/yadg/parsers/chromdata/empalccsv.py +++ /dev/null @@ -1,225 +0,0 @@ -""" -**empalccsv**: Processing Empa's online LC exported data (csv) --------------------------------------------------------------- - -This is a structured format produced by the export from Agilent's Online LC device -at Empa. It contains three sections: - - - metadata section, - - table containing sampling information, - - table containing analysed chromatography data. - -.. codeauthor:: Peter Kraus -""" - -import logging -import datetime -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -import xarray as xr -import numpy as np - -logger = logging.getLogger(__name__) - - -def process(*, fn: str, encoding: str, **kwargs: dict) -> xr.Dataset: - """ - Custom Agilent Online LC csv export format. - - Multiple chromatograms per file, with multiple detectors. - - Parameters - ---------- - fn - Filename to process. - - encoding - Encoding used to open the file. - - Returns - ------- - :class:`xarray.Dataset` - - """ - - with open(fn, "r", encoding=encoding, errors="ignore") as infile: - lines = infile.readlines() - - metadata = {} - while len(lines) > 0: - line = lines.pop(0) - if len(lines) == 0: - raise RuntimeError( - f"Last line of file '{fn}' read during metadata section." - ) - elif line.strip() == "": - break - elif line.strip().startswith("Sequence name"): - metadata["sequence"] = line.split(":,")[1] - elif line.strip().startswith("Description"): - metadata["description"] = line.split(":,")[1] - elif line.strip().startswith("Acquired by"): - metadata["username"] = line.split(":,")[1] - elif line.strip().startswith("Data path"): - metadata["datafile"] = line.split(":,")[1] - elif line.strip().startswith("Report version"): - metadata["version"] = int(line.split(":,")[1]) - - if metadata.get("version", None) is None: - raise RuntimeError(f"Report version in file '{fn}' was not specified.") - - samples = {} - while len(lines) > 0: - line = lines.pop(0) - if len(lines) == 0: - raise RuntimeError(f"Last line of file '{fn}' read during samples section.") - elif line.strip() == "": - break - elif "Line#" in line: - headers = [i.strip() for i in line.split(",")] - else: - data = [i.strip() for i in line.split(",")] - sample = { - "location": data[headers.index("Location")], - "injection date": data[headers.index("Injection Date")], - "acquisition": { - "method": data[headers.index("Acq Method Name")], - "version": data[headers.index("Acq Method Version")], - }, - "integration": { - "method": data[headers.index("Injection DA Method Name")], - "version": data[headers.index("Injection DA Method Version")], - }, - "offset": data[headers.index("Time offset")], - } - if sample["offset"] != "": - sn = data[headers.index("Sample Name")] - samples[sn] = sample - - svals = samples.values() - if len(svals) == 0: - raise RuntimeError( - f"No complete sample data found in file '{fn}'. " - "Have you added time offsets?" - ) - r = next(iter(svals)) - # check that acquisition and integration methods are consistent throughout file: - if any([s["acquisition"]["method"] != r["acquisition"]["method"] for s in svals]): - logger.warning("Acquisition method is inconsistent in file '%s'.", fn) - if any([s["integration"]["method"] != r["integration"]["method"] for s in svals]): - logger.warning("Integration method is inconsistent in file '%s'.", fn) - - metadata["method"] = r["acquisition"]["method"] - - species = set() - while len(lines) > 0: - line = lines.pop(0) - if len(lines) == 0: - break - elif line.strip() == "": - break - elif "Line#" in line: - headers = [i.strip() for i in line.split(",")] - else: - data = [i.strip() for i in line.split(",")] - sn = data[headers.index("Sample Name")] - cn = data[headers.index("Compound")] - species.add(cn) - - h = data[headers.index("Peak Height")] - if h != "": - if "height" not in samples[sn]: - samples[sn]["height"] = {} - samples[sn]["height"][cn] = tuple_fromstr(h) - - A = data[headers.index("Area")] - if A != "": - if "area" not in samples[sn]: - samples[sn]["area"] = {} - samples[sn]["area"][cn] = tuple_fromstr(A) - - if metadata["version"] == 2: - c = data[headers.index("Concentration")] - else: - logger.warning( - "Report version '%d' in file '%s' not understood.", - metadata["version"], - fn, - ) - c = data[headers.index("Concentration")] - if c != "": - if "concentration" not in samples[sn]: - samples[sn]["concentration"] = {} - samples[sn]["concentration"][cn] = tuple_fromstr(c) - - rt = data[headers.index("RT [min]")] - if rt != "": - if "retention time" not in samples[sn]: - samples[sn]["retention time"] = {} - samples[sn]["retention time"][cn] = tuple_fromstr(rt) - units = { - "height": None, - "area": None, - "concentration": "mmol/l", - "retention time": "min", - } - species = sorted(species) - data = [] - for k, v in samples.items(): - # Remove unnecessary parameters - del v["acquisition"] - del v["integration"] - v["sampleid"] = k - # Process offset to uts - offset = v.pop("offset") - t = None - for fmt in {"%H:%M:%S"}: - try: - t = datetime.datetime.strptime(offset, fmt) - except ValueError: - continue - if t is None: - try: - td = datetime.timedelta(minutes=float(offset)) - except ValueError: - raise RuntimeError( - f"It was not possible to parse offset '{offset}' present in file " - f"'{fn}' using known formats." - ) - else: - td = datetime.timedelta(hours=t.hour, minutes=t.minute, seconds=t.second) - point = {"uts": td.total_seconds()} - vals = {} - devs = {} - for kk in {"height", "area", "concentration", "retention time"}: - val = v.get(kk, {}) - vals[kk], devs[kk] = zip(*[val.get(cn, (np.nan, np.nan)) for cn in species]) - point["vals"] = vals - point["devs"] = devs - data.append(point) - - data_vars = {} - for kk in {"height", "area", "concentration", "retention time"}: - data_vars[kk] = ( - ["uts", "species"], - [i["vals"][kk] for i in data], - {"anciliary_variables": f"{kk}_std_err"}, - ) - data_vars[f"{kk}_std_err"] = ( - ["uts", "species"], - [i["devs"][kk] for i in data], - {"standard_name": f"{kk} standard_error"}, - ) - if units[kk] is not None: - data_vars[kk][2]["units"] = units[kk] - data_vars[f"{kk}_std_err"][2]["units"] = units[kk] - - ds = xr.Dataset( - data_vars=data_vars, - coords={ - "species": (["species"], species), - "uts": (["uts"], [i["uts"] for i in data]), - }, - attrs=metadata, - ) - - return ds diff --git a/src/yadg/parsers/chromdata/empalcxlsx.py b/src/yadg/parsers/chromdata/empalcxlsx.py deleted file mode 100644 index fdd80855..00000000 --- a/src/yadg/parsers/chromdata/empalcxlsx.py +++ /dev/null @@ -1,220 +0,0 @@ -""" -**empalcxlsx**: Processing Empa's online LC exported data (xlsx) ----------------------------------------------------------------- - -This is a structured format produced by the export from Agilent's Online LC device -at Empa. It contains three sections: - - - metadata section, - - table containing sampling information, - - table containing analysed chromatography data. - - -.. codeauthor:: Peter Kraus -""" - -import logging -import datetime -import openpyxl -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -import xarray as xr -import numpy as np - -logger = logging.getLogger(__name__) - - -def process(*, fn: str, **kwargs: dict) -> xr.Dataset: - """ - Fusion xlsx export format. - - Multiple chromatograms per file, with multiple detectors. - - Parameters - ---------- - fn - Filename to process. - - Returns - ------- - :class:`xarray.Dataset` - - """ - try: - wb = openpyxl.load_workbook( - filename=fn, - read_only=True, - ) - except TypeError: - raise RuntimeError( - f"Could not read the file '{fn}' using openpyxl. Try to open and save the " - f"file in Excel." - ) - - ws = wb["Page 1"] - metadata = {} - for row in ws.rows: - val = row[1].value if len(row) > 1 else "" - if row[0].value.startswith("Sequence name"): - metadata["sequence"] = val - elif row[0].value.startswith("Description"): - metadata["description"] = val - elif row[0].value.startswith("Acquired by"): - metadata["username"] = val - elif row[0].value.startswith("Data path"): - metadata["datafile"] = val - elif row[0].value.startswith("Report version"): - metadata["version"] = int(val) - - if metadata.get("version", None) is None: - raise RuntimeError(f"Report version in file '{fn}' was not specified.") - - ws = wb["Page 2"] - samples = {} - for row in ws.rows: - if "Line#" in row[0].value: - headers = [i.value.replace("\n", "").replace(" ", "") for i in row] - else: - data = [str(i.value) if i.value is not None else None for i in row] - sample = { - "location": data[headers.index("Location")], - "injection date": data[headers.index("InjectionDate")], - "acquisition": { - "method": data[headers.index("AcqMethodName")], - "version": data[headers.index("AcqMethodVersion")], - }, - "integration": { - "method": data[headers.index("InjectionDAMethodName")], - "version": data[headers.index("InjectionDAMethodVersion")], - }, - "offset": data[headers.index("Timeoffset")], - } - if sample["offset"] is not None: - sn = data[headers.index("SampleName")] - sn = sn.replace(" ", "").replace("\n", "") - samples[sn] = sample - - svals = samples.values() - if len(svals) == 0: - raise RuntimeError( - f"No complete sample data found in file '{fn}'. " - "Have you added time offsets?" - ) - r = next(iter(svals)) - # check that acquisition and integration methods are consistent throughout file: - if any([s["acquisition"]["method"] != r["acquisition"]["method"] for s in svals]): - logger.warning("Acquisition method is inconsistent in file '%s'.", fn) - if any([s["integration"]["method"] != r["integration"]["method"] for s in svals]): - logger.warning("Integration method is inconsistent in file '%s'.", fn) - - metadata["method"] = r["acquisition"]["method"].replace("\n", "").replace(" ", "") - - species = set() - ws = wb["Page 3"] - for row in ws.rows: - if "Line#" in str(row[0].value): - headers = [i.value.replace("\n", "").replace(" ", "") for i in row] - else: - data = [str(i.value) if i.value is not None else None for i in row] - sn = data[headers.index("SampleName")].replace("\n", "").replace(" ", "") - cn = data[headers.index("Compound")] - species.add(cn) - - h = data[headers.index("PeakHeight")] - if h is not None: - if "height" not in samples[sn]: - samples[sn]["height"] = {} - samples[sn]["height"][cn] = tuple_fromstr(h) - - A = data[headers.index("Area")] - if A is not None: - if "area" not in samples[sn]: - samples[sn]["area"] = {} - samples[sn]["area"][cn] = tuple_fromstr(A) - - if metadata["version"] == 2: - c = data[headers.index("Concentration")] - else: - logger.warning( - "Report version '%d' in file '%s' not understood.", - metadata["version"], - fn, - ) - c = data[headers.index("Concentration")] - if c is not None: - if "concentration" not in samples[sn]: - samples[sn]["concentration"] = {} - samples[sn]["concentration"][cn] = tuple_fromstr(c) - - rt = data[headers.index("RT[min]")] - if rt is not None: - if "retention time" not in samples[sn]: - samples[sn]["retention time"] = {} - samples[sn]["retention time"][cn] = tuple_fromstr(rt) - - units = { - "height": None, - "area": None, - "concentration": "mmol/l", - "retention time": "min", - } - species = sorted(species) - data = [] - for k, v in samples.items(): - # Remove unnecessary parameters - del v["acquisition"] - del v["integration"] - v["sampleid"] = k - # Process offset to uts - offset = v.pop("offset") - t = None - for fmt in {"%H:%M:%S"}: - try: - t = datetime.datetime.strptime(offset, fmt) - except ValueError: - continue - if t is None: - try: - td = datetime.timedelta(minutes=float(offset)) - except ValueError: - raise RuntimeError( - f"It was not possible to parse offset '{offset}' present in file " - f"'{fn}' using known formats." - ) - else: - td = datetime.timedelta(hours=t.hour, minutes=t.minute, seconds=t.second) - point = {"uts": td.total_seconds()} - vals = {} - devs = {} - for kk in {"height", "area", "concentration", "retention time"}: - val = v.get(kk, {}) - vals[kk], devs[kk] = zip(*[val.get(cn, (np.nan, np.nan)) for cn in species]) - point["vals"] = vals - point["devs"] = devs - data.append(point) - - data_vars = {} - for kk in {"height", "area", "concentration", "retention time"}: - data_vars[kk] = ( - ["uts", "species"], - [i["vals"][kk] for i in data], - {"anciliary_variables": f"{kk}_std_err"}, - ) - data_vars[f"{kk}_std_err"] = ( - ["uts", "species"], - [i["devs"][kk] for i in data], - {"standard_name": f"{kk} standard_error"}, - ) - if units[kk] is not None: - data_vars[kk][2]["units"] = units[kk] - data_vars[f"{kk}_std_err"][2]["units"] = units[kk] - - ds = xr.Dataset( - data_vars=data_vars, - coords={ - "species": (["species"], species), - "uts": (["uts"], [i["uts"] for i in data]), - }, - attrs=metadata, - ) - - return ds diff --git a/src/yadg/parsers/chromdata/fusioncsv.py b/src/yadg/parsers/chromdata/fusioncsv.py deleted file mode 100644 index 8977e769..00000000 --- a/src/yadg/parsers/chromdata/fusioncsv.py +++ /dev/null @@ -1,143 +0,0 @@ -""" -**fusioncsv**: Processing Inficon Fusion csv export format (csv). ------------------------------------------------------------------- - -This is a tabulated format, including the concentrations, mole fractions, peak -areas, and retention times. The latter is ignored by this parser. - -.. warning:: - - As also mentioned in the ``csv`` files themselves, the use of this filetype - is discouraged, and the ``json`` files (or a zipped archive of them) should - be parsed instead. - -.. codeauthor:: Peter Kraus -""" - -import logging -from yadg.dgutils.dateutils import str_to_uts -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -import xarray as xr -import numpy as np - -logger = logging.getLogger(__name__) - -data_names = { - "Concentration": "concentration", - "NormalizedConcentration": "xout", - "Area": "area", - "RT(s)": "retention time", -} - -data_units = { - "concentration": "%", - "xout": "%", - "area": None, - "retention time": "s", -} - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> xr.Dataset: - """ - Fusion csv export format. - - Multiple chromatograms per file, with multiple detectors. - - Parameters - ---------- - fn - Filename to process. - - encoding - Encoding used to open the file. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - :class:`xarray.Dataset` - - """ - - with open(fn, "r", encoding=encoding, errors="ignore") as infile: - lines = infile.readlines() - - data = [] - species = set() - for line in lines[3:]: - if "SampleName" in line: - header = [i.strip() for i in line.split(",")] - sni = header.index("SampleName") - method = header[0] - for ii, i in enumerate(header): - if i == "": - header[ii] = header[ii - 1] - elif "Detectors" in line: - detectors = [i.replace('"', "").strip() for i in line.split(",")] - for ii, i in enumerate(detectors): - if i == "": - detectors[ii] = detectors[ii - 1] - elif "Time" in line: - samples = [i.replace('"', "").strip() for i in line.split(",")] - time = samples[0] - if time == "Time (GMT 120 mins)": - offset = "+02:00" - elif time == "Time (GMT 60 mins)": - offset = "+01:00" - else: - logger.error("offset '%s' not understood", time) - offset = "+00:00" - elif "% RSD" in line: - continue - else: - items = line.split(",") - point = { - "concentration": {}, - "xout": {}, - "area": {}, - "retention time": {}, - "sampleid": items[sni], - "uts": str_to_uts(timestamp=f"{items[0]}{offset}", timezone=timezone), - } - for ii, i in enumerate(items[2:]): - ii += 2 - species.add(samples[ii]) - point[data_names[header[ii]]][samples[ii]] = tuple_fromstr(i) - data.append(point) - - species = sorted(species) - data_vars = {} - for kk in {"concentration", "xout", "area", "retention time"}: - vals = [] - devs = [] - for i in range(len(data)): - ivals, idevs = zip( - *[data[i][kk].get(cn, (np.nan, np.nan)) for cn in species] - ) - vals.append(ivals) - devs.append(idevs) - data_vars[kk] = ( - ["uts", "species"], - vals, - {"anciliary_variables": f"{kk}_std_err"}, - ) - data_vars[f"{kk}_std_err"] = ( - ["uts", "species"], - devs, - {"standard_name": f"{kk} standard_error"}, - ) - if data_units[kk] is not None: - data_vars[kk][2]["units"] = data_units[kk] - data_vars[f"{kk}_std_err"][2]["units"] = data_units[kk] - - ds = xr.Dataset( - data_vars=data_vars, - coords={ - "species": (["species"], species), - "uts": (["uts"], [i["uts"] for i in data]), - }, - attrs=dict(method=method), - ) - - return ds diff --git a/src/yadg/parsers/chromdata/fusionjson.py b/src/yadg/parsers/chromdata/fusionjson.py deleted file mode 100644 index 1ebe3a9c..00000000 --- a/src/yadg/parsers/chromdata/fusionjson.py +++ /dev/null @@ -1,154 +0,0 @@ -""" -**fusionjson**: Processing Inficon Fusion json data format (json). ------------------------------------------------------------------- - -This is a fairly detailed data format, including the traces, the calibration applied, -and also the integrated peak areas and other processed information, which are parsed -by this module. - -.. note :: - - To parse the raw trace data, use the :mod:`~yadg.parsers.chromtrace` module. - -.. warning :: - - The detectors in the json files are not necessarily in a consistent order. To - avoid inconsistent parsing of species which appear in both detectors, the - detector keys are sorted. **Species present in both detectors** will be - **overwritten by the last detector** in alphabetical order. - -Exposed metadata: -````````````````` - -.. code-block:: yaml - - params: - method: !!str - username: None - version: !!str - datafile: !!str - -.. codeauthor:: Peter Kraus -""" - -import json -import logging -from ...dgutils.dateutils import str_to_uts -import xarray as xr -import numpy as np - -logger = logging.getLogger(__name__) - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> xr.Dataset: - """ - Fusion json format. - - One chromatogram per file with multiple traces, and pre-analysed results. - Only a subset of the metadata is retained, including the method name, - detector names, and information about assigned peaks. - - Parameters - ---------- - fn - Filename to process. - - encoding - Encoding used to open the file. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - :class:`xarray.Dataset` - - """ - - with open(fn, "r", encoding=encoding, errors="ignore") as infile: - jsdata = json.load(infile) - metadata = { - "method": jsdata.get("methodName", "n/a"), - "version": jsdata.get("softwareVersion", {}).get("version", None), - "datafile": jsdata.get("sequence", {}).get("location", None), - } - uts = str_to_uts(timestamp=jsdata["runTimeStamp"], timezone=timezone) - - sampleid = jsdata.get("annotations", {}).get("name", None) - if sampleid is not None: - metadata["sampleid"] = sampleid - - units = { - "height": None, - "area": None, - "concentration": "%", - "xout": "%", - "retention time": "s", - } - - raw = { - "height": {}, - "area": {}, - "concentration": {}, - "xout": {}, - "retention time": {}, - } - - species = set() - - # sort detector keys to ensure alphabetic order for ID matching - for detname in sorted(jsdata["detectors"].keys()): - detdict = jsdata["detectors"][detname] - if "analysis" in detdict: - for peak in detdict["analysis"]["peaks"]: - if "label" not in peak: - continue - else: - species.add(peak["label"]) - if "height" in peak: - raw["height"][peak["label"]] = (float(peak["height"]), 1.0) - if "area" in peak: - raw["area"][peak["label"]] = (float(peak["area"]), 0.01) - if "concentration" in peak: - raw["concentration"][peak["label"]] = ( - float(peak["concentration"]), - float(peak["concentration"]) * 1e-3, - ) - if "normalizedConcentration" in peak: - raw["xout"][peak["label"]] = ( - float(peak["normalizedConcentration"]), - float(peak["normalizedConcentration"]) * 1e-3, - ) - if "top" in peak: - raw["retention time"][peak["label"]] = (float(peak["top"]), 0.01) - else: - logger.warning("'analysis' of chromatogram not present in file '%s'", fn) - - valve = jsdata.get("annotations", {}).get("valcoPosition", None) - if valve is not None: - raw["valve"] = valve - - species = sorted(species) - data_vars = {} - for k, v in units.items(): - vals, devs = zip(*[raw[k].get(s, (np.nan, np.nan)) for s in species]) - data_vars[k] = ( - ["uts", "species"], - [vals], - {"ancillary_variables": f"{k}_std_err"}, - ) - data_vars[f"{k}_std_err"] = ( - ["uts", "species"], - [devs], - {"standard_name": f"{k} stdandard_error"}, - ) - if v is not None: - data_vars[k][2]["units"] = v - data_vars[f"{k}_std_err"][2]["units"] = v - - ds = xr.Dataset( - data_vars=data_vars, - coords={"species": (["species"], species), "uts": (["uts"], [uts])}, - attrs=metadata, - ) - return ds diff --git a/src/yadg/parsers/chromdata/fusionzip.py b/src/yadg/parsers/chromdata/fusionzip.py deleted file mode 100644 index 9755f5f5..00000000 --- a/src/yadg/parsers/chromdata/fusionzip.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -**fusionzip**: Processing Inficon Fusion zipped data format (zip). ------------------------------------------------------------------- - -This is a wrapper parser which unzips the provided zip file, and then uses -the :mod:`yadg.parsers.chromdata.fusionjson` parser to parse every data -file present in the archive. - -.. codeauthor:: Peter Kraus -""" - -import zipfile -import tempfile -import os -import xarray as xr - -from .fusionjson import process as processjson - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> xr.Dataset: - """ - Fusion zip file format. - - The Fusion GC's can export their json formats as a zip archive of a folder - of jsons. This parser allows for parsing of this zip archive directly, - without the user having to unzip & move the data. - - Parameters - ---------- - fn - Filename to process. - - encoding - Not used as the file is binary. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - :class:`xarray.Dataset` - The data from the inidividual json files contained in the zip archive are - concatenated into a single :class:`xarray.Dataset`. This might fail if the metadata - in the json files differs, or if the dimensions are not easily concatenable. - - """ - - zf = zipfile.ZipFile(fn) - with tempfile.TemporaryDirectory() as tempdir: - zf.extractall(tempdir) - ds = None - for ffn in sorted(os.listdir(tempdir)): - ffn = os.path.join(tempdir, ffn) - if ffn.endswith("fusion-data"): - ids = processjson(fn=ffn, encoding=encoding, timezone=timezone) - if ds is None: - ds = ids - else: - try: - ds = xr.concat([ds, ids], dim="uts", combine_attrs="identical") - except xr.MergeError: - raise RuntimeError( - "Merging metadata from the unzipped fusion-json files has failed. " - "This might be caused by trying to parse data obtained using " - "different chromatographic methods. Please check the contents " - "of the unzipped files." - ) - return ds diff --git a/src/yadg/parsers/chromtrace/__init__.py b/src/yadg/parsers/chromtrace/__init__.py deleted file mode 100644 index 1fa4da1b..00000000 --- a/src/yadg/parsers/chromtrace/__init__.py +++ /dev/null @@ -1,116 +0,0 @@ -""" -Handles the parsing of raw traces present in chromatography files, whether the source is -a liquid chromatograph (LC) or a gas chromatograph (GC). The basic function of the -parser is to: - -#. read in the raw data and create timestamped `traces` -#. collect `metadata` such as the method information, sample ID, etc. - -:mod:`~yadg.parsers.chromtrace` loads the chromatographic data from the specified -file, determines the uncertainties of the signal (y-axis), and explicitly -populates the points in the time axis (x-axis), when required. - -Usage -````` -Available since ``yadg-4.0``. The parser supports the following parameters: - -.. _yadg.parsers.chromtrace.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.ChromTrace - -.. _yadg.parsers.chromtrace.formats: - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - EZ-Chrom ASCII export (``ezchrom.asc``): - see :mod:`~yadg.parsers.chromtrace.ezchromasc` - - Agilent Chemstation Chromtab (``agilent.csv``): - see :mod:`~yadg.parsers.chromtrace.agilentcsv` - - Agilent OpenLab binary signal (``agilent.ch``): - see :mod:`~yadg.parsers.chromtrace.agilentch` - - Agilent OpenLab data archive (``agilent.dx``): - see :mod:`~yadg.parsers.chromtrace.agilentdx` - - Inficon Fusion JSON format (``fusion.json``): - see :mod:`~yadg.parsers.chromtrace.fusionjson` - - Inficon Fusion zip archive (``fusion.zip``): - see :mod:`~yadg.parsers.chromtrace.fusionzip` - -.. _yadg.parsers.chromtrace.provides: - -Schema -`````` -The data is returned as a :class:`datatree.DataTree`, containing a :class:`xarray.Dataset` -for each trace / detector name: - -.. code-block:: yaml - - datatree.DataTree: - {{ detector_name }} !!xr.Dataset - coords: - uts: !!float # Timestamp of the chromatogram - elution_time: !!float # The time axis of the chromatogram (s) - data_vars: - signal: (uts, elution_time) # The ordinate axis of the chromatogram - -When multiple chromatograms are parsed, they are concatenated separately per detector -name. An error might occur during this concatenation if the ``elution_time`` axis changes -dimensions or coordinates between different timesteps. - -.. note:: - - To parse processed data in the raw data files, such as integrated peak areas or - concentrations, use the :mod:`~yadg.parsers.chromdata` parser instead. - -Module Functions -```````````````` - -""" - -import logging -import datatree - -from . import ( - ezchromasc, - agilentcsv, - agilentch, - agilentdx, - fusionjson, - fusionzip, -) - -logger = logging.getLogger(__name__) - - -def process( - *, - filetype: str, - **kwargs: dict, -) -> datatree.DataTree: - """ - Unified raw chromatogram parser. Forwards ``kwargs`` to the worker functions - based on the supplied ``filetype``. - - Parameters - ---------- - filetype - Discriminator used to select the appropriate worker function. - - Returns - ------- - :class:`datatree.DataTree` - - """ - if filetype == "ezchrom.asc": - return ezchromasc.process(**kwargs) - elif filetype == "agilent.csv": - return agilentcsv.process(**kwargs) - elif filetype == "agilent.dx": - return agilentdx.process(**kwargs) - elif filetype == "agilent.ch": - return agilentch.process(**kwargs) - elif filetype == "fusion.json": - return fusionjson.process(**kwargs) - elif filetype == "fusion.zip": - return fusionzip.process(**kwargs) diff --git a/src/yadg/parsers/chromtrace/agilentch.py b/src/yadg/parsers/chromtrace/agilentch.py deleted file mode 100644 index a699a2e5..00000000 --- a/src/yadg/parsers/chromtrace/agilentch.py +++ /dev/null @@ -1,161 +0,0 @@ -""" -**agilentch**: Processing Agilent OpenLab binary signal trace files (CH and IT). --------------------------------------------------------------------------------- - -Currently supports version "179" of the files. Version information is defined in -the `magic_values` (parameters & metadata) and `data_dtypes` (data) dictionaries. - -Adapted from `ImportAgilent.m `_ and -`aston `_. - -File Structure of ``.ch`` files -``````````````````````````````` -.. code :: - - 0x0000 "version magic" - 0x0108 "data offset" - 0x011a "x-axis minimum (ms)" - 0x011e "x-axis maximum (ms)" - 0x035a "sample ID" - 0x0559 "description" - 0x0758 "username" - 0x0957 "timestamp" - 0x09e5 "instrument name" - 0x09bc "inlet" - 0x0a0e "method" - 0x104c "y-axis unit" - 0x1075 "detector name" - 0x1274 "y-axis intercept" - 0x127c "y-axis slope" - -Data is stored in a consecutive set of ``i4"), # (x-1) * 512 - 0x011A: ("xmin", ">f4"), # / 60000 - 0x011E: ("xmax", ">f4"), # / 60000 - 0x1274: ("intercept", ">f8"), - 0x127C: ("slope", ">f8"), -} - -data_dtypes = {} -data_dtypes["179"] = (8, " DataTree: - """ - Agilent OpenLAB signal trace parser - - One chromatogram per file with a single trace. Binary data format. - - Parameters - ---------- - fn - Filename to process. - - encoding - Not used as the file is binary. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - class:`datatree.DataTree` - A :class:`datatree.DataTree` containing one :class:`xarray.Dataset` per detector. As - there is only one detector data in each CH file, this nesting is only for - consistency with other filetypes. - - """ - - with open(fn, "rb") as inf: - ch = inf.read() - - magic = dgutils.read_value(ch, 0, "utf-8") - pars = {} - if magic in magic_values.keys(): - for offset, (tag, dtype) in magic_values[magic].items(): - v = dgutils.read_value(ch, offset, dtype) - pars[tag] = v - pars["end"] = len(ch) - dsize, ddtype = data_dtypes[magic] - pars["start"] = (pars["offset"] - 1) * 512 - nbytes = pars["end"] - pars["start"] - assert nbytes % dsize == 0 - npoints = nbytes // dsize - - metadata = dict() - for k in ["sampleid", "username", "method"]: - metadata[k] = pars[k] - metadata["version"] = str(magic) - - xsn = np.linspace(pars["xmin"] / 1000, pars["xmax"] / 1000, num=npoints) - xss = np.ones(npoints) * xsn[0] - ysn = ( - np.frombuffer( - ch, - offset=pars["start"], - dtype=ddtype, - count=npoints, - ) - * pars["slope"] - ) - yss = np.ones(npoints) * pars["slope"] - - detector, title = pars["tracetitle"].split(",") - - uts = str_to_uts( - timestamp=pars["timestamp"], format="%d-%b-%y, %H:%M:%S", timezone=timezone - ) - - ds = xr.Dataset( - data_vars={ - "signal": ( - ["uts", "elution_time"], - [ysn], - {"units": pars["yunit"], "ancillary_variables": "signal_std_err"}, - ), - "signal_std_err": ( - ["uts", "elution_time"], - [yss], - {"units": pars["yunit"], "standard_name": "signal standard_error"}, - ), - "elution_time_std_err": ( - ["elution_time"], - xss, - {"units": "s", "standard_name": "elution_time standard_error"}, - ), - }, - coords={ - "elution_time": ( - ["elution_time"], - xsn, - {"units": "s", "ancillary_variables": "elution_time_std_err"}, - ), - "uts": (["uts"], [uts]), - }, - attrs={"title": title}, - ) - dt = DataTree.from_dict({detector: ds}) - dt.attrs = metadata - return dt diff --git a/src/yadg/parsers/chromtrace/agilentcsv.py b/src/yadg/parsers/chromtrace/agilentcsv.py deleted file mode 100644 index 62385534..00000000 --- a/src/yadg/parsers/chromtrace/agilentcsv.py +++ /dev/null @@ -1,180 +0,0 @@ -""" -**agilentcsv**: Processing Agilent Chemstation Chromtab tabulated data files (csv). ------------------------------------------------------------------------------------ - -This file format may include multiple timesteps consisting of several traces each in a -single CSV file. It contains a header section for each timestep, followed by a detector -name, and a sequence of "X, Y" datapoints, which are stored as ``elution_time`` and -``signal``. - -.. warning :: - - It is not guaranteed that the X-axis of the chromatogram (i.e. ``elution_time``) is - consistent between the timesteps of the same trace. The traces are expanded to the - length of the longest trace, and the shorter traces are padded with ``NaNs``. - -.. warning :: - - Unfortunately, the chromatographic ``method`` is not exposed in this file format. - -.. codeauthor:: Peter Kraus -""" - -import numpy as np -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -from yadg.dgutils.dateutils import str_to_uts -import xarray as xr -from datatree import DataTree - - -def _process_headers(headers: list, columns: list, timezone: str) -> dict: - res = {} - assert len(headers) == len( - columns - ), "chromtab: The number of headers and columns do not match." - assert "Date Acquired" in headers, "chromtab: Cannot infer date." - res["uts"] = str_to_uts( - timestamp=columns[headers.index("Date Acquired")].strip(), - format="%d %b %Y %H:%M", - timezone=timezone, - ) - fn = "" - if "Path" in headers: - fn += columns[headers.index("Path")] - if "File" in headers: - fn += columns[headers.index("File")] - res["datafile"] = fn - if "Sample" in headers: - res["sampleid"] = columns[headers.index("Sample")] - return res - - -def _to_trace(tx, ty): - tvals, tders = [x for x in zip(*tx)] - yvals, yders = [x for x in zip(*ty)] - trace = { - "tvals": np.array(tvals) * 60, - "tdevs": np.array(tders) * 60, - "yvals": list(yvals), - "ydevs": list(yders), - } - return trace - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> DataTree: - """ - Agilent Chemstation CSV (Chromtab) file parser - - Each file may contain multiple chromatograms per file with multiple traces. Each - chromatogram starts with a header section, and is followed by each trace, which - includes a header line and x,y-data. - - Parameters - ---------- - fn - Filename to process. - - encoding - Encoding used to open the file. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - class:`datatree.DataTree` - A :class:`datatree.DataTree` containing one :class:`xarray.Dataset` per detector. As - When multiple timesteps are present in the file, the traces of each detector are - expanded to match the longest trace, and collated along the ``uts``-dimension. - """ - - with open(fn, "r", encoding=encoding, errors="ignore") as infile: - lines = infile.readlines() - metadata = {} - uts = [] - tx = [] - ty = [] - detname = None - tstep = dict() - data = [] - traces = set() - maxlen = dict() - for line in lines: - parts = line.strip().split(",") - if len(parts) > 2: - if '"Date Acquired"' in parts: - if tx != [] and ty != [] and detname is not None: - trace = _to_trace(tx, ty) - tstep[detname] = trace - maxlen[detname] = max(maxlen.get(detname, 0), len(trace["tvals"])) - tx = [] - ty = [] - if len(tstep) > 0: - data.append(tstep) - tstep = dict() - headers = [p.replace('"', "") for p in parts] - else: - columns = [p.replace('"', "") for p in parts] - ret = _process_headers(headers, columns, timezone) - uts.append(ret.pop("uts")) - metadata.update(ret) - elif len(parts) == 1: - if tx != [] and ty != [] and detname is not None: - trace = _to_trace(tx, ty) - tstep[detname] = trace - maxlen[detname] = max(maxlen.get(detname, 0), len(trace["tvals"])) - tx = [] - ty = [] - detname = parts[0].replace('"', "").split("\\")[-1] - traces.add(detname) - elif len(parts) == 2: - x, y = [tuple_fromstr(i) for i in parts] - tx.append(x) - ty.append(y) - trace = _to_trace(tx, ty) - tstep[detname] = trace - maxlen[detname] = max(maxlen.get(detname, 0), len(trace["tvals"])) - data.append(tstep) - - traces = sorted(traces) - vals = {} - for tr in traces: - dsets = [] - for ti, ts in enumerate(data): - thislen = len(ts[tr]["tvals"]) - fvals = {} - for k in {"yvals", "ydevs", "tvals", "tdevs"}: - fvals[k] = np.ones(maxlen[tr]) * np.nan - fvals[k][:thislen] = ts[tr][k] - ds = xr.Dataset( - data_vars={ - "signal": ( - ["elution_time"], - fvals["yvals"], - {"ancillary_variables": "signal_std_err"}, - ), - "signal_std_err": ( - ["elution_time"], - fvals["ydevs"], - {"standard_name": "signal standard_error"}, - ), - "elution_time": ( - ["_"], - fvals["tvals"], - {"units": "s", "ancillary_variables": "elution_time_std_err"}, - ), - "elution_time_std_err": ( - ["elution_time"], - fvals["tdevs"], - {"units": "s", "standard_name": "elution_time standard_error"}, - ), - }, - coords={}, - attrs={}, - ) - ds["uts"] = [uts[ti]] - dsets.append(ds) - vals[tr] = xr.concat(dsets, dim="uts") - dt = DataTree.from_dict(vals) - dt.attrs = metadata - return dt diff --git a/src/yadg/parsers/chromtrace/agilentdx.py b/src/yadg/parsers/chromtrace/agilentdx.py deleted file mode 100644 index f38dbc31..00000000 --- a/src/yadg/parsers/chromtrace/agilentdx.py +++ /dev/null @@ -1,87 +0,0 @@ -""" -**agilentch**: Processing Agilent OpenLab data archive files (DX). ------------------------------------------------------------------- - -This is a wrapper parser which unzips the provided DX file, and then uses the -:mod:`yadg.parsers.chromtrace.agilentch` parser to parse every CH file present in -the archive. The IT files in the archive are currently ignored. - -In addition to the metadata exposed by the CH parser, the ``datafile`` entry -is populated with the corresponding name of the CH file. The ``fn`` entry in each -timestep contains the parent DX file. - -.. note:: - - Currently the timesteps from multiple CH files (if present) are appended in the - timesteps array without any further sorting. - -.. codeauthor:: Peter Kraus -""" - -import zipfile -import tempfile -import os -from .agilentch import process as processch -from datatree import DataTree -import xarray as xr - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> DataTree: - """ - Agilent OpenLab DX archive parser. - - This is a simple wrapper around the Agilent OpenLab signal trace parser in - :mod:`yadg.parsers.chromtrace.agilentch`. This wrapper first un-zips the DX - file into a temporary directory, and then processess all CH files found - within the archive, concatenating timesteps from multiple files. - - Parameters - ---------- - fn - Filename to process. - - encoding - Not used as the file is binary. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - class:`datatree.DataTree` - A :class:`datatree.DataTree` containing one :class:`xarray.Dataset` per detector. If - multiple timesteps are found in the zip archive, the :class:`datatree.DataTrees` - are collated along the ``uts`` dimension. - - """ - - zf = zipfile.ZipFile(fn) - with tempfile.TemporaryDirectory() as tempdir: - zf.extractall(tempdir) - dt = None - for ffn in os.listdir(tempdir): - if ffn.endswith("CH"): - path = os.path.join(tempdir, ffn) - fdt = processch(fn=path, encoding=encoding, timezone=timezone) - if dt is None: - dt = fdt - elif isinstance(dt, DataTree): - for k, v in fdt.items(): - if k in dt: # pylint: disable=E1135 - try: - newv = xr.concat( - [dt[k].ds, v.ds], # pylint: disable=E1136 - dim="uts", - combine_attrs="identical", - ) - except xr.MergeError: - raise RuntimeError( - "Merging metadata from the unzipped agilent-ch files has failed. " - "This is a bug. Please open an issue on GitHub." - ) - else: - newv = v.ds - dt[k] = DataTree(newv) # pylint: disable=E1137 - else: - raise RuntimeError("We should not get here.") - return dt diff --git a/src/yadg/parsers/chromtrace/ezchromasc.py b/src/yadg/parsers/chromtrace/ezchromasc.py deleted file mode 100644 index 72d102d2..00000000 --- a/src/yadg/parsers/chromtrace/ezchromasc.py +++ /dev/null @@ -1,169 +0,0 @@ -""" -**ezchromasc**: Processing EZ-Chrom ASCII export files (dat.asc). ------------------------------------------------------------------ - -This file format includes one timestep with multiple traces in each ASCII file. It -contains a header section, and a sequence of Y datapoints (``signal``) for each detector. -The X-axis (``elution_time``) is assumed to be uniform between traces, and its units have -to be deduced from the header. - -.. codeauthor:: Peter Kraus -""" - -import numpy as np -import logging -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -from yadg.dgutils.dateutils import str_to_uts -import xarray as xr -from datatree import DataTree - -logger = logging.getLogger(__name__) - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> DataTree: - """ - EZ-Chrome ASCII export file parser. - - One chromatogram per file with multiple traces. A header section is followed by - y-values for each trace. x-values have to be deduced using number of points, - frequency, and x-multiplier. Method name is available, but detector names are not. - They are assigned their numerical index in the file. - - Parameters - ---------- - fn - Filename to process. - - encoding - Encoding used to open the file. - - timezone - Timezone information. This should be ``"localtime"``. - - - Returns - ------- - class:`datatree.DataTree` - A :class:`datatree.DataTree` containing one :class:`xarray.Dataset` per detector. - - """ - - with open(fn, "r", encoding=encoding, errors="ignore") as infile: - lines = infile.readlines() - metadata = {} - data = {} - - for line in lines: - for key in ["Version", "Method", "User Name"]: - if line.startswith(key): - k = key.lower().replace(" ", "") - metadata[k] = line.split(f"{key}:")[1].strip() - for key in ["Sample ID"]: # , "Data File"]: - if line.startswith(key): - k = key.lower().replace(" ", "") - metadata[k] = line.split(f"{key}:")[1].strip() - if line.startswith("Acquisition Date and Time:"): - uts = str_to_uts( - timestamp=line.split("Time:")[1].strip(), - format="%m/%d/%Y %I:%M:%S %p", - timezone=timezone, - ) - if line.startswith("Sampling Rate:"): - assert ( - "Hz" in line - ), f"datasc: Incorrect units for rate in file {fn}: {line}" - parts = line.split("\t") - samplerates = [float(each.strip()) for each in parts[1:-1]] - if line.startswith("Total Data Points:"): - assert ( - "Pts." in line - ), f"datasc: Incorrect units for number of points in file {fn}: {line}" - parts = line.split("\t") - npoints = [int(each.strip()) for each in parts[1:-1]] - if line.startswith("X Axis Title:"): - parts = line.split("\t") - xunits = [each.strip() for each in parts[1:]] - if line.startswith("Y Axis Title:"): - parts = line.split("\t") - yunits = [each.strip() for each in parts[1:]] - if "25 V" in yunits: - logger.warning("Implicit conversion of y-axis unit from '25 V' to 'V'.") - yunits = [i.replace("25 V", "V") for i in yunits] - if line.startswith("X Axis Multiplier:"): - parts = line.split("\t") - xmuls = [float(each.strip()) for each in parts[1:]] - if line.startswith("Y Axis Multiplier:"): - parts = line.split("\t") - ymuls = [float(each.strip()) for each in parts[1:]] - if ":" not in line: - si = lines.index(line) - break - assert ( - len(samplerates) - == len(npoints) - == len(xunits) - == len(yunits) - == len(xmuls) - == len(ymuls) - ), f"datasc: Inconsistent number of traces in {fn}." - - data = {} - units = {} - for ti, npts in enumerate(npoints): - assert ( - xunits[ti] == "Minutes" - ), f"datasc: X units label of trace {ti} in {fn} was not understood." - dt = 60 - xmul = xmuls[ti] * dt / samplerates[ti] - ymul = ymuls[ti] - xsn = np.arange(npts) * xmul - xss = np.ones(npts) * xmul - ysn, yss = zip(*[tuple_fromstr(li) for li in lines[si : si + npts]]) - si += npts - data[f"{ti}"] = { - "t": (xsn, xss), - "y": (np.array(ysn) * ymul, np.array(yss) * ymul), - } - units[f"{ti}"] = {"t": "s", "y": yunits[ti]} - - traces = sorted(data.keys()) - vals = {} - for ti in traces: - fvals = xr.Dataset( - data_vars={ - "signal": ( - ["uts", "elution_time"], - [data[ti]["y"][0]], - {"units": units[ti]["y"], "ancillary_variables": "signal_std_err"}, - ), - "signal_std_err": ( - ["uts", "elution_time"], - [data[ti]["y"][1]], - {"units": units[ti]["y"], "standard_name": "signal standard_error"}, - ), - "elution_time_std_err": ( - ["elution_time"], - data[ti]["t"][1], - { - "units": units[ti]["t"], - "standard_name": "elution_time standard_error", - }, - ), - }, - coords={ - "elution_time": ( - ["elution_time"], - data[ti]["t"][0], - { - "units": units[ti]["t"], - "ancillary_variables": "elution_time_std_err", - }, - ), - "uts": (["uts"], [uts]), - }, - attrs={}, - ) - vals[ti] = fvals - dt = DataTree.from_dict(vals) - dt.attrs = metadata - return dt diff --git a/src/yadg/parsers/chromtrace/fusionjson.py b/src/yadg/parsers/chromtrace/fusionjson.py deleted file mode 100644 index 2bd98264..00000000 --- a/src/yadg/parsers/chromtrace/fusionjson.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -**fusionjson**: Processing Inficon Fusion json data format (json). ------------------------------------------------------------------- - -This is a fairly detailed data format, including the traces, the calibration applied, -and also the integrated peak areas. If the peak areas are present, this is returned -in the list of timesteps as a ``"peaks"`` entry. - -Exposed metadata: -````````````````` - -.. code-block:: yaml - - method: !!str - sampleid: !!str - version: !!str - datafile: !!str - -.. codeauthor:: Peter Kraus -""" - -import json -import numpy as np -from ...dgutils.dateutils import str_to_uts -import xarray as xr -from datatree import DataTree - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> DataTree: - """ - Fusion json format. - - One chromatogram per file with multiple traces, and integrated peak areas. - - .. warning:: - - To parse the integrated data present in these files, use the - :mod:`~yadg.parsers.chromdata` parser. - - Only a subset of the metadata is retained, including the method name, - detector names, and information about assigned peaks. - - Parameters - ---------- - fn - Filename to process. - - encoding - Encoding used to open the file. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - class:`datatree.DataTree` - A :class:`datatree.DataTree` containing one :class:`xarray.Dataset` per detector. - - """ - - with open(fn, "r", encoding=encoding, errors="ignore") as infile: - jsdata = json.load(infile) - metadata = { - "method": jsdata.get("methodName", "n/a"), - "sampleid": jsdata.get("annotations", {}).get("name", None), - "version": jsdata.get("softwareVersion", {}).get("version", None), - "datafile": jsdata.get("sequence", {}).get("location", None), - } - uts = str_to_uts(timestamp=jsdata["runTimeStamp"], timezone=timezone) - - # sort detector keys to ensure alphabetic order for ID matching - traces = sorted(jsdata["detectors"].keys()) - vals = {} - for detname in traces: - detdict = jsdata["detectors"][detname] - fvals = xr.Dataset( - data_vars={ - "signal": ( - ["uts", "elution_time"], - [detdict["values"]], - {"ancillary_variables": "signal_std_err"}, - ), - "signal_std_err": ( - ["uts", "elution_time"], - [np.ones(detdict["nValuesExpected"])], - {"standard_name": "signal standard_error"}, - ), - "elution_time_std_err": ( - ["elution_time"], - np.ones(detdict["nValuesExpected"]) / detdict["nValuesPerSecond"], - {"units": "s", "standard_name": "elution_time standard_error"}, - ), - }, - coords={ - "elution_time": ( - ["elution_time"], - np.arange(detdict["nValuesExpected"]) / detdict["nValuesPerSecond"], - {"units": "s", "ancillary_variables": "elution_time_std_err"}, - ), - "uts": (["uts"], [uts]), - }, - attrs={}, - ) - valve = jsdata.get("annotations", {}).get("valcoPosition", None) - if valve is not None: - fvals["valve"] = valve - vals[detname] = fvals - - dt = DataTree.from_dict(vals) - dt.attrs = metadata - return dt diff --git a/src/yadg/parsers/chromtrace/fusionzip.py b/src/yadg/parsers/chromtrace/fusionzip.py deleted file mode 100644 index 5087f227..00000000 --- a/src/yadg/parsers/chromtrace/fusionzip.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -**fusionzip**: Processing Inficon Fusion zipped data format (zip). ------------------------------------------------------------------- - -This is a wrapper parser which unzips the provided zip file, and then uses -the :mod:`yadg.parsers.chromtrace.fusionjson` parser to parse every data -file present in the archive. - -Exposed metadata: -````````````````` - -.. code-block:: yaml - - method: !!str - sampleid: !!str - version: !!str - datafile: !!str - -.. codeauthor:: Peter Kraus -""" - -import zipfile -import tempfile -import os -import xarray as xr -from datatree import DataTree - -from .fusionjson import process as processjson - - -def process(*, fn: str, encoding: str, timezone: str, **kwargs: dict) -> DataTree: - """ - Fusion zip file format. - - The Fusion GC's can export their json formats as a zip archive of a folder - of jsons. This parser allows for parsing of this zip archive directly, - without the user having to unzip & move the data. - - Parameters - ---------- - fn - Filename to process. - - encoding - Not used as the file is binary. - - timezone - Timezone information. This should be ``"localtime"``. - - Returns - ------- - class:`datatree.DataTree` - A :class:`datatree.DataTree` containing one :class:`xarray.Dataset` per detector. If - multiple timesteps are found in the zip archive, the :class:`datatree.DataTrees` - are collated along the ``uts`` dimension. - - """ - - zf = zipfile.ZipFile(fn) - with tempfile.TemporaryDirectory() as tempdir: - zf.extractall(tempdir) - dt = None - for ffn in sorted(os.listdir(tempdir)): - path = os.path.join(tempdir, ffn) - if ffn.endswith("fusion-data"): - fdt = processjson(fn=path, encoding=encoding, timezone=timezone) - if dt is None: - dt = fdt - elif isinstance(dt, DataTree): - for k, v in fdt.items(): - if k in dt: # pylint: disable=E1135 - newv = xr.concat( - [dt[k].ds, v.ds], # pylint: disable=E1136 - dim="uts", - combine_attrs="identical", - ) - else: - newv = v.ds - dt[k] = DataTree(newv) # pylint: disable=E1137 - else: - raise RuntimeError("We should not get here.") - return dt diff --git a/src/yadg/parsers/dummy/__init__.py b/src/yadg/parsers/dummy/__init__.py deleted file mode 100644 index d16f5f9d..00000000 --- a/src/yadg/parsers/dummy/__init__.py +++ /dev/null @@ -1,98 +0,0 @@ -""" -This is a dummy parser, used mainly for testing of the :mod:`yadg` and :mod:`tomato` -packages. It provides no real functionality. - -Usage -````` -Available since ``yadg-4.0``. The parser supports the following parameters: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.Dummy - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - tomato's JSON file (``tomato.json``) - -Schema -`````` -The output schema is only defined for the ``tomato.json`` filetype. - -.. code-block:: yaml - - xr.Dataset: - coords: - uts: !!float - data_vars: - {{ entries }} (uts) # Elements present in the "data" entry - -The value of every element of ``data`` is assigned a deviation of 0.0. - -Module Functions -```````````````` - -""" - -from pydantic import BaseModel -import json -from ... import dgutils -from ..basiccsv.main import append_dicts, dicts_to_dataset -from datatree import DataTree - - -def process( - *, - fn: str, - filetype: str, - parameters: BaseModel, - **kwargs: dict, -) -> DataTree: - """ - A dummy parser. - - This parser simply returns the current time, the filename provided, and any - ``kwargs`` passed. - - In case the provided ``filetype`` is a ``tomato.json`` file, this is a json - data file from the :mod:`tomato` package, which should contain a :class:`list` - of ``{"value": float, "time": float}`` datapoints in its ``data`` entry. - - Parameters - ---------- - fn - Filename to process - - filetype - Accepts ``tomato.json`` as an optional "dummy instrument" filetype from - :mod:`tomato`. - - parameters - Parameters for :class:`~dgbowl_schemas.yadg.dataschema_5_0.step.Dummy`. - - Returns - ------- - :class:`xarray.Dataset` - - """ - if filetype == "tomato.json": - with open(fn, "r") as inf: - jsdata = json.load(inf) - - data_vals = {} - meta_vals = {} - for vi, vals in enumerate(jsdata["data"]): - vals["uts"] = vals.pop("time") - devs = {} - for k, v in vals.items(): - if k not in {"time", "address", "channel"}: - devs[k] = 0.0 - append_dicts(vals, devs, data_vals, meta_vals, fn, vi) - else: - kwargs = {} if parameters is None else parameters.dict() - if "parser" in kwargs: - del kwargs["parser"] - data_vals = {k: [v] for k, v in kwargs.items()} - data_vals["uts"] = [dgutils.now()] - meta_vals = {} - - return dicts_to_dataset(data_vals, meta_vals, fulldate=False) diff --git a/src/yadg/parsers/electrochem/__init__.py b/src/yadg/parsers/electrochem/__init__.py index c08f4e27..e69de29b 100644 --- a/src/yadg/parsers/electrochem/__init__.py +++ b/src/yadg/parsers/electrochem/__init__.py @@ -1,88 +0,0 @@ -""" -This module handles the reading and processing of files containing electrochemical -data, including BioLogic's EC-Lab file formats. The basic function of the parser is to: - -#. Read in the technique data and create timesteps. -#. Collect metadata, such as the measurement settings and the loops - contained in a given file. -#. Collect data describing the technique parameter sequences. - -Usage -````` -Available since ``yadg-4.0``. The parser supports the following parameters: - -.. _yadg.parsers.electrochem.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.ElectroChem - -.. _yadg.parsers.electrochem.formats: - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - EC-Lab raw data binary file and parameter settings (``eclab.mpr``), - see :mod:`~yadg.parsers.electrochem.eclabmpr` - - EC-Lab human-readable text export of data (``eclab.mpt``), - see :mod:`~yadg.parsers.electrochem.eclabmpt` - - tomato's structured json output (``tomato.json``), - see :mod:`~yadg.parsers.electrochem.tomatojson` - -Schema -`````` -Depending on the filetype, the output :class:`xarray.Dataset` may contain multiple -derived values. However, all filetypes will report at least the following: - -.. code-block:: yaml - - xr.Dataset: - coords: - uts: !!float - data_vars: - Ewe: (uts) # Potential of the working electrode (V) - Ece: (uts) # Potential of the counter electrode (V) - I: (uts) # Applied current (A) - -In some cases, average values (i.e. ```` or ````) may be reported instead -of the instantaneous data. - -.. warning:: - - In previous versions of :mod:`yadg`, the :mod:`~yadg.parsers.electrochem` parser - optionally transposed data from impedance spectroscopy, grouping the datapoints - in each scan into a single "trace". This behaviour has been removed in ``yadg-5.0``. - -Module Functions -```````````````` - -""" - -import xarray as xr -from . import eclabmpr, eclabmpt, tomatojson - - -def process( - *, - filetype: str, - **kwargs: dict, -) -> xr.Dataset: - """ - Unified parser for electrochemistry data. Forwards ``kwargs`` to the worker functions - based on the supplied ``filetype``. - - Parameters - ---------- - filetype - Discriminator used to select the appropriate worker function. - - Returns - ------- - :class:`xarray.Dataset` - - """ - if filetype == "eclab.mpr": - return eclabmpr.process(**kwargs) - elif filetype == "eclab.mpt": - return eclabmpt.process(**kwargs) - elif filetype == "tomato.json": - return tomatojson.process(**kwargs) diff --git a/src/yadg/parsers/electrochem/eclabmpr.py b/src/yadg/parsers/electrochem/eclabmpr.py index cd48c4d3..235a8943 100644 --- a/src/yadg/parsers/electrochem/eclabmpr.py +++ b/src/yadg/parsers/electrochem/eclabmpr.py @@ -213,7 +213,7 @@ log_dtypes, extdev_dtypes, ) -from yadg.parsers.basiccsv.main import append_dicts, dicts_to_dataset +from yadg.extractors.custom.basic.csv import append_dicts, dicts_to_dataset logger = logging.getLogger(__name__) diff --git a/src/yadg/parsers/electrochem/eclabmpt.py b/src/yadg/parsers/electrochem/eclabmpt.py index 8fff9879..e57ad801 100644 --- a/src/yadg/parsers/electrochem/eclabmpt.py +++ b/src/yadg/parsers/electrochem/eclabmpt.py @@ -39,7 +39,7 @@ from ...dgutils.dateutils import str_to_uts from .eclabcommon.techniques import get_resolution, technique_params, param_from_key from .eclabcommon.mpt_columns import column_units -from yadg.parsers.basiccsv.main import append_dicts, dicts_to_dataset +from yadg.extractors.custom.basic.csv import append_dicts, dicts_to_dataset logger = logging.getLogger(__name__) diff --git a/src/yadg/parsers/electrochem/tomatojson.py b/src/yadg/parsers/electrochem/tomatojson.py deleted file mode 100644 index e33294e4..00000000 --- a/src/yadg/parsers/electrochem/tomatojson.py +++ /dev/null @@ -1,142 +0,0 @@ -""" -**tomatojson**: Processing of tomato electrochemistry outputs. --------------------------------------------------------------- - -This module parses the electrochemistry ``json`` files generated by tomato. - -.. warning:: - - This parser is brand-new in `yadg-4.1` and the interface is unstable. - -Four sections are expected in each tomato data file: - -- ``technique`` section, describing the current technique, -- ``previous`` section, containing status information of the previous file, -- ``current`` section, containing status information of the current file, -- ``data`` section, containing the timesteps. - -The reason why both ``previous`` and ``current`` are requires is that the device -status is recorded at the time of data polling, which means the values in ``current`` -might be invalid (after the run has finished) or not in sync with the ``data`` (if -a technique change happened). However, ``previous`` may not be present in the first -data file of an experiment. - -To determine the measurement errors, the values from BioLogic manual are used: for -measured voltages (:math:`E_{\\text{we}}` and :math:`E_{\\text{ce}}`) this corresponds -to a constant uncertainty of 0.004% of the applied E-range with a maximum of 75 uV, -while for currents (:math:`I`) this is a constant uncertainty of 0.0015% of the applied -I-range with a maximum of 0.76 uA. - -.. codeauthor:: Peter Kraus -""" - -import json -import logging -import xarray as xr - -logger = logging.getLogger(__name__) - -I_ranges = { - "1 A": 1e0, - "100 mA": 1e-1, - "10 mA": 1e-2, - "1 mA": 1e-3, - "100 uA": 1e-4, - "10 uA": 1e-5, - "1 uA": 1e-6, - "100 pA": 1e-7, -} - - -def process(*, fn: str, **kwargs: dict) -> xr.Dataset: - with open(fn, "r") as infile: - jsdata = json.load(infile) - - technique = jsdata["technique"] - previous = jsdata.get("previous", None) - current = jsdata["current"] - - if "uts" in technique: - uts = technique["uts"] - fulldate = True - else: - uts = 0 - fulldate = False - - uts += technique["start_time"] - - if previous is None: - meta = current - elif current["status"] == "STOP": - meta = previous - elif previous["elapsed_time"] > technique["start_time"]: - meta = previous - else: - meta = current - - I_range = I_ranges[meta["I_range"]] - E_range = meta["E_range"]["max"] - meta["E_range"]["min"] - - data_vars = { - "loop number": [], - "technique": [], - "index": [], - "time": [], - "Ewe": [], - "Ewe_std_err": [], - "Ece": [], - "Ece_std_err": [], - "I": [], - "I_std_err": [], - "cycle": [], - } - - for point in jsdata["data"]: - for k, v in point.items(): - if k == "time": - data_vars[k].append(uts + v) - elif k in {"Ewe", "Ece"}: - data_vars[k].append(v) - data_vars[f"{k}_std_err"].append(max(E_range * 0.0015 / 100, 75e-6)) - elif k in {"I"}: - data_vars[k].append(v) - data_vars[f"{k}_std_err"].append(max(I_range * 0.004 / 100, 760e-12)) - elif k in {"cycle"}: - data_vars[k].append(v) - else: - logger.cricital(f"parameter {k}: {v} not understood.") - data_vars["loop number"].append(technique["loop_number"]) - data_vars["technique"].append(technique["name"]) - data_vars["index"].append(technique["index"]) - - data_vars["cycle number"] = data_vars.pop("cycle") - uts = data_vars.pop("time") - - data_vars = {k: v for k, v in data_vars.items() if len(v) > 0} - - for k in data_vars: - if k in {"Ewe", "Ece", "I"}: - data_vars[k] = ( - ["uts"], - data_vars[k], - { - "units": "A" if k == "I" else "V", - "ancillary_variables": f"{k}_std_err", - }, - ) - elif k.endswith("_std_err"): - data_vars[k] = ( - ["uts"], - data_vars[k], - { - "units": "A" if k == "I" else "V", - "standard_name": f"{k.replace('_std_err', '')} standard_error", - }, - ) - else: - data_vars[k] = (["uts"], data_vars[k]) - - ds = xr.Dataset(data_vars, coords=dict(uts=uts)) - if not fulldate: - ds.attrs["fulldate"] = False - return ds diff --git a/src/yadg/parsers/flowdata/__init__.py b/src/yadg/parsers/flowdata/__init__.py deleted file mode 100644 index 4b9e9431..00000000 --- a/src/yadg/parsers/flowdata/__init__.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -Handles the reading and processing of flow controller or flow meter data. - -Usage -````` -Available since ``yadg-4.0``. The parser supports the following parameters: - -.. _yadg.parsers.flowdata.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.FlowData - -.. _yadg.parsers.flowdata.formats: - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - DryCal log file text output (``drycal.txt``), - see :mod:`~yadg.parsers.flowdata.drycal` - - DryCal log file tabulated output (``drycal.csv``), - see :mod:`~yadg.parsers.flowdata.drycal` - - DryCal log file document file (``drycal.rtf``), - see :mod:`~yadg.parsers.flowdata.drycal` - -.. _yadg.parsers.flowdata.provides: - -Schema -`````` -The parser is used to extract all tabular data in the input file. This parser processes -additional calibration information analogously to :mod:`~yadg.parsers.basiccsv`. - -Module Functions -```````````````` - -""" - -from .main import process - -__all__ = ["process"] diff --git a/src/yadg/parsers/flowdata/drycal.py b/src/yadg/parsers/flowdata/drycal.py deleted file mode 100644 index a23f3556..00000000 --- a/src/yadg/parsers/flowdata/drycal.py +++ /dev/null @@ -1,228 +0,0 @@ -""" -**drycal**: File parser for DryCal log files. ---------------------------------------------- - -This module includes functions for parsing converted documents (``rtf``) and -tabulated exports (``txt``, ``csv``). - -The DryCal files only contain the timestamps of the datapoints, not the date. Therefore, -the date has to be supplied either using the ``date`` argument in parameters, or is -parsed from the prefix of the filename. - -.. codeauthor:: Peter Kraus -""" - -from striprtf.striprtf import rtf_to_text -from ..basiccsv.main import process_row, append_dicts, dicts_to_dataset -from ... import dgutils -from pydantic import BaseModel -from typing import Optional -from datatree import DataTree - - -class TimeDate(BaseModel): - class TimestampSpec(BaseModel, extra="forbid"): - index: Optional[int] = None - format: Optional[str] = None - - date: Optional[TimestampSpec] = None - time: Optional[TimestampSpec] = None - - -def rtf( - fn: str, - encoding: str, - timezone: str, -) -> DataTree: - """ - RTF version of the drycal parser. - - This is intended to parse legacy drycal DOC files, which have been converted to RTF - using other means. - - Parameters - ---------- - fn - Filename to parse. - - encoding - Encoding to use for parsing ``fn``. - - calib - A calibration spec. - - Returns - ------- - (timesteps, metadata, None): tuple[list, dict, None] - A standard data - metadata - common data output tuple. - """ - with open(fn, "r", encoding=encoding) as infile: - rtf = infile.read() - lines = rtf_to_text(rtf).split("\n") - for li in range(len(lines)): - if lines[li].startswith("Sample"): - si = li - elif lines[li].startswith("1|"): - di = li - break - # Metadata processing for rtf files is in columns, not rows. - ml = [] - metadata = dict() - for line in lines[:si]: - if line.strip() != "": - items = [i.strip() for i in line.split("|")] - if len(items) > 1: - ml.append(items) - assert len(ml) == 2 and len(ml[0]) == len(ml[1]) - for i in range(len(ml[0])): - if ml[0][i] != "": - metadata[ml[0][i]] = ml[1][i] - - # Process data table - dl = [] - dl.append(" ".join(lines[si:di])) - for line in lines[di:]: - if line.strip() != "": - dl.append(line) - headers, units, data = drycal_table(dl, sep="|") - datecolumns, datefunc, _ = dgutils.infer_timestamp_from( - spec=TimeDate(time={"index": 4, "format": "%I:%M:%S %p"}), timezone=timezone - ) - - # Process rows - data_vals = {} - meta_vals = {"_fn": []} - for pi, point in enumerate(data): - vals, devs = process_row(headers[1:], point[1:], datefunc, datecolumns) - append_dicts(vals, devs, data_vals, meta_vals, fn, pi) - - return dicts_to_dataset(data_vals, meta_vals, units, False) - - -def sep( - fn: str, - sep: str, - encoding: str, - timezone: str, -) -> DataTree: - """ - Generic drycal parser, using ``sep`` as separator string. - - This is intended to parse other export formats from DryCal, such as txt and csv files. - - Parameters - ---------- - fn - Filename to parse. - - date - A unix timestamp float corresponding to the day (or other offset) to be added to - each line in the measurement table. - - sep - The separator character used to split lines in ``fn``. - - encoding - Encoding to use for parsing ``fn``. - - calib - A calibration spec. - - Returns - ------- - (timesteps, metadata, None): tuple[list, dict, None] - A standard data - metadata - common data output tuple. - """ - with open(fn, "r", encoding=encoding) as infile: - lines = infile.readlines() - for li in range(len(lines)): - if lines[li].startswith("Sample"): - si = li - elif lines[li].startswith(f"1{sep}"): - di = li - break - # Metadata processing for csv files is standard. - metadata = dict() - for line in lines[:si]: - if line.strip() != "": - items = [i.strip() for i in line.split(sep)] - if len(items) == 2: - metadata[items[0]] = items[1] - - # Process data table - dl = list() - dl.append(" ".join(lines[si:di])) - for line in lines[di:]: - if line.strip() != "": - dl.append(line) - headers, units, data = drycal_table(dl, sep=sep) - - if "AM" in data[0][-1].upper() or "PM" in data[0][-1].upper(): - fmt = "%I:%M:%S %p" - else: - fmt = "%H:%M:%S" - datecolumns, datefunc, _ = dgutils.infer_timestamp_from( - spec=TimeDate(time={"index": 4, "format": fmt}), timezone=timezone - ) - - # Process rows - data_vals = {} - meta_vals = {"_fn": []} - for pi, point in enumerate(data): - vals, devs = process_row(headers[1:], point[1:], datefunc, datecolumns) - append_dicts(vals, devs, data_vals, meta_vals, fn, pi) - - return dicts_to_dataset(data_vals, meta_vals, units, False) - - -def drycal_table(lines: list, sep: str = ",") -> tuple[list, dict, list]: - """ - DryCal table-processing function. - - Given a table with headers and units in the first line, and data in the following - lines, this function returns the headers, units, and data extracted from the table. - The returned values are always of :class:`(str)` type, any post-processing is done - in the calling routine. - - Parameters - ---------- - lines - A list containing the lines to be parsed - - sep - The separator string used to split each line into individual items - - Returns - ------- - (headers, units, data): tuple[list, dict, list] - A tuple of a list of the stripped headers, dictionary of header-unit key-value - pairs, and a list of lists containing the rows of the table. - """ - items = [i.strip() for i in lines[0].split(sep)] - headers = [] - units = {} - data = [] - trim = False - for item in items: - for rs in [". ", " "]: - parts = item.split(rs) - if len(parts) == 2: - break - headers.append(parts[0]) - if len(parts) == 2: - units[parts[0]] = parts[1] - else: - units[parts[0]] = " " - if items[-1] == "": - trim = True - headers = headers[:-1] - for line in lines[1:]: - cols = line.split(sep) - assert len(cols) == len(items) - if trim: - data.append(cols[:-1]) - else: - data.append(cols) - - units = dgutils.sanitize_units(units) - return headers, units, data diff --git a/src/yadg/parsers/flowdata/main.py b/src/yadg/parsers/flowdata/main.py deleted file mode 100644 index a455287c..00000000 --- a/src/yadg/parsers/flowdata/main.py +++ /dev/null @@ -1,66 +0,0 @@ -import logging -import xarray as xr -from . import drycal - -logger = logging.getLogger(__name__) - - -def process( - *, - fn: str, - filetype: str, - encoding: str, - timezone: str, - **kwargs: dict, -) -> xr.Dataset: - """ - Flow meter data processor - - This parser processes flow meter data. - - Parameters - ---------- - fn - File to process - - encoding - Encoding of ``fn``, by default "utf-8". - - timezone - A string description of the timezone. Default is "localtime". - - parameters - Parameters for :class:`~dgbowl_schemas.yadg.dataschema_5_0.step.FlowData`. - - Returns - ------- - :class:`xarray.Dataset` - - """ - - if filetype.startswith("drycal"): - - if filetype.endswith(".rtf") or fn.endswith("rtf"): - vals = drycal.rtf(fn, encoding, timezone) - elif filetype.endswith(".csv") or fn.endswith("csv"): - vals = drycal.sep(fn, ",", encoding, timezone) - elif filetype.endswith(".txt") or fn.endswith("txt"): - vals = drycal.sep(fn, "\t", encoding, timezone) - - # check timestamps are increasing: - warn = True - ndays = 0 - utslist = vals.uts.values - for i in range(1, vals.uts.size): - if utslist[i] < utslist[i - 1]: - if warn: - logger.warning("DryCal log crossing day boundary. Adding offset.") - warn = False - uts = utslist[i] + ndays * 86400 - while uts < utslist[i - 1]: - ndays += 1 - uts = utslist[i] + ndays * 86400 - utslist[i] = uts - vals["uts"] = xr.DataArray(data=utslist, dims=["uts"]) - vals.attrs["fulldate"] = False - return vals diff --git a/src/yadg/parsers/masstrace/__init__.py b/src/yadg/parsers/masstrace/__init__.py deleted file mode 100644 index cd4556ce..00000000 --- a/src/yadg/parsers/masstrace/__init__.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Handles the reading and processing of mass spectrometry files. The basic function of the -parser is to: - -#. read in the raw data and create timestamped traces with one :class:`xarray.Dataset` per trace -#. collect `metadata` such as the software version, author, etc. - -Usage -````` -Select :mod:`~yadg.parsers.masstrace` by supplying it to the ``parser`` keyword -in the `dataschema`. The parser supports the following parameters: - -.. _yadg.parsers.masstrace.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.MassTrace - -.. _yadg.parsers.masstrace.formats: - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - Pfeiffer Quadstar 32-bit scan analog data (``quadstar.sac``), - see :mod:`~yadg.parsers.masstrace.quadstarsac` - -.. _yadg.parsers.masstrace.provides: - -Schema -`````` -The raw data, loaded from the supplied files, is stored using the following format: - -.. code-block:: yaml - - datatree.DataTree: - {{ detector_name }} !!xr.Dataset - coords: - uts: !!float - mass_to_charge: !!float # m/z (amu) - data_vars: - y: (uts, mass_to_charge) # Detected signal (counts) - -The uncertainties in ``mass_to_charge`` are taken as the step-width of -the linearly spaced mass values. - -The uncertainties in of ``y`` are the largest value between: - -#. The quantization error from the ADC, its resolution assumed to be 32 - bit. Dividing F.S.R. by ``2 ** 32`` gives an error in the order of - magnitude of the smallest data value in ``y``. -#. The contribution from neighboring masses. In the operating manual of - the QMS 200 (see 2.8 QMS 200 F & 2.9 QMS 200 M), a maximum - contribution from the neighboring mass of 50 ppm is noted. - -.. note:: - - The data in ``y`` may contain ``NaN`` s. The measured ion - count/current value will occasionally exceed the specified detector - F.S.R. (e.g. 1e-9), and will then flip directly to the maximum value - of a float32. These values are set to ``float("NaN")``. - -Module Functions -```````````````` - -""" - -import datatree -from . import quadstarsac - - -def process( - *, - filetype: str, - **kwargs: dict, -) -> datatree.DataTree: - """ - Unified mass spectrometry data parser.Forwards ``kwargs`` to the worker functions - based on the supplied ``filetype``. - - Parameters - ---------- - filetype - Discriminator used to select the appropriate worker function. - - Returns - ------- - :class:`datatree.DataTree` - - """ - if filetype == "quadstar.sac": - return quadstarsac.process(**kwargs) diff --git a/src/yadg/parsers/masstrace/quadstarsac.py b/src/yadg/parsers/masstrace/quadstarsac.py deleted file mode 100644 index 774deedd..00000000 --- a/src/yadg/parsers/masstrace/quadstarsac.py +++ /dev/null @@ -1,265 +0,0 @@ -""" -**quadstarsac**: Processing of Quadstar 32-bit scan analog data. ----------------------------------------------------------------- - -The `sac2dat.c code from Dr. Moritz Bubek `_ -was a really useful stepping stone for this Python file parser. - -Pretty much the entire file format has been reverse engineered. There -are still one or two unknown fields. - -File Structure of `.sac` Files -`````````````````````````````` - -.. code-block:: python - - 0x00 "data_index" - 0x02 "software_id" - 0x06 "version_major" - 0x07 "version_minor" - 0x08 "second" - 0x09 "minute" - 0x0a "hour" - 0x0b "day" - 0x0c "month" - 0x0d "year" - 0x0f "author" - 0x64 "n_timesteps" - 0x68 "n_traces" - 0x6a "timestep_length" - ... - # Not sure what sits from 0x6e to 0xc2. - ... - 0xc2 "uts_base_s" - 0xc6 "uts_base_ms" - # Trace header. Read these 9 bytes for every trace (n_traces). - 0xc8 + (n * 0x09) "type" - 0xc9 + (n * 0x09) "info_position" - 0xcd + (n * 0x09) "data_position" - ... - # Trace info. Read these 137 bytes for every trace where type != 0x11. - info_position + 0x00 "data_format" - info_position + 0x02 "y_title" - info_position + 0x0f "y_unit" - info_position + 0x1d "x_title" - info_position + 0x2a "x_unit" - info_position + 0x38 "comment" - info_position + 0x7a "first_mass" - info_position + 0x7e "scan_width" - info_position + 0x80 "values_per_mass" - info_position + 0x81 "zoom_start" - info_position + 0x85 "zoom_end" - ... - # UTS offset. Read these 6 bytes for every timestep (n_timesteps). - 0xc2 + (n * timestep_length) "uts_offset_s" - 0xc6 + (n * timestep_length) "uts_offset_ms" - # Read everything remaining below for every timestep and every trace - # where type != 0x11. - data_position + (n * timestep_length) + 0x00 "n_datapoints" - data_position + (n * timestep_length) + 0x04 "data_range" - # Datapoints. Read these 4 bytes (scan_width * values_per_mass) - # times. - data_position + (n * timestep_length) + 0x06 "datapoints" - ... - -.. codeauthor:: Nicolas Vetsch -""" - -import numpy as np -from datatree import DataTree -import xarray as xr -import yadg.dgutils as dgutils - -# The general header at the top of .sac files. -general_header_dtype = np.dtype( - [ - ("data_index", " int: - """Finds the data position of the first scan containing any data.""" - for header in scan_headers: - if header["type"] != 0x11: - continue - return header["data_position"] - - -def process( - *, - fn: str, - **kwargs: dict, -) -> DataTree: - """Processes a Quadstar 32-bit analog data .sac file. - - Parameters - ---------- - fn - The file containing the trace(s) to parse. - - Returns - ------- - :class:`datatree.DataTree` - A :class:`datatree.DataTree` containing one :class:`xarray.Dataset` per mass trace. - The traces in the Quadstar ``.sac`` files are not named, therefore their index - is used as the :class:`xarray.Dataset` name. - - """ - with open(fn, "rb") as sac_file: - sac = sac_file.read() - meta = dgutils.read_value(sac, 0x0000, general_header_dtype) - uts_base_s = dgutils.read_value(sac, 0x00C2, " fsr] = np.nan - # TODO: Determine the correct accuracy from fsr. The 32bit - # ADC is a guess that seems to put the error in the correct - # order of magnitude. - sigma_adc = np.ones(len(yvals)) * fsr / 2**32 - # Determine error based on contributions of neighboring masses. - # The upper limit on contribution from peak at next integer mass is 50ppm. - prev_neighbor = np.roll(yvals, ndm) - prev_neighbor[:ndm] = np.nan - next_neighbor = np.roll(yvals, -ndm) - next_neighbor[-ndm:] = np.nan - sigma_neighbor = np.fmax(prev_neighbor, next_neighbor) * 50e-6 - # Pick the maximum error here - ydevs = np.fmax(sigma_adc, sigma_neighbor) - ds = xr.Dataset( - data_vars={ - "fsr": fsr, - "mass_to_charge_std_err": ( - ["mass_to_charge"], - mdevs, - { - "units": info["x_unit"], - "standard_name": "mass_to_charge standard_error", - }, - ), - "y": ( - ["uts", "mass_to_charge"], - [yvals], - {"units": info["y_unit"], "ancilliary_variables": "y_std_err"}, - ), - "y_std_err": ( - ["uts", "mass_to_charge"], - [ydevs], - {"units": info["y_unit"], "standard_name": "y standard_error"}, - ), - }, - coords={ - "mass_to_charge": ( - ["mass_to_charge"], - mvals, - { - "units": info["x_unit"], - "ancillary_variables": "mass_to_charge_std_err", - }, - ), - "uts": (["uts"], [uts_timestamp]), - }, - attrs=info, - ) - if f"{ti}" not in traces: - traces[f"{ti}"] = ds - else: - try: - traces[f"{ti}"] = xr.concat( - [traces[f"{ti}"], ds], dim="uts", combine_attrs="identical" - ) - except xr.MergeError: - raise RuntimeError( - "Merging metadata from the individual traces has failed. " - "This is a bug. Please open an issue on GitHub." - ) - - ret = DataTree.from_dict(traces) - ret.attrs = meta - return ret diff --git a/src/yadg/parsers/meascsv/__init__.py b/src/yadg/parsers/meascsv/__init__.py deleted file mode 100644 index e2b72426..00000000 --- a/src/yadg/parsers/meascsv/__init__.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -This parser handles the reading and processing of the legacy log files created by -the LabView interface for the MCPT instrument. These files contain information about -the timestamp, temperatures, and inlet / process flows. - -.. admonition:: DEPRECATED in ``yadg-4.0`` - - As of ``yadg-4.0``, this parser is deprecated and should not be used for new data. - Please consider switching to the :mod:`~yadg.parsers.basiccsv` parser. - -Usage -````` -Available since ``yadg-3.0``. Deprecated since ``yadg-4.0``. The parser supports the -following parameters: - -.. _yadg.parsers.meascsv.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.MeasCSV - -.. _parsers_meascsv_provides: - -Schema -`````` -The parser is used to extract all of the tabular data in the input file, storing -them in the same format as :mod:`~yadg.parsers.basiccsv`, using the column headers -as keys. - -""" - -import logging -from pydantic import BaseModel -from ..basiccsv.main import process_row, append_dicts, dicts_to_dataset -from ... import dgutils -import xarray as xr - -logger = logging.getLogger(__name__) - - -def process( - *, - fn: str, - encoding: str, - timezone: str, - parameters: BaseModel, - **kwargs: dict, -) -> xr.Dataset: - """ - Legacy MCPT measurement log parser. - - This parser is included to maintain parity with older schemas and datagrams. - It is essentially a wrapper around :func:`yadg.parsers.basiccsv.main.process_row`. - - .. admonition:: DEPRECATED in ``yadg-4.0`` - - For new applications, please use the :mod:`~yadg.parsers.basiccsv` parser. - - Parameters - ---------- - fn - File to process - - encoding - Encoding of ``fn``, by default "utf-8". - - timezone - A string description of the timezone. Default is "localtime". - - parameters - Parameters for :class:`~dgbowl_schemas.yadg.dataschema_5_0.step.MeasCSV`. - - Returns - ------- - :class:`xarray.Dataset` - A :class:`xarray.Dataset` containing the timesteps, metadata, and full date tag. No - metadata is returned. The full date is always provided in :mod:`~yadg.parsers.meascsv` - compatible files. - - """ - logger.warning("This parser is deprecated. Please switch to 'basiccsv'.") - - with open(fn, "r", encoding=encoding) as infile: - lines = [i.strip() for i in infile.readlines()] - - headers = [i.strip() for i in lines.pop(0).split(";")] - - for hi, header in enumerate(headers): - if "/" in header: - logger.warning("Replacing '/' for '_' in header '%s'.", header) - headers[hi] = header.replace("/", "_") - - _units = [i.strip() for i in lines.pop(0).split(";")] - units = {} - for h in headers: - units[h] = _units.pop(0) - - units = dgutils.sanitize_units(units) - - datecolumns, datefunc, fulldate = dgutils.infer_timestamp_from( - spec=parameters.timestamp, - timezone=timezone, - ) - - # Process rows - data_vals = {} - meta_vals = {"_fn": []} - for li, line in enumerate(lines): - vals, devs = process_row( - headers, - line.split(";"), - datefunc, - datecolumns, - ) - append_dicts(vals, devs, data_vals, meta_vals, fn, li) - - return dicts_to_dataset(data_vals, meta_vals, units, fulldate) diff --git a/src/yadg/parsers/qftrace/__init__.py b/src/yadg/parsers/qftrace/__init__.py deleted file mode 100644 index 30111e9a..00000000 --- a/src/yadg/parsers/qftrace/__init__.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -The module handles the reading and processing of the network analyzer -traces, containing the reflection coefficient as a function of the sweeped frequency, -:math:`\\Gamma(f)`. - -:mod:`~yadg.parsers.qftrace` loads the reflection trace data, determines the -uncertainties of the signal (y-axis), and explicitly populates the points in -the time axis (x-axis). - -Usage -````` -Available since ``yadg-3.0``. The parser supports the following parameters: - -.. _yadg.parsers.qftrace.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.QFTrace - -.. _yadg.parsers.qftrace.formats: - - - LabView output in a tab-separated format (``csv``): - :mod:`~yadg.parsers.qftrace.labviewcsv` - -.. _yadg.parsers.qftrace.provides: - -Schema -`````` -For filetypes containing the reflection trace data, the schema is as follows: - -.. code-block:: yaml - - datatree.DataTree: - S11: - coords: - uts: !!float - freq: !!float # Field frequency (Hz) - data_vars: - Re(G): (uts, freq) # Imaginary part of the reflection coefficient - Im(G) (uts, freq) # Real part of the reflection coefficient - average: (uts) # Number of scans averaged to form a single trace - bandwidth: (uts) # Filter bandwidth (Hz) - -Module Functions -```````````````` - -""" - -from . import labviewcsv -import datatree - - -def process( - *, - filetype: str, - **kwargs: dict, -) -> datatree.DataTree: - """ - VNA reflection trace parser. Forwards ``kwargs`` to the worker functions - based on the supplied ``filetype``. - - Parameters - ---------- - filetype - Discriminator used to select the appropriate worker function. - - Returns - ------- - :class:`datatree.DataTree` - - """ - if filetype == "labview.csv": - return labviewcsv.process(**kwargs) diff --git a/src/yadg/parsers/qftrace/labviewcsv.py b/src/yadg/parsers/qftrace/labviewcsv.py deleted file mode 100644 index a3cf1098..00000000 --- a/src/yadg/parsers/qftrace/labviewcsv.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -**labviewcsv**: Processing Agilent LabVIEW CSV files ----------------------------------------------------- - -Used to process files generated using Agilent PNA-L N5320C via its LabVIEW driver. -This file format includes a header, with the values of bandwidth and averaging, -and three tab-separated columns containing the frequency :math:`f`, and the real -and imaginary parts of the complex reflection coefficient :math:`\\Gamma(f)`. - -Timestamps are determined from file name. One trace per file. As the set-up for -which this format was designed always uses the ``S11`` port, the name of the trace -is hard-coded to this value. - -.. codeauthor:: Peter Kraus -""" - -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -import xarray as xr -import datatree - - -def process( - *, - fn: str, - encoding: str = "utf-8", - **kwargs: dict, -) -> datatree.DataTree: - """ - VNA reflection trace parser for Agilent's LabVIEW driver. - - Parameters - ---------- - fn - File to process - - encoding - Encoding of ``fn``, by default "utf-8". - - Returns - ------- - :class:`datatree.DataTree` - A :class:`datatree.DataTree` containing a single :class:`xarray.Dataset` with the - ``S11`` (reflection) trace. - - """ - - with open(fn, "r", encoding=encoding) as infile: - lines = infile.readlines() - assert ( - len(lines) > 2 - ), f"qftrace: Only {len(lines)-1} points supplied in {fn}; fitting impossible." - - # process header - bw = [10000.0, 1.0] - avg = 15 - if ";" in lines[0]: - items = lines.pop(0).split(";") - for item in items: - if item.startswith("BW"): - bw = tuple_fromstr(item.split("=")[-1].strip()) - if item.startswith("AVG"): - avg = int(item.split("=")[-1].strip()) - fsbw = bw[0] / avg - - # calculate precision of trace - freq = {"vals": [], "devs": []} - real = {"vals": [], "devs": []} - imag = {"vals": [], "devs": []} - for line in lines: - f, re, im = line.strip().split() - fn, fs = tuple_fromstr(f) - fs = max(fs, fsbw) - ren, res = tuple_fromstr(re) - imn, ims = tuple_fromstr(im) - freq["vals"].append(fn) - freq["devs"].append(fs) - real["vals"].append(ren) - real["devs"].append(res) - imag["vals"].append(imn) - imag["devs"].append(ims) - - vals = xr.Dataset( - data_vars={ - "Re(G)": ( - ["freq"], - real["vals"], - {"ancillary_variables": "Re(G)_std_err"}, - ), - "Re(G)_std_err": ( - ["freq"], - real["devs"], - {"standard_name": "Re(G) standard_error"}, - ), - "Im(G)": ( - ["freq"], - imag["vals"], - {"ancillary_variables": "Im(G)_std_err"}, - ), - "Im(G)_std_err": ( - ["freq"], - imag["devs"], - {"standard_name": "Im(G) standard_error"}, - ), - "average": avg, - "bandwidth": ( - [], - bw[0], - {"units": "Hz", "ancillary_variables": "bandwidth_std_err"}, - ), - "bandwidth_std_err": ( - [], - bw[1], - {"units": "Hz", "standard_name": "bandwidth standard_error"}, - ), - }, - coords={ - "freq": ( - ["freq"], - freq["vals"], - {"units": "Hz", "ancillary_variables": "freq_std_err"}, - ), - "freq_std_err": ( - ["freq"], - freq["devs"], - {"units": "Hz", "standard_name": "freq standard_error"}, - ), - }, - ) - - return datatree.DataTree.from_dict(dict(S11=vals)) diff --git a/src/yadg/parsers/xpstrace/__init__.py b/src/yadg/parsers/xpstrace/__init__.py deleted file mode 100644 index 93582b0f..00000000 --- a/src/yadg/parsers/xpstrace/__init__.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -This module handles the reading and processing of X-ray photoelectron spectroscopy -data, including determining the uncertainties of the signal (y-axis), and explicitly -populating the points in the energy axis (``E``). - -Usage -````` -Available since ``yadg-4.1``. The parser supports the following parameters: - -.. _yadg.parsers.xpstrace.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.XPSTrace - -.. _yadg.parsers.xpstrace.formats: - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - ULVAC PHI Multipak XPS traces (``phi.spe``), - see :mod:`~yadg.parsers.xpstrace.phispe` - -.. _yadg.parsers.xpstrace.provides: - -Provides -```````` -The raw data is stored, for each timestep, using the following format: - -.. code-block:: yaml - - datatree.DataTree: - {{ trace_name }} !!xr.Dataset - coords: - uts: !!float - E: !!float # binding energies (eV) - data_vals: - y: (uts, E) # signal - -Module Functions -```````````````` - -""" - -import datatree -from . import phispe - - -def process( - *, - filetype: str, - **kwargs: dict, -) -> datatree.DataTree: - """ - Unified x-ray photoelectron spectroscopy parser. Forwards ``kwargs`` to the worker - functions based on the supplied ``filetype``. - - This parser processes XPS scans in signal(energy) format. - - Parameters - ---------- - filetype - Discriminator used to select the appropriate worker function. - - Returns - ------- - :class:`datatree.DataTree` - - """ - if filetype == "phi.spe": - return phispe.process(**kwargs) diff --git a/src/yadg/parsers/xpstrace/phispe.py b/src/yadg/parsers/xpstrace/phispe.py deleted file mode 100644 index 2730d53c..00000000 --- a/src/yadg/parsers/xpstrace/phispe.py +++ /dev/null @@ -1,375 +0,0 @@ -""" -**phispe**: Processing of ULVAC PHI Multipak XPS traces. --------------------------------------------------------- - -The `IGOR .spe import script by jjweimer `_ -was pretty helpful for writing this parser. - -File Structure of ``.spe`` Files -```````````````````````````````` - -These binary files actually contain an ASCII file header, delimited by -`"SOFH\n"` and `"EOFH\n"`. - -The binding energies corresponding to the datapoints in the later part -of the file can be found from the `"SpectralRegDef"` entries in this -header. Each of these entries look something like: - -.. code-block:: - - 2 2 F1s 9 161 -0.1250 695.0 675.0 695.0 680.0 0.160000 29.35 AREA - -This maps as follows: - -.. code-block:: - - 2 trace_number - 2 trace_number (again?) - F1s name - 9 atomic_number - 161 num_datapoints - -0.1250 step - 695.0 start - 675.0 stop - 695.0 ? - 680.0 ? - 0.160000 dwell_time - 29.35 e_pass - AREA description (?) - -After the file header, the binary part starts with a short data header -(offsets given from start of data header): - -.. code-block:: - - 0x0000 group # Data group number. - 0x0004 num_traces # Number of traces in file - 0x0008 trace_header_size # Combined lengths of all trace headers. - 0x000c data_header_size # Length of this data header. - -After this follow ``num_traces`` trace headers that are each structured -something like this: - -.. code-block:: - - 0x0000 trace_number # Number of the trace. - 0x0004 bool_01 # ??? - 0x0008 bool_02 # ??? - 0x000c trace_number_again # Number of the trace. Again? - 0x0010 bool_03 # ??? - 0x0014 num_datapoints # Number of datapoints in trace. - 0x0018 bool_04 # ??? - 0x001c bool_05 # ??? - 0x0020 string_01 # ??? - 0x0024 string_02 # ??? - 0x0028 string_03 # ??? - 0x002c int_02 # ??? - 0x0030 string_04 # ??? - 0x0034 string_05 # ??? - 0x0038 y_unit # The unit of the datapoints. - 0x003c int_05 # ??? - 0x0040 int_06 # ??? - 0x0044 int_07 # ??? - 0x0048 data_dtype # Data type for datapoints (f4 / f8). - 0x004c num_data_bytes # Unsure about this one. - 0x0050 num_datapoints_tot # This one as well. - 0x0054 int_10 # ??? - 0x0058 int_11 # ??? - 0x005c end_of_data # Byte offset of the end-of-data. - -After the trace headers follow the datapoints. After the number of -datapoints there is a single 32bit float with the trace's dwelling time -again. - -Uncertainties -````````````` -The uncertainties of ``"E"`` are taken as the step-width of -the linearly spaced energy values. - -The uncertainties ``"s"`` of ``"y"`` are currently set to a constant -value of ``12.5`` counts per second as all the signals in the files seen so -far only seem to take on values in those steps. - -.. admonition:: TODO - - https://github.com/dgbowl/yadg/issues/13 - - Determining the uncertainty of the counts per second signal in XPS - traces from the phispe parser should be done in a better way. - -.. codeauthor:: Nicolas Vetsch -""" - -import re -import numpy as np -import xarray as xr -import datatree -import yadg.dgutils as dgutils - -data_header_dtype = np.dtype( - [ - ("group", " str: - """Converts CamelCase strings to snake_case. - - From https://stackoverflow.com/a/1176023 - - Parameters - ---------- - s - The CamelCase input string. - - Returns - ------- - str - The snake_case equivalent of s. - - """ - s = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", s) - return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s).lower() - - -def _process_header(spe: list[bytes]) -> dict: - """Processes the file header at the top of `.spe` files. - - Parameters - ---------- - spe - The lines of bytes read from file. - - Returns - ------- - dict - The file header parsed into a dictionary. Some entries (keys) - occur more than once. The corresponding values are joined into - a list. - - """ - header_lines = spe[spe.index(b"SOFH\n") + 1 : spe.index(b"EOFH\n")] - header = {} - for line in header_lines: - key, value = line.split(b":") - key, value = camel_to_snake(key.decode().strip()), value.decode().strip() - if key in header: - header[key] = [header[key]] + [value] - else: - header[key] = value - return header - - -def _process_trace_defs(header: dict) -> list[dict]: - """Processes the trace definition strings given in the file header. - - These strings look something like the following: - `2 2 F1s 9 161 -0.1250 695.0 675.0 695.0 680.0 0.160000 29.35 AREA` - - Parameters - ---------- - header - The file header parsed into a dictionary. The "SpectralRegDef" - entry contains a list of trace definition strings. - - Returns - ------- - list[dict] - A list of trace definition dictionaries describind the kind of - trace and the binding energy ranges. - - """ - trace_defs = [] - for trace_def in header.get("spectral_reg_def"): - trace_def = trace_def.split() - trace_defs.append( - { - "trace_number": int(trace_def[0]), - "name": trace_def[2], - "atomic_number": trace_def[3], - "num_datapoints": int(trace_def[4]), - "step": float(trace_def[5]), - "start": float(trace_def[6]), - "stop": float(trace_def[7]), - "dwell_time": trace_def[10], - "e_pass": trace_def[11], - "description": trace_def[12], - } - ) - return trace_defs - - -def _process_traces(spe: list[bytes], trace_defs: list[dict]) -> dict: - """Processes the spectral traces in the file. - - Parameters - ---------- - spe - The lines of bytes read from file. - - trace_defs - The list of trace definitions parsed from the file header. - - Returns - ------- - dict - A dictionary containing the binding energies constructed from - the trace definitions and the corrresponding XPS traces. - - """ - data = b"".join(spe[spe.index(b"EOFH\n") + 1 :]) - data_header = dgutils.read_value(data, 0x0000, data_header_dtype) - assert data_header["num_traces"] == len(trace_defs) - # All trace headers I have seen are 192 (0xc0) bytes long. - assert data_header["trace_header_size"] / trace_header_dtype.itemsize == len( - trace_defs - ) - assert data_header["data_header_size"] == data_header_dtype.itemsize - trace_headers = np.frombuffer( - data, - offset=0x0010, - dtype=trace_header_dtype, - count=len(trace_defs), - ) - traces = {} - for trace_header, trace_def in zip(trace_headers, trace_defs): - assert trace_header["trace_number"] == trace_def["trace_number"] - assert trace_header["num_datapoints"] == trace_def["num_datapoints"] - # Contruct the binding energies from trace_def. - energies, dE = np.linspace( - trace_def["start"], - trace_def["stop"], - trace_def["num_datapoints"], - endpoint=True, - retstep=True, - ) - # Construct data from trace_header - data_dtype = np.dtype(f'{trace_header["data_dtype"].decode()}') - data_offset = trace_header["end_of_data"] - trace_header["num_data_bytes"] - datapoints = np.frombuffer( - data, - offset=data_offset, - dtype=data_dtype, - count=trace_header["num_datapoints"], - ) - dwell_time = dgutils.read_value(data, trace_header["end_of_data"], " datatree.DataTree: - """Processes ULVAC-PHI Multipak XPS data. - - Parameters - ---------- - fn - The file containing the data to parse. - - Returns - ------- - :class:`datatree.DataTree` - Returns a :class:`datatree.DataTree` containing a :class:`xarray.Dataset` for each - XPS trace present in the input file. - - """ - with open(fn, "rb") as spe_file: - spe = spe_file.readlines() - header = _process_header(spe) - software_id, version = header.get("software_version").split() - meta = { - "params": { - "software_id": software_id, - "version": version, - "username": header.get("operator"), - }, - "file_header": header, - } - trace_defs = _process_trace_defs(header) - traces = _process_traces(spe, trace_defs) - vals = {} - for v in traces.values(): - fvals = xr.Dataset( - data_vars={ - "y": ( - ["E"], - v["yvals"], - {"units": v["yunit"], "ancillary_variables": "y_std_err"}, - ), - "y_std_err": ( - ["E"], - v["ydevs"], - {"units": v["yunit"], "standard_name": "y standard_error"}, - ), - "E_std_err": ( - ["E"], - v["Edevs"], - {"units": v["Eunit"], "standard_name": "E standard_error"}, - ), - }, - coords={ - "E": ( - ["E"], - v["Evals"], - {"units": v["Eunit"], "ancillary_variables": "E_std_err"}, - ), - }, - ) - vals[v["name"]] = fvals - - dt = datatree.DataTree.from_dict(vals) - dt.attrs = meta - return dt diff --git a/src/yadg/parsers/xrdtrace/__init__.py b/src/yadg/parsers/xrdtrace/__init__.py deleted file mode 100644 index 6b04f801..00000000 --- a/src/yadg/parsers/xrdtrace/__init__.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -This module handles the reading and processing of X-ray diffraction data. It loads X-ray -diffraction data, determines reasonable uncertainties of the signal intensity (y-axis), -and explicitly populates the angle axis (:math:`2\\theta`), if necessary. - -Usage -````` -Available since ``yadg-4.0``. The parser supports the following parameters: - -.. _yadg.parsers.xrdtrace.model: - -.. autopydantic_model:: dgbowl_schemas.yadg.dataschema_5_0.step.XRDTrace - -.. _yadg.parsers.xrdtrace.formats: - -Formats -``````` -The ``filetypes`` currently supported by the parser are: - - - PANalytical ``xrdml`` files (``panalytical.xrdml``), - see :mod:`~yadg.parsers.xrdtrace.panalyticalxrdml` - - PANalytical ``csv`` files (``panalytical.csv``), - see :mod:`~yadg.parsers.xrdtrace.panalyticalcsv` - - PANalytical ``xy`` files (``panalytical.xy``), - see :mod:`~yadg.parsers.xrdtrace.panalyticalxy` - -.. _yadg.parsers.xrdtrace.provides: - -Provides -```````` -The raw data is stored, for each timestep, using the following format: - -.. code-block:: yaml - - xr.Dataset: - coords: - uts: !!float - angle: !!float # Diffraction angle (deg) - data_vals: - intensity: (uts, angle) # Detector intensity (counts) - -""" - -import xarray as xr -from . import panalyticalxrdml, panalyticalcsv, panalyticalxy - - -def process( - *, - filetype: str, - **kwargs: dict, -) -> xr.Dataset: - """ - Unified X-ray diffractogram data parser. Forwards ``kwargs`` to the worker - functions based on the supplied ``filetype``. - - This parser processes XPS scans in signal(energy) format. - - Parameters - ---------- - filetype - Discriminator used to select the appropriate worker function. - - Returns - ------- - :class:`xarray.Dataset` - - - """ - if filetype == "panalytical.xrdml": - return panalyticalxrdml.process(**kwargs) - elif filetype == "panalytical.csv": - return panalyticalcsv.process(**kwargs) - elif filetype == "panalytical.xy": - return panalyticalxy.process(**kwargs) diff --git a/src/yadg/parsers/xrdtrace/common.py b/src/yadg/parsers/xrdtrace/common.py deleted file mode 100644 index 6febd3aa..00000000 --- a/src/yadg/parsers/xrdtrace/common.py +++ /dev/null @@ -1,62 +0,0 @@ -import re - - -def panalytical_comment(line: str) -> dict: - """Processes a comments from the file header into a dictionary. - - Parameters - ---------- - line - A line containing the comment. - - Returns - ------- - dict - A dictionary containing the processed comment. - - """ - - if line.startswith("Configuration="): - split = [s.split("=") for s in line.split(", ")] - __, values = list(zip(*split)) - keys = ["configuration", "owner", "creation_date"] - elif line.startswith("Goniometer="): - split = [s.replace("=", ":").split(":") for s in line.split(";")] - __, values = list(zip(*split)) - keys = ["goniometer", "min_step_size_2theta", "min_step_size_omega"] - elif line.startswith("Sample stage="): - __, values = line.split("=") - keys = ["sample_stage"] - elif line.startswith("Diffractometer system="): - __, values = line.split("=") - keys = ["diffractometer_system"] - elif line.startswith("Measurement program="): - split = [s.split("=") for s in line.split(", ")] - __, values = list(zip(*split)) - keys = ["measurement_program", "identifier"] - elif line.startswith("Fine Calibration Offset for 2Theta"): - __, values = line.split(" = ") - keys = ["calib_offset_2theta"] - values = [values] if isinstance(values, str) else values - return dict(zip(keys, values)) - - -def snake_case(s: str) -> str: - """Converts Sentence case. and camelCase strings to snake_case. - - From https://stackoverflow.com/a/1176023 - - Parameters - ---------- - s - The input string to be converted. - - Returns - ------- - str - The corresponding snake_case string. - - """ - s = "".join([s.capitalize() for s in s.replace(".", "").split()]) - s = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", s) - return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s).lower() diff --git a/src/yadg/parsers/xrdtrace/panalyticalcsv.py b/src/yadg/parsers/xrdtrace/panalyticalcsv.py deleted file mode 100644 index a5500635..00000000 --- a/src/yadg/parsers/xrdtrace/panalyticalcsv.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -panalyticalcsv: Processing of PANalytical XRD ``csv`` files ------------------------------------------------------------ - -File Structure -`````````````` - -These files are split into a ``[Measurement conditions]`` and a ``[Scan points]`` -section. The former stores the metadata and the latter all the datapoints. - -Uncertainties -````````````` -The uncertainties of ``"angle"`` are taken from the number of significant figures. - -The uncertainties of ``"intensity"`` are taken from the number of significant figures. - -.. codeauthor:: - Nicolas Vetsch, - Peter Kraus -""" - -from ...dgutils import dateutils -from .common import panalytical_comment, snake_case -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -import xarray as xr -import numpy as np - -# Converting camelCase xrdml keys to snake_case. - - -def _process_comments(comments: list[str]) -> dict: - ret = {} - for line in comments: - ret.update(panalytical_comment(line)) - return ret - - -def _process_header(header: str) -> dict: - """ - Processes the header section, staring with the ``[Measurement conditions]`` line. - - Parameters - ---------- - header - The header portion as a string. - - Returns - ------- - header: dict - A dictionary containing the processed metadata. - - """ - header_lines = header.split("\n")[1:-1] - header = dict([line.split(",", 1) for line in header_lines]) - # Process comment entries. - comments = [] - for key in list(header.keys()): - if key.startswith("Comment"): - comments.append(header.pop(key).strip('"')) - comments = _process_comments(comments) - # Renaming the keys. - for key in list(header.keys()): - header[snake_case(key)] = header.pop(key) - header.update(comments) - return header - - -def _process_data(data: str) -> tuple[list, list]: - """ - Processes the data section, starting with the ``[Scan points]`` line. - - Parameters - ---------- - data - The data portion as a string. - - Returns - ------- - avals, adevs, ivals, idevs - The values and uncertainties in angle and intensity. - - """ - data_lines = data.split("\n")[1:-1] - columns = data_lines[0].split(",") - assert columns == ["Angle", "Intensity"], "Unexpected columns." - datapoints = [line.split(",") for line in data_lines[1:]] - angle, intensity = [list(d) for d in zip(*datapoints)] - avals, adevs = list(zip(*[tuple_fromstr(a) for a in angle])) - ivals, idevs = list(zip(*[tuple_fromstr(i) for i in intensity])) - return avals, adevs, ivals, idevs - - -def process( - *, - fn: str, - encoding: str = "utf-8", - timezone: str = "UTC", - **kwargs: dict, -) -> xr.Dataset: - """ - Processes a PANalytical XRD csv file. All information contained in the header - of the csv file is stored in the metadata. - - Parameters - ---------- - fn - The file containing the trace(s) to parse. - - encoding - Encoding of ``fn``, by default "utf-8". - - timezone - A string description of the timezone. Default is "UTC". - - Returns - ------- - :class:`xarray.Dataset` - Data containing the timesteps and metadata. This filetype contains the full - date specification. - - """ - with open(fn, "r", encoding=encoding) as csv_file: - csv = csv_file.read() - # Split file into its sections. - __, header, data = csv.split("[") - assert header.startswith("Measurement conditions"), "Unexpected section." - assert data.startswith("Scan points"), "Unexpected section." - header = _process_header(header) - # Process the data trace. - angle, _, insty, _ = _process_data(data) - adiff = np.abs(np.diff(angle)) * 0.5 - adiff = np.append(adiff, adiff[-1]) - idevs = np.ones(len(insty)) - # Process the metadata. - uts = dateutils.str_to_uts( - timestamp=header["file_date_and_time"], - format="%d/%B/%Y %H:%M", - timezone=timezone, - ) - header["fulldate"] = True - # Build Datasets - vals = xr.Dataset( - data_vars={ - "intensity": ( - ["uts", "angle"], - np.reshape(insty, (1, -1)), - {"units": "counts", "ancillary_variables": "intensity_std_err"}, - ), - "intensity_std_err": ( - ["uts", "angle"], - np.reshape(idevs, (1, -1)), - {"units": "counts", "standard_name": "intensity standard_error"}, - ), - "angle_std_err": ( - ["uts", "angle"], - np.reshape(adiff, (1, -1)), - {"units": "deg", "standard_name": "angle standard_error"}, - ), - }, - coords={ - "uts": (["uts"], [uts]), - "angle": ( - ["angle"], - list(angle), - {"units": "deg", "ancillary_variables": "angle_std_err"}, - ), - }, - attrs=header, - ) - return vals diff --git a/src/yadg/parsers/xrdtrace/panalyticalxrdml.py b/src/yadg/parsers/xrdtrace/panalyticalxrdml.py deleted file mode 100644 index 5922cc4d..00000000 --- a/src/yadg/parsers/xrdtrace/panalyticalxrdml.py +++ /dev/null @@ -1,277 +0,0 @@ -""" -panalyticalxrdml: Processing of PANalytical XRD ``xml`` files -------------------------------------------------------------- - -File Structure -`````````````` - -These are xml-formatted files, which we here parse using the :mod:`xml.etree` -library into a Python :class:`dict`. - -.. note:: - - The ``angle`` returned from this parser is based on a linear interpolation of - the start and end point of the scan, and is the :math:`2\\theta`. The values - of :math:`\\omega` are discarded. - -.. warning:: - - This parser is fairly new and untested. As a result, the returned metadata - contain only a subset of the available metadata in the XML file. If something - important is missing, please contact us! - -Uncertainties -````````````` -The uncertainties of in ``"angle"`` are taken as the step-width of -the linearly spaced :math:`2\\theta` values. - -The uncertainties of of ``"intensity"`` are currently set to a constant -value of 1.0 count as all the supported files seem to produce integer values. - -.. codeauthor:: - Nicolas Vetsch, - Peter Kraus -""" - -from collections import defaultdict -from typing import Union -from xml.etree import ElementTree -import numpy as np -import xarray as xr - -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -from .common import panalytical_comment -from ...dgutils import dateutils - - -def etree_to_dict(e: ElementTree.Element) -> dict: - """Recursively converts an ElementTree.Element into a dictionary. - - Element attributes are stored into `"@"`-prefixed attribute keys. - Element text is stored into `"#text"` for all nodes. - - From https://stackoverflow.com/a/10076823. - - Parameters - ---------- - e - The ElementTree root Element. - - Returns - ------- - dict - ElementTree parsed into a dictionary. - - """ - d = {e.tag: {} if e.attrib else None} - children = list(e) - if children: - dd = defaultdict(list) - for dc in map(etree_to_dict, children): - for k, v in dc.items(): - dd[k].append(v) - d = {e.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}} - if e.attrib: - d[e.tag].update(("@" + k, v) for k, v in e.attrib.items()) - if e.text: - text = e.text.strip() - if children or e.attrib: - if text: - d[e.tag]["#text"] = text - else: - d[e.tag] = text - return d - - -def _process_values(d: Union[dict, str]) -> Union[dict, str]: - """ - Recursively parses dicts in the following format: - - .. code:: - - {"key": {"#text": ..., "@unit": ...}, ...} - - into a single string: - - .. code:: - - {"key": f"{#text} {@unit}", ...} - - """ - # TODO - # If not "#text" or @tribute just snake_case and recurse. - if isinstance(d, dict): - if "@unit" in d and "#text" in d: - return f"{d['#text']} {d['@unit']}" - elif "@version" in d and "#text" in d: - return f"{d['#text']} {d['@version']}" - else: - for k, v in d.items(): - d[k] = _process_values(v) - return d - - -def _process_scan(scan: dict) -> dict: - """ - Parses the scan section of the file. Creates the explicit positions based - on the number of measured intensities and the start & end position. - - """ - header = scan.pop("header") - dpts = scan.pop("dataPoints") - counting_time = _process_values(dpts.pop("commonCountingTime")) - ivals, idevs = list( - zip(*[tuple_fromstr(c) for c in dpts["intensities"].pop("#text").split()]) - ) - iunit = dpts["intensities"].pop("@unit") - timestamp = header.pop("startTimeStamp") - - dp = { - "intensity": {"vals": ivals, "devs": idevs, "unit": iunit}, - "timestamp": timestamp, - "counting_time": counting_time, - } - - positions = _process_values(dpts.pop("positions")) - for v in positions: - pos = np.linspace( - float(v["startPosition"]), float(v["endPosition"]), num=len(ivals) - ) - adiff = np.abs(np.diff(pos)) * 0.5 - adiff = np.append(adiff, adiff[-1]) - dp[v["@axis"]] = { - "vals": pos, - "devs": adiff, - "unit": v["@unit"], - } - return dp - - -def _process_comment(comment: dict) -> dict: - """ """ - entry = comment.pop("entry") - ret = {} - for line in entry: - ret.update(panalytical_comment(line)) - return ret - - -def _process_measurement(measurement: dict, timezone: str): - """ - A function that processes each section of the XRD XML file. - """ - # Comment. - comment = measurement["comment"].pop("entry") - for line in comment: - if "PHD Lower Level" in line and "PHD Upper Level" in line: - __, values = list(zip(*[s.split(" = ") for s in line.split(", ")])) - keys = ["phd_lower_level", "phd_upper_level"] - measurement["comment"] = dict(zip(keys, values)) - # Wavelength. - wavelength = _process_values(measurement.pop("usedWavelength")) - measurement["wavelength"] = wavelength - # Incident beam path. - incident_beam_path = _process_values(measurement.pop("incidentBeamPath")) - measurement["incident_beam_path"] = incident_beam_path - # Diffracted beam path. - diffracted_beam_path = _process_values(measurement.pop("diffractedBeamPath")) - measurement["diffracted_beam_path"] = diffracted_beam_path - scan = _process_scan(measurement.pop("scan")) - trace = {"angle": scan.pop("2Theta"), "intensity": scan.pop("intensity")} - meta = measurement - meta["counting_time"] = scan.pop("counting_time") - trace["uts"] = dateutils.str_to_uts( - timestamp=scan.pop("timestamp"), timezone=timezone - ) - return trace, meta - - -def process( - *, - fn: str, - timezone: str, - **kwargs: dict, -) -> xr.Dataset: - """Processes a PANalytical xrdml file. - - Parameters - ---------- - fn - The file containing the trace(s) to parse. - - timezone - A string description of the timezone. Default is "UTC". - - Returns - ------- - :class:`xarray.Dataset` - Data containing the timesteps, and metadata. This filetype contains the full - date specification. - - """ - it = ElementTree.iterparse(fn) - # Removing xmlns prefixes from all tags. - # From https://stackoverflow.com/a/25920989. - for __, e in it: - __, xmlns_present, postfix = e.tag.partition("}") - if xmlns_present: - e.tag = postfix # Strip away all xmlns prefixes. - root = it.root - xrd = etree_to_dict(root) - # Start processing the xml contents. - measurements = xrd["xrdMeasurements"] - assert measurements["@status"] == "Completed", "Incomplete measurement." - comment = _process_comment(measurements["comment"]) - # Renaming some entries because I want to. - sample = measurements["sample"] - sample["prepared_by"] = sample.pop("preparedBy") - sample["type"] = sample.pop("@type") - # Process measurement data. - data, meta = _process_measurement(measurements["xrdMeasurement"], timezone) - data["fn"] = fn - # Shove unused data into meta - meta["sample"] = sample - meta["comment"] = comment - meta["fulldate"] = True - # Build Datasets - vals = xr.Dataset( - data_vars={ - "intensity": ( - ["uts", "angle"], - np.reshape(data["intensity"]["vals"], (1, -1)), - { - "units": data["intensity"]["unit"], - "ancillary_variables": "intensity_std_err", - }, - ), - "intensity_std_err": ( - ["uts", "angle"], - np.reshape(data["intensity"]["devs"], (1, -1)), - { - "units": data["intensity"]["unit"], - "standard_name": "intensity standard_error", - }, - ), - "angle_std_err": ( - ["uts", "angle"], - np.reshape(data["angle"]["devs"], (1, -1)), - { - "units": data["angle"]["unit"], - "standard_name": "angle standard_error", - }, - ), - }, - coords={ - "uts": (["uts"], [data["uts"]]), - "angle": ( - ["angle"], - data["angle"]["vals"], - { - "units": data["angle"]["unit"], - "ancillary_variables": "angle_std_err", - }, - ), - }, - attrs=meta, - ) - return vals diff --git a/src/yadg/parsers/xrdtrace/panalyticalxy.py b/src/yadg/parsers/xrdtrace/panalyticalxy.py deleted file mode 100644 index f5a0c9c9..00000000 --- a/src/yadg/parsers/xrdtrace/panalyticalxy.py +++ /dev/null @@ -1,87 +0,0 @@ -""" -panalyticalxy: Processing of PANalytical XRD ``xy`` files ---------------------------------------------------------- - -File Structure -`````````````` - -These files basically just contain the ``[Scan points]`` part of -:mod:`~yadg.parsers.xrdtrace.panalyticalcsv` files. As a consequence, no metadata -is recorded, and the format does not have an associated timestamp. - -Uncertainties -````````````` -The uncertainties of ``"angle"`` are taken from the number of significant figures. - -The uncertainties of ``"intensity"`` are taken from the number of significant figures. - -.. codeauthor:: - Nicolas Vetsch, - Peter Kraus -""" - -from uncertainties.core import str_to_number_with_uncert as tuple_fromstr -import numpy as np -import xarray as xr - - -def process( - *, - fn: str, - encoding: str, - **kwargs: dict, -) -> xr.Dataset: - """Processes a PANalytical XRD xy file. - - Parameters - ---------- - fn - The file containing the trace(s) to parse. - - encoding - Encoding of ``fn``, by default "utf-8". - - - Returns - ------- - :class:`xarray.Dataset` - Tuple containing the timesteps and metadata. A full timestamp is not available - in ``.xy`` files. - - """ - with open(fn, "r", encoding=encoding) as xy_file: - xy = xy_file.readlines() - datapoints = [li.strip().split() for li in xy] - angle, intensity = list(zip(*datapoints)) - angle, _ = list(zip(*[tuple_fromstr(a) for a in angle])) - insty, _ = list(zip(*[tuple_fromstr(i) for i in intensity])) - idevs = np.ones(len(insty)) - adiff = np.abs(np.diff(angle)) * 0.5 - adiff = np.append(adiff, adiff[-1]) - vals = xr.Dataset( - data_vars={ - "intensity": ( - ["angle"], - list(insty), - {"units": "counts", "ancillary_variables": "intensity_std_err"}, - ), - "intensity_std_err": ( - ["angle"], - idevs, - {"units": "counts", "standard_name": "intensity standard_error"}, - ), - "angle_std_err": ( - ["angle"], - adiff, - {"units": "deg", "standard_name": "angle standard_error"}, - ), - }, - coords={ - "angle": ( - ["angle"], - list(angle), - {"units": "deg", "ancillary_variables": "angle_std_err"}, - ), - }, - ) - return vals