Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set multiprocessing start method explicitly through context #447

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/cooler/_balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import warnings
from functools import partial
from operator import add
from typing import Literal
from typing import Any, Literal

import numpy as np

Expand Down Expand Up @@ -79,14 +79,14 @@ def _balance_genomewide(
tol,
max_iters,
rescale_marginals,
use_lock,
lock,
):
scale = 1.0
n_bins = len(bias)

for _ in range(max_iters):
marg = (
split(clr, spans=spans, map=map, use_lock=use_lock)
split(clr, spans=spans, map=map, lock=lock)
.prepare(_init)
.pipe(filters)
.pipe(_timesouterproduct, bias)
Expand Down Expand Up @@ -134,7 +134,7 @@ def _balance_cisonly(
tol,
max_iters,
rescale_marginals,
use_lock,
lock,
):
chroms = clr.chroms()["name"][:]
chrom_ids = np.arange(len(clr.chroms()))
Expand All @@ -153,7 +153,7 @@ def _balance_cisonly(
var = np.nan
for _ in range(max_iters):
marg = (
split(clr, spans=spans, map=map, use_lock=use_lock)
split(clr, spans=spans, map=map, lock=lock)
.prepare(_init)
.pipe(filters)
.pipe(_timesouterproduct, bias)
Expand Down Expand Up @@ -206,7 +206,7 @@ def _balance_transonly(
tol,
max_iters,
rescale_marginals,
use_lock,
lock,
):
scale = 1.0
n_bins = len(bias)
Expand All @@ -221,7 +221,7 @@ def _balance_transonly(

for _ in range(max_iters):
marg = (
split(clr, spans=spans, map=map, use_lock=use_lock)
split(clr, spans=spans, map=map, lock=lock)
.prepare(_init)
.pipe(filters)
.pipe(_zero_cis)
Expand Down Expand Up @@ -276,7 +276,7 @@ def balance_cooler(
max_iters: int = 200,
chunksize: int = 10_000_000,
map: MapFunctor = map,
use_lock: bool = False,
lock: Any | None = None,
store: bool = False,
store_name: str = "weight",
) -> tuple[np.ndarray, dict]:
Expand Down Expand Up @@ -375,7 +375,7 @@ def balance_cooler(
if min_nnz > 0:
filters = [_binarize, *base_filters]
marg_nnz = (
split(clr, spans=spans, map=map, use_lock=use_lock)
split(clr, spans=spans, map=map, lock=lock)
.prepare(_init)
.pipe(filters)
.pipe(_marginalize)
Expand All @@ -385,7 +385,7 @@ def balance_cooler(

filters = base_filters
marg = (
split(clr, spans=spans, map=map, use_lock=use_lock)
split(clr, spans=spans, map=map, lock=lock)
.prepare(_init)
.pipe(filters)
.pipe(_marginalize)
Expand Down Expand Up @@ -424,7 +424,7 @@ def balance_cooler(
tol,
max_iters,
rescale_marginals,
use_lock,
lock,
)
elif trans_only:
bias, scale, var = _balance_transonly(
Expand All @@ -437,7 +437,7 @@ def balance_cooler(
tol,
max_iters,
rescale_marginals,
use_lock,
lock,
)
else:
bias, scale, var = _balance_genomewide(
Expand All @@ -450,7 +450,7 @@ def balance_cooler(
tol,
max_iters,
rescale_marginals,
use_lock,
lock,
)

stats = {
Expand Down
77 changes: 33 additions & 44 deletions src/cooler/_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import Any, Literal

import h5py
import multiprocess as mp
import numpy as np
import pandas as pd

Expand All @@ -17,7 +16,6 @@
from ._version import __format_version_mcool__
from .api import Cooler
from .create import ContactBinner, create
from .parallel import lock
from .util import GenomeSegmentation, parse_cooler_uri

__all__ = ["coarsen_cooler", "merge_coolers", "zoomify_cooler"]
Expand Down Expand Up @@ -517,8 +515,10 @@ def __init__(
agg: dict[str, Any] | None,
batchsize: int,
map: MapFunctor = map,
lock=None,
):
self._map = map
self.lock = lock

self.source_uri = source_uri
self.batchsize = batchsize
Expand Down Expand Up @@ -583,7 +583,7 @@ def _each(group):
.reset_index(drop=True)
)

def _aggregate(self, span: tuple[int, int]) -> pd.DataFrame:
def aggregate(self, span: tuple[int, int]) -> pd.DataFrame:
lo, hi = span

clr = Cooler(self.source_uri)
Expand Down Expand Up @@ -623,25 +623,20 @@ def _aggregate(self, span: tuple[int, int]) -> pd.DataFrame:
.reset_index()
)

def aggregate(self, span: tuple[int, int]) -> pd.DataFrame:
try:
chunk = self._aggregate(span)
except MemoryError as e: # pragma: no cover
raise RuntimeError(str(e)) from e
return chunk

def __iter__(self) -> Iterator[dict[str, np.ndarray]]:
# Distribute batches of `batchsize` pixel spans at once.
# Distribute consecutive pixel spans across the workers in batches.
# In the single-process case, each batch is one pixel span.
# With n processes, each batch is n pixel spans - one span per process.
batchsize = self.batchsize
spans = list(zip(self.edges[:-1], self.edges[1:]))
for i in range(0, len(spans), batchsize):
try:
if batchsize > 1:
lock.acquire()
if batchsize > 1 and self.lock is not None:
self.lock.acquire()
results = self._map(self.aggregate, spans[i : i + batchsize])
finally:
if batchsize > 1:
lock.release()
if batchsize > 1 and self.lock is not None:
self.lock.release()
for df in results:
yield {k: v.values for k, v in df.items()}

Expand All @@ -655,6 +650,7 @@ def coarsen_cooler(
columns: list[str] | None = None,
dtypes: dict[str, Any] | None = None,
agg: dict[str, Any] | None = None,
map: MapFunctor = map,
**kwargs,
) -> None:
"""
Expand Down Expand Up @@ -719,38 +715,29 @@ def coarsen_cooler(
else:
dtypes.setdefault(col, input_dtypes[col])

try:
# Note: fork before opening to prevent inconsistent global HDF5 state
if nproc > 1:
pool = mp.Pool(nproc)
kwargs.setdefault("lock", lock)

iterator = CoolerCoarsener(
base_uri,
factor,
chunksize,
columns=columns,
agg=agg,
batchsize=nproc,
map=pool.map if nproc > 1 else map,
)

new_bins = iterator.new_bins
iterator = CoolerCoarsener(
base_uri,
factor,
chunksize,
columns=columns,
agg=agg,
batchsize=nproc,
map=map,
lock=kwargs.get("lock", None),
)

kwargs.setdefault("append", True)
new_bins = iterator.new_bins

create(
output_uri,
new_bins,
iterator,
dtypes=dtypes,
symmetric_upper=clr.storage_mode == "symmetric-upper",
**kwargs,
)
kwargs.setdefault("append", True)

finally:
if nproc > 1:
pool.close()
create(
output_uri,
new_bins,
iterator,
dtypes=dtypes,
symmetric_upper=clr.storage_mode == "symmetric-upper",
**kwargs,
)


def zoomify_cooler(
Expand All @@ -762,6 +749,7 @@ def zoomify_cooler(
columns: list[str] | None = None,
dtypes: dict[str, Any] | None = None,
agg: dict[str, Any] | None = None,
map: MapFunctor = map,
**kwargs,
) -> None:
"""
Expand Down Expand Up @@ -872,6 +860,7 @@ def zoomify_cooler(
dtypes=dtypes,
agg=agg,
mode="r+",
map=map,
**kwargs,
)

Expand Down
4 changes: 2 additions & 2 deletions src/cooler/cli/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from functools import wraps

import click
import multiprocess as mp
import numpy as np
import pandas as pd

from .. import util
from ..parallel import get_mp_context


class DelimitedTuple(click.types.ParamType):
Expand Down Expand Up @@ -149,7 +149,7 @@ def check_ncpus(arg_value):
if arg_value <= 0:
raise click.BadParameter("n_cpus must be >= 1")
else:
return min(arg_value, mp.cpu_count())
return min(arg_value, get_mp_context().cpu_count())


@contextmanager
Expand Down
6 changes: 3 additions & 3 deletions src/cooler/cli/balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import h5py
import numpy as np
import pandas as pd
from multiprocess import Pool

from .._balance import balance_cooler
from ..api import Cooler
from ..parallel import get_mp_context
from ..util import bedslice, parse_cooler_uri
from . import cli, get_logger

Expand Down Expand Up @@ -236,7 +236,8 @@ def balance(

try:
if nproc > 1:
pool = Pool(nproc)
ctx = get_mp_context()
pool = ctx.Pool(nproc)
map_ = pool.imap_unordered
else:
map_ = map
Expand All @@ -254,7 +255,6 @@ def balance(
max_iters=max_iters,
ignore_diags=ignore_diags,
rescale_marginals=True,
use_lock=False,
map=map_,
)

Expand Down
8 changes: 5 additions & 3 deletions src/cooler/cli/cload.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pandas as pd
import simplejson as json
from cytoolz import compose
from multiprocess import Pool

from ..create import (
HDF5Aggregator,
Expand All @@ -16,6 +15,7 @@
create_cooler,
sanitize_records,
)
from ..parallel import get_mp_context
from . import cli, get_logger
from ._util import parse_bins, parse_field_param, parse_kv_list_param

Expand Down Expand Up @@ -230,7 +230,8 @@ def tabix(
try:
map_func = map
if nproc > 1:
pool = Pool(nproc)
ctx = get_mp_context()
pool = ctx.Pool(nproc)
logger.info(f"Using {nproc} cores")
map_func = pool.imap

Expand Down Expand Up @@ -331,7 +332,8 @@ def pairix(
try:
map_func = map
if nproc > 1:
pool = Pool(nproc)
ctx = get_mp_context()
pool = ctx.Pool(nproc)
logger.info(f"Using {nproc} cores")
map_func = pool.imap

Expand Down
Loading
Loading