Skip to content

Commit

Permalink
DOC: fix typos, extend dask-related docs
Browse files Browse the repository at this point in the history
dask: GPUs, cloudpickle, links, --lifetime
  • Loading branch information
elcorto committed Jan 4, 2025
1 parent 6209ebc commit dea0401
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
13 changes: 11 additions & 2 deletions doc/source/written/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ c = ps.plist("c", [1.23])
params = ps.pgrid(a, b, c)
```

Another solution is to define defauts in `func`, like so:
Another solution is to define defaults in `func`, like so:

```py
def func(pset, c_default=1.23):
Expand Down Expand Up @@ -1145,6 +1145,11 @@ extension](https://github.com/dask/dask-labextension) that gives you access to
the dask dashboard and more.


#### How to request GPUs

See `examples/batch_dask/run_psweep_jax_gpu.py`.


#### Pros and Cons

```{admonition} Pros
Expand Down Expand Up @@ -1178,6 +1183,10 @@ the dask dashboard and more.
for more.
* More software to install: On the HPC machine, you need `psweep`,
`dask.distributed` and `dask_jobqueue`.
* `dask` uses [`cloudpickle`](https://github.com/cloudpipe/cloudpickle) to
serialize data before sending it to workers, which may fail in some cases
such as data generated by
[`torch.compile`](https://pytorch.org/docs/stable/generated/torch.compile.html)
```

(s:templates)=
Expand Down Expand Up @@ -1659,4 +1668,4 @@ more reproducibility, look into using one of the workflow frameworks above.
[dask]: https://dask.org
[dask_dist]: https://distributed.dask.org
[dask_jq]: https://jobqueue.dask.org
[dask_time_limits]: https://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
[dask_time_limits]: https://jobqueue.dask.org/en/latest/clusters-advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
2 changes: 1 addition & 1 deletion examples/batch_dask/dask_control.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# run_psweep.py. The dask cluster will then run workloads. After all work is
# done, the dask cluster will be teared down and this job will exit.
#
# The reason for using a batch job to run a the dask control process (so python
# The reason for using a batch job to run the dask control process (so python
# run_psweep.py) is that typically on HPC machines, there is a time limit for
# user processes started on the head / login nodes. Therefore the dask control
# process may get killed before the dask workers have finished processing.
Expand Down
11 changes: 7 additions & 4 deletions examples/batch_dask/run_psweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def func(pset):


if __name__ == "__main__":
# Settings in SLURMCluster() apply to one batch job, started with e.g.
# cluster.scale(jobs=1).
cluster = SLURMCluster(
queue="some_queue,some_other_queue",
##account="some_account",
Expand All @@ -36,10 +38,11 @@ def func(pset):

params = ps.plist("a", range(100))

# Start 2 batch jobs, each with 10 dask workers (processes=10) and 10
# cores, so 1 core (1 thread) / worker and 20 workers in total (2 jobs x 10
# workers). Each worker gets 1 GiB of memory (memory="10GiB" for 10
# workers). See examples/batch_dask/slurm_cluster_settings.py
# Start 2 batch jobs (cluster.scale(jobs=2)), each with 10 dask workers
# (processes=10) and 10 cores (cores=10), so 1 core (1 thread) / worker and
# 20 workers in total (2 jobs x 10 workers). Each worker gets 1 GiB of
# memory (memory="10GiB" for 10 workers per batch job). See
# examples/batch_dask/slurm_cluster_settings.py .
cluster.scale(jobs=2)
client = Client(cluster)
df = ps.run(func, params, dask_client=client)
Expand Down
19 changes: 16 additions & 3 deletions examples/batch_dask/slurm_cluster_settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
This script is independent of psweep. It explains dask_jobqueue.SLURMCluster
parameters.
This script is (almost) independent of psweep. It explains
dask_jobqueue.SLURMCluster parameters.
SLURM terms
Expand Down Expand Up @@ -82,13 +82,22 @@
mpirun myscript
"""

import time

from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import time


def func(x):
time.sleep(5)

# If using print() logging/debugging, also in combo with ps.run(...,
# capture_logs=).
#
# https://github.com/dask/dask-jobqueue/issues/299#issuecomment-609002077
##sys.stdout.flush()
##os.fsync(sys.stdout.fileno())

return x**2


Expand Down Expand Up @@ -189,6 +198,8 @@ def func(x):
# Memory for the whole job. Will be distributed among workers
# (dask_worker --memory-limit).
memory="10GiB",
# should be <= the smallest expected queue time limit, can also help to
# get jobs started more quuckly
walltime="00:10:00",
# Each of the settings in this list will end up in the job script, e.g.
# #SBATCH --gres gpu:1
Expand All @@ -200,6 +211,8 @@ def func(x):
# ssh -L 2222:localhost:3333 cluster
# localhost$ browser localhost:2222/status
scheduler_options={"dashboard_address": ":3333"},
# gracefully terminate workers, must be <= walltime above
worker_extra_args=["--lifetime", "9m", "--lifetime-stagger", "30s"],
)

# Job script generated with
Expand Down

0 comments on commit dea0401

Please sign in to comment.