Skip to content

Commit

Permalink
Move job status tracking machinery into the backend
Browse files Browse the repository at this point in the history
  • Loading branch information
takluyver committed Aug 30, 2024
1 parent 8c53ab1 commit 987f966
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 68 deletions.
70 changes: 70 additions & 0 deletions damnit/backend/extraction_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,73 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=Fal
submitter.execute_in_slurm(req)
else:
submitter.submit(req)


class ExtractionJobTracker:
"""Track running extraction jobs using their running/finished messages"""
def __init__(self):
self.jobs = {} # keyed by processing_id

def on_processing_running(self, info):
proc_id = info['processing_id']
if info != self.jobs.get(proc_id, None):
self.jobs[proc_id] = info
self.on_run_jobs_changed(info['proposal'], info['run'])
log.debug("Processing running for p%s r%s on %s (%s)",
info['proposal'], info['run'], info['hostname'], proc_id)

def on_processing_finished(self, info):
proc_id = info['processing_id']
info = self.jobs.pop(proc_id, None)
if info is not None:
self.on_run_jobs_changed(info['proposal'], info['run'])
log.debug("Processing finished for p%s r%s (%s)",
info['proposal'], info['run'], proc_id)

def on_run_jobs_changed(self, proposal, run):
pass # Implement in subclass

Check warning on line 309 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L309

Added line #L309 was not covered by tests

def check_slurm_jobs(self):
"""Check for any Slurm jobs that exited without a 'finished' message"""
jobs_by_cluster = {}
for info in self.jobs.values():
if cluster := info['slurm_cluster']:
jobs_by_cluster.setdefault(cluster, []).append(info)

Check warning on line 316 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L313-L316

Added lines #L313 - L316 were not covered by tests

for cluster, infos in jobs_by_cluster.items():
jids = [i['slurm_job_id'] for i in infos]

Check warning on line 319 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L318-L319

Added lines #L318 - L319 were not covered by tests
# Passing 1 Job ID can give an 'Invalid job id' error if it has
# already left the queue. With multiple, we always get a list back.
if len(jids) == 1:
jids.append("1")

Check warning on line 323 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L322-L323

Added lines #L322 - L323 were not covered by tests

cmd = ["squeue", "--clusters", cluster, "--jobs=" + ",".join(jids),

Check warning on line 325 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L325

Added line #L325 was not covered by tests
"--format=%i %T", "--noheader"]
self.squeue_check_jobs(cmd, infos)

Check warning on line 327 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L327

Added line #L327 was not covered by tests

# Running the squeue subprocess is separated here so GUI code can override
# it, to avoid blocking the event loop if squeue is slow for any reason.
def squeue_check_jobs(self, cmd, jobs_to_check):
res = subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
if res.returncode != 0:
log.warning("Error calling squeue")
return

Check warning on line 335 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L332-L335

Added lines #L332 - L335 were not covered by tests

self.process_squeue_output(res.stdout, jobs_to_check)

Check warning on line 337 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L337

Added line #L337 was not covered by tests

def process_squeue_output(self, stdout: str, jobs_to_check):
"""Inspect squeue output to clean up crashed jobs"""
still_running = set()
for line in stdout.splitlines():
job_id, status = line.strip().split()
if status == 'RUNNING':
still_running.add(job_id)

Check warning on line 345 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L341-L345

Added lines #L341 - L345 were not covered by tests

for info in jobs_to_check:
proc_id = info['processing_id']
job_id = info['slurm_job_id']
if (proc_id in self.jobs) and (job_id not in still_running):
del self.jobs[proc_id]
self.on_run_jobs_changed(info['proposal'], info['run'])
log.info("Slurm job %s on %s (%s) crashed or was cancelled",

Check warning on line 353 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L347-L353

Added lines #L347 - L353 were not covered by tests
info['slurm_job_id'], info['slurm_cluster'], proc_id)
108 changes: 40 additions & 68 deletions damnit/gui/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from PyQt5.QtWidgets import QMessageBox

from ..backend.db import BlobTypes, DamnitDB, ReducedData
from ..backend.extraction_control import ExtractionJobTracker
from ..backend.user_variables import value_types_by_name
from ..util import StatusbarStylesheet, delete_variable, timestamp2str

Expand Down Expand Up @@ -295,7 +296,8 @@ def __init__(self, db: DamnitDB, column_settings: dict, parent):
self.column_index = {c: i for (i, c) in enumerate(self.column_ids)}
self.run_index = {} # {(proposal, run): row}
self.standalone_comment_index = {}
self.processing_jobs = {} # UUID -> job info
self.processing_jobs = QtExtractionJobTracker(self)
self.processing_jobs.run_jobs_changed.connect(self.update_processing_status)

self._bold_font = QtGui.QFont()
self._bold_font.setBold(True)
Expand Down Expand Up @@ -324,10 +326,6 @@ def __init__(self, db: DamnitDB, column_settings: dict, parent):

self._load_from_db()

self.slurm_check_timer = QtCore.QTimer(self)
self.slurm_check_timer.timeout.connect(self.check_slurm_jobs)
self.slurm_check_timer.start(120_000)

@staticmethod
def _load_columns(db: DamnitDB, col_settings):
t0 = time.perf_counter()
Expand Down Expand Up @@ -634,24 +632,13 @@ def handle_variable_set(self, var_info: dict):
self.setHorizontalHeaderItem(col_ix, QtGui.QStandardItem(title))

def handle_processing_running(self, info):
processing_id = info['processing_id']
self.processing_jobs[processing_id] = info
self.update_processing_status(info['proposal'], info['run'])
log.debug("Processing running for p%s r%s on %s (%s)",
info['proposal'], info['run'], info['hostname'], processing_id)
self.processing_jobs.on_processing_running(info)

def handle_processing_finished(self, info):
processing_id = info['processing_id']
info = self.processing_jobs.pop(processing_id, None)
if info is not None:
self.update_processing_status(info['proposal'], info['run'])
log.debug("Processing finished for p%s r%s (%s)",
info['proposal'], info['run'], processing_id)

def update_processing_status(self, proposal, run):
self.processing_jobs.on_processing_finished(info)

def update_processing_status(self, proposal, run, jobs_for_run):
"""Show/hide the processing indicator for the given run"""
jobs_for_run = [i for i in self.processing_jobs.values()
if i['proposal'] == proposal and i['run'] == run]
try:
row_ix = self.find_row(proposal, run)
except KeyError:
Expand All @@ -675,54 +662,6 @@ def update_processing_status(self, proposal, run):
runnr_item.setData(f"{run}", Qt.ItemDataRole.DisplayRole)
runnr_item.setToolTip("")

def check_slurm_jobs(self):
"""Every 2 minutes, check for crashed/cancelled Slurm jobs"""
jobs_by_cluster = {}
for info in self.processing_jobs.values():
if cluster := info['slurm_cluster']:
jobs_by_cluster.setdefault(cluster, []).append(info)

for cluster, infos in jobs_by_cluster.items():
self._check_slurm_jobs_cluster(cluster, infos)

def _check_slurm_jobs_cluster(self, cluster, infos):
jids = [i['slurm_job_id'] for i in infos]
# Passing 1 Job ID can give an 'Invalid job id' error if it has
# already left the queue. With multiple, we always get a list back.
if len(jids) == 1:
jids.append("1")

args = ["--clusters", cluster, "--jobs=" + ",".join(jids),
"--format=%i %T", "--noheader"]
log.info("Squeue check: %r", args)
proc = QProcess(self)
proc.setProcessChannelMode(QProcess.ForwardedErrorChannel)

def done():
proc.deleteLater()
if proc.exitStatus() != QProcess.NormalExit or proc.exitCode() != 0:
log.warning("Error calling squeue")
return
stdout = bytes(proc.readAllStandardOutput()).decode()

still_running = set()
for line in stdout.splitlines():
job_id, status = line.strip().split()
if status == 'RUNNING':
still_running.add(job_id)

for info in infos:
proc_id = info['processing_id']
job_id = info['slurm_job_id']
if (proc_id in self.processing_jobs) and (job_id not in still_running):
del self.processing_jobs[proc_id]
self.update_processing_status(info['proposal'], info['run'])
log.info("Slurm job %s on %s (%s) crashed or was cancelled",
info['slurm_job_id'], info['slurm_cluster'], proc_id)

proc.finished.connect(done)
proc.start("squeue", args)

def add_editable_column(self, name):
if name == "Status":
return
Expand Down Expand Up @@ -891,6 +830,39 @@ def dataframe_for_export(self, column_titles, rows=None, drop_image_cols=False):
return df


class QtExtractionJobTracker(ExtractionJobTracker, QtCore.QObject):
run_jobs_changed = QtCore.pyqtSignal(int, int, object) # prop, run, jobs

def __init__(self, parent):
super().__init__()
QtCore.QObject.__init__(self, parent)

# Check for crashed Slurm jobs every 2 minutes
self.slurm_check_timer = QtCore.QTimer(self)
self.slurm_check_timer.timeout.connect(self.check_slurm_jobs)
self.slurm_check_timer.start(120_000)

def squeue_check_jobs(self, cmd, jobs_to_check):
proc = QProcess(self)
proc.setProcessChannelMode(QProcess.ForwardedErrorChannel)

Check warning on line 847 in damnit/gui/table.py

View check run for this annotation

Codecov / codecov/patch

damnit/gui/table.py#L846-L847

Added lines #L846 - L847 were not covered by tests

def done():
proc.deleteLater()
if proc.exitStatus() != QProcess.NormalExit or proc.exitCode() != 0:
log.warning("Error calling squeue")
return
stdout = bytes(proc.readAllStandardOutput()).decode()
self.process_squeue_output(stdout, jobs_to_check)

Check warning on line 855 in damnit/gui/table.py

View check run for this annotation

Codecov / codecov/patch

damnit/gui/table.py#L849-L855

Added lines #L849 - L855 were not covered by tests

proc.finished.connect(done)
proc.start(cmd[0], cmd[1:])

Check warning on line 858 in damnit/gui/table.py

View check run for this annotation

Codecov / codecov/patch

damnit/gui/table.py#L857-L858

Added lines #L857 - L858 were not covered by tests

def on_run_jobs_changed(self, proposal, run):
jobs = [i for i in self.jobs.values()
if i['proposal'] == proposal and i['run'] == run]
self.run_jobs_changed.emit(proposal, run, jobs)


def prettify_notation(value):
if isinstance(value, float):
if value % 1 == 0 and abs(value) < 10_000:
Expand Down

0 comments on commit 987f966

Please sign in to comment.