Skip to content

Commit

Permalink
add support for threading and for user-specified MPI dimension. Closes
Browse files Browse the repository at this point in the history
…#35 (#98)

* mpi: outer, threads:2

* add TODO ids

* add TODO label (3D support)

* skipping multi-threading tests if JIT disabled

* add pylint-disable comment

* print number of cores on workers

* log info about supported plotting options

* debug: print num_threads per worker

* debug: omit pylint

* debug: omit pylint

* hardcore threads

* fix issue number

* mpi_dim as scenarion ctor argument; new test for mpi_indices; more readable code around send/recv tags for threading

* pylint fixes

* pylint fixes ++

* pylint disable too-many-args for whole file

* check thread number every time

* increase workflow timeout

* check if the 3rd-order-terms issue indeed is present?

* indeed 3rd-order terms cause problems with mpi_dim=INNER :(

* increase timeout to 120min

* remove 4th worker from the test

* increase timeout to 60min

* updates to README to reflect threading support changes

* removing debug leftover

---------

Co-authored-by: Sylwester Arabas <[email protected]>
Co-authored-by: Sylwester Arabas <[email protected]>
  • Loading branch information
3 people authored Mar 22, 2024
1 parent ec2b26e commit 3f07a87
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 109 deletions.
14 changes: 9 additions & 5 deletions .github/workflows/tests+pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
mpi: [ 'mpich', 'openmpi', 'intelmpi']
python-version: ["3.10"]
disable-jit: [1, 0]
mpi-np: [1, 2, 3, 4]
mpi-np: [1, 2, 3]
exclude:
# as of time of writing, mpi4py/setup-mpi does not support it
- platform: macos-latest
Expand All @@ -100,7 +100,7 @@ jobs:

fail-fast: false
runs-on: ${{ matrix.platform }}
timeout-minutes: 45
timeout-minutes: 60
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
Expand All @@ -112,13 +112,17 @@ jobs:
- if: matrix.mpi == 'mpich'
run: echo _ch="ch" >> $GITHUB_ENV
- if: startsWith(matrix.platform, 'ubuntu-')
run: sudo apt-get update && sudo apt-get install -y libhdf5-mpi$_ch-dev pkg-config
run: |
sudo apt-get update && sudo apt-get install -y libhdf5-mpi$_ch-dev pkg-config
lscpu
- if: startsWith(matrix.platform, 'ubuntu-') && matrix.mpi == 'mpich'
run: |
echo HDF5_LIBDIR=/usr/lib/x86_64-linux-gnu/hdf5/mpich >> $GITHUB_ENV
echo HDF5_INCLUDEDIR=/usr/include/hdf5/mpich >> $GITHUB_ENV
- if: startsWith(matrix.platform, 'macos-')
run: brew install hdf5-mpi && echo HDF5_DIR=/opt/homebrew >> $GITHUB_ENV
run: |
brew install hdf5-mpi && echo HDF5_DIR=/opt/homebrew >> $GITHUB_ENV
sysctl -a | grep cpu | grep hw
- run: HDF5_MPI="ON" CC=mpicc pip install --no-binary=h5py "git+https://github.com/h5py/h5py@81f6c01#egg=h5py"
- run: pip install -e .[tests]
- run: python -We -c "import PyMPDATA_MPI"
Expand All @@ -137,7 +141,7 @@ jobs:
export COV_ARGS="--cov=./ --cov-report=xml"
pip install pytest-cov
fi
mpiexec $_mpiexec_args -n ${{ matrix.mpi-np }} pytest $COV_ARGS --timeout=600 --timeout_method=thread -s -vv -We tests/local;
NUMBA_NUM_THREADS=3 mpiexec $_mpiexec_args -n ${{ matrix.mpi-np }} pytest $COV_ARGS --timeout=600 --timeout_method=thread -s -vv -We tests/local;
- uses: actions/upload-artifact@v2
with:
name: plots
Expand Down
19 changes: 10 additions & 9 deletions PyMPDATA_MPI/domain_decomposition.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# pylint: disable=missing-module-docstring,missing-function-docstring,missing-class-docstring,invalid-name
""" MPI-aware domain decomposition utilities """

import numpy as np
from PyMPDATA.impl.domain_decomposition import make_subdomain
from PyMPDATA.impl.enumerations import OUTER

MPI_DIM = OUTER

subdomain = make_subdomain(jit_flags={})


def mpi_indices(grid, rank, size):
start, stop = subdomain(grid[MPI_DIM], rank, size)
xi, yi = np.indices((stop - start, grid[MPI_DIM - 1]), dtype=float)
xi += start
return xi, yi
def mpi_indices(*, grid, rank, size, mpi_dim):
"""returns a mapping from rank-local indices to domain-wide indices,
(subdomain-aware equivalent of np.indices)"""
start, stop = subdomain(grid[mpi_dim], rank, size)
indices_arg = list(grid)
indices_arg[mpi_dim] = stop - start
xyi = np.indices(tuple(indices_arg), dtype=float)
xyi[mpi_dim] += start
return xyi
36 changes: 25 additions & 11 deletions PyMPDATA_MPI/impl/boundary_condition_commons.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
# pylint: disable=too-many-arguments
""" boundary_condition common functions """

from functools import lru_cache

import numba
import numba_mpi as mpi
from PyMPDATA.impl.enumerations import INVALID_INDEX
from PyMPDATA.impl.enumerations import INVALID_INDEX, OUTER

IRRELEVANT = 666


@lru_cache()
def make_scalar_boundary_condition(
indexers, jit_flags, dimension_index, dtype, get_peer
*, indexers, jit_flags, dimension_index, dtype, get_peer, mpi_dim
):
"""returns fill_halos() function for scalar boundary conditions.
Provides default logic for scalar buffer filling. Notable arguments:
Expand All @@ -26,20 +27,20 @@ def fill_buf(buf, psi, i_rng, k_rng, sign, _dim):
(i, INVALID_INDEX, k), psi, sign
)

send_recv = _make_send_recv(indexers.set, jit_flags, fill_buf, dtype, get_peer)
send_recv = _make_send_recv(
indexers.set, jit_flags, fill_buf, dtype, get_peer, mpi_dim
)

# pylint: disable=too-many-arguments
@numba.njit(**jit_flags)
def fill_halos(buffer, i_rng, j_rng, k_rng, psi, _, sign):
send_recv(buffer, psi, i_rng, j_rng, k_rng, sign, IRRELEVANT, psi)

return fill_halos


# pylint: disable=too-many-arguments
@lru_cache()
def make_vector_boundary_condition( # pylint: disable=too-many-arguments
indexers, halo, jit_flags, dimension_index, dtype, get_peer
def make_vector_boundary_condition(
indexers, halo, jit_flags, dimension_index, dtype, get_peer, mpi_dim
):
"""returns fill_halos() function for vector boundary conditions.
Provides default logic for vector buffer filling. Notable arguments:
Expand All @@ -63,7 +64,9 @@ def fill_buf(buf, components, i_rng, k_rng, sign, dim):

buf[i - i_rng.start, k - k_rng.start] = value

send_recv = _make_send_recv(indexers.set, jit_flags, fill_buf, dtype, get_peer)
send_recv = _make_send_recv(
indexers.set, jit_flags, fill_buf, dtype, get_peer, mpi_dim
)

@numba.njit(**jit_flags)
def fill_halos_loop_vector(buffer, i_rng, j_rng, k_rng, components, dim, _, sign):
Expand All @@ -74,10 +77,17 @@ def fill_halos_loop_vector(buffer, i_rng, j_rng, k_rng, components, dim, _, sign
return fill_halos_loop_vector


def _make_send_recv(set_value, jit_flags, fill_buf, dtype, get_peer):
def _make_send_recv(set_value, jit_flags, fill_buf, dtype, get_peer, mpi_dim):

@numba.njit(**jit_flags)
def get_buffer_chunk(buffer, i_rng, k_rng, chunk_index):
chunk_size = len(i_rng) * len(k_rng)
if mpi_dim != OUTER:
n_chunks = len(buffer) // (chunk_size * numba.get_num_threads())
chunk_index += numba.get_thread_id() * n_chunks
else:
n_chunks = len(buffer) // (chunk_size * 2)
chunk_index += int(numba.get_thread_id() != 0) * n_chunks
return buffer.view(dtype)[
chunk_index * chunk_size : (chunk_index + 1) * chunk_size
].reshape((len(i_rng), len(k_rng)))
Expand All @@ -97,12 +107,16 @@ def fill_output(output, buffer, i_rng, j_rng, k_rng):

@numba.njit(**jit_flags)
def _send(buf, peer, fill_buf_args):
tag = numba.get_thread_id()
fill_buf(buf, *fill_buf_args)
mpi.send(buf, dest=peer)
mpi.send(buf, dest=peer, tag=tag)

@numba.njit(**jit_flags)
def _recv(buf, peer):
mpi.recv(buf, source=peer)
th_id = numba.get_thread_id()
n_th = numba.get_num_threads()
tag = th_id if mpi_dim != OUTER else {0: n_th - 1, n_th - 1: 0}[th_id]
mpi.recv(buf, source=peer, tag=tag)

@numba.njit(**jit_flags)
def _send_recv(buffer, psi, i_rng, j_rng, k_rng, sign, dim, output):
Expand Down
24 changes: 13 additions & 11 deletions PyMPDATA_MPI/impl/mpi_boundary_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
class MPIBoundaryCondition:
"""common base class for MPI boundary conditions"""

def __init__(self, base, size):
def __init__(self, base, size, mpi_dim):
self.__mpi_size_one = size == 1
self.worker_pool_size = size
self.base = base
self.mpi_dim = mpi_dim

@staticmethod
def make_get_peer(_, __):
"""returns (lru-cached) numba-compiled callable."""
raise NotImplementedError()

# pylint: disable=too-many-arguments
def make_scalar(self, indexers, halo, dtype, jit_flags, dimension_index):
Expand All @@ -19,14 +25,10 @@ def make_scalar(self, indexers, halo, dtype, jit_flags, dimension_index):
indexers, halo, dtype, jit_flags, dimension_index
)
return make_scalar_boundary_condition(
indexers,
jit_flags,
dimension_index,
dtype,
self.make_get_peer(jit_flags, self.worker_pool_size),
indexers=indexers,
jit_flags=jit_flags,
dimension_index=dimension_index,
dtype=dtype,
get_peer=self.make_get_peer(jit_flags, self.worker_pool_size),
mpi_dim=self.mpi_dim,
)

@staticmethod
def make_get_peer(_, __):
"""returns (lru-cached) numba-compiled callable."""
raise NotImplementedError()
5 changes: 3 additions & 2 deletions PyMPDATA_MPI/mpi_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ class MPIPeriodic(MPIBoundaryCondition):
`PyMPDATA.scalar_field.ScalarField` and
`PyMPDATA.vector_field.VectorField` __init__ methods"""

def __init__(self, size):
def __init__(self, size, mpi_dim):
# passing size insead of using mpi.size() because lack of support for non-default
# MPI communicators. https://github.com/numba-mpi/numba-mpi/issues/64
assert SIGN_RIGHT == -1
assert SIGN_LEFT == +1

super().__init__(size=size, base=Periodic)
super().__init__(size=size, base=Periodic, mpi_dim=mpi_dim)

# pylint: disable=too-many-arguments
def make_vector(self, indexers, halo, dtype, jit_flags, dimension_index):
Expand All @@ -38,6 +38,7 @@ def make_vector(self, indexers, halo, dtype, jit_flags, dimension_index):
dimension_index,
dtype,
self.make_get_peer(jit_flags, self.worker_pool_size),
self.mpi_dim,
)

@staticmethod
Expand Down
6 changes: 3 additions & 3 deletions PyMPDATA_MPI/mpi_polar.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from PyMPDATA.boundary_conditions import Polar
from PyMPDATA.impl.enumerations import INNER, OUTER

from PyMPDATA_MPI.domain_decomposition import MPI_DIM
from PyMPDATA_MPI.impl import MPIBoundaryCondition


Expand All @@ -16,8 +15,8 @@ class MPIPolar(MPIBoundaryCondition):
`PyMPDATA.scalar_field.ScalarField` and
`PyMPDATA.vector_field.VectorField` __init__ methods"""

def __init__(self, mpi_grid, grid):
self.worker_pool_size = grid[MPI_DIM] // mpi_grid[MPI_DIM]
def __init__(self, mpi_grid, grid, mpi_dim):
self.worker_pool_size = grid[mpi_dim] // mpi_grid[mpi_dim]
self.__mpi_size_one = self.worker_pool_size == 1

if not self.__mpi_size_one:
Expand All @@ -31,6 +30,7 @@ def __init__(self, mpi_grid, grid):
if self.__mpi_size_one
else None
),
mpi_dim=mpi_dim,
)

