From 2b2b89436e2090ee7c3128c184cf906c74877c25 Mon Sep 17 00:00:00 2001 From: Edan Bainglass Date: Mon, 1 Jan 2024 10:31:41 +0000 Subject: [PATCH 1/2] Implement pacifist monitoring --- aiida_aurora/monitors.py | 28 +++- aiida_aurora/utils/analyzers.py | 156 ++++++++++----------- aiida_aurora/utils/cycling_analysis.py | 31 ++-- aiida_aurora/workflows/cycling_sequence.py | 48 +++++-- 4 files changed, 156 insertions(+), 107 deletions(-) diff --git a/aiida_aurora/monitors.py b/aiida_aurora/monitors.py index 9f20a84..360136f 100644 --- a/aiida_aurora/monitors.py +++ b/aiida_aurora/monitors.py @@ -1,7 +1,9 @@ +from __future__ import annotations + from json import load from tempfile import NamedTemporaryFile -from typing import Optional +from aiida.common.log import LOG_LEVEL_REPORT from aiida.orm import CalcJobNode from aiida.transports import Transport @@ -13,7 +15,7 @@ def monitor_capacity_threshold( transport: Transport, settings: dict, filename="snapshot.json", -) -> Optional[str]: +) -> str | None: """Retrieve and inspect snapshot to determine if capacity has fallen below threshold for several consecutive cycles. @@ -48,7 +50,6 @@ def monitor_capacity_threshold( """ analyzer = CapacityAnalyzer(**settings) - analyzer.set_logger(node.logger) try: @@ -72,7 +73,26 @@ def monitor_capacity_threshold( if not snapshot: raise ValueError - return analyzer.analyze(snapshot) + analyzer.analyze(snapshot) + + node.base.extras.set_many({ + "status": analyzer.status, + "snapshot": analyzer.snapshot, + }) + + node.logger.log(LOG_LEVEL_REPORT, analyzer.report) + + if node.base.extras.get("marked_for_death", False): + + node.base.extras.set("flag", "☠️") + + if "snapshot" in node.base.extras: + node.base.extras.delete("snapshot") + + return "Job terminated by monitor per user request" + + if analyzer.flag: + node.base.extras.set("flag", f"🍅{analyzer.flag}") except TypeError: node.logger.error(f"'{filename}' not in dictionary format") diff --git a/aiida_aurora/utils/analyzers.py b/aiida_aurora/utils/analyzers.py index dba8477..2b9ca2a 100644 --- a/aiida_aurora/utils/analyzers.py +++ b/aiida_aurora/utils/analyzers.py @@ -1,34 +1,14 @@ -from itertools import groupby -from logging import LoggerAdapter -from typing import Optional +from __future__ import annotations -from aiida.common.log import AIIDA_LOGGER, LOG_LEVEL_REPORT +import numpy as np from .parsers import get_data_from_raw class Analyzer: - """Base class for all analyzers. + """Base class for all analyzers.""" - Attributes - ========== - `logger` : `Union[AiidaLoggerType, LoggerAdapter]` - The associated logger. - """ - - logger = AIIDA_LOGGER.getChild("monitor") - - def set_logger(self, logger: LoggerAdapter) -> None: - """Set the analyzer logger. - - Parameters - ---------- - `logger` : `LoggerAdapter` - The logger of the analyzed calculation node. - """ - self.logger = logger - - def analyze(self, snapshot: dict) -> Optional[str]: + def analyze(self, snapshot: dict) -> None: """Analyze the experiment snapshot against a condition. Condition is defined in subclass analyzers. @@ -37,12 +17,6 @@ def analyze(self, snapshot: dict) -> Optional[str]: ---------- `snapshot` : `dict` The loaded snapshot dictionary. - - Returns - ------- - `Optional[str]` - A string if a defined condition has been met, - `None` otherwise. """ raise NotImplementedError @@ -67,6 +41,7 @@ def __init__( check_type="discharge_capacity", threshold=0.8, consecutive_cycles=2, + keep_last=10, ) -> None: """`CapacityAnalyzer` constructor. @@ -80,6 +55,8 @@ def __init__( `consecutive_cycles` : `int` The number of required consecutive cycles, `2` by default. + `keep_last` : `int` + The number of cycles to keep in snapshot. Raises ------ @@ -93,8 +70,13 @@ def __init__( self.threshold = threshold self.consecutive = consecutive_cycles self.is_discharge = check_type == "discharge_capacity" + self.keep_last = keep_last + + self.flag = "" + self.status = "" + self.report = "" - def analyze(self, snapshot: dict) -> Optional[str]: + def analyze(self, snapshot: dict) -> None: """Analyze the snapshot. Check if capacity has fallen below threshold for required @@ -104,84 +86,98 @@ def analyze(self, snapshot: dict) -> Optional[str]: ---------- `snapshot` : `dict` The loaded snapshot dictionary. - - Returns - ------- - `Optional[str]` - If condition is met, an exit message, `None` otherwise. """ - self.capacities = self._get_capacities(snapshot) - self.cycles = len(self.capacities) - return None if self.cycles < 1 else self._check_capacity() + self._extract_capacities(snapshot) + self._check_capacity() + self._truncate_snapshot() ########### # PRIVATE # ########### - def _get_capacities(self, snapshot: dict): + def _extract_capacities(self, snapshot: dict) -> None: """Post-process the snapshot to extract capacities. Parameters ---------- `snapshot` : `dict` The loaded snapshot dictionary. - - Returns - ------- - `_type_` - A `numpy` array of capacities (in mAh), or empty list - if failed to process snapshot. """ try: - data = get_data_from_raw(snapshot) - return data['Qd'] if self.is_discharge else data['Qc'] + self.snapshot = get_data_from_raw(snapshot) + self.capacities = self.snapshot["Qd"] \ + if self.is_discharge \ + else self.snapshot["Qc"] except KeyError as err: - self.logger.error(f"missing '{str(err)}' in snapshot") - return [] + self.report = f"missing '{str(err)}' in snapshot" + self.snapshot = {} + self.capacities = [] - def _check_capacity(self) -> Optional[str]: + def _check_capacity(self) -> None: """Check if capacity has fallen below threshold for required - consecutive cycles. + consecutive cycles.""" - Returns - ------- - `Optional[str]` - If condition is met, an exit message, `None` otherwise. - """ + if (n := len(self.capacities)) < 2: + self.report = "need at least two complete cycles" + return - n = self.cycles Qs = self.capacities[0] - Q = self.capacities[-1] + Q = self.capacities[-2] Qt = self.threshold * Qs + C_per = Q / Qs * 100 - message = f"cycle #{n} : {Q = :.2f} mAh ({Q / Qs * 100:.1f}%)" + self.report = f"cycle #{n} : {Q = :.2f} mAh ({C_per:.1f}%)" + self.status = f"(cycle #{n} : C @ {C_per:.1f}%)" if Q < Qt: - message += f" : {(Qt - Q) / Qt * 100:.1f}% below threshold" + self.report += f" - {(Qt - Q) / Qt * 100:.1f}% below threshold" + + below_threshold = np.where(self.capacities < Qt)[0] + 1 + consecutively_below = self._filter_consecutive(below_threshold) - self.logger.log(LOG_LEVEL_REPORT, message) + if len(consecutively_below): - below_threshold = self._count_cycles_below_threshold() - if below_threshold >= self.consecutive: - return f"Capacity below threshold ({Qt:.2f} mAh) " \ - f"for {below_threshold} cycles!" + cycles_str = str(consecutively_below).replace("'", "") + self.report += f" - cycles below threshold: {cycles_str}" - return None + if consecutively_below[-1] == n: + self.flag = "🔴" + else: + self.flag = "🟡" - def _count_cycles_below_threshold(self) -> int: - """Count the number of consecutive cycles below threshold. + def _filter_consecutive(self, cycles: list[int]) -> list[int]: + """Return cycles below threshold for `x` consecutive cycles. + + Parameters + ---------- + `cycles` : `list[int]` + The cycles below threshold. Returns ------- - `int` - The number of consecutive cycles below threshold. + `list[int]` + The cycles below threshold for `x` consecutive cycles. """ - Qt = self.threshold * self.capacities[0] - return next( - ( - len(list(group)) # cycle-count of first below-threshold group - for below, group in groupby(self.capacities < Qt) - if below - ), - 0, - ) + return [ + cycle for i, cycle in enumerate(cycles) + if i >= self.consecutive - 1 and \ + all(cycles[i - j] == cycle - j for j in range(1, self.consecutive)) + ] + + def _truncate_snapshot(self) -> None: + """Truncate the snapshot to user defined size.""" + + truncated = {} + + size = min(self.keep_last, len(self.snapshot["cycle-number"])) + + for key, value in self.snapshot.items(): + + if key in ("time", "I", "Ewe", "Q"): + index = self.snapshot["cycle-index"][-size] + truncated[key] = value[index:] + + elif key in ("cycle-number", "Qc", "Qd", "Ec", "Ed"): + truncated[key] = value[-size:] + + self.snapshot = truncated diff --git a/aiida_aurora/utils/cycling_analysis.py b/aiida_aurora/utils/cycling_analysis.py index 7e47612..2be9a4c 100644 --- a/aiida_aurora/utils/cycling_analysis.py +++ b/aiida_aurora/utils/cycling_analysis.py @@ -2,6 +2,7 @@ import json +import numpy as np from pandas import DataFrame from pandas.io.formats.style import Styler @@ -278,9 +279,6 @@ def process_data(node: CalcJobNode) -> tuple[dict, str, Styler | str]: Post-processed data, warning, and analysis | error message. """ - if node.process_state and "finished" not in node.process_state.value: - return {}, f"Job terminated with message '{node.process_status}'", "" - warning = "" if node.exit_status: @@ -289,16 +287,19 @@ def process_data(node: CalcJobNode) -> tuple[dict, str, Styler | str]: warning += f"{node.exit_message}" if node.exit_message else generic warning += "\n\n" - if "results" in node.outputs: - data = get_data_from_results(node.outputs.results) - elif "raw_data" in node.outputs: - data = get_data_from_file(node.outputs.raw_data) - elif "retrieved" in node.outputs: - data = get_data_from_file(node.outputs.retrieved) - elif "remote_folder" in node.outputs: - data = get_data_from_remote(node.outputs.remote_folder) + if node.exit_status is None: + data = get_data_from_snapshot(node.base.extras.get("snapshot", {})) else: - data = {} + if "results" in node.outputs: + data = get_data_from_results(node.outputs.results) + elif "raw_data" in node.outputs: + data = get_data_from_file(node.outputs.raw_data) + elif "retrieved" in node.outputs: + data = get_data_from_file(node.outputs.retrieved) + elif "remote_folder" in node.outputs: + data = get_data_from_remote(node.outputs.remote_folder) + else: + data = {} return data, warning, add_analysis(data) @@ -346,6 +347,11 @@ def get_data_from_remote(source: RemoteData) -> dict: return {} +def get_data_from_snapshot(snapshot: dict) -> dict: + """docstring""" + return {k: np.array(v) for k, v in snapshot.items()} + + def add_analysis(data: dict) -> Styler | str: """Return analysis details. @@ -381,4 +387,5 @@ def add_analysis(data: dict) -> Styler | str: ]).hide(axis="index") else: + return "ERROR! Failed to find or parse output" diff --git a/aiida_aurora/workflows/cycling_sequence.py b/aiida_aurora/workflows/cycling_sequence.py index d840b8e..fc0229a 100644 --- a/aiida_aurora/workflows/cycling_sequence.py +++ b/aiida_aurora/workflows/cycling_sequence.py @@ -117,14 +117,35 @@ def has_steps_remaining(self): def inspect_cycling_step(self): """Verify that the last cycling step finished successfully.""" - last_subprocess = self.ctx.subprocesses[-1] - if not last_subprocess.is_finished_ok: - pkid = last_subprocess.pk - stat = last_subprocess.exit_status - self.report(f'Cycling substep failed with exit status {stat}') + last_step: orm.CalcJobNode = self.ctx.subprocesses[-1] + + if not last_step.is_finished_ok: + + pk = last_step.pk + status = last_step.exit_status + self.report(f"Process <{pk=}> failed with exit status {status}") + + # not killed by monitor + if "🍅" in last_step.base.extras.get("flag", ""): + + exit_codes = BatteryCyclerExperiment.exit_codes + + if status == exit_codes.WARNING_COMPLETED_CANCELLED.status: + # killed by tomato + last_step.base.extras.set("flag", "🚫") + else: + # catastrophic error + last_step.base.extras.set("flag", "🚨") + return self.exit_codes.ERROR_IN_CYCLING_STEP + last_step.base.extras.set("flag", "✅") + + for extra in ("status", "snapshot"): + if extra in last_step.base.extras: + last_step.base.extras.delete(extra) + def run_cycling_step(self): """Run the next cycling step.""" protocol_name = self.ctx.steps.pop(0) @@ -136,19 +157,24 @@ def run_cycling_step(self): 'control_settings': self.inputs.control_settings[protocol_name], } - has_monitors = protocol_name in self.inputs.monitor_settings + has_monitors = bool(self.inputs.monitor_settings[protocol_name]) if has_monitors: inputs['monitors'] = self.inputs.monitor_settings[protocol_name] - running = self.submit(BatteryCyclerExperiment, **inputs) + running: orm.CalcJobNode = self.submit( + BatteryCyclerExperiment, + **inputs, + ) + sample_name = self.inputs.battery_sample.attributes["metadata"]["name"] running.label = f"{protocol_name} | {sample_name}" - if has_monitors: - running.set_extra('monitored', True) - else: - running.set_extra('monitored', False) + running.base.extras.set_many({ + "monitored": has_monitors, + "flag": "🍅", + "status": "", + }) workflow_group = self.inputs.group_label.value experiment_group = f"{workflow_group}/{protocol_name}" From 80c8a33949e7aceabb32c86e1503f9fa1ee99648 Mon Sep 17 00:00:00 2001 From: Edan Bainglass Date: Mon, 1 Jan 2024 10:40:56 +0000 Subject: [PATCH 2/2] Update nitpick exceptions file --- docs/source/nitpick-exceptions | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/nitpick-exceptions b/docs/source/nitpick-exceptions index 7f6b96c..5021cd5 100644 --- a/docs/source/nitpick-exceptions +++ b/docs/source/nitpick-exceptions @@ -6,6 +6,7 @@ py:class Logger py:class orm.ProcessNode py:class CalcJobNode +py:class Transport py:class aiida_aurora.schemas.battery.Config py:class aiida_aurora.schemas.cycling.Config