diff --git a/damnit/backend/extraction_control.py b/damnit/backend/extraction_control.py index 110ba0e3..9590081a 100644 --- a/damnit/backend/extraction_control.py +++ b/damnit/backend/extraction_control.py @@ -12,6 +12,7 @@ from ctypes import CDLL from dataclasses import dataclass from pathlib import Path +from secrets import token_hex from threading import Thread from extra_data.read_machinery import find_proposal @@ -74,6 +75,16 @@ def proposal_runs(proposal): raw_dir = Path(find_proposal(proposal_name)) / "raw" return set(int(p.stem[1:]) for p in raw_dir.glob("*")) +def batches(l, n): + start = 0 + while True: + end = start + n + batch = l[start:end] + if not batch: + return + yield batch + start = end + @dataclass class ExtractionRequest: @@ -143,17 +154,78 @@ def sbatch_cmd(self, req: ExtractionRequest): log.info("Processing output will be written to %s", log_path.relative_to(self.context_dir.absolute())) + if req.run == -1: + job_name = f"p{req.proposal}-damnit" + else: + # We put the run number first so that it's visible in + # squeue's default 11-character column for the JobName. + job_name = f"r{req.run}-p{req.proposal}-damnit" + return [ 'sbatch', '--parsable', *self._resource_opts(req.cluster), '-o', log_path, '--open-mode=append', - # Note: we put the run number first so that it's visible in - # squeue's default 11-character column for the JobName. - '--job-name', f"r{req.run}-p{req.proposal}-damnit", + '--job-name', job_name, '--wrap', shlex.join(req.python_cmd()) ] + def submit_multi(self, reqs: list[ExtractionRequest], limit_running=30): + """Submit multiple requests using Slurm job arrays. + """ + out = [] + + assert len({r.cluster for r in reqs}) <= 1 # Don't mix cluster/non-cluster + + # Array jobs are limited to 1001 in Slurm config (MaxArraySize) + for req_group in batches(reqs, 1000): + grpid = token_hex(8) # random unique string + scripts_dir = self.context_dir / '.tmp' + scripts_dir.mkdir(exist_ok=True) + if scripts_dir.stat().st_uid == os.getuid(): + scripts_dir.chmod(0o777) + + for i, req in enumerate(req_group): + script_file = scripts_dir / f'launch-{grpid}-{i}.sh' + log_path = process_log_path(req.run, req.proposal, self.context_dir) + script_file.write_text( + 'rm "$0"\n' # Script cleans itself up + f'{shlex.join(req.python_cmd())} >>"{log_path}" 2>&1' + ) + script_file.chmod(0o777) + + script_expr = f".tmp/launch-{grpid}-$SLURM_ARRAY_TASK_ID.sh" + cmd = self.sbatch_array_cmd(script_expr, req_group, limit_running) + if out: + # 1 batch at a time, to simplify limiting concurrent jobs + prev_job = out[-1][0] + cmd.append(f"--dependency=afterany:{prev_job}") + res = subprocess.run( + cmd, stdout=subprocess.PIPE, text=True, check=True, cwd=self.context_dir, + ) + job_id, _, cluster = res.stdout.partition(';') + job_id = job_id.strip() + cluster = cluster.strip() or 'maxwell' + log.info("Launched Slurm (%s) job array %s (%d runs) to run context file", + cluster, job_id, len(req_group)) + out.append((job_id, cluster)) + + return out + + def sbatch_array_cmd(self, script_expr, reqs, limit_running=30): + """Make the sbatch command for an array job""" + req = reqs[0] # This should never be called with an empty list + return [ + 'sbatch', '--parsable', + *self._resource_opts(req.cluster), + # Slurm doesn't know the run number, so we redirect inside the job + '-o', '/dev/null', + '--open-mode=append', + '--job-name', f"p{req.proposal}-damnit", + '--array', f"0-{len(reqs)-1}%{limit_running}", + '--wrap', f'exec {script_expr}' + ] + def execute_in_slurm(self, req: ExtractionRequest): """Run an extraction job in srun with live output""" log_path = process_log_path(req.run, req.proposal, self.context_dir) @@ -216,7 +288,7 @@ def _slurm_cluster_opts(self): return opts -def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=False): +def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=False, limit_running=30): """Called by the 'amore-proto reprocess' subcommand""" submitter = ExtractionSubmitter(Path.cwd()) if proposal is None: @@ -243,6 +315,7 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=Fal else: unavailable_runs.append((proposal, run)) + props_runs.sort() print(f"Reprocessing {len(props_runs)} runs already recorded, skipping {len(unavailable_runs)}...") else: try: @@ -274,11 +347,11 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=Fal for req in reqs[1:]: req.update_vars = False - for prop, run in props_runs: - req = ExtractionRequest(run, prop, RunData.ALL, match=match, mock=mock) - if direct: + if direct: + for req in reqs: submitter.execute_direct(req) - elif watch: + elif watch: + for req in reqs: submitter.execute_in_slurm(req) - else: - submitter.submit(req) + else: + submitter.submit_multi(reqs, limit_running=limit_running) diff --git a/damnit/cli.py b/damnit/cli.py index f825814b..627b8711 100644 --- a/damnit/cli.py +++ b/damnit/cli.py @@ -109,6 +109,10 @@ def main(argv=None): '--direct', action='store_true', help="Run processing in subprocesses on this node, instead of via Slurm" ) + reprocess_ap.add_argument( + '--concurrent-jobs', type=int, default=30, + help="The maximum number of jobs that will run at once (default 30)" + ) reprocess_ap.add_argument( 'run', nargs='+', help="Run number, e.g. 96. Multiple runs can be specified at once, " @@ -226,7 +230,8 @@ def main(argv=None): from .backend.extraction_control import reprocess reprocess( - args.run, args.proposal, args.match, args.mock, args.watch, args.direct + args.run, args.proposal, args.match, args.mock, args.watch, args.direct, + limit_running=args.concurrent_jobs, ) elif args.subcmd == 'read-context': diff --git a/damnit/gui/main_window.py b/damnit/gui/main_window.py index 215b6ce5..ce053fe2 100644 --- a/damnit/gui/main_window.py +++ b/damnit/gui/main_window.py @@ -928,8 +928,7 @@ def process_runs(self): try: reqs = dlg.extraction_requests() - for req in reqs: - submitter.submit(req) + submitter.submit_multi(reqs) except Exception as e: log.error("Error launching processing", exc_info=True) self.show_status_message(f"Error launching processing: {e}",