-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmonitors.py
110 lines (82 loc) · 3.11 KB
/
monitors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from __future__ import annotations
from json import load
from tempfile import NamedTemporaryFile
from aiida.common.log import LOG_LEVEL_REPORT
from aiida.orm import CalcJobNode
from aiida.transports import Transport
from .utils.analyzers import CapacityAnalyzer
def monitor_capacity_threshold(
node: CalcJobNode,
transport: Transport,
settings: dict,
filename="snapshot.json",
) -> str | None:
"""Retrieve and inspect snapshot to determine if capacity has
fallen below threshold for several consecutive cycles.
Parameters
----------
`node` : `CalcJobNode`
The calculation node.
`transport` : `Transport`
The associated transport instance.
`settings` : `dict`
The monitor settings.
`filename` : `str`
The polled source file, `"snapshot.json"` by default.
Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
Raises
------
`TypeError`
If source file is not in expected dictionary format (JSON).
`ValueError`
If source file is empty.
`FileNotFoundError`
If the file does not exist in the working directory.
`OSError`
If another error occurred while reading the file.
`Exception`
If something else prevented analysis.
"""
analyzer = CapacityAnalyzer(**settings)
try:
with transport:
remote_path = f"{node.get_remote_workdir()}/{filename}"
if not transport.isfile(remote_path):
node.logger.info(f"'{filename}' not yet produced; continue")
return None
try:
with NamedTemporaryFile("w+") as temp_file:
transport.getfile(remote_path, temp_file.name)
snapshot = load(temp_file)
if not isinstance(snapshot, dict):
raise TypeError
if not snapshot:
raise ValueError
analyzer.analyze(snapshot)
node.base.extras.set_many({
"status": analyzer.status,
"snapshot": analyzer.snapshot,
})
node.logger.log(LOG_LEVEL_REPORT, analyzer.report)
if node.base.extras.get("marked_for_death", False):
node.base.extras.set("flag", "☠️")
if "snapshot" in node.base.extras:
node.base.extras.delete("snapshot")
return "Job terminated by monitor per user request"
if analyzer.flag:
node.base.extras.set("flag", f"🍅{analyzer.flag}")
except TypeError:
node.logger.error(f"'{filename}' not in dictionary format")
except ValueError:
node.logger.error(f"'{filename}' is empty")
except FileNotFoundError:
node.logger.error(f"error fetching '{filename}'")
except OSError as err:
node.logger.error(str(err))
return None
except Exception as err:
node.logger.error(str(err))
return None