From 2dda9500f4a9c788a73d055bb2fd2fdc35a4ee47 Mon Sep 17 00:00:00 2001 From: Robin De Schepper Date: Sat, 16 Mar 2024 01:05:52 +0100 Subject: [PATCH] fix tests --- docs/dev/services.rst | 58 +++++++--- tests/test_jobs.py | 216 ++++++++++++++++++------------------- tests/test_morphologies.py | 16 +-- 3 files changed, 159 insertions(+), 131 deletions(-) diff --git a/docs/dev/services.rst b/docs/dev/services.rst index 0aa976c73..e6b7b9195 100644 --- a/docs/dev/services.rst +++ b/docs/dev/services.rst @@ -40,18 +40,43 @@ The ``JobPool`` service allows you to ``submit`` ``Jobs`` and then ``execute`` t Most component types have a ``queue`` method that takes a job pool as an argument and lets them schedule their jobs. -The recommended way to open a job pool is to use :meth:`~bsb.core.Scaffold.create_job_pool`: +The recommended way to open a job pool is to use the +:meth:`~bsb.core.Scaffold.create_job_pool` context manager: .. code-block:: python network = from_storage("example.hdf5") - pool = network.create_job_pool() - if pool.is_main(): - # Only the main node needs to schedule the jobs - for component in network.placement.values(): - component.queue(pool) - # But everyone needs to partake in the execute call - pool.execute() + with network.create_job_pool() as pool: + if pool.is_main(): + # Only the main node needs to schedule the jobs + for component in network.placement.values(): + component.queue(pool) + # But everyone needs to partake in the execute call + pool.execute() + +Scheduling +---------- + +Pools can concurrently schedule the jobs on the main node, while executing them on worker +nodes with the :meth:`~bsb.services.pool.JobPool.schedule` method: + +.. code-block:: + + network = from_storage("example.hdf5") + with network.create_job_pool() as pool: + if pool.is_main(): + pool.schedule([*network.placement.values]()) + pool.execute() + +.. warning:: + + Pass in topologically sorted arrays of nodes! Some queueing methods depend on state + stored during the scheduling (specifically the ``_queued_jobs`` attribute). Dependencies + are only checked between the nodes, not the jobs, by checking for a ``depends_on`` + attribute. + +Listeners +--------- On top of opening the job pool this also registers the appropriate listeners. Listeners listen to updates emitted by the job pool and can respond to changes, for example by printing @@ -63,13 +88,16 @@ them out to display the progress of the job pool: def report_time_elapsed(progress): global _t if progress.reason == PoolProgressReason.POOL_STATUS_CHANGE: - if progress.status == PoolStatus.STARTING: + if progress.status == PoolStatus.SCHEDULING: _t = time.time() - elif progress.status == PoolStatus.ENDING: + elif progress.status == PoolStatus.CLOSING: print(f"Pool execution finished. {time.time()} seconds elapsed.") - pool = network.create_job_pool() - pool.add_listener(report_time_elapsed) - pool.submit(lambda scaffold: time.sleep(2)) - pool.execute() - # Will print `Pool execution finished. 2 seconds elapsed.` \ No newline at end of file + with network.create_job_pool() as pool: + pool.add_listener(report_time_elapsed) + pool.submit(lambda scaffold: time.sleep(2)) + pool.execute() + # Will print `Pool execution finished. 2 seconds elapsed.` + +Listeners can also be context managers, and will enter and exit the same context as the +JobPool. \ No newline at end of file diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 21429636b..cd41608a5 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -114,17 +114,17 @@ def setUp(self): @timeout(1) def test_create_pool(self): - pool = self.network.create_job_pool(quiet=True) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + pool.execute() @timeout(1) def test_single_job(self): """Test the execution of a single lambda function""" - pool = self.network.create_job_pool(quiet=True) - job = pool.queue(lambda scaffold, x, y: x * y, (5, 0.1)) - self.assertEqual(job.status, JobStatus.PENDING) + with self.network.create_job_pool(quiet=True) as pool: + job = pool.queue(lambda scaffold, x, y: x * y, (5, 0.1)) + self.assertEqual(job.status, JobStatus.PENDING) - results = pool.execute(return_results=True) + results = pool.execute(return_results=True) if pool.is_main(): self.assertEqual(0.5, results[job]) self.assertEqual(job.status, JobStatus.SUCCESS) @@ -134,10 +134,10 @@ def test_single_job_fail(self): """ Test if a division by zero error is propagated back """ - pool = self.network.create_job_pool(quiet=True) - job = pool.queue(lambda scaffold, x, y: x / y, (5, 0)) - with self.assertRaises(WorkflowError): - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + job = pool.queue(lambda scaffold, x, y: x / y, (5, 0)) + with self.assertRaises(WorkflowError): + pool.execute() if pool.is_main(): self.assertIn("division by zero", str(job._error)) self.assertEqual(job.status, JobStatus.FAILED) @@ -145,13 +145,13 @@ def test_single_job_fail(self): @timeout(1) def test_multiple_jobs(self): """Test the execution of a set of lambda function""" - pool = self.network.create_job_pool(quiet=True) - job1 = pool.queue(lambda scaffold, x, y: x * y, (5, 0.1)) - job2 = pool.queue(lambda scaffold, x, y: x * y, (6, 0.1)) - job3 = pool.queue(lambda scaffold, x, y: x * y, (7, 0.1)) - job4 = pool.queue(lambda scaffold, x, y: x * y, (8, 0.1)) + with self.network.create_job_pool(quiet=True) as pool: + job1 = pool.queue(lambda scaffold, x, y: x * y, (5, 0.1)) + job2 = pool.queue(lambda scaffold, x, y: x * y, (6, 0.1)) + job3 = pool.queue(lambda scaffold, x, y: x * y, (7, 0.1)) + job4 = pool.queue(lambda scaffold, x, y: x * y, (8, 0.1)) - results = pool.execute(return_results=True) + results = pool.execute(return_results=True) if pool.is_main(): self.assertAlmostEqual(0.5, results[job1]) @@ -164,11 +164,11 @@ def test_cancel_job(self): """ Cancel a job """ - pool = self.network.create_job_pool(quiet=True) - t = time.time() - job: "Job" = pool.queue(sleep_y, (5, 2)) - job.cancel("Test") - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + t = time.time() + job: "Job" = pool.queue(sleep_y, (5, 2)) + job.cancel("Test") + pool.execute() # Confirm the cancellation error self.assertEqual(JobCancelledError, type(job.error)) self.assertEqual("Test", str(job.error)) @@ -180,9 +180,9 @@ def test_cancel_bygone_job(self): """ Attempt to cancel a job after running. Should yield a 'could not cancel' warning. """ - pool = self.network.create_job_pool(quiet=True) - job = pool.queue(sleep_y, (5, 0.5)) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + job = pool.queue(sleep_y, (5, 0.5)) + pool.execute() if pool.is_main(): with self.assertWarns(Warning) as w: job.cancel("Testing") @@ -192,8 +192,8 @@ def test_cancel_bygone_job(self): def test_job_result_before_run(self): """Test result exception before the pool has ran""" - pool = self.network.create_job_pool(quiet=True) - job = pool.queue(sleep_y, (5, 0.5)) + with self.network.create_job_pool(quiet=True) as pool: + job = pool.queue(sleep_y, (5, 0.5)) with self.assertRaisesRegex(JobPoolError, "not available"): job.result @@ -201,9 +201,9 @@ def test_job_result_before_run(self): def test_job_result_after_run(self): """Test result exception after the pool has ran""" - pool = self.network.create_job_pool(quiet=True) - job = pool.queue(sleep_y, (5, 0.5)) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + job = pool.queue(sleep_y, (5, 0.5)) + pool.execute() with self.assertRaisesRegex(JobPoolError, "not available"): job.result @@ -219,11 +219,11 @@ def test_placement_job(self): ), ) - pool = self.network.create_job_pool(quiet=True) - pool.queue_placement( - self.network.placement.test_strat, Chunk((0, 0, 0), (200, 200, 200)) - ) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + pool.queue_placement( + self.network.placement.test_strat, Chunk((0, 0, 0), (200, 200, 200)) + ) + pool.execute() ps = self.network.get_placement_set("dud_cell") self.assertClose([[0, 0, 0]], ps.load_positions()) @@ -240,22 +240,22 @@ def setUp(self): @timeout(3) def test_double_pool(self): """Test whether we can open multiple pools sequentially""" - pool = self.network.create_job_pool(quiet=True) - job = pool.queue(sleep_y, (5, 0.1)) - results = pool.execute(return_results=True) + with self.network.create_job_pool(quiet=True) as pool: + job = pool.queue(sleep_y, (5, 0.1)) + results = pool.execute(return_results=True) if pool.is_main(): self.assertEqual(5, results[job]) - pool = self.network.create_job_pool(quiet=True) - job = pool.queue(sleep_y, (4, 0.1)) - results = pool.execute(return_results=True) + with self.network.create_job_pool(quiet=True) as pool: + job = pool.queue(sleep_y, (4, 0.1)) + results = pool.execute(return_results=True) if pool.is_main(): self.assertEqual(4, results[job]) @timeout(3) def test_submitting_closed(self): """Test that you can't submit a job after the pool has executed already""" - pool = self.network.create_job_pool(quiet=True) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + pool.execute() with self.assertRaises(JobPoolError): pool.queue(sleep_y, (4, 0.1)) @@ -273,10 +273,10 @@ def try_cancel(progress: PoolProgress): with self.assertWarnsRegex(Warning, "Could not cancel"): progress.jobs[0].cancel("Test") - pool = self.network.create_job_pool(quiet=True) - pool.add_listener(try_cancel, 1) - pool.queue(sleep_y, (5, 0.1)) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + pool.add_listener(try_cancel, 1) + pool.queue(sleep_y, (5, 0.1)) + pool.execute() @timeout(3) def test_cancel_queued_job(self): @@ -289,11 +289,11 @@ def job_killer(progress: PoolProgress): ): progress.jobs[-1].cancel("Testing") - pool = self.network.create_job_pool(quiet=True) - pool.add_listener(job_killer) - jobs = [pool.queue(sleep_y, (1, 0.001)) for _ in range(200)] - jobs.append(pool.queue(sleep_y, (1, 0.1))) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + pool.add_listener(job_killer) + jobs = [pool.queue(sleep_y, (1, 0.001)) for _ in range(200)] + jobs.append(pool.queue(sleep_y, (1, 0.1))) + pool.execute() if pool.is_main(): self.assertEqual(jobs[-1].status, JobStatus.CANCELLED) @@ -315,12 +315,12 @@ def spy_initial_pool_queue(progress): and not JobStatus.QUEUED == progress.jobs[1].status ) - pool = self.network.create_job_pool(quiet=True) - pool.add_listener(spy_initial_pool_queue) - job_without_dep = pool.queue(sleep_y, (4, 0.2)) - job_with_dep = pool.queue(sleep_y, (5, 0.08), deps=[job_without_dep]) + with self.network.create_job_pool(quiet=True) as pool: + pool.add_listener(spy_initial_pool_queue) + job_without_dep = pool.queue(sleep_y, (4, 0.2)) + job_with_dep = pool.queue(sleep_y, (5, 0.08), deps=[job_without_dep]) - results = pool.execute(return_results=True) + results = pool.execute(return_results=True) if pool.is_main(): self.assertTrue(outcome, "A job with unfinished dependencies was scheduled.") @@ -330,15 +330,15 @@ def spy_initial_pool_queue(progress): @timeout(3) def test_dependency_failure(self): """Test that when a dependency fails, the dependents are cancelled""" - pool = self.network.create_job_pool(fail_fast=False, quiet=True) - job = pool.queue(sleep_fail, (4, 0.2)) - job2 = pool.queue(sleep_y, (5, 0.1), deps=[job]) - job3 = pool.queue(sleep_y, (4, 0.1)) + with self.network.create_job_pool(fail_fast=False, quiet=True) as pool: + job = pool.queue(sleep_fail, (4, 0.2)) + job2 = pool.queue(sleep_y, (5, 0.1), deps=[job]) + job3 = pool.queue(sleep_y, (4, 0.1)) - try: - pool.execute() - except WorkflowError: - pass + try: + pool.execute() + except WorkflowError: + pass if not MPI.get_rank(): self.assertEqual(str(job2.error), "Job killed for dependency failure") @@ -348,14 +348,14 @@ def test_dependency_failure(self): def test_fail_fast(self): """Test that when a single job fails, main raises the error and further execution is aborted.""" - pool = self.network.create_job_pool(fail_fast=True, quiet=True) - job = pool.queue(sleep_fail, (4, 0.01)) - job3 = pool.queue(sleep_y, (4, 0.01)) - job4 = pool.queue(sleep_y, (4, 0.01)) - job5 = pool.queue(sleep_y, (4, 0.01)) - - with self.assertRaises(WorkflowError) as workflow_errors: - pool.execute() + with self.network.create_job_pool(fail_fast=True, quiet=True) as pool: + job = pool.queue(sleep_fail, (4, 0.01)) + job3 = pool.queue(sleep_y, (4, 0.01)) + job4 = pool.queue(sleep_y, (4, 0.01)) + job5 = pool.queue(sleep_y, (4, 0.01)) + + with self.assertRaises(WorkflowError) as workflow_errors: + pool.execute() if pool.is_main(): self.assertIn( ZeroDivisionError, @@ -372,10 +372,10 @@ def spy_lt(progress: PoolProgress): nonlocal i i += 1 - pool = self.network.create_job_pool(quiet=True) - pool.add_listener(spy_lt, 0.1) - pool.queue(sleep_y, (5, 0.35)) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + pool.add_listener(spy_lt, 0.1) + pool.queue(sleep_y, (5, 0.35)) + pool.execute() if pool.is_main(): self.assertEqual(i, 3, "Should have 3 timeout pings") self.assertEqual(0.1, pool._max_wait, "_max_wait not properly set.") @@ -395,12 +395,12 @@ class SerialPStrat(NotParallel, PlacementStrategy): def place(_, chunk, indicators): return 1 - pool = self.network.create_job_pool(quiet=True) - pstrat = self.network.placement.add( - "test", SerialPStrat(strategy="", cell_types=[], partitions=[]) - ) - pstrat.queue(pool, None) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + pstrat = self.network.placement.add( + "test", SerialPStrat(strategy="", cell_types=[], partitions=[]) + ) + pstrat.queue(pool, None) + pool.execute() def test_notparallel_cs_job(self): @config.node @@ -408,17 +408,17 @@ class SerialCStrat(NotParallel, ConnectionStrategy): def connect(_, pre, post): return 1 - pool = self.network.create_job_pool(quiet=True) - cstrat = self.network.connectivity.add( - "test", - SerialCStrat( - strategy="", - presynaptic={"cell_types": []}, - postsynaptic={"cell_types": []}, - ), - ) - cstrat.queue(pool) - pool.execute() + with self.network.create_job_pool(quiet=True) as pool: + cstrat = self.network.connectivity.add( + "test", + SerialCStrat( + strategy="", + presynaptic={"cell_types": []}, + postsynaptic={"cell_types": []}, + ), + ) + cstrat.queue(pool) + pool.execute() class TestSubmissionContext( @@ -450,25 +450,25 @@ def setUp(self): super().setUp() def test_ps_node_submission(self): - pool = self.network.create_job_pool() - if pool.is_main(): - self.network.placement.test.queue(pool, [100, 100, 100]) - self.assertEqual(1, len(pool.jobs)) - self.assertEqual("{root}.placement.test", pool.jobs[0].name) + with self.network.create_job_pool() as pool: + if pool.is_main(): + self.network.placement.test.queue(pool, [100, 100, 100]) + self.assertEqual(1, len(pool.jobs)) + self.assertEqual("{root}.placement.test", pool.jobs[0].name) @timeout(3) def test_cs_node_submission(self): self.network.run_placement() - pool = self.network.create_job_pool() - if pool.is_main(): - self.network.connectivity.test.queue(pool) - self.assertEqual(1, len(pool.jobs)) - self.assertEqual("{root}.connectivity.test", pool.jobs[0].name) + with self.network.create_job_pool() as pool: + if pool.is_main(): + self.network.connectivity.test.queue(pool) + self.assertEqual(1, len(pool.jobs)) + self.assertEqual("{root}.connectivity.test", pool.jobs[0].name) @timeout(3) def test_no_node_submission(self): - pool = self.network.create_job_pool() - if pool.is_main(): - job = pool.queue(sleep_y, (4, 0.2), submitter={"number": "One"}) - self.assertIn("function sleep_y", job.name) - self.assertEqual("One", job.context["number"]) + with self.network.create_job_pool() as pool: + if pool.is_main(): + job = pool.queue(sleep_y, (4, 0.2), submitter={"number": "One"}) + self.assertIn("function sleep_y", job.name) + self.assertEqual("One", job.context["number"]) diff --git a/tests/test_morphologies.py b/tests/test_morphologies.py index db97b404c..65a1aede5 100644 --- a/tests/test_morphologies.py +++ b/tests/test_morphologies.py @@ -1235,10 +1235,10 @@ def test_single_source(self): ] ) scaffold = Scaffold(cfg, self.storage) - pool = scaffold.create_job_pool(quiet=True) - cfg.morphologies[0].queue(pool) - self.assertEqual(1, len(pool.jobs)) - pool.execute() + with scaffold.create_job_pool(quiet=True) as pool: + cfg.morphologies[0].queue(pool) + self.assertEqual(1, len(pool.jobs)) + pool.execute() m_mio = scaffold.morphologies.load("test_mio") self.assertEqual(12, len(m_mio), "Expected 12 points in morpho") self.assertClose(0, m_mio.points[:, 2], "Rotation step skipped") @@ -1267,10 +1267,10 @@ def test_multi_source_parser_priority(self): ] ) scaffold = Scaffold(cfg, self.storage) - pool = scaffold.create_job_pool(quiet=True) - cfg.morphologies[0].queue(pool) - self.assertEqual(2, len(pool.jobs)) - pool.execute() + with scaffold.create_job_pool(quiet=True) as pool: + cfg.morphologies[0].queue(pool) + self.assertEqual(2, len(pool.jobs)) + pool.execute() m_mio = scaffold.morphologies.load("test_mio") m_bsb = scaffold.morphologies.load("test_bsb") # The MorphIO parser by default skips the boundary between soma and other tags