From 5b788605a41d8762a8855839c8431e74bd99c012 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Thu, 17 Oct 2019 12:03:38 -0700 Subject: [PATCH 01/35] REL v0.10.0 release From 37f1934b9555da7d64c2104069cf1857c6b80705 Mon Sep 17 00:00:00 2001 From: Ray Douglass <3107146+raydouglass@users.noreply.github.com> Date: Fri, 6 Dec 2019 16:05:13 -0500 Subject: [PATCH 02/35] Add ucx-py dependency to CI (#212) * Add ucx-py dependency to CI --- ci/gpu/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 3c474c4f9..6bf399bfd 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -65,7 +65,7 @@ conda install "cudatoolkit=$CUDA_REL" \ conda install -c conda-forge "pytest" "pytest-asyncio" # Use nightly build of ucx-py for now -conda install -c rapidsai-nightly "ucx-py" +conda install "ucx-py=$MINOR_VERSION" conda list From f2bbfa452f3796e6fd34528b22d505e31c5afbce Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 11 Dec 2019 12:16:44 -0800 Subject: [PATCH 03/35] REL v0.11.0 release From 9b76d886c24a3dc38914418ed98af6a03ef068fe Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Tue, 4 Feb 2020 12:20:51 -0800 Subject: [PATCH 04/35] REL v0.12.0 release From 7f94db59b56acecb90c3383b59ab91a771cda422 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Tue, 31 Mar 2020 11:19:52 -0700 Subject: [PATCH 05/35] REL v0.13.0 release From d059ffccb364fa64ea1e96463ba56a5115c45607 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 3 Jun 2020 10:28:30 -0700 Subject: [PATCH 06/35] REL v0.14.0 release From b234fa5659f6a5ebaa0d4fdc7a1caf5b5d7183e0 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 19 Jun 2020 14:53:40 -0700 Subject: [PATCH 07/35] Only create Security object if TLS files are specified --- dask_cuda/dask_cuda_worker.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/dask_cuda_worker.py index 8b303f399..fe0a3a57f 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/dask_cuda_worker.py @@ -234,9 +234,12 @@ def main( enable_proctitle_on_current() enable_proctitle_on_children() - sec = Security( - tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key - ) + if tls_ca_file and tls_cert and tls_worker_key: + sec = Security( + tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key + ) + else: + sec = None try: nprocs = len(os.environ["CUDA_VISIBLE_DEVICES"].split(",")) @@ -325,7 +328,7 @@ def del_pid_file(): ), preload=(list(preload) or []) + ["dask_cuda.initialize"], preload_argv=(list(preload_argv) or []) + ["--create-cuda-context"], - security=sec, + #security=sec, env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, plugins={CPUAffinity(get_cpu_affinity(i)), RMMPool(rmm_pool_size)}, name=name if nprocs == 1 or not name else name + "-" + str(i), From a860a1b1c3934a0eb4b5f3ae413badb29abed301 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 19 Jun 2020 14:59:58 -0700 Subject: [PATCH 08/35] Fix argument tls_key argument name --- dask_cuda/dask_cuda_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/dask_cuda_worker.py index fe0a3a57f..47c393e3f 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/dask_cuda_worker.py @@ -234,7 +234,7 @@ def main( enable_proctitle_on_current() enable_proctitle_on_children() - if tls_ca_file and tls_cert and tls_worker_key: + if tls_ca_file and tls_cert and tls_key: sec = Security( tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key ) @@ -328,7 +328,7 @@ def del_pid_file(): ), preload=(list(preload) or []) + ["dask_cuda.initialize"], preload_argv=(list(preload_argv) or []) + ["--create-cuda-context"], - #security=sec, + security=sec, env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, plugins={CPUAffinity(get_cpu_affinity(i)), RMMPool(rmm_pool_size)}, name=name if nprocs == 1 or not name else name + "-" + str(i), From 3fc6db464b31240db746bab8387fb7fbf2d6a14c Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Mon, 22 Jun 2020 08:32:58 -0700 Subject: [PATCH 09/35] REL v0.14.1 release From 027595793c4fa3613064a8acf87e9fc302ff5e3b Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 26 Aug 2020 10:59:55 -0700 Subject: [PATCH 10/35] REL v0.15.0 release From d714829c29d9be2581611cf327b28c15abab029e Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 21 Oct 2020 14:14:30 -0700 Subject: [PATCH 11/35] REL v0.16.0 release From 81371d55827f53fe985e415db34e85c32a7c16f6 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Thu, 10 Dec 2020 08:43:02 -0900 Subject: [PATCH 12/35] REL v0.17.0 release From 89b82cf0df229d842792f8493f89e7ca48acfb79 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 24 Feb 2021 09:53:34 -0800 Subject: [PATCH 13/35] REL v0.18.0 release From 1acf55e170797ae41284c56495d293c9feb7f004 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 21 Apr 2021 10:40:39 -0700 Subject: [PATCH 14/35] REL v0.19.0 release From e5bf324562f11fc3a62f30c6e6c739c57371156d Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 9 Jun 2021 09:43:19 -0700 Subject: [PATCH 15/35] REL v21.06.00 release From 1287a1566868b77b2ebeb486037796a6148f8892 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 4 Aug 2021 07:37:25 -0700 Subject: [PATCH 16/35] REL v21.08.00 release From 5311c1a6ac481047b95b4ba3bcda39a1fddeab0a Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 6 Oct 2021 08:35:51 -0700 Subject: [PATCH 17/35] REL v21.10.00 release From e1e49b6f3dce5a0d9a2354350f61a07243be630a Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 8 Dec 2021 17:06:45 +0000 Subject: [PATCH 18/35] REL v21.12.00 release From a666e9bbb183738f7a54c9faa78b50323772434f Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 2 Feb 2022 16:00:30 +0000 Subject: [PATCH 19/35] REL v22.02.00 release --- ci/gpu/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 4041f2a44..4f764cfbb 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -26,7 +26,7 @@ cd "$WORKSPACE" export GIT_DESCRIBE_TAG=`git describe --tags` export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` export UCX_PATH=$CONDA_PREFIX -export UCXPY_VERSION="0.24.*" +export UCXPY_VERSION=0.24.* # Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, # will possibly be enabled by default starting on 1.17) From 451b3b3a5fb5aa2a194d8acbb966ccaaa020fff1 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 6 Apr 2022 14:44:41 +0000 Subject: [PATCH 20/35] REL v22.04.00 release --- ci/gpu/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 4f764cfbb..1894a78c9 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -26,7 +26,7 @@ cd "$WORKSPACE" export GIT_DESCRIBE_TAG=`git describe --tags` export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` export UCX_PATH=$CONDA_PREFIX -export UCXPY_VERSION=0.24.* +export UCXPY_VERSION=0.25.* # Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, # will possibly be enabled by default starting on 1.17) From 29929663a4cd9fbd2b3c00eb666e777cc81669b1 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Tue, 7 Jun 2022 15:45:42 +0000 Subject: [PATCH 21/35] REL v22.06.00 release From 9860cad03146349e051ec7974af45379328a4b33 Mon Sep 17 00:00:00 2001 From: Raymond Douglass Date: Wed, 17 Aug 2022 10:21:23 -0400 Subject: [PATCH 22/35] update changelog --- CHANGELOG.md | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e244dbd71..8b0cb14f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,41 @@ -# dask-cuda 22.08.00 (Date TBD) +# dask-cuda 22.08.00 (17 Aug 2022) -Please see https://github.com/rapidsai/dask-cuda/releases/tag/v22.08.00a for the latest changes to this development branch. +## 🚨 Breaking Changes + +- Fix useless property ([#944](https://github.com/rapidsai/dask-cuda/pull/944)) [@wence-](https://github.com/wence-) + +## 🐛 Bug Fixes + +- Fix `distributed` error related to `loop_in_thread` ([#963](https://github.com/rapidsai/dask-cuda/pull/963)) [@galipremsagar](https://github.com/galipremsagar) +- Add `__rmatmul__` to `ProxyObject` ([#960](https://github.com/rapidsai/dask-cuda/pull/960)) [@jakirkham](https://github.com/jakirkham) +- Always use versioneer command classes in setup.py ([#948](https://github.com/rapidsai/dask-cuda/pull/948)) [@wence-](https://github.com/wence-) +- Do not dispatch removed `cudf.Frame._index` object ([#947](https://github.com/rapidsai/dask-cuda/pull/947)) [@pentschev](https://github.com/pentschev) +- Fix useless property ([#944](https://github.com/rapidsai/dask-cuda/pull/944)) [@wence-](https://github.com/wence-) +- LocalCUDACluster's memory limit: `None` means no limit ([#943](https://github.com/rapidsai/dask-cuda/pull/943)) [@madsbk](https://github.com/madsbk) +- ProxyManager: support `memory_limit=None` ([#941](https://github.com/rapidsai/dask-cuda/pull/941)) [@madsbk](https://github.com/madsbk) +- Remove deprecated `loop` kwarg to `Nanny` in `CUDAWorker` ([#934](https://github.com/rapidsai/dask-cuda/pull/934)) [@pentschev](https://github.com/pentschev) +- Import `cleanup` fixture in `test_dask_cuda_worker.py` ([#924](https://github.com/rapidsai/dask-cuda/pull/924)) [@pentschev](https://github.com/pentschev) + +## 📖 Documentation + +- Switch docs to use common `js` & `css` code ([#967](https://github.com/rapidsai/dask-cuda/pull/967)) [@galipremsagar](https://github.com/galipremsagar) +- Switch `language` from `None` to `"en"` in docs build ([#939](https://github.com/rapidsai/dask-cuda/pull/939)) [@galipremsagar](https://github.com/galipremsagar) + +## 🚀 New Features + +- Add communications bandwidth to benchmarks ([#938](https://github.com/rapidsai/dask-cuda/pull/938)) [@pentschev](https://github.com/pentschev) + +## 🛠️ Improvements + +- Pin `dask` & `distributed` for release ([#965](https://github.com/rapidsai/dask-cuda/pull/965)) [@galipremsagar](https://github.com/galipremsagar) +- Test memory_limit=None for CUDAWorker ([#946](https://github.com/rapidsai/dask-cuda/pull/946)) [@wence-](https://github.com/wence-) +- benchmarks: Record total number of workers in dataframe ([#945](https://github.com/rapidsai/dask-cuda/pull/945)) [@wence-](https://github.com/wence-) +- Benchmark refactoring: tidy data and multi-node capability via `--scheduler-file` ([#940](https://github.com/rapidsai/dask-cuda/pull/940)) [@wence-](https://github.com/wence-) +- Add util functions to simplify printing benchmarks results ([#937](https://github.com/rapidsai/dask-cuda/pull/937)) [@pentschev](https://github.com/pentschev) +- Add --multiprocessing-method option to benchmarks ([#933](https://github.com/rapidsai/dask-cuda/pull/933)) [@wence-](https://github.com/wence-) +- Remove click pinning ([#932](https://github.com/rapidsai/dask-cuda/pull/932)) [@charlesbluca](https://github.com/charlesbluca) +- Remove compiler variables ([#929](https://github.com/rapidsai/dask-cuda/pull/929)) [@ajschmidt8](https://github.com/ajschmidt8) +- Unpin `dask` & `distributed` for development ([#927](https://github.com/rapidsai/dask-cuda/pull/927)) [@galipremsagar](https://github.com/galipremsagar) # dask-cuda 22.06.00 (7 Jun 2022) From 9a61ce50b466d1fe6c5799190d10527c8d5d5254 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 17 Aug 2022 14:37:16 +0000 Subject: [PATCH 23/35] REL v22.08.00 release From 62a1ee85ccfa67ef99ef8a6b4a28824148f845b5 Mon Sep 17 00:00:00 2001 From: Raymond Douglass Date: Wed, 12 Oct 2022 10:17:27 -0400 Subject: [PATCH 24/35] update changelog --- CHANGELOG.md | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 725c0f569..8a9d2b7a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,26 @@ -# dask-cuda 22.10.00 (Date TBD) +# dask-cuda 22.10.00 (12 Oct 2022) -Please see https://github.com/rapidsai/dask-cuda/releases/tag/v22.10.00a for the latest changes to this development branch. +## 🐛 Bug Fixes + +- Revert "Update rearrange_by_column patch for explicit comms" ([#1001](https://github.com/rapidsai/dask-cuda/pull/1001)) [@rjzamora](https://github.com/rjzamora) +- Address CI failures caused by upstream distributed and cupy changes ([#993](https://github.com/rapidsai/dask-cuda/pull/993)) [@rjzamora](https://github.com/rjzamora) +- DeviceSerialized.__reduce_ex__: convert frame to numpy arrays ([#977](https://github.com/rapidsai/dask-cuda/pull/977)) [@madsbk](https://github.com/madsbk) + +## 📖 Documentation + +- Remove line-break that's breaking link ([#982](https://github.com/rapidsai/dask-cuda/pull/982)) [@ntabris](https://github.com/ntabris) +- Dask-cuda best practices ([#976](https://github.com/rapidsai/dask-cuda/pull/976)) [@quasiben](https://github.com/quasiben) + +## 🚀 New Features + +- Add Groupby benchmark ([#979](https://github.com/rapidsai/dask-cuda/pull/979)) [@rjzamora](https://github.com/rjzamora) + +## 🛠️ Improvements + +- Pin `dask` and `distributed` for release ([#1003](https://github.com/rapidsai/dask-cuda/pull/1003)) [@galipremsagar](https://github.com/galipremsagar) +- Update rearrange_by_column patch for explicit comms ([#992](https://github.com/rapidsai/dask-cuda/pull/992)) [@rjzamora](https://github.com/rjzamora) +- benchmarks: Add option to suppress output of point to point data ([#985](https://github.com/rapidsai/dask-cuda/pull/985)) [@wence-](https://github.com/wence-) +- Unpin `dask` and `distributed` for development ([#971](https://github.com/rapidsai/dask-cuda/pull/971)) [@galipremsagar](https://github.com/galipremsagar) # dask-cuda 22.08.00 (Date TBD) From 382e519751fbe6fe22ce15aaeee45041744c39b1 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Wed, 12 Oct 2022 14:53:10 +0000 Subject: [PATCH 25/35] REL v22.10.00 release From dc4758effb7f34e56ce0b55074244dcbec7772f6 Mon Sep 17 00:00:00 2001 From: gpuCI <38199262+GPUtester@users.noreply.github.com> Date: Thu, 8 Dec 2022 15:14:50 +0000 Subject: [PATCH 26/35] REL v22.12.00 release From 748bccdf3b4c9772eb3f7c94344a3278ff3a0f82 Mon Sep 17 00:00:00 2001 From: Raymond Douglass Date: Thu, 9 Feb 2023 10:41:48 -0500 Subject: [PATCH 27/35] REL v23.02.00 release From 2c50668e8d06d829e8e543b66b9daec70bf58e3b Mon Sep 17 00:00:00 2001 From: Raymond Douglass Date: Wed, 22 Feb 2023 13:08:34 -0500 Subject: [PATCH 28/35] REL v23.02.01 release From d4d6a0204c2bd89730dc56398ad661b01b42623b Mon Sep 17 00:00:00 2001 From: Raymond Douglass Date: Wed, 12 Apr 2023 09:25:01 -0400 Subject: [PATCH 29/35] REL v23.04.00 release From fd3ab2d07a6233a7fb7aefd6e2bec916f72ad576 Mon Sep 17 00:00:00 2001 From: Ray Douglass Date: Wed, 7 Jun 2023 10:53:12 -0400 Subject: [PATCH 30/35] REL v23.06.00 release From efbd6cac9942caea4e74f56c0a0154891ef8f000 Mon Sep 17 00:00:00 2001 From: Ray Douglass Date: Wed, 9 Aug 2023 12:39:31 -0400 Subject: [PATCH 31/35] REL v23.08.00 release From 5ca317cf8629d8ad1f051a9053df2d90c4118f50 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 28 Aug 2023 12:12:26 -0700 Subject: [PATCH 32/35] add shuffle_to_parquet --- dask_cuda/explicit_comms/dataframe/shuffle.py | 138 +++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 0ca1c48ee..e23ba4c67 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -1,8 +1,10 @@ from __future__ import annotations import asyncio +import contextlib import functools import inspect +import threading from collections import defaultdict from math import ceil from operator import getitem @@ -16,7 +18,7 @@ from dask.base import tokenize from dask.dataframe.core import DataFrame, Series, _concat as dd_concat, new_dd_object from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch -from distributed import wait +from distributed import get_worker, wait from distributed.protocol import nested_deserialize, to_serialize from distributed.worker import Worker @@ -28,6 +30,38 @@ Proxify = Callable[[T], T] +_WORKER_CACHE = {} +_WORKER_CACHE_LOCK = threading.RLock() + + +@contextlib.contextmanager +def get_worker_cache(name): + with _WORKER_CACHE_LOCK: + yield _get_worker_cache(name) + + +def _get_worker_cache(name): + """Utility to get the `name` element of the cache + dictionary for the current worker. If executed + by anything other than a distributed Dask worker, + we will use the global `_WORKER_CACHE` variable. + """ + try: + worker = get_worker() + except ValueError: + # There is no dask.distributed worker. + # Assume client/worker are same process + global _WORKER_CACHE # pylint: disable=global-variable-not-assigned + if name not in _WORKER_CACHE: + _WORKER_CACHE[name] = {} + return _WORKER_CACHE[name] + if not hasattr(worker, "worker_cache"): + worker.worker_cache = {} + if name not in worker.worker_cache: + worker.worker_cache[name] = {} + return worker.worker_cache[name] + + def get_proxify(worker: Worker) -> Proxify: """Get function to proxify objects""" from dask_cuda.proxify_host_file import ProxifyHostFile @@ -328,6 +362,8 @@ async def shuffle_task( ignore_index: bool, num_rounds: int, batchsize: int, + parquet_dir: str | None, + final_task: bool, ) -> List[DataFrame]: """Explicit-comms shuffle task @@ -386,6 +422,34 @@ async def shuffle_task( out_part_id_to_dataframe_list, ) + if parquet_dir: + import cudf + + out_part_ids = list(out_part_id_to_dataframe_list.keys()) + for out_part_id in out_part_ids: + writers = _get_worker_cache("writers") + try: + writer = writers[out_part_id] + except KeyError: + fn = f"{parquet_dir}/part.{out_part_id}.parquet" + writer = cudf.io.parquet.ParquetWriter(fn, index=False) + writers[out_part_id] = writer + + dfs = out_part_id_to_dataframe_list.pop(out_part_id) + for df in dfs: + writer.write_table(df) + del dfs + await asyncio.sleep(0) + + if parquet_dir: + out_part_ids = list(writers.keys()) + if final_task: + for out_part_id in out_part_ids: + writers.pop(out_part_id).close() + await asyncio.sleep(0) + del writers + return out_part_ids + # Finally, we concatenate the output dataframes into the final output partitions ret = [] while out_part_id_to_dataframe_list: @@ -519,6 +583,7 @@ def shuffle( ignore_index, num_rounds, batchsize, + True, ) wait(list(shuffle_result.values())) @@ -545,6 +610,77 @@ def shuffle( return ret +def shuffle_to_parquet( + full_df: DataFrame, + column_names: List[str], + parquet_dir: str, + npartitions: Optional[int] = None, + ignore_index: bool = False, + batchsize: int = 4, +) -> DataFrame: + + import dask_cudf + + c = comms.default_comms() + + # The ranks of the output workers + ranks = list(range(len(c.worker_addresses))) + + # By default, we preserve number of partitions + if npartitions is None: + npartitions = full_df.npartitions + + # Find the output partition IDs for each worker + div = npartitions // len(ranks) + rank_to_out_part_ids: Dict[int, Set[int]] = {} # rank -> set of partition id + for i, rank in enumerate(ranks): + rank_to_out_part_ids[rank] = set(range(div * i, div * (i + 1))) + for rank, i in zip(ranks, range(div * len(ranks), npartitions)): + rank_to_out_part_ids[rank].add(i) + + parts_per_batch = len(ranks) * batchsize + num_rounds = ceil(full_df.npartitions / parts_per_batch) + for stage in range(num_rounds): + offset = parts_per_batch * stage + df = full_df.partitions[offset : offset + parts_per_batch] + + # Step (a): + df = df.persist() # Make sure optimizations are apply on the existing graph + wait([df]) # Make sure all keys has been materialized on workers + name = ( + "explicit-comms-shuffle-" + f"{tokenize(df, column_names, npartitions, ignore_index)}" + ) + + # Stage all keys of `df` on the workers and cancel them, which makes it possible + # for the shuffle to free memory as the partitions of `df` are consumed. + # See CommsContext.stage_keys() for a description of staging. + rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__()) + max_num_inkeys = max(len(k) for k in rank_to_inkeys.values()) + c.client.cancel(df) + + # Run a shuffle task on each worker + shuffle_result = {} + for rank in ranks: + shuffle_result[rank] = c.submit( + c.worker_addresses[rank], + shuffle_task, + name, + rank_to_inkeys, + rank_to_out_part_ids, + column_names, + npartitions, + ignore_index, + 1, + max_num_inkeys, + parquet_dir, + stage == (num_rounds - 1), + ) + wait(list(shuffle_result.values())) + + return dask_cudf.read_parquet(parquet_dir, blocksize=None) + + def _use_explicit_comms() -> bool: """Is explicit-comms and available?""" if dask.config.get("explicit-comms", False): From 2df6f913645971f1c2233d617a6d6e5318a69465 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 28 Aug 2023 13:07:58 -0700 Subject: [PATCH 33/35] add pre_shuffle callback --- dask_cuda/explicit_comms/dataframe/shuffle.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index e23ba4c67..14ac010ae 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -617,6 +617,7 @@ def shuffle_to_parquet( npartitions: Optional[int] = None, ignore_index: bool = False, batchsize: int = 4, + pre_shuffle: Optional[int] = None, ) -> DataFrame: import dask_cudf @@ -644,7 +645,10 @@ def shuffle_to_parquet( offset = parts_per_batch * stage df = full_df.partitions[offset : offset + parts_per_batch] - # Step (a): + # Execute pre-shuffle function on each batch + if callable(pre_shuffle): + df = pre_shuffle(df) + df = df.persist() # Make sure optimizations are apply on the existing graph wait([df]) # Make sure all keys has been materialized on workers name = ( @@ -678,7 +682,7 @@ def shuffle_to_parquet( ) wait(list(shuffle_result.values())) - return dask_cudf.read_parquet(parquet_dir, blocksize=None) + return dask_cudf.read_parquet(parquet_dir, split_row_groups=False) def _use_explicit_comms() -> bool: From 22697f639d5c40486f9b03d038d54006e8edbda5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 28 Aug 2023 20:40:45 -0700 Subject: [PATCH 34/35] add more options (experimental) --- dask_cuda/explicit_comms/dataframe/shuffle.py | 64 +++++++++++++------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 14ac010ae..f1ae95f7b 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -5,6 +5,7 @@ import functools import inspect import threading +import uuid from collections import defaultdict from math import ceil from operator import getitem @@ -407,6 +408,8 @@ async def shuffle_task( assert stage.keys() == rank_to_inkeys[myrank] no_comm_postprocess = get_no_comm_postprocess(stage, num_rounds, batchsize, proxify) + fns = [] + append_files = True # Whether to keep files open between batches out_part_id_to_dataframe_list: Dict[int, List[DataFrame]] = defaultdict(list) for _ in range(num_rounds): partitions = create_partitions( @@ -427,28 +430,44 @@ async def shuffle_task( out_part_ids = list(out_part_id_to_dataframe_list.keys()) for out_part_id in out_part_ids: - writers = _get_worker_cache("writers") - try: - writer = writers[out_part_id] - except KeyError: - fn = f"{parquet_dir}/part.{out_part_id}.parquet" - writer = cudf.io.parquet.ParquetWriter(fn, index=False) - writers[out_part_id] = writer - - dfs = out_part_id_to_dataframe_list.pop(out_part_id) - for df in dfs: - writer.write_table(df) - del dfs - await asyncio.sleep(0) + if append_files: + writers = _get_worker_cache("writers") + try: + writer = writers[out_part_id] + except KeyError: + fn = f"{parquet_dir}/part.{out_part_id}.parquet" + fns.append(fn) + writer = cudf.io.parquet.ParquetWriter(fn, index=False) + writers[out_part_id] = writer + dfs = out_part_id_to_dataframe_list.pop(out_part_id) + dfs = [df for df in dfs if len(dfs) > 0] + for df in dfs: + writer.write_table(df) + del dfs + else: + dfs = out_part_id_to_dataframe_list.pop(out_part_id) + id = str(uuid.uuid4())[:8] + fn = f"{parquet_dir}/part.{out_part_id}.{id}.parquet" + fns.append(fn) + dfs = [df for df in dfs if len(dfs) > 0] + if len(dfs) > 1: + with cudf.io.parquet.ParquetWriter(fn, index=False) as writer: + for df in dfs: + writer.write_table(df) + elif dfs: + dfs[0].to_parquet(fn, index=False) + del dfs + await asyncio.sleep(0) if parquet_dir: - out_part_ids = list(writers.keys()) - if final_task: - for out_part_id in out_part_ids: - writers.pop(out_part_id).close() - await asyncio.sleep(0) - del writers - return out_part_ids + if append_files: + if final_task: + for out_part_id in list(writers.keys()): + writers.pop(out_part_id).close() + await asyncio.sleep(0) + del writers + return fns + return fns # Finally, we concatenate the output dataframes into the final output partitions ret = [] @@ -620,10 +639,15 @@ def shuffle_to_parquet( pre_shuffle: Optional[int] = None, ) -> DataFrame: + import os + import dask_cudf c = comms.default_comms() + if not os.path.isdir(parquet_dir): + os.mkdir(parquet_dir) + # The ranks of the output workers ranks = list(range(len(c.worker_addresses))) From f67966f702b99ba60eb392b19a1d36931f008a9d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 12 Dec 2023 16:21:21 -0800 Subject: [PATCH 35/35] basic to_worker_storage and from_worker_storage API --- dask_cuda/explicit_comms/dataframe/shuffle.py | 28 ++-- dask_cuda/explicit_comms/dataframe/utils.py | 140 ++++++++++++++++++ 2 files changed, 158 insertions(+), 10 deletions(-) create mode 100644 dask_cuda/explicit_comms/dataframe/utils.py diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 8d8036452..fb61c4a7d 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -25,6 +25,11 @@ from .. import comms +try: + from tqdm import tqdm +except ImportError: + tqdm = lambda x: x + T = TypeVar("T") @@ -638,17 +643,20 @@ def shuffle_to_parquet( npartitions: Optional[int] = None, ignore_index: bool = False, batchsize: int = 2, - pre_shuffle: Optional = None, -) -> DataFrame: - - import os - - import dask_cudf + pre_shuffle: Optional[int] = None, + overwrite: bool = False, +) -> None: + from dask_cuda.explicit_comms.dataframe.utils import ( + _clean_worker_storage, + _prepare_dir, + ) c = comms.default_comms() - if not os.path.isdir(parquet_dir): - os.mkdir(parquet_dir) + # Assume we are writing to local worker storage + if overwrite: + wait(c.client.run(_clean_worker_storage, parquet_dir)) + wait(c.client.run(_prepare_dir, parquet_dir)) # The ranks of the output workers ranks = list(range(len(c.worker_addresses))) @@ -667,7 +675,7 @@ def shuffle_to_parquet( parts_per_batch = len(ranks) * batchsize num_rounds = ceil(full_df.npartitions / parts_per_batch) - for stage in range(num_rounds): + for stage in tqdm(range(num_rounds)): offset = parts_per_batch * stage df = full_df.partitions[offset : offset + parts_per_batch] @@ -708,7 +716,7 @@ def shuffle_to_parquet( ) wait(list(shuffle_result.values())) - return dask_cudf.read_parquet(parquet_dir, split_row_groups=False) + return def _use_explicit_comms() -> bool: diff --git a/dask_cuda/explicit_comms/dataframe/utils.py b/dask_cuda/explicit_comms/dataframe/utils.py new file mode 100644 index 000000000..9d1783b3e --- /dev/null +++ b/dask_cuda/explicit_comms/dataframe/utils.py @@ -0,0 +1,140 @@ +import glob +import os +import pickle +from collections import defaultdict + +from dask.blockwise import BlockIndex +from distributed import wait +from distributed.protocol import dask_deserialize, dask_serialize + +from dask_cuda.explicit_comms import comms + + +class LazyLoad: + def __init__(self, path, index, **kwargs): + self.path = path + self.index = index + self.kwargs = kwargs + + def pre_serialize(self): + """Make the unloaded partition serializable""" + return self.load() + + def load(self): + """Load the partition into memory""" + import cudf + + fn = glob.glob(f"{self.path}/*.{self.index}.parquet") + return cudf.read_parquet(fn, **self.kwargs) + + +@dask_serialize.register(LazyLoad) +def _serialize_unloaded(obj): + return None, [pickle.dumps(obj.pre_serialize())] + + +@dask_deserialize.register(LazyLoad) +def _deserialize_unloaded(header, frames): + return pickle.loads(frames[0]) + + +def _prepare_dir(dirpath: str): + os.makedirs(dirpath, exist_ok=True) + + +def _clean_worker_storage(dirpath: str): + import shutil + + if os.path.isdir(dirpath): + shutil.rmtree(dirpath) + + +def _write_partition(part, dirpath, index, token=None): + if token is None: + fn = f"{dirpath}/part.{index[0]}.parquet" + else: + fn = f"{dirpath}/part.{token}.{index[0]}.parquet" + part.to_parquet(fn) + return index + + +def _get_partition(dirpath, index): + return LazyLoad(dirpath, index) + + +def _get_metadata(dirpath, index): + import glob + + import pyarrow.parquet as pq + + import cudf + + fn = glob.glob(f"{dirpath}/*.{index}.parquet")[0] + return cudf.DataFrame.from_arrow( + pq.ParquetFile(fn).schema.to_arrow_schema().empty_table() + ) + + +def _load_partition(data): + if isinstance(data, LazyLoad): + data = data.load() + return data + + +def to_worker_storage(df, dirpath, shuffle_on=None, overwrite=False, **kwargs): + + if shuffle_on: + from dask_cuda.explicit_comms.dataframe.shuffle import shuffle_to_parquet + + if not isinstance(shuffle_on, list): + shuffle_on = [shuffle_on] + return shuffle_to_parquet( + df, shuffle_on, dirpath, overwrite=overwrite, **kwargs + ) + + c = comms.default_comms() + if overwrite: + wait(c.client.run(_clean_worker_storage, dirpath)) + wait(c.client.run(_prepare_dir, dirpath)) + df.map_partitions( + _write_partition, + dirpath, + BlockIndex((df.npartitions,)), + **kwargs, + ).compute() + + +def from_worker_storage(dirpath): + import dask_cudf + + c = comms.default_comms() + + def get_indices(path): + return {int(fn.split(".")[-2]) for fn in glob.glob(path + "/*.parquet")} + + worker_indices = c.client.run(get_indices, dirpath) + + summary = defaultdict(list) + for worker, indices in worker_indices.items(): + for index in indices: + summary[index].append(worker) + + assignments = {} + futures = [] + meta = None + for i, (worker, indices) in enumerate(summary.items()): + assignments[worker] = indices[i % len(indices)] + futures.append( + c.client.submit(_get_partition, dirpath, i, workers=[assignments[i]]) + ) + if meta is None: + meta = c.client.submit(_get_metadata, dirpath, i, workers=[assignments[i]]) + wait(meta) + meta = meta.result() + wait(futures) + + return dask_cudf.from_delayed(futures, meta=meta, verify_meta=False).map_partitions( + _load_partition, + meta=meta, + enforce_metadata=False, + )