diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4098a07b..e4c01d62 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,26 +1,116 @@ -name: Upload BSB Package +name: Bump version, create release and deploy on: - release: - types: [created] + push: + branches: + - main jobs: + bump: + runs-on: ubuntu-latest + outputs: + tag: ${{ steps.semver.outputs.next }} + old_tag: ${{ steps.semver.outputs.current }} + + steps: + - name: Checkout Code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Get Next Version + id: semver + uses: ietf-tools/semver-action@v1 + with: + token: ${{ github.token }} + branch: main + + - name: Set up Python 3.11 + uses: actions/setup-python@v1 + with: + python-version: 3.11 + + - name: Bump version in Python project + run: | + pip install --upgrade pip bump-my-version + oldv="${{ steps.semver.outputs.current }}" + newv="${{steps.semver.outputs.next}}" + # Bump the version, dropping the leading `v` with `${x:1}` + bump-my-version replace --current-version=${oldv:1} --new-version=${newv:1} pyproject.toml + + - name: Commit version change + uses: stefanzweifel/git-auto-commit-action@v4 + with: + branch: main + commit_message: 'docs: bump version: ${{ steps.semver.outputs.current }} → ${{ steps.semver.outputs.next }}' + + - uses: rickstaa/action-create-tag@v1 + id: "tag_create" + with: + tag: ${{ steps.semver.outputs.next }} + github_token: ${{ github.token }} + + release: + runs-on: ubuntu-latest + needs: bump + + steps: + - name: Checkout Code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Pull commit of version change + run: | + git pull origin main + + - name: Update CHANGELOG + id: changelog + uses: requarks/changelog-action@v1 + with: + token: ${{ github.token }} + fromTag: ${{ needs.bump.outputs.tag }} + toTag: ${{ needs.bump.outputs.old_tag }} + + - name: Create Release + uses: ncipollo/release-action@v1.12.0 + with: + allowUpdates: true + draft: false + makeLatest: true + tag: ${{ needs.bump.outputs.tag }} + name: ${{ needs.bump.outputs.tag }} + body: ${{ steps.changelog.outputs.changes }} + token: ${{ github.token }} + + - name: Commit CHANGELOG.md + uses: stefanzweifel/git-auto-commit-action@v4 + with: + branch: main + commit_message: 'docs: update CHANGELOG.md for ${{ github.ref_name }}' + file_pattern: CHANGELOG.md + deploy: runs-on: ubuntu-latest + needs: release + steps: - - uses: actions/checkout@v4 - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: '3.x' - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install setuptools wheel twine - - name: Build and publish - env: - TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} - TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} - run: | - python setup.py sdist bdist_wheel - twine upload dist/* + - uses: actions/checkout@v4 + + - name: Set up Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: 3.11 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Build and publish + env: + TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + python -m build + twine upload --verbose --repository pypi dist/* diff --git a/.github/workflows/validate-pr.yml b/.github/workflows/validate-pr.yml new file mode 100644 index 00000000..82868560 --- /dev/null +++ b/.github/workflows/validate-pr.yml @@ -0,0 +1,14 @@ +name: PR Conventional Commit Validation + +on: + pull_request: + types: [opened, synchronize, reopened, edited] + +jobs: + validate-pr-title: + runs-on: ubuntu-latest + steps: + - name: PR Conventional Commit Validation + uses: ytanikin/PRConventionalCommits@1.2.0 + with: + task_types: '["feat","fix","docs","test","ci","refactor","perf","revert"]' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 705ff3cd..02848a04 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,6 @@ +default_install_hook_types: + - pre-commit + - commit-msg repos: - repo: https://github.com/psf/black-pre-commit-mirror rev: 24.1.1 @@ -14,3 +17,10 @@ repos: name: api-test entry: python3 .github/devops/generate_public_api.py language: system + - repo: https://github.com/compilerla/conventional-pre-commit + rev: v3.3.0 + hooks: + - id: conventional-pre-commit + name: conventional-commit + stages: [ commit-msg ] + args: [ ] diff --git a/CHANGELOG b/CHANGELOG.md similarity index 99% rename from CHANGELOG rename to CHANGELOG.md index 274082ed..d64556ad 100644 --- a/CHANGELOG +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 4.3.0 +* Introduction of a pool caching system +* Fix run iteration values in core +* Add FixedOutdegree + # 4.2.0 * Created geometric shape connection strategies * Added support for multiple shapes for each cell type diff --git a/bsb/__init__.py b/bsb/__init__.py index e0bfbe19..5ccb7777 100644 --- a/bsb/__init__.py +++ b/bsb/__init__.py @@ -7,7 +7,7 @@ install the `bsb` package instead. """ -__version__ = "4.2.0" +__version__ = "4.3.0" import functools import importlib @@ -221,6 +221,7 @@ def __dir__(): FileScheme: typing.Type["bsb.storage._files.FileScheme"] FileStore: typing.Type["bsb.storage.interfaces.FileStore"] FixedIndegree: typing.Type["bsb.connectivity.general.FixedIndegree"] +FixedOutdegree: typing.Type["bsb.connectivity.general.FixedOutdegree"] FixedPositions: typing.Type["bsb.placement.strategy.FixedPositions"] FractionFilter: typing.Type["bsb.simulation.targetting.FractionFilter"] GatewayError: typing.Type["bsb.exceptions.GatewayError"] @@ -416,6 +417,7 @@ def __dir__(): parse_configuration_file: "bsb.config.parse_configuration_file" parse_morphology_content: "bsb.morphologies.parsers.parse_morphology_content" parse_morphology_file: "bsb.morphologies.parsers.parse_morphology_file" +pool_cache: "bsb.services.pool_cache" read_option: "bsb.options.read_option" refs: "bsb.config.refs" register_option: "bsb.options.register_option" diff --git a/bsb/connectivity/general.py b/bsb/connectivity/general.py index d15ab054..82c8a6bd 100644 --- a/bsb/connectivity/general.py +++ b/bsb/connectivity/general.py @@ -42,6 +42,38 @@ def connect(self, pre, post): self.connect_cells(from_ps, to_ps, src_locs, dest_locs) +def _connect_fixed_degree(self, pre, post, degree, is_in): + # Generalized connect function for Fixed in- and out-degree + rng = np.random.default_rng() + ps_counted = pre.placement if is_in else post.placement + ps_fixed = post.placement if is_in else pre.placement + high = sum(len(ps) for ps in ps_counted) + for ps in ps_fixed: + l = len(ps) + counted_targets = np.full((l * degree, 3), -1) + fixed_targets = np.full((l * degree, 3), -1) + ptr = 0 + for i in range(l): + fixed_targets[ptr : ptr + degree, 0] = i + counted_targets[ptr : ptr + degree, 0] = rng.choice( + high, degree, replace=False + ) + ptr += degree + lowmux = 0 + for ps_o in ps_counted: + highmux = lowmux + len(ps_o) + demux_idx = (counted_targets[:, 0] >= lowmux) & ( + counted_targets[:, 0] < highmux + ) + demuxed = counted_targets[demux_idx] + demuxed[:, 0] -= lowmux + if is_in: + self.connect_cells(ps_o, ps, demuxed, fixed_targets[demux_idx]) + else: + self.connect_cells(ps, ps_o, fixed_targets[demux_idx], demuxed) + lowmux = highmux + + @config.node class FixedIndegree(InvertedRoI, ConnectionStrategy): """ @@ -52,26 +84,20 @@ class FixedIndegree(InvertedRoI, ConnectionStrategy): indegree: int = config.attr(type=int, required=True) def connect(self, pre, post): - in_ = self.indegree - rng = np.random.default_rng() - high = sum(len(ps) for ps in pre.placement) - for ps in post.placement: - l = len(ps) - pre_targets = np.full((l * in_, 3), -1) - post_targets = np.full((l * in_, 3), -1) - ptr = 0 - for i in range(l): - post_targets[ptr : ptr + in_, 0] = i - pre_targets[ptr : ptr + in_, 0] = rng.choice(high, in_, replace=False) - ptr += in_ - lowmux = 0 - for pre_ps in pre.placement: - highmux = lowmux + len(pre_ps) - demux_idx = (pre_targets[:, 0] >= lowmux) & (pre_targets[:, 0] < highmux) - demuxed = pre_targets[demux_idx] - demuxed[:, 0] -= lowmux - self.connect_cells(pre_ps, ps, demuxed, post_targets[demux_idx]) - lowmux = highmux - - -__all__ = ["AllToAll", "Convergence", "FixedIndegree"] + _connect_fixed_degree(self, pre, post, self.indegree, True) + + +@config.node +class FixedOutdegree(ConnectionStrategy): + """ + Connect a group of presynaptic cell types to ``outdegree`` uniformly random + postsynaptic cells from all the postsynaptic cell types. + """ + + outdegree: int = config.attr(type=int, required=True) + + def connect(self, pre, post): + _connect_fixed_degree(self, pre, post, self.outdegree, False) + + +__all__ = ["AllToAll", "Convergence", "FixedIndegree", "FixedOutdegree"] diff --git a/bsb/core.py b/bsb/core.py index 4abd60bb..dcdcca08 100644 --- a/bsb/core.py +++ b/bsb/core.py @@ -131,6 +131,7 @@ def __init__(self, config=None, storage=None, clear=False, comm=None): :returns: A network object :rtype: :class:`~.core.Scaffold` """ + self._pool_cache: dict[int, typing.Callable[[], None]] = {} self._pool_listeners: list[tuple[typing.Callable[[list["Job"]], None], float]] = ( [] ) @@ -270,7 +271,7 @@ def run_placement(self, strategies=None, fail_fast=True, pipelines=True): if pipelines: self.run_pipelines() if strategies is None: - strategies = [*self.placement.values()] + strategies = set(self.placement.values()) strategies = PlacementStrategy.sort_deps(strategies) with self.create_job_pool(fail_fast=fail_fast) as pool: if pool.is_main(): @@ -309,7 +310,7 @@ def run_after_placement(self, hooks=None, fail_fast=None, pipelines=True): Run after placement hooks. """ if hooks is None: - hooks = self.after_placement + hooks = set(self.after_placement.values()) with self.create_job_pool(fail_fast) as pool: if pool.is_main(): pool.schedule(hooks) @@ -321,7 +322,7 @@ def run_after_connectivity(self, hooks=None, fail_fast=None, pipelines=True): Run after placement hooks. """ if hooks is None: - hooks = self.after_placement + hooks = set(self.after_connectivity.values()) with self.create_job_pool(fail_fast) as pool: if pool.is_main(): pool.schedule(hooks) @@ -785,6 +786,19 @@ def remove_listener(self, listener): self._pool_listeners.pop(i) break + def register_pool_cached_item(self, id, cleanup): + """ + Registers a cleanup function for items cached during a parallel workflow. + Internal use only. + + :param id: Id of the cached item. Should be unique but identical across MPI + nodes + :param cleanup: A callable that cleans up the cached item. + """ + if id in self._pool_cache: + raise RuntimeError(f"Pool cache item '{id}' already exists.") + self._pool_cache[id] = cleanup + class ReportListener: def __init__(self, scaffold, file): diff --git a/bsb/services/__init__.py b/bsb/services/__init__.py index 22ed9130..6317f73d 100644 --- a/bsb/services/__init__.py +++ b/bsb/services/__init__.py @@ -17,7 +17,7 @@ """ from .pool import JobPool as _JobPool # noqa -from .pool import WorkflowError +from .pool import WorkflowError, pool_cache JobPool = _JobPool """ @@ -33,4 +33,4 @@ def register_service(attr, provider): globals()[attr] = provider -__all__ = ["MPI", "MPILock", "JobPool", "register_service", "WorkflowError"] +__all__ = ["MPI", "MPILock", "JobPool", "register_service", "WorkflowError", "pool_cache"] diff --git a/bsb/services/mpi.py b/bsb/services/mpi.py index 6b042a46..d74924b5 100644 --- a/bsb/services/mpi.py +++ b/bsb/services/mpi.py @@ -49,6 +49,25 @@ def allgather(self, obj): return self._comm.allgather(obj) return [obj] + def window(self, buffer): + if self._comm and self.get_size() > 1: + from mpi4py.MPI import INFO_NULL, Win + + return Win.Create(buffer, True, INFO_NULL, self._comm) + else: + + class WindowMock: + def Get(self, bufspec, rank): + return bufspec[0] + + def Lock(self, rank): + pass + + def Unlock(self, rank): + pass + + return WindowMock() + class MPIModule(MockModule): """ diff --git a/bsb/services/pool.py b/bsb/services/pool.py index c48f41ac..0df89854 100644 --- a/bsb/services/pool.py +++ b/bsb/services/pool.py @@ -40,15 +40,18 @@ class name, ``_name`` for the job name and ``_c`` for the chunk. These are used import abc import concurrent.futures +import functools import logging import pickle import tempfile import threading import typing import warnings +import zlib from contextlib import ExitStack from enum import Enum, auto +import numpy as np from exceptiongroup import ExceptionGroup from .._util import obj_str_insert @@ -215,11 +218,26 @@ def enable_serde_logging(self): def dispatcher(pool_id, job_args): + """ + The dispatcher is the function that gets pickled on main, and unpacked "here" on the + worker. Through class variables on `JobPool` and the given `pool_id` we can find the + pool and scaffold object, and the job function to run. + + Before running a job, the cache is checked for eventual cached items to free up. + """ job_type, args, kwargs = job_args # Get the static job execution handler from this module handler = globals()[job_type].execute + # Get the owning scaffold from the JobPool class variables, which act as a registry. owner = JobPool.get_owner(pool_id) - # Execute it. + + # Check the pool's cache + pool = JobPool._pools[pool_id] + required_cache_items = pool._read_required_cache_items() + # and free any stale cached items + free_stale_pool_cache(owner, required_cache_items) + + # Execute the job handler. return handler(owner, args, kwargs) @@ -268,7 +286,13 @@ class Job(abc.ABC): """ def __init__( - self, pool, submission_context: SubmissionContext, args, kwargs, deps=None + self, + pool, + submission_context: SubmissionContext, + args, + kwargs, + deps=None, + cache_items=None, ): self.pool_id = pool.id self._args = args @@ -281,6 +305,7 @@ def __init__( self._thread: typing.Optional[threading.Thread] = None self._res_file = None self._error = None + self._cache_items: list[int] = [] if cache_items is None else cache_items for j in self._deps: j.on_completion(self._dep_completed) @@ -453,7 +478,8 @@ class PlacementJob(Job): def __init__(self, pool, strategy, chunk, deps=None): args = (strategy.name, chunk) context = SubmissionContext(strategy, [chunk]) - super().__init__(pool, context, args, {}, deps=deps) + cache_items = get_node_cache_items(strategy) + super().__init__(pool, context, args, {}, deps=deps, cache_items=cache_items) @staticmethod def execute(job_owner, args, kwargs): @@ -475,7 +501,8 @@ def __init__(self, pool, strategy, pre_roi, post_roi, deps=None): context = SubmissionContext( strategy, chunks=chunklist((*(pre_roi or []), *(post_roi or []))) ) - super().__init__(pool, context, args, {}, deps=deps) + cache_items = get_node_cache_items(strategy) + super().__init__(pool, context, args, {}, deps=deps, cache_items=cache_items) @staticmethod def execute(job_owner, args, kwargs): @@ -486,12 +513,19 @@ def execute(job_owner, args, kwargs): class FunctionJob(Job): - def __init__(self, pool, f, args, kwargs, deps=None, **context): + def __init__(self, pool, f, args, kwargs, deps=None, cache_items=None, **context): # Pack the function into the args args = (f, args) # If no submitter was given, set the function as submitter context.setdefault("submitter", f) - super().__init__(pool, SubmissionContext(**context), args, kwargs, deps=deps) + super().__init__( + pool, + SubmissionContext(**context), + args, + kwargs, + deps=deps, + cache_items=cache_items, + ) @staticmethod def execute(job_owner, args, kwargs): @@ -521,6 +555,8 @@ def __init__(self, scaffold, fail_fast=False, workflow: "Workflow" = None): self._workers_raise_unhandled = False self._fail_fast = fail_fast self._workflow = workflow + self._cache_buffer = np.zeros(1000, dtype=np.uint64) + self._cache_window = MPI.window(self._cache_buffer) def __enter__(self): self._context = ExitStack() @@ -655,7 +691,7 @@ def scheduling(self): return any(not f.done() for f in self._schedulers) def queue(self, f, args=None, kwargs=None, deps=None, **context): - job = FunctionJob(self, f, args or [], kwargs or {}, deps, **context) + job = FunctionJob(self, f, args or [], kwargs or {}, deps, [], **context) self._put(job) return job @@ -716,6 +752,10 @@ def _execute_parallel(self): "Unhandled exceptions during parallel execution.", [JobPoolError("See main node logs for details.")], ) + + # Free all cached items + free_stale_pool_cache(self.owner, set()) + return try: @@ -726,6 +766,8 @@ def _execute_parallel(self): job._enqueue(self) # Add the scheduling futures to the running futures, to await them. self._running_futures.extend(self._schedulers) + # Start tracking cached items + self._update_cache_window() # Keep executing as long as any of the schedulers or jobs aren't done yet. while self.scheduling or any( @@ -747,6 +789,11 @@ def _execute_parallel(self): for job in self._job_queue: if job._future in done: job._completed() + + # If a job finished, update the required cache items + if len(done): + self._update_cache_window() + # Remove running futures that are done for future in done: self._running_futures.remove(future) @@ -797,6 +844,8 @@ def _execute_serial(self): while job.run(timeout=self._max_wait): self.ping() self.notify() + # After each job, check if any cache items can be freed. + free_stale_pool_cache(self.owner, self.get_required_cache_items()) self.notify() # Raise any unhandled errors self.raise_unhandled() @@ -846,3 +895,90 @@ def raise_unhandled(self): f"Your workflow encountered errors.", errors, ) + + def get_required_cache_items(self): + """ + Returns the list of cache functions for all the jobs in the queue + + :return: set of cache function name + :rtype: set[int] + """ + items = set() + for job in self._job_queue: + if ( + job.status == JobStatus.QUEUED + or job.status == JobStatus.PENDING + or job.status == JobStatus.RUNNING + ): + items.update(job._cache_items) + return items + + def _update_cache_window(self): + """ + Checks and updates if the cache buffer should be updated by looking at the job + statuses in the job queue. Only call on main. + """ + # Create a new cache window buffer + new_buffer = np.zeros(1000, dtype=int) + for i, item in enumerate(self.get_required_cache_items()): + new_buffer[i] = item + + # If there are actual cache requirement differences, lock the window + # and transfer the buffer + if np.any(new_buffer != self._cache_buffer): + self._cache_window.Lock(0) + self._cache_buffer[:] = new_buffer + self._cache_window.Unlock(0) + + def _read_required_cache_items(self): + """ + Locks the cache window and read the still required cache items from rank 0. + Only call on workers. + """ + from mpi4py.MPI import UINT64_T + + self._cache_window.Lock(0) + self._cache_window.Get([self._cache_buffer, UINT64_T], 0) + self._cache_window.Unlock(0) + return set(self._cache_buffer) + + +def get_node_cache_items(node): + return [ + attr.get_pool_cache_id(node) + for key in dir(node) + if hasattr(attr := getattr(node, key), "get_pool_cache_id") + ] + + +def free_stale_pool_cache(scaffold, required_cache_items: set[int]): + for stale_key in set(scaffold._pool_cache.keys()) - required_cache_items: + # If so, pop them and execute the registered cleanup function. + scaffold._pool_cache.pop(stale_key)() + + +def pool_cache(caching_function): + @functools.cache + def decorated(self, *args, **kwargs): + self.scaffold.register_pool_cached_item( + decorated.get_pool_cache_id(self), cleanup + ) + return caching_function(self, *args, **kwargs) + + def get_pool_cache_id(node): + if not hasattr(node, "get_node_name"): + raise RuntimeError( + "Pool caching can only be used on methods of @node decorated classes." + ) + return _cache_hash(f"{node.get_node_name()}.{caching_function.__name__}") + + def cleanup(): + decorated.cache_clear() + + decorated.get_pool_cache_id = get_pool_cache_id + + return decorated + + +def _cache_hash(string): + return zlib.crc32(string.encode()) diff --git a/docs/connectivity/component.rst b/docs/connectivity/component.rst index 063ae36d..4e6af5bd 100644 --- a/docs/connectivity/component.rst +++ b/docs/connectivity/component.rst @@ -151,6 +151,10 @@ The example connects cells that are near each other, between a :guilabel:`min` a if self.max < self.min: raise ConfigurationError("Max distance should be larger than min distance.") +.. hint:: + + Some connection strategies may benefit in speed from :ref:`caching`. + And an example configuration using this strategy: .. code-block:: json diff --git a/docs/connectivity/connection-strategies.rst b/docs/connectivity/connection-strategies.rst index 72b3803f..6b5863bf 100644 --- a/docs/connectivity/connection-strategies.rst +++ b/docs/connectivity/connection-strategies.rst @@ -2,6 +2,28 @@ List of strategies ################## +:class:`AllToAll <.connectivity.general.AllToAll>` +================================================== + +This strategy connects each presynaptic neuron to all the postsynaptic neurons. +It therefore creates one connection for each unique pair of neuron. + +:class:`FixedIndegree <.connectivity.general.FixedIndegree>` +============================================================ + +This strategy connects to each postsynaptic neuron, a fixed number of uniform randomly selected +presynaptic neurons. + +* ``indegree``: Number of neuron to connect for each postsynaptic neuron. + +:class:`FixedOutdegree <.connectivity.general.FixedOutdegree>` +============================================================== + +This strategy connects to each presynaptic neuron, a fixed number of uniform randomly selected +postsynaptic neurons. + +* ``outdegree``: Number of neuron to connect for each presynaptic neuron. + :class:`VoxelIntersection <.connectivity.detailed.voxel_intersection.VoxelIntersection>` ======================================================================================== diff --git a/docs/dev/guidelines.rst b/docs/dev/guidelines.rst new file mode 100644 index 00000000..708fb317 --- /dev/null +++ b/docs/dev/guidelines.rst @@ -0,0 +1,65 @@ +#################### +Developer Guidelines +#################### + +This section provides advisory guidelines for developers on the BSB repository to facilitate +the communication with its maintainer and smoothen the process of integration and review of new contributions. + +Please, read first our `code of conduct `_ to +understand how to interact with the BSB community |:heart:| + +Development process +------------------- + +Raise issue on Github +~~~~~~~~~~~~~~~~~~~~~ +If you wish to contribute or raise an issue on the BSB project, you should first check the list of known +`issues `_ on Github. If you cannot find an issue related to your specific +contribution, please create a new one. It is indeed important for the BSB maintainers to keep track of potential bugs +or needed features to schedule future releases. Additionally, they would provide you with their expertise and guide you +through this process of development. + +If you need to create an issue on Github, please provide as much context as possible. + +Fork and create a Pull Request +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +If you are not part of the BSB maintainers, you should fork the bsb repository on your own account to modify the code. +If you introduce new features to BSB, please provide the associated documentation (docstrings or general documentation), +and unittests. We are trying to improve the coverage for both and would appreciate greatly your contribution. + +The documentation, tests and code style (black, isort) is controlled for each commit on the repository, so please +install the :doc:`pre-commit hooks ` and run the following tests before pushing on the repository: + +To install:: + + cd bsb-core + black . + + isort . + + # try build the documentation, warnings will trigger errors + cd docs && rm -rf _build && sphinx-build -nW -b html . _build/html && cd .. + + # run the tests + python -m unittest discover -s tests + +The BSB repository implements Github Actions to perform these tests directly on Github. Failing these tests will prevent +the integration of your contribution. Do not hesitate to ask for help on these |:wink:| + +When you believe your changes are ready to be integrated in the main repository, you can create a Pull Request (PR) +adding in the description what your contribution changed and which issue it is related to. + +Commit guidelines +~~~~~~~~~~~~~~~~~ +BSB commits and PR names should follow the +`conventional commits guidelines `_. Not only this will help with the +communication of the nature of your changes with the other developers, it will also permit for the automatic +generation of changelogs and releases. + +Releases +~~~~~~~~ +A new BSB release is published automatically for every push on the ``main`` branch. +The push will automatically trigger Github Actions that will bump the library version, add a git tag, make a github +release and update the `CHANGELOG `_ +This will update the official documentation on ``Readthedocs`` but also deploy the code on +`PyPI `_ and the `EBRAINS `_ website. diff --git a/docs/dev/installation.rst b/docs/dev/installation.rst index 7f216a89..0fed7cdd 100644 --- a/docs/dev/installation.rst +++ b/docs/dev/installation.rst @@ -12,13 +12,6 @@ To install:: Test your install with:: - python -m unittest discover -s tests + cd tests/ + python -m unittest discover -s ./ -Releases --------- - -To release a new version:: - - bump-my-version bump pre_n - python -m build - twine upload dist/* --skip-existing \ No newline at end of file diff --git a/docs/dev/services.rst b/docs/dev/services.rst index cc6a156c..aed5e1a1 100644 --- a/docs/dev/services.rst +++ b/docs/dev/services.rst @@ -99,4 +99,34 @@ them out to display the progress of the job pool: # Will print `Pool execution finished. 2 seconds elapsed.` Listeners can also be context managers, and will enter and exit the same context as the -JobPool. \ No newline at end of file +JobPool. + +.. _caching: + +Caching +------- + +Some jobs may benefit from caching data. The problem with memoization techniques like +``functools.cache`` in a parallel workflow would be that the data risks remaining cached +for the entire workflow, consuming high amounts of memory on every parallel worker, while +the job is long over. + +To prevent this, ``JobPools`` support caching items for as long as any other job owned by +the scheduler still needs to complete. To use pool managed caching, simply decorate a +method of a ``@node``-decorated class with the :func:`~bsb.services.pool.pool_cache` +decorator: + +.. code-block:: python + + from bsb import PlacementStrategy, config, pool_cache + + @config.node + class MyStrategy(PlacementStrategy): + @pool_cache + def heavy_calculations(self): + return 5 + 5 + + def place(self, chunk, indicators): + # `heavy_calculations` will be called maximum once on each parallel node + for i in range(1000): + self.heavy_calculations() \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 0b66c39e..ec10e29c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -136,6 +136,7 @@ human-readable, multi-scale models! dev/installation dev/documentation + dev/guidelines dev/services dev/plugins dev/hooks diff --git a/pyproject.toml b/pyproject.toml index ae730ab2..49f2f7b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ parallel = [ test = [ "bsb-arbor~=4.0", "bsb-hdf5~=4.0", - "bsb-test~=4.0", + "bsb-test~=4.1", "coverage~=7.3", ] docs = [ @@ -90,7 +90,7 @@ dev = [ "black~=24.1.1", "isort~=5.12", "snakeviz~=2.1", - "bump-my-version~=0.18" + "bump-my-version~=0.24" ] [tool.black] @@ -100,7 +100,7 @@ line-length = 90 profile = "black" [tool.bumpversion] -current_version = "4.2.0" +current_version = "4.3.0" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)" serialize = ["{major}.{minor}.{patch}"] search = "{current_version}" diff --git a/tests/test_connectivity.py b/tests/test_connectivity.py index 0705dbb7..5db187af 100644 --- a/tests/test_connectivity.py +++ b/tests/test_connectivity.py @@ -693,6 +693,43 @@ def test_multi_indegree(self): self.assertTrue(np.all(total == 50), "Not all cells have indegree 50") +class TestFixedOutdegree( + RandomStorageFixture, NetworkFixture, unittest.TestCase, engine_name="hdf5" +): + def setUp(self) -> None: + self.cfg = get_test_config("outdegree") + super().setUp() + + def test_outdegree(self): + self.network.compile() + cs = self.network.get_connectivity_set("outdegree") + pre_locs, _ = cs.load_connections().all() + ps = self.network.get_placement_set("excitatory") + u, c = np.unique(pre_locs[:, 0], return_counts=True) + self.assertTrue( + np.array_equal(np.arange(len(ps)), np.sort(u)), + "Not all post cells have connections", + ) + self.assertTrue(np.all(c == 50), "Not all cells have outdegree 50") + + def test_multi_outdegree(self): + self.network.compile() + for pre_name in ("excitatory", "extra"): + post_ps = self.network.get_placement_set(pre_name) + total = np.zeros(len(post_ps), dtype=int) + for post_name in ("inhibitory", "extra"): + cs = self.network.get_connectivity_set( + f"multi_outdegree_{pre_name}_to_{post_name}" + ) + pre_locs, _ = cs.load_connections().all() + ps = self.network.get_placement_set("inhibitory") + u, c = np.unique(pre_locs[:, 0], return_counts=True) + this = np.zeros(len(post_ps), dtype=int) + this[u] = c + total += this + self.assertTrue(np.all(total == 50), "Not all cells have outdegree 50") + + class TestOutputNamingSingle(unittest.TestCase): """Test output naming as specified in: https://github.com/dbbs-lab/bsb-core/issues/823""" diff --git a/tests/test_jobs.py b/tests/test_jobs.py index f5ed64a5..bbb1e470 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,7 +1,9 @@ +import os import time import unittest from graphlib import CycleError from time import sleep +from unittest.mock import patch from bsb_test import ( NetworkFixture, @@ -24,6 +26,7 @@ Partition, PlacementStrategy, RandomPlacement, + Scaffold, config, ) from bsb.services.pool import ( @@ -33,6 +36,9 @@ PoolProgressReason, PoolStatus, WorkflowError, + _cache_hash, + get_node_cache_items, + pool_cache, ) @@ -497,3 +503,121 @@ def test_no_submitter_submission(self): job = pool.queue(sleep_y, (4, 0.2), number=1) self.assertIn("function sleep_y", job.name) self.assertEqual(1, job.context["number"]) + + +def mock_free_cache(scaffold, required_cache_items: set[str]): + # Mock function to test job cache system + + for stale_key in set(scaffold._pool_cache.keys()) - required_cache_items: + # Save cleaned items in a file for testing + with open(f"test_cache_{MPI.get_rank()}.txt", "a") as f: + f.write(f"{stale_key}\n") + scaffold._pool_cache.pop(stale_key)() + + +class TestPoolCache(RandomStorageFixture, unittest.TestCase, engine_name="hdf5"): + def setUp(self): + super().setUp() + + @config.node + class TestCache(PlacementStrategy): + @pool_cache + def cache_something(self): + return 10 + + def place(self, chunk, indicators): + self.cache_something() + + self.network = Scaffold( + config=Configuration.default( + placement=dict( + withcache=TestCache(cell_types=[], partitions=[]), + ) + ), + storage=self.storage, + ) + self.network.placement.withcache.cache_something.cache_clear() + self.id_cache = _cache_hash("{root}.placement.withcache.cache_something") + + def test_cache_registration(self): + """Test that when a cache is hit, it is registered in the scaffold""" + self.network.placement.withcache.place(None, None) + self.assertEqual( + [self.id_cache], + [*self.network._pool_cache.keys()], + ) + + def test_method_detection(self): + """Test that we can detect which jobs need which items""" + self.assertEqual( + [self.id_cache], + get_node_cache_items(self.network.placement.withcache), + ) + + def test_pool_required_cache(self): + """Test that the pool knows which cache items are required""" + with self.network.create_job_pool() as pool: + self.assertEqual(set(), pool.get_required_cache_items()) + pool.queue_placement(self.network.placement.withcache, [0, 0, 0]) + self.assertEqual( + {self.id_cache}, + pool.get_required_cache_items(), + ) + + @patch( + "bsb.services.pool.free_stale_pool_cache", + lambda scaffold, required_cache_items: mock_free_cache( + scaffold, required_cache_items + ), + ) + def test_cache_survival(self): + """Test that the required cache items survive until the jobs are done.""" + + # FIXME: This mechanism is critical for parallel execution, but is hard to test + # under that condition. This test will pass with false positive results under + # parallel conditions. This mechanism was manually tested when it was written, + # and accepts PRs to properly test it in CI. + + @config.node + class TestNode(PlacementStrategy): + def place(node, chunk, indicators): + # Get the other job's cache. + cache = node.scaffold.placement.withcache.cache_something.cache_info() + # Assert that both times this job is called, the cache has no items in it, + # even though the other job was executed and cached in between. + # This confirms that the cache is cleared once its dependents are done. + self.assertEqual(cache.misses, 0) + + self.network.placement["withoutcache"] = TestNode(cell_types=[], partitions=[]) + pool = self.network.create_job_pool() + with pool: + first = pool.queue_placement(self.network.placement.withoutcache, [0, 0, 0]) + # create 4 jobs with cache to check that the cache is deleted only once. + job0 = pool.queue_placement( + self.network.placement.withcache, [0, 0, 0], deps=[first] + ) + job1 = pool.queue_placement( + self.network.placement.withcache, [0, 0, 1], deps=[first] + ) + job2 = pool.queue_placement( + self.network.placement.withcache, [0, 1, 0], deps=[first] + ) + job3 = pool.queue_placement( + self.network.placement.withcache, [1, 0, 0], deps=[first] + ) + pool.queue_placement( + self.network.placement.withoutcache, + [0, 0, 0], + deps=[job0, job1, job2, job3], + ) + pool.execute() + + for filename in os.listdir(): + if filename.startswith(f"test_cache_{MPI.get_rank()}"): + with open(filename, "r") as f: + lines = f.readlines() + self.assertEqual( + len(lines), 1, "The free function should be called only once." + ) + self.assertEqual(lines[0], f"{self.id_cache}\n") + os.remove(filename)