Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Helveg committed Mar 16, 2024
1 parent 9f55460 commit 2dda950
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 131 deletions.
58 changes: 43 additions & 15 deletions docs/dev/services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,43 @@ The ``JobPool`` service allows you to ``submit`` ``Jobs`` and then ``execute`` t
Most component types have a ``queue`` method that takes a job pool as an argument and
lets them schedule their jobs.

The recommended way to open a job pool is to use :meth:`~bsb.core.Scaffold.create_job_pool`:
The recommended way to open a job pool is to use the
:meth:`~bsb.core.Scaffold.create_job_pool` context manager:

.. code-block:: python
network = from_storage("example.hdf5")
pool = network.create_job_pool()
if pool.is_main():
# Only the main node needs to schedule the jobs
for component in network.placement.values():
component.queue(pool)
# But everyone needs to partake in the execute call
pool.execute()
with network.create_job_pool() as pool:
if pool.is_main():
# Only the main node needs to schedule the jobs
for component in network.placement.values():
component.queue(pool)
# But everyone needs to partake in the execute call
pool.execute()
Scheduling
----------

Pools can concurrently schedule the jobs on the main node, while executing them on worker
nodes with the :meth:`~bsb.services.pool.JobPool.schedule` method:

.. code-block::
network = from_storage("example.hdf5")
with network.create_job_pool() as pool:
if pool.is_main():
pool.schedule([*network.placement.values]())
pool.execute()
.. warning::

Pass in topologically sorted arrays of nodes! Some queueing methods depend on state
stored during the scheduling (specifically the ``_queued_jobs`` attribute). Dependencies
are only checked between the nodes, not the jobs, by checking for a ``depends_on``
attribute.

Listeners
---------

On top of opening the job pool this also registers the appropriate listeners. Listeners
listen to updates emitted by the job pool and can respond to changes, for example by printing
Expand All @@ -63,13 +88,16 @@ them out to display the progress of the job pool:
def report_time_elapsed(progress):
global _t
if progress.reason == PoolProgressReason.POOL_STATUS_CHANGE:
if progress.status == PoolStatus.STARTING:
if progress.status == PoolStatus.SCHEDULING:
_t = time.time()
elif progress.status == PoolStatus.ENDING:
elif progress.status == PoolStatus.CLOSING:
print(f"Pool execution finished. {time.time()} seconds elapsed.")
pool = network.create_job_pool()
pool.add_listener(report_time_elapsed)
pool.submit(lambda scaffold: time.sleep(2))
pool.execute()
# Will print `Pool execution finished. 2 seconds elapsed.`
with network.create_job_pool() as pool:
pool.add_listener(report_time_elapsed)
pool.submit(lambda scaffold: time.sleep(2))
pool.execute()
# Will print `Pool execution finished. 2 seconds elapsed.`
Listeners can also be context managers, and will enter and exit the same context as the
JobPool.
Loading

0 comments on commit 2dda950

Please sign in to comment.