@staticmethod
Expand Down
16 changes: 5 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ Note that the spherical animations below depict simulations without MPDATA corre
In the cartesian example below (based on a test case from [Arabas et al. 2014](https://doi.org/10.3233/SPR-140379)),
a constant advector field $u$ is used (and $G=1$).
MPI (Message Passing Interface) is used
for handling data transfers and synchronisation in the outer dimension,
while multi-threading (using, e.g., OpenMP via Numba) is used in the inner dimension.
for handling data transfers and synchronisation with the domain decomposition
across MPI workers done in either inner or in the outer dimension (user setting).
Multi-threading (using, e.g., OpenMP via Numba) is used for shared-memory parallelisation
within subdomains with further subdomain split across the inner dimension (PyMPDATA logic).
In this example, two corrective MPDATA iterations are employed.

### 1 worker
Expand All @@ -80,14 +82,6 @@ In this example, two corrective MPDATA iterations are employed.
<img src="https://github.com/open-atmos/PyMPDATA-MPI/releases/download/latest-generated-plots/n_iters.3_rank_2_size_3_c_field_.0.5.0.25.-CartesianScenario-anim.gif" width="32%" />
</p>

### 4 workers
<p align="middle">
<img src="https://github.com/open-atmos/PyMPDATA-MPI/releases/download/latest-generated-plots/n_iters.3_rank_0_size_4_c_field_.0.5.0.25.-CartesianScenario-anim.gif" width="24%" />
<img src="https://github.com/open-atmos/PyMPDATA-MPI/releases/download/latest-generated-plots/n_iters.3_rank_1_size_4_c_field_.0.5.0.25.-CartesianScenario-anim.gif" width="24%" />
<img src="https://github.com/open-atmos/PyMPDATA-MPI/releases/download/latest-generated-plots/n_iters.3_rank_2_size_4_c_field_.0.5.0.25.-CartesianScenario-anim.gif" width="24%" />
<img src="https://github.com/open-atmos/PyMPDATA-MPI/releases/download/latest-generated-plots/n_iters.3_rank_3_size_4_c_field_.0.5.0.25.-CartesianScenario-anim.gif" width="24%" />
</p>

## Package architecture

```mermaid
Expand Down Expand Up @@ -143,7 +137,7 @@ licence: [GPL v3](https://www.gnu.org/licenses/gpl-3.0.html)

- MPI support for PyMPDATA implemented externally (i.e., not incurring any overhead or additional dependencies for PyMPDATA users)
- MPI calls within Numba njitted code (hence not using `mpi4py`, but leveraging `numba-mpi`)
- hybrid threading (internal in PyMPDATA, in the inner dimension) + MPI (outer dimension) parallelisation
- hybrid domain decomposition parallelisation: threading (internal in PyMPDATA, in the inner dimension) + MPI (either inner or outer dimension)
- portability across major OSes (currently Linux & macOS; no Windows support due [challenges in getting HDF5/MPI-IO to work there](https://docs.h5py.org/en/stable/build.html#source-installation-on-windows))
- full test coverage including CI builds asserting on same results with multi-node vs. single-node computations
- Continuous Integration with different OSes and different MPI implementation
Expand Down
20 changes: 15 additions & 5 deletions scenarios/_scenario.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
# pylint: disable=too-few-public-methods
""" Provides base _Scenario base class that every scenario should inherit """

from PyMPDATA import Solver
from PyMPDATA.impl.enumerations import INNER, OUTER


class _Scenario:
class _Scenario: # pylint: disable=too-few-public-methods
"""Base class for every Scenario. Provides logic for advance() function"""

def __init__(self, *, stepper, advectee, advector, g_factor=None):
# pylint: disable=too-many-arguments
def __init__(self, *, mpi_dim, stepper, advectee, advector, g_factor=None):
self.mpi_dim = mpi_dim
self.solver = Solver(
stepper=stepper, advectee=advectee, advector=advector, g_factor=g_factor
)

def advance(self, dataset, output_steps, x_range):
def advance(self, dataset, output_steps, mpi_range):
"""Logic for performing simulation. Returns wall time of one timestep (in clock ticks)"""
steps_done = 0
wall_time = 0
Expand All @@ -21,5 +24,12 @@ def advance(self, dataset, output_steps, x_range):
wall_time_per_timestep = self.solver.advance(n_steps=n_steps)
wall_time += wall_time_per_timestep * n_steps
steps_done += n_steps
dataset[x_range, :, index] = self.solver.advectee.get()
data = self.solver.advectee.get()
dataset[
(
mpi_range if self.mpi_dim == OUTER else slice(None),
mpi_range if self.mpi_dim == INNER else slice(None),
slice(index, index + 1),
)
] = data.reshape((data.shape[0], data.shape[1], 1))
return wall_time
26 changes: 19 additions & 7 deletions scenarios/cartesian.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from matplotlib import pyplot
from PyMPDATA import ScalarField, Stepper, VectorField
from PyMPDATA.boundary_conditions import Periodic
from PyMPDATA.impl.enumerations import INNER, OUTER

from PyMPDATA_MPI.domain_decomposition import mpi_indices
from PyMPDATA_MPI.mpi_periodic import MPIPeriodic
Expand All @@ -24,16 +25,22 @@ def __init__( # pylint: disable=too-many-arguments
rank,
size,
courant_field_multiplier,
mpi_dim,
):
# pylint: disable=too-many-locals, invalid-name
halo = mpdata_options.n_halo

xi, yi = mpi_indices(grid, rank, size)
nx, ny = xi.shape
xyi = mpi_indices(grid=grid, rank=rank, size=size, mpi_dim=mpi_dim)
nx, ny = xyi[mpi_dim].shape

boundary_conditions = (MPIPeriodic(size=size), Periodic())
mpi_periodic = MPIPeriodic(size=size, mpi_dim=mpi_dim)
periodic = Periodic()
boundary_conditions = (
mpi_periodic if mpi_dim == OUTER else periodic,
mpi_periodic if mpi_dim == INNER else periodic,
)
advectee = ScalarField(
data=self.initial_condition(xi, yi, grid),
data=self.initial_condition(*xyi, grid),
halo=mpdata_options.n_halo,
boundary_conditions=boundary_conditions,
)
Expand All @@ -52,11 +59,16 @@ def __init__( # pylint: disable=too-many-arguments
n_threads=n_threads,
left_first=tuple([rank % 2 == 0] * 2),
# TODO #70 (see also https://github.com/open-atmos/PyMPDATA/issues/386)
buffer_size=((ny + 2 * halo) * halo)
buffer_size=(
(ny if mpi_dim == OUTER else nx + 2 * halo) * halo
) # TODO #38 support for 3D domain
* 2 # for temporary send/recv buffer on one side
* 2, # for complex dtype
* 2 # for complex dtype
* (2 if mpi_dim == OUTER else n_threads),
)
super().__init__(
mpi_dim=mpi_dim, stepper=stepper, advectee=advectee, advector=advector
)
super().__init__(stepper=stepper, advectee=advectee, advector=advector)

@staticmethod
def initial_condition(xi, yi, grid):
Expand Down
Loading

0 comments on commit 3f07a87

Please sign in to comment.