Skip to content

Commit

Permalink
Use Slurm array jobs to limit concurrent extraction jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
takluyver committed Sep 13, 2024
1 parent c295f8d commit 38bcb05
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 14 deletions.
8 changes: 6 additions & 2 deletions damnit/backend/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,11 @@ def main(argv=None):
# Hide some logging from Kafka to make things more readable
logging.getLogger('kafka').setLevel(logging.WARNING)

print(f"\n----- Processing r{args.run} (p{args.proposal}) -----", file=sys.stderr)
if (run := args.run) == -1:
log.debug("Getting run number from Slurm array task ID")
run = int(os.environ['SLURM_ARRAY_TASK_ID'])

Check warning on line 270 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L268-L270

Added lines #L268 - L270 were not covered by tests

print(f"\n----- Processing r{run} (p{args.proposal}) -----", file=sys.stderr)

Check warning on line 272 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L272

Added line #L272 was not covered by tests
log.info(f"run_data={args.run_data}, match={args.match}")
if args.mock:
log.info("Using mock run object for testing")
Expand All @@ -277,7 +281,7 @@ def main(argv=None):
if args.update_vars:
extr.update_db_vars()

extr.extract_and_ingest(args.proposal, args.run,
extr.extract_and_ingest(args.proposal, run,

Check warning on line 284 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L284

Added line #L284 was not covered by tests
cluster=args.cluster_job,
run_data=RunData(args.run_data),
match=args.match,
Expand Down
76 changes: 66 additions & 10 deletions damnit/backend/extraction_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import sys
from contextlib import contextmanager
from ctypes import CDLL
from dataclasses import dataclass
from dataclasses import dataclass, replace
from itertools import groupby
from pathlib import Path
from threading import Thread

Expand Down Expand Up @@ -38,6 +39,8 @@ def default_slurm_partition():


def process_log_path(run, proposal, ctx_dir=Path('.'), create=True):
if run == -1:
run = "%a" # Slurm array task ID
p = ctx_dir.absolute() / 'process_logs' / f"r{run}-p{proposal}.out"
if create:
p.parent.mkdir(exist_ok=True)
Expand Down Expand Up @@ -143,17 +146,69 @@ def sbatch_cmd(self, req: ExtractionRequest):
log.info("Processing output will be written to %s",
log_path.relative_to(self.context_dir.absolute()))

if req.run == -1:
job_name = f"p{req.proposal}-damnit"
else:
# We put the run number first so that it's visible in
# squeue's default 11-character column for the JobName.
job_name = f"r{req.run}-p{req.proposal}-damnit"

return [
'sbatch', '--parsable',
*self._resource_opts(req.cluster),
'-o', log_path,
'--open-mode=append',
# Note: we put the run number first so that it's visible in
# squeue's default 11-character column for the JobName.
'--job-name', f"r{req.run}-p{req.proposal}-damnit",
'--job-name', job_name,
'--wrap', shlex.join(req.python_cmd())
]

def submit_multi(self, reqs: list[ExtractionRequest], limit_running=30):
"""Submit multiple requests using Slurm job arrays.
Requests with only the run number different are grouped together.
limit_running controls how many can run simultaneously *within* each
group. Normally most/all runs will be in one group, so this will be
close to the overall limit.
"""
out = []
# run -1 tells extract_data to take the run # from the Slurm array task
for generic_req, req_group in groupby(reqs, key=lambda r: replace(r, run=-1)):
cmd = self.sbatch_cmd(generic_req) # -1 -> use Slurm array task id
runs = [req.run for req in req_group]
array_spec = self._abbrev_array_nums(runs) + f'%{limit_running}'
cmd += ['--array', array_spec]
res = subprocess.run(
cmd, stdout=subprocess.PIPE, text=True, check=True, cwd=self.context_dir,
)
job_id, _, cluster = res.stdout.partition(';')
job_id = job_id.strip()
cluster = cluster.strip() or 'maxwell'
log.info("Launched Slurm (%s) job %s with array %s (%d runs) to run context file",
cluster, job_id, array_spec, len(runs))
out.append((job_id, cluster))

return out

@staticmethod
def _abbrev_array_nums(nums: list[int]) -> str:
range_starts, range_ends = [nums[0]], []
current_range_end = nums[0]
for r in nums[1:]:
if r > current_range_end + 1:
range_ends.append(current_range_end)
range_starts.append(r)
current_range_end = r

Check warning on line 200 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L197-L200

Added lines #L197 - L200 were not covered by tests
range_ends.append(current_range_end)

s_pieces = []
for start, end in zip(range_starts, range_ends):
if start == end:
s_pieces.append(str(start))
else:
s_pieces.append(f"{start}-{end}")

Check warning on line 208 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L208

Added line #L208 was not covered by tests

return ",".join(s_pieces)

def execute_in_slurm(self, req: ExtractionRequest):
"""Run an extraction job in srun with live output"""
log_path = process_log_path(req.run, req.proposal, self.context_dir)
Expand Down Expand Up @@ -243,6 +298,7 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=Fal
else:
unavailable_runs.append((proposal, run))

props_runs.sort()
print(f"Reprocessing {len(props_runs)} runs already recorded, skipping {len(unavailable_runs)}...")
else:
try:
Expand Down Expand Up @@ -274,11 +330,11 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=Fal
for req in reqs[1:]:
req.update_vars = False

for prop, run in props_runs:
req = ExtractionRequest(run, prop, RunData.ALL, match=match, mock=mock)
if direct:
if direct:
for req in reqs:

Check warning on line 334 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L334

Added line #L334 was not covered by tests
submitter.execute_direct(req)
elif watch:
elif watch:
for req in reqs:

Check warning on line 337 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L337

Added line #L337 was not covered by tests
submitter.execute_in_slurm(req)
else:
submitter.submit(req)
else:
submitter.submit_multi(reqs)
3 changes: 1 addition & 2 deletions damnit/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,8 +929,7 @@ def process_runs(self):

try:
reqs = dlg.extraction_requests()
for req in reqs:
submitter.submit(req)
submitter.submit_multi(reqs)

Check warning on line 932 in damnit/gui/main_window.py

View check run for this annotation

Codecov / codecov/patch

damnit/gui/main_window.py#L932

Added line #L932 was not covered by tests
except Exception as e:
log.error("Error launching processing", exc_info=True)
self.show_status_message(f"Error launching processing: {e}",
Expand Down

0 comments on commit 38bcb05

Please sign in to comment.