diff --git a/README.md b/README.md index 4df538aa2d..aeb6ddfa46 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,89 @@ The C++ library is header-only making it easy to include in [existing projects]( * A Python [Zarr](https://zarr.readthedocs.io/en/stable/) backend for reading and writing GPU data to file seamlessly. * Concurrent reads and writes using an internal thread pool. * Non-blocking API. -* Handle both host and device IO seamlessly. +* Read/write to both host and device memory seamlessly. +* Provides compile-time optional remote read from AWS S3 storage seamlessly, using the [AWS SDK](https://docs.aws.amazon.com/sdkref/latest/guide/overview.html). * Provides Python bindings to [nvCOMP](https://github.com/NVIDIA/nvcomp). + ### Documentation * Python: * C++: + + +### Examples + +#### Python +```python +import cupy +import kvikio + +def main(path): + a = cupy.arange(100) + f = kvikio.CuFile(path, "w") + # Write whole array to file + f.write(a) + f.close() + + b = cupy.empty_like(a) + f = kvikio.CuFile(path, "r") + # Read whole array from file + f.read(b) + assert all(a == b) + + # Use contexmanager + c = cupy.empty_like(a) + with kvikio.CuFile(path, "r") as f: + f.read(c) + assert all(a == c) + + # Non-blocking read + d = cupy.empty_like(a) + with kvikio.CuFile(path, "r") as f: + future1 = f.pread(d[:50]) + future2 = f.pread(d[50:], file_offset=d[:50].nbytes) + future1.get() # Wait for first read + future2.get() # Wait for second read + assert all(a == d) + + +if __name__ == "__main__": + main("/tmp/kvikio-hello-world-file") +``` + +#### C++ +```c++ +#include +#include +#include +using namespace std; + +int main() +{ + // Create two arrays `a` and `b` + constexpr std::size_t size = 100; + void *a = nullptr; + void *b = nullptr; + cudaMalloc(&a, size); + cudaMalloc(&b, size); + + // Write `a` to file + kvikio::FileHandle fw("test-file", "w"); + size_t written = fw.write(a, size); + fw.close(); + + // Read file into `b` + kvikio::FileHandle fr("test-file", "r"); + size_t read = fr.read(b, size); + fr.close(); + + // Read file into `b` in parallel using 16 threads + kvikio::default_thread_pool::reset(16); + { + kvikio::FileHandle f("test-file", "r"); + future future = f.pread(b_dev, sizeof(a), 0); // Non-blocking + size_t read = future.get(); // Blocking + // Notice, `f` closes automatically on destruction. + } +} +``` diff --git a/build.sh b/build.sh index 6ccf832e1d..09dbc0dfac 100755 --- a/build.sh +++ b/build.sh @@ -18,15 +18,16 @@ ARGS=$* # script, and that this script resides in the repo dir! REPODIR=$(cd $(dirname $0); pwd) -VALIDARGS="clean libkvikio kvikio -v -g -n --pydevelop -h" -HELP="$0 [clean] [libkvikio] [kvikio] [-v] [-g] [-n] [--cmake-args=\"\"] [-h] +VALIDARGS="clean libkvikio kvikio -v -g -n --pydevelop --no-s3 -h" +HELP="$0 [clean] [libkvikio] [kvikio] [--no-s3] [-v] [-g] [-n] [--pydevelop] [--cmake-args=\"\"] [-h] clean - remove all existing build artifacts and configuration (start over) libkvikio - build and install the libkvikio C++ code kvikio - build and install the kvikio Python package (requires libkvikio) + --no-s3 - build with no AWS S3 support -v - verbose build mode -g - build for debug -n - no install step - --pydevelop - Install Python packages in editable mode + --pydevelop - install Python packages in editable mode --cmake-args=\\\"\\\" - pass arbitrary list of CMake configuration options (escape all quotes in argument) -h - print this text default action (no args) is to build and install 'libkvikio' and 'kvikio' targets @@ -36,6 +37,7 @@ KVIKIO_BUILD_DIR="${REPODIR}/python/kvikio/build/" BUILD_DIRS="${LIBKVIKIO_BUILD_DIR} ${KVIKIO_BUILD_DIR}" # Set defaults for vars modified by flags to this script +ENABLE_S3_SUPPORT="-DKvikIO_AWSSDK_SUPPORT=ON" VERBOSE_FLAG="" BUILD_TYPE=Release INSTALL_TARGET=install @@ -86,6 +88,7 @@ function ensureCMakeRan { cmake -B "${LIBKVIKIO_BUILD_DIR}" -S . \ -DCMAKE_INSTALL_PREFIX="${INSTALL_PREFIX}" \ -DCMAKE_BUILD_TYPE=${BUILD_TYPE} \ + ${ENABLE_S3_SUPPORT} \ ${EXTRA_CMAKE_ARGS} RAN_CMAKE=1 fi @@ -109,6 +112,9 @@ if (( ${NUMARGS} != 0 )); then fi # Process flags +if hasArg --no-s3; then + ENABLE_S3_SUPPORT="-DKvikIO_AWSSDK_SUPPORT=OFF" +fi if hasArg -v; then VERBOSE_FLAG=-v export SKBUILD_BUILD_VERBOSE=true diff --git a/conda/environments/all_cuda-118_arch-aarch64.yaml b/conda/environments/all_cuda-118_arch-aarch64.yaml index 65ca39fa34..9c3b84b19b 100644 --- a/conda/environments/all_cuda-118_arch-aarch64.yaml +++ b/conda/environments/all_cuda-118_arch-aarch64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp>=1.11.267 +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-python>=11.7.1,<12.0a0 @@ -17,6 +19,7 @@ dependencies: - dask>=2022.05.2 - doxygen=1.9.1 - gcc_linux-aarch64=11.* +- moto>=4.0.8 - ninja - numcodecs !=0.12.0 - numpy>=1.23,<3.0a0 diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index a020690e64..1d168c6ae2 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp>=1.11.267 +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-python>=11.7.1,<12.0a0 @@ -19,6 +21,7 @@ dependencies: - gcc_linux-64=11.* - libcufile-dev=1.4.0.31 - libcufile=1.4.0.31 +- moto>=4.0.8 - ninja - numcodecs !=0.12.0 - numpy>=1.23,<3.0a0 diff --git a/conda/environments/all_cuda-125_arch-aarch64.yaml b/conda/environments/all_cuda-125_arch-aarch64.yaml index 31145241d7..5e1d4483e0 100644 --- a/conda/environments/all_cuda-125_arch-aarch64.yaml +++ b/conda/environments/all_cuda-125_arch-aarch64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp>=1.11.267 +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-nvcc @@ -18,6 +20,7 @@ dependencies: - doxygen=1.9.1 - gcc_linux-aarch64=11.* - libcufile-dev +- moto>=4.0.8 - ninja - numcodecs !=0.12.0 - numpy>=1.23,<3.0a0 diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 4d7d0be7c6..1dc99fdce9 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp>=1.11.267 +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-nvcc @@ -18,6 +20,7 @@ dependencies: - doxygen=1.9.1 - gcc_linux-64=11.* - libcufile-dev +- moto>=4.0.8 - ninja - numcodecs !=0.12.0 - numpy>=1.23,<3.0a0 diff --git a/conda/recipes/kvikio/meta.yaml b/conda/recipes/kvikio/meta.yaml index c70e8aebfe..96c0ea4ae6 100644 --- a/conda/recipes/kvikio/meta.yaml +++ b/conda/recipes/kvikio/meta.yaml @@ -64,11 +64,13 @@ requirements: - rapids-build-backend >=0.3.0,<0.4.0.dev0 - scikit-build-core >=0.10.0 - libkvikio ={{ version }} + - aws-sdk-cpp>=1.11.267 run: - python - numpy >=1.23,<3.0a0 - cupy >=12.0.0 - zarr + - aws-sdk-cpp>=1.11.267 # See https://github.com/zarr-developers/numcodecs/pull/475 - numcodecs !=0.12.0 - packaging diff --git a/conda/recipes/libkvikio/meta.yaml b/conda/recipes/libkvikio/meta.yaml index 186c373f56..1ea46a59e4 100644 --- a/conda/recipes/libkvikio/meta.yaml +++ b/conda/recipes/libkvikio/meta.yaml @@ -52,6 +52,7 @@ requirements: {% else %} - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp>=1.11.267 outputs: - name: libkvikio @@ -83,6 +84,7 @@ outputs: {% else %} - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp>=1.11.267 test: commands: - test -f $PREFIX/include/kvikio/file_handle.hpp @@ -106,6 +108,7 @@ outputs: - cuda-cudart-dev - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp>=1.11.267 requirements: build: - cmake {{ cmake_version }} @@ -118,6 +121,7 @@ outputs: - cuda-cudart-dev - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp>=1.11.267 run: - {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }} {% if cuda_major == "11" %} @@ -127,6 +131,7 @@ outputs: - cuda-cudart - libcufile # [linux] {% endif %} + - aws-sdk-cpp>=1.11.267 about: home: https://rapids.ai license: Apache-2.0 diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f4f3f13109..0afcb888e0 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -37,6 +37,7 @@ rapids_cmake_build_type(Release) # build options option(KvikIO_BUILD_EXAMPLES "Configure CMake to build examples" ON) +option(KvikIO_AWSSDK_SUPPORT "Configure CMake to build with AWS S3 support" ON) rapids_cmake_support_conda_env(conda_env MODIFY_PREFIX_PATH) @@ -55,6 +56,27 @@ rapids_find_package( INSTALL_EXPORT_SET kvikio-exports ) +# If AWSSDK isn't found, the Cython module remote_handle.pyx isn't built and C++ users shouldn't +# include +if(KvikIO_AWSSDK_SUPPORT) + include(cmake/thirdparty/get_aws_sdk_cpp.cmake) +endif() + +if(TARGET aws-cpp-sdk-s3) + get_property( + _lib_type + TARGET aws-cpp-sdk-s3 + PROPERTY TYPE + ) + if(_lib_type STREQUAL "STATIC_LIBRARY") + rapids_find_package( + ZLIB + BUILD_EXPORT_SET kvikio-exports + INSTALL_EXPORT_SET kvikio-exports + ) + endif() +endif() + rapids_find_package( cuFile BUILD_EXPORT_SET kvikio-exports @@ -131,6 +153,10 @@ target_include_directories( target_link_libraries( kvikio INTERFACE Threads::Threads ${CMAKE_DL_LIBS} nvtx3::nvtx3-cpp BS::thread_pool ) +if(TARGET aws-cpp-sdk-s3) + target_link_libraries(kvikio INTERFACE $) + target_compile_definitions(kvikio INTERFACE $) +endif() target_compile_features(kvikio INTERFACE cxx_std_17) # optionally build examples @@ -217,6 +243,14 @@ if(NOT already_set_kvikio) target_compile_definitions(kvikio::kvikio INTERFACE KVIKIO_CUFILE_STREAM_API_FOUND) endif() endif() + + if(KvikIO_AWSSDK_SUPPORT) + find_package(AWSSDK COMPONENTS s3 QUIET) + endif() + if(TARGET aws-cpp-sdk-s3) + target_link_libraries(kvikio::kvikio INTERFACE aws-cpp-sdk-s3) + target_compile_definitions(kvikio::kvikio INTERFACE $) + endif() endif() ]=] ) diff --git a/cpp/cmake/thirdparty/get_aws_sdk_cpp.cmake b/cpp/cmake/thirdparty/get_aws_sdk_cpp.cmake new file mode 100644 index 0000000000..6d32889e1f --- /dev/null +++ b/cpp/cmake/thirdparty/get_aws_sdk_cpp.cmake @@ -0,0 +1,45 @@ +# ============================================================================= +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +# This function finds aws-sdk-cpp and sets any additional necessary environment variables. +function(find_and_configure_aws_sdk_cpp) + include(${rapids-cmake-dir}/cpm/find.cmake) + + # Attempt to use find_package() - the patch is only needed if building from source + set(CPM_USE_LOCAL_PACKAGES ON) + + rapids_cpm_find( + AWSSDK 1.11.267 + GLOBAL_TARGETS aws-cpp-sdk-s3 COMPONENTS S3 + BUILD_EXPORT_SET kvikio-exports + INSTALL_EXPORT_SET kvikio-exports + CPM_ARGS + GIT_REPOSITORY https://github.com/aws/aws-sdk-cpp.git + GIT_TAG 1.11.393 + PATCH_COMMAND + ${CMAKE_COMMAND} + -E + env + GIT_COMMITTER_NAME=rapids-cmake + GIT_COMMITTER_EMAIL=rapids.cmake@rapids.ai + git + am + --no-gpg-sign + ${CMAKE_CURRENT_LIST_DIR}/patches/aws-sdk-cpp/0001-Don-t-set-CMP0077-to-OLD.patch + OPTIONS "BUILD_ONLY s3" "BUILD_SHARED_LIBS OFF" "ENABLE_TESTING OFF" "ENABLE_UNITY_BUILD ON" + "USE_CRT_HTTP_CLIENT ON" + ) +endfunction() + +find_and_configure_aws_sdk_cpp() diff --git a/cpp/cmake/thirdparty/patches/aws-sdk-cpp/0001-Don-t-set-CMP0077-to-OLD.patch b/cpp/cmake/thirdparty/patches/aws-sdk-cpp/0001-Don-t-set-CMP0077-to-OLD.patch new file mode 100644 index 0000000000..b1f4168436 --- /dev/null +++ b/cpp/cmake/thirdparty/patches/aws-sdk-cpp/0001-Don-t-set-CMP0077-to-OLD.patch @@ -0,0 +1,26 @@ +From 7b24166a73e422e65b725ffcb0acd20ab493fac0 Mon Sep 17 00:00:00 2001 +From: Kyle Edwards +Date: Wed, 28 Aug 2024 15:32:07 -0400 +Subject: [PATCH] Don't set CMP0077 to OLD + +--- + CMakeLists.txt | 4 ---- + 1 file changed, 4 deletions(-) + +diff --git a/CMakeLists.txt b/CMakeLists.txt +index c17ff8a07b1..b30bc81b6df 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -13,10 +13,6 @@ if (LEGACY_BUILD) + "update the build flags as mentioned in README.md and set -DLEGACY_BUILD=OFF. " + "The legacy support will be removed at 1.12.0 release.") + +- if (POLICY CMP0077) +- cmake_policy(SET CMP0077 OLD) # CMP0077: option() honors normal variables. Introduced in 3.13 +- endif () +- + get_filename_component(AWS_NATIVE_SDK_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE) + + # Cmake invocation variables: +-- +2.34.1 diff --git a/cpp/doxygen/main_page.md b/cpp/doxygen/main_page.md index 21a33b1d45..be8a924f48 100644 --- a/cpp/doxygen/main_page.md +++ b/cpp/doxygen/main_page.md @@ -108,6 +108,13 @@ KvikIO might have to use intermediate host buffers (one per thread) when copying This setting can also be controlled by `defaults::bounce_buffer_size()` and `defaults::bounce_buffer_size_reset()`. +This setting can also be controlled by `defaults::gds_threshold()` and `defaults::gds_threshold_reset()`. + +#### Size of the Bounce Buffer (KVIKIO_GDS_THRESHOLD) +KvikIO might have to use an intermediate host buffer when copying between file and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to size (in bytes) of this "bounce" buffer. If not set, the default value is 16777216 (16 MiB). + +This setting can also be controlled by `defaults::bounce_buffer_size()` and `defaults::bounce_buffer_size_reset()`. + ## Example diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index c12ddb2e52..284590e943 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -14,6 +14,8 @@ set(TEST_INSTALL_PATH bin/tests/libkvikio) +# Example: basic_io + if(CUDAToolkit_FOUND) add_executable(BASIC_IO_TEST basic_io.cpp) set_target_properties(BASIC_IO_TEST PROPERTIES INSTALL_RPATH "\$ORIGIN/../../lib") @@ -35,6 +37,8 @@ else() message(STATUS "Cannot build the basic_io example when CUDA is not found") endif() +# Example: basic_no_cuda + add_executable(BASIC_NO_CUDA_TEST basic_no_cuda.cpp) set_target_properties(BASIC_NO_CUDA_TEST PROPERTIES INSTALL_RPATH "\$ORIGIN/../../lib") target_include_directories(BASIC_NO_CUDA_TEST PRIVATE ../include) diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index fbd510d86f..f84e792489 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -286,7 +286,7 @@ class FileHandle { * * @return The number of bytes */ - [[nodiscard]] inline std::size_t nbytes() const + [[nodiscard]] std::size_t nbytes() const { if (closed()) { return 0; } if (_nbytes == 0) { _nbytes = detail::get_file_size(_fd_direct_off); } diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp new file mode 100644 index 0000000000..0e0fc09f83 --- /dev/null +++ b/cpp/include/kvikio/remote_handle.hpp @@ -0,0 +1,378 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#ifndef KVIKIO_AWS_SDK_FOUND +#error "cannot include , configuration did not find AWS SDK" +#endif + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace kvikio { +namespace detail { + +/** + * Stream implementation of a fixed size buffer. + */ +class BufferAsStream : public Aws::IOStream { + public: + using Base = Aws::IOStream; + explicit BufferAsStream(std::streambuf* buf) : Base(buf) {} + + ~BufferAsStream() override = default; +}; + +/** + * @brief Given a file path like "s3:///", return the name of the bucket and object. + * + * @throws std::invalid_argument if file path is ill-formed or is missing the bucket or object name. + * + * @param path S3 file path. + * @return Pair of strings: [bucket-name, object-name]. + */ +inline std::pair parse_s3_path(std::string const& path) +{ + if (path.empty()) { throw std::invalid_argument("The remote path cannot be an empty string."); } + if (path.size() < 5 || path.substr(0, 5) != "s3://") { + throw std::invalid_argument("The remote path must start with the S3 scheme (\"s3://\")."); + } + std::string p = path.substr(5); + if (p.empty()) { throw std::invalid_argument("The remote path cannot be an empty string."); } + size_t pos = p.find_first_of('/'); + std::string bucket_name = p.substr(0, pos); + if (bucket_name.empty()) { + throw std::invalid_argument("The remote path does not contain a bucket name."); + } + std::string object_name = (pos == std::string::npos) ? "" : p.substr(pos + 1); + if (object_name.empty()) { + throw std::invalid_argument("The remote path does not contain an object name."); + } + return std::make_pair(std::move(bucket_name), std::move(object_name)); +} + +} // namespace detail + +/** + * @brief S3 context that initializes and maintains the S3 SDK and client. + * + * If not given an existing S3 client, S3Context calls `Aws::InitAPI()` and `Aws::ShutdownAPI`, + * which inherit some limitations from the SDK. + * - The SDK for C++ and its dependencies use C++ static objects, and the order of static object + * destruction is not determined by the C++ standard. To avoid memory issues caused by the + * nondeterministic order of static variable destruction, do not wrap `S3Context` in another + * static object. + * - Please construct and destruct `S3Context` from the same thread (use a dedicated thread if + * necessary). This avoids problems in initializing the dependent Common RunTime C libraries. + */ +class S3Context { + private: + // We use a shared pointer since constructing a default `Aws::S3::S3Client` before calling + // `Aws::InitAPI` is illegal. + std::shared_ptr _client; + // Only call `Aws::ShutdownAPI`, if `Aws::InitAPI` was called on construction. + bool const _shutdown_s3_api; + + public: + /** + * @brief Create a context given an existing S3 client + * + * The S3 SDK isn't initialized. + * + * @param client The S3 client + */ + S3Context(std::shared_ptr client) + : _client{std::move(client)}, _shutdown_s3_api{false} + { + if (!_client) { throw std::invalid_argument("S3Context(): S3 client cannot be null"); } + } + + /** + * @brief Create a new context with a newly created S3 client. + * + * The S3 SDK is automatically initialized on construction and shutdown on destruction. + * + * The new S3 client use the default `Aws::Client::ClientConfiguration`, thus please make sure + * that AWS credentials have been configured on the system. A common way to do this, is to set the + * environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. + * + * Other relevant options are `AWS_DEFAULT_REGION` and `AWS_ENDPOINT_URL`, see + * . + * + * @throws std::runtime_error If failing to authenticate to the S3 server. + * + * @param endpoint_override If not empty, the address of the S3 server. This takes precedence + * over the AWS system configuration including the `AWS_ENDPOINT_URL` environment variable. + */ + S3Context(std::string const& endpoint_override = "") : _shutdown_s3_api{true} + { + // NB: `Aws::InitAPI` has to be called before everything in the SDK beside `Aws::SDKOptions`, + // even before config structs like `Aws::Client::ClientConfiguration`. However, we are now + // allowed to call `Aws::InitAPI` multiple times: + Aws::SDKOptions options; + Aws::InitAPI(options); + + // Create a client config where `endpoint_override` takes precedence over `AWS_ENDPOINT_URL` + Aws::Client::ClientConfiguration config; + char const* ep = std::getenv("AWS_ENDPOINT_URL"); + if (!endpoint_override.empty()) { + config.endpointOverride = endpoint_override; + } else if (ep != nullptr && !std::string(ep).empty()) { + config.endpointOverride = ep; + } + + // We check authentication here to trigger an early exception. + Aws::Auth::DefaultAWSCredentialsProviderChain provider; + if (provider.GetAWSCredentials().IsEmpty()) { + throw std::runtime_error("failed authentication to S3 server"); + } + _client = std::make_shared(config); + } + + ~S3Context() noexcept + { + if (_shutdown_s3_api) { + // Since we created the S3 client and we only provide const reference access, + // we should be the only reference. + if (_client.use_count() != 1) { + std::cerr << "~S3Context(): S3 client has multiple owners, cannot shutdown the AWS API" + << std::endl; + return; + } + _client = nullptr; // Close the client before shutting down the API + try { + Aws::SDKOptions options; + Aws::ShutdownAPI(options); + } catch (std::exception const& e) { + std::cerr << "~S3Context(): " << e.what() << std::endl; + } + } + } + + /** + * @brief Get a reference to the S3 client. + * + * @return S3 client. + */ + Aws::S3::S3Client const& client() const { return *_client; } + + // No copy and move semantic + S3Context(S3Context const&) = delete; + void operator=(S3Context const&) = delete; + S3Context(S3Context const&&) = delete; + void operator=(S3Context const&&) = delete; + + /** + * @brief Get the size of a S3 file + * + * @param bucket_name The bucket name. + * @param object_name The object name. + * @return Size of the file in bytes. + */ + std::size_t get_file_size(std::string const& bucket_name, std::string const& object_name) const + { + KVIKIO_NVTX_FUNC_RANGE(); + Aws::S3::Model::HeadObjectRequest req; + req.SetBucket(bucket_name.c_str()); + req.SetKey(object_name.c_str()); + Aws::S3::Model::HeadObjectOutcome outcome = client().HeadObject(req); + if (!outcome.IsSuccess()) { + Aws::S3::S3Error const& err = outcome.GetError(); + throw std::invalid_argument("get_file_size(): " + err.GetExceptionName() + ": " + + err.GetMessage()); + } + return outcome.GetResult().GetContentLength(); + } +}; + +/** + * @brief Handle of remote file (currently, only AWS S3 is supported). + */ +class RemoteHandle { + private: + std::string _bucket_name{}; + std::string _object_name{}; + std::size_t _nbytes{}; + std::shared_ptr _context; + + public: + /** + * @brief Construct from a bucket and object name pair. + * + * @param context The S3 context used for the connection to the remote server. + * @param bucket_and_object_name Name pair . + */ + RemoteHandle(std::shared_ptr context, + std::pair bucket_and_object_name) + { + if (!context) { throw std::invalid_argument("RemoteHandle(): context cannot be null"); } + _context = std::move(context); + _bucket_name = std::move(bucket_and_object_name.first); + _object_name = std::move(bucket_and_object_name.second); + _nbytes = _context->get_file_size(_bucket_name, _object_name); + } + + /** + * @brief Construct from a bucket and object name. + * + * @param context The S3 context used for the connection to the remote server. + * @param bucket_name Name of the bucket. + * @param object_name Name of the object. + */ + RemoteHandle(std::shared_ptr context, std::string bucket_name, std::string object_name) + : RemoteHandle(std::move(context), + std::make_pair(std::move(bucket_name), std::move(object_name))) + { + } + + /** + * @brief Construct from a remote path such as "s3:///". + * + * @param context The S3 context used for the connection to the remote server. + * @param remote_path Remote file path. + */ + RemoteHandle(std::shared_ptr context, std::string const& remote_path) + : RemoteHandle(std::move(context), detail::parse_s3_path(remote_path)) + { + } + + /** + * @brief Get the file size. + * + * Note, this is very fast, no communication needed. + * + * @return The number of bytes. + */ + [[nodiscard]] std::size_t nbytes() const { return _nbytes; } + + /** + * @brief Read from remote source into host memory. + * + * @param buf Pointer to host memory. + * @param size Number of bytes to read. + * @param file_offset File offset in bytes. + * @return Number of bytes read. + */ + std::size_t read_to_host(void* buf, std::size_t size, std::size_t file_offset = 0) + { + KVIKIO_NVTX_FUNC_RANGE("AWS S3 receive", size); + + Aws::S3::Model::GetObjectRequest req; + req.SetBucket(_bucket_name.c_str()); + req.SetKey(_object_name.c_str()); + std::string const byte_range = + "bytes=" + std::to_string(file_offset) + "-" + std::to_string(file_offset + size - 1); + req.SetRange(byte_range.c_str()); + + // To write directly to `buf`, we register a "factory" that wraps a buffer as an output stream. + // Notice, the AWS SDK will handle the freeing of the returned `detail::BufferAsStream`: + // + Aws::Utils::Stream::PreallocatedStreamBuf buf_stream(static_cast(buf), size); + req.SetResponseStreamFactory( + [&]() { return Aws::New("BufferAsStream", &buf_stream); }); + + Aws::S3::Model::GetObjectOutcome outcome = _context->client().GetObject(req); + if (!outcome.IsSuccess()) { + Aws::S3::S3Error const& err = outcome.GetError(); + throw std::runtime_error(err.GetExceptionName() + ": " + err.GetMessage()); + } + std::size_t const n = outcome.GetResult().GetContentLength(); + if (n != size) { + throw std::runtime_error("S3 read of " + std::to_string(size) + " bytes failed, received " + + std::to_string(n) + " bytes"); + } + return n; + } + + /** + * @brief Read from remote source into buffer (host or device memory). + * + * @param buf Pointer to host or device memory. + * @param size Number of bytes to read. + * @param file_offset File offset in bytes. + * @return Number of bytes read, which is always `size`. + */ + std::size_t read(void* buf, std::size_t size, std::size_t file_offset = 0) + { + KVIKIO_NVTX_FUNC_RANGE("RemoteHandle::read()", size); + if (is_host_memory(buf)) { return read_to_host(buf, size, file_offset); } + + CUcontext ctx = get_context_from_pointer(buf); + PushAndPopContext c(ctx); + + auto alloc = AllocRetain::instance().get(); // Host memory allocation + CUdeviceptr devPtr = convert_void2deviceptr(buf); + CUstream stream = detail::StreamsByThread::get(); + + std::size_t cur_file_offset = convert_size2off(file_offset); + std::size_t byte_remaining = convert_size2off(size); + + while (byte_remaining > 0) { + std::size_t const nbytes_requested = std::min(alloc.size(), byte_remaining); + std::size_t nbytes_got = read_to_host(alloc.get(), nbytes_requested, cur_file_offset); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + cur_file_offset += nbytes_got; + devPtr += nbytes_got; + byte_remaining -= nbytes_got; + } + return size; + } + + /** + * @brief Read from remote source into buffer (host or device memory) in parallel. + * + * This API is a parallel async version of `.read()` that partition the operation + * into tasks of size `task_size` for execution in the default thread pool. + * + * @param buf Pointer to host or device memory. + * @param size Number of bytes to read. + * @param file_offset File offset in bytes. + * @param task_size Size of each task in bytes. + * @return Number of bytes read, which is `size` always. + */ + std::future pread(void* buf, + std::size_t size, + std::size_t file_offset = 0, + std::size_t task_size = defaults::task_size()) + { + KVIKIO_NVTX_FUNC_RANGE("RemoteHandle::pread()", size); + auto task = [this](void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset) -> std::size_t { + return read(static_cast(devPtr_base) + devPtr_offset, size, file_offset); + }; + return parallel_io(task, buf, size, file_offset, task_size, 0); + } +}; + +} // namespace kvikio diff --git a/dependencies.yaml b/dependencies.yaml index 123112ac1a..f7b0e6e99e 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -106,6 +106,7 @@ dependencies: packages: - c-compiler - cxx-compiler + - aws-sdk-cpp>=1.11.267 # Need specific: - output_types: conda matrices: @@ -319,6 +320,15 @@ dependencies: - &dask dask>=2022.05.2 - pytest - pytest-cov + - boto3>=1.21.21 + - output_types: [requirements, pyproject] + packages: + - moto[server]>=4.0.8 + - output_types: conda + packages: + # TODO: Uncomment once https://github.com/rapidsai/cudf/pull/16499 is merged + #- cudf==24.10.*,>=0.0.0a0 + - moto>=4.0.8 specific: - output_types: [conda, requirements, pyproject] matrices: @@ -329,9 +339,19 @@ dependencies: - matrix: # All CUDA 11 versions packages: - cuda-python>=11.7.1,<12.0a0 - test_python_legate: - common: - - output_types: [conda, requirements, pyproject] - packages: - - *dask - - distributed>=2022.05.2 + - output_types: [requirements, pyproject] + matrices: + - matrix: + cuda: "12.*" + packages: + # TODO: Uncomment once https://github.com/rapidsai/cudf/pull/16499 is merged + #- cudf-cu12==24.10.*,>=0.0.0a0 + - matrix: + cuda: "11.*" + packages: + # TODO: Uncomment once https://github.com/rapidsai/cudf/pull/16499 is merged + #- cudf-cu11==24.10.*,>=0.0.0a0 + - matrix: + packages: + # TODO: Uncomment once https://github.com/rapidsai/cudf/pull/16499 is merged + #- cudf==24.10.*,>=0.0.0a0 diff --git a/docs/source/api.rst b/docs/source/api.rst index 4d19c09bbb..a06f9f2d58 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -18,6 +18,16 @@ Zarr .. autoclass:: GDSStore :members: +RemoteFile +---------- +.. currentmodule:: kvikio.remote_file + +.. autoclass:: S3Context + :members: + +.. autoclass:: RemoteFile + :members: + Defaults -------- .. currentmodule:: kvikio.defaults diff --git a/docs/source/conf.py b/docs/source/conf.py index 089a8033f6..603b6736e8 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -82,6 +82,14 @@ pygments_style = None +autodoc_default_options = { + 'members': True, + 'member-order': 'bysource', + 'special-members': '__init__', + 'undoc-members': True, + 'exclude-members': '__weakref__' +} + # -- Options for HTML output ------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for diff --git a/docs/source/index.rst b/docs/source/index.rst index 4dd491fd96..9e302b5f44 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -23,6 +23,7 @@ Contents install quickstart zarr + remote_file runtime_settings api genindex diff --git a/docs/source/remote_file.rst b/docs/source/remote_file.rst new file mode 100644 index 0000000000..b1bc9c585b --- /dev/null +++ b/docs/source/remote_file.rst @@ -0,0 +1,7 @@ +Remote File +=========== + +KvikIO provides direct access to `AWS S3 storage `_. + +.. literalinclude:: ../../python/kvikio/examples/aws_s3.py + :language: python diff --git a/python/kvikio/examples/aws_s3.py b/python/kvikio/examples/aws_s3.py new file mode 100644 index 0000000000..0e4292da76 --- /dev/null +++ b/python/kvikio/examples/aws_s3.py @@ -0,0 +1,42 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import os + +import boto3 +import cupy + +import kvikio +from kvikio.benchmarks.aws_s3_io import get_local_port, local_s3_server + + +def main(): + a = cupy.arange(100) + b = cupy.empty_like(a) + + # In this example, we launch and use a local S3 server with the + # following address: + endpoint_url = f"http://127.0.0.1:{get_local_port()}" + + # In order use a local server instead of an official Amazon S3 server, + # we set the AWS_ENDPOINT_URL environment variable. + os.environ["AWS_ENDPOINT_URL"] = endpoint_url + + # Start a local S3 server + with local_s3_server(lifetime=100): + # Create the bucket "my-bucket" and the object "data" + client = boto3.client("s3", endpoint_url=endpoint_url) + client.create_bucket(Bucket="my-bucket", ACL="public-read-write") + client.put_object(Bucket="my-bucket", Key="data", Body=bytes(a)) + + # Create a S3 context that connects to AWS_ENDPOINT_URL + context = kvikio.S3Context() + + # Using the context, we can open "data" as if it was a regular CuFile + with kvikio.RemoteFile(context, "my-bucket", "data") as f: + f.read(b) + assert all(a == b) + + +if __name__ == "__main__": + main() diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index 883ac9e784..33f6de648a 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -4,9 +4,17 @@ from kvikio._lib import driver_properties # type: ignore from kvikio._version import __git_commit__, __version__ from kvikio.cufile import CuFile +from kvikio.remote_file import RemoteFile, S3Context, is_remote_file_available # TODO: Wrap nicely, maybe as a dataclass? DriverProperties = driver_properties.DriverProperties -__all__ = ["__git_commit__", "__version__", "CuFile"] +__all__ = [ + "__git_commit__", + "__version__", + "CuFile", + "RemoteFile", + "S3Context", + "is_remote_file_available", +] diff --git a/python/kvikio/kvikio/_lib/CMakeLists.txt b/python/kvikio/kvikio/_lib/CMakeLists.txt index c77d8e3df1..8fdc07d401 100644 --- a/python/kvikio/kvikio/_lib/CMakeLists.txt +++ b/python/kvikio/kvikio/_lib/CMakeLists.txt @@ -17,8 +17,16 @@ set(cython_modules arr.pyx buffer.pyx defaults.pyx driver_properties.pyx file_ha libnvcomp.pyx libnvcomp_ll.pyx ) +if(TARGET aws-cpp-sdk-core AND TARGET aws-cpp-sdk-s3) + message(STATUS "Building remote_handle.pyx (aws-cpp-sdk-s3 found)") + list(APPEND cython_modules remote_handle.pyx) + set(aws_cpp_sdk_core_dep aws-cpp-sdk-core) +else() + message(WARNING "Skipping remote_handle.pyx (aws-cpp-sdk-s3 not found or disabled)") +endif() + rapids_cython_create_modules( CXX SOURCE_FILES "${cython_modules}" - LINKED_LIBRARIES kvikio::kvikio nvcomp::nvcomp + LINKED_LIBRARIES kvikio::kvikio nvcomp::nvcomp ${aws_cpp_sdk_core_dep} ) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pyx b/python/kvikio/kvikio/_lib/remote_handle.pyx new file mode 100644 index 0000000000..8d9c0a219d --- /dev/null +++ b/python/kvikio/kvikio/_lib/remote_handle.pyx @@ -0,0 +1,99 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from typing import Optional + +from cython.operator cimport dereference as deref +from libc.stdint cimport uintptr_t +from libcpp.memory cimport make_shared, make_unique, shared_ptr, unique_ptr +from libcpp.string cimport string +from libcpp.utility cimport pair + +from kvikio._lib.arr cimport parse_buffer_argument +from kvikio._lib.future cimport IOFuture, _wrap_io_future, future + + +cdef extern from "" nogil: + + cdef cppclass cpp_S3Context "kvikio::S3Context": + cpp_S3Context() except + + cpp_S3Context(string endpoint_override) except + + + cdef cppclass cpp_RemoteHandle "kvikio::RemoteHandle": + cpp_RemoteHandle( + shared_ptr[cpp_S3Context] context, + string bucket_name, + string object_name, + ) except + + cpp_RemoteHandle( + shared_ptr[cpp_S3Context] context, + string remote_path, + ) except + + int nbytes() + size_t read( + void* buf, + size_t size, + size_t file_offset + ) except + + future[size_t] pread( + void* buf, + size_t size, + size_t file_offset + ) except + + +cdef class S3Context: + cdef shared_ptr[cpp_S3Context] _handle + + def __init__(self, endpoint_override: Optional[str]): + if endpoint_override is None: + self._handle = make_shared[cpp_S3Context]() + return + cdef string s = str.encode(str(endpoint_override)) + self._handle = make_shared[cpp_S3Context](s) + +cdef class RemoteFile: + cdef unique_ptr[cpp_RemoteHandle] _handle + + @classmethod + def from_bucket_and_object( + cls, + S3Context context, + bucket_name: str, + object_name: str + ): + cdef RemoteFile ret = RemoteFile() + cdef string b = str.encode(str(bucket_name)) + cdef string o = str.encode(str(object_name)) + ret._handle = make_unique[cpp_RemoteHandle](context._handle, b, o) + return ret + + @classmethod + def from_url(cls, S3Context context, url: str): + cdef RemoteFile ret = RemoteFile() + cdef string u = str.encode(str(url)) + ret._handle = make_unique[cpp_RemoteHandle](context._handle, u) + return ret + + def nbytes(self) -> int: + return deref(self._handle).nbytes() + + def read(self, buf, size: Optional[int], file_offset: int) -> int: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + return deref(self._handle).read( + info.first, + info.second, + file_offset, + ) + + def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + return _wrap_io_future( + deref(self._handle).pread( + info.first, + info.second, + file_offset, + ) + ) diff --git a/python/kvikio/kvikio/benchmarks/aws_s3_io.py b/python/kvikio/kvikio/benchmarks/aws_s3_io.py new file mode 100644 index 0000000000..4e88cd13d5 --- /dev/null +++ b/python/kvikio/kvikio/benchmarks/aws_s3_io.py @@ -0,0 +1,246 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import argparse +import contextlib +import multiprocessing +import os +import socket +import statistics +import sys +import time +from functools import partial +from typing import ContextManager +from urllib.parse import urlparse + +import boto3 +import cupy +import numpy +from dask.utils import format_bytes + +import kvikio +import kvikio.defaults + + +def get_local_port() -> int: + """Return an available port""" + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + +def start_s3_server(lifetime: int): + """Start a server and run it for `lifetime` minutes. + NB: to stop before `lifetime`, kill the process/thread running this function. + """ + from moto.server import ThreadedMotoServer + + # Silence the activity info from ThreadedMotoServer + sys.stderr = open(os.devnull, "w") + url = urlparse(os.environ["AWS_ENDPOINT_URL"]) + server = ThreadedMotoServer(ip_address=url.hostname, port=url.port) + server.start() + time.sleep(lifetime) + + +@contextlib.contextmanager +def local_s3_server(lifetime: int): + """Start a server and run it for `lifetime` minutes or kill it on context exit""" + # Use fake aws credentials + os.environ["AWS_ACCESS_KEY_ID"] = "foobar_key" + os.environ["AWS_SECRET_ACCESS_KEY"] = "foobar_secret" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + p = multiprocessing.Process(target=start_s3_server, args=(lifetime,)) + p.start() + yield + p.kill() + + +def create_client_and_bucket(): + client = boto3.client("s3", endpoint_url=os.getenv("AWS_ENDPOINT_URL", None)) + try: + client.create_bucket(Bucket=args.bucket, ACL="public-read-write") + except ( + client.exceptions.BucketAlreadyOwnedByYou, + client.exceptions.BucketAlreadyExists, + ): + pass + except Exception: + print( + "Problem accessing the S3 server? using wrong credentials? Try setting " + "AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and/or AWS_ENDPOINT_URL. " + "Alternatively, use the bundled server `--use-bundled-server`\n", + file=sys.stderr, + flush=True, + ) + raise + return client + + +def run_numpy_like(args, xp): + # Upload data to S3 server + data = numpy.arange(args.nelem, dtype=args.dtype) + recv = xp.empty_like(data) + + client = create_client_and_bucket() + client.put_object(Bucket=args.bucket, Key="data1", Body=bytes(data)) + context = kvikio.S3Context() + + def run() -> float: + t0 = time.perf_counter() + with kvikio.RemoteFile( + context=context, bucket_name=args.bucket, object_name="data1" + ) as f: + res = f.read(recv) + t1 = time.perf_counter() + assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}" + xp.testing.assert_array_equal(data, recv) + return t1 - t0 + + for _ in range(args.nruns): + yield run() + + +def run_cudf(args, libcudf_s3_io: bool): + import cudf + + cudf.set_option("libcudf_s3_io", libcudf_s3_io) + + # Upload data to S3 server + create_client_and_bucket() + data = cupy.random.rand(args.nelem).astype(args.dtype) + df = cudf.DataFrame({"a": data}) + df.to_parquet(f"s3://{args.bucket}/data1") + + def run() -> float: + t0 = time.perf_counter() + cudf.read_parquet(f"s3://{args.bucket}/data1") + t1 = time.perf_counter() + return t1 - t0 + + for _ in range(args.nruns): + yield run() + + +API = { + "cupy-kvikio": partial(run_numpy_like, xp=cupy), + "numpy-kvikio": partial(run_numpy_like, xp=numpy), + "cudf-kvikio": partial(run_cudf, libcudf_s3_io=True), + "cudf-fsspec": partial(run_cudf, libcudf_s3_io=False), +} + + +def main(args): + cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool + cupy.arange(10) # Make sure CUDA is initialized + + kvikio.defaults.num_threads_reset(args.nthreads) + print("Roundtrip benchmark") + print("--------------------------------------") + print(f"nelem | {args.nelem} ({format_bytes(args.nbytes)})") + print(f"dtype | {args.dtype}") + print(f"nthreads | {args.nthreads}") + print(f"nruns | {args.nruns}") + print(f"server | {os.getenv('AWS_ENDPOINT_URL', 'http://*.amazonaws.com')}") + if args.use_bundled_server: + print("--------------------------------------") + print("Using the bundled local server is slow") + print("and can be misleading. Consider using") + print("a local MinIO or official S3 server.") + print("======================================") + + # Run each benchmark using the requested APIs + for api in args.api: + res = [] + for elapsed in API[api](args): + res.append(elapsed) + + def pprint_api_res(name, samples): + samples = [args.nbytes / s for s in samples] # Convert to throughput + mean = statistics.harmonic_mean(samples) if len(samples) > 1 else samples[0] + ret = f"{api}-{name}".ljust(18) + ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) + if len(samples) > 1: + stdev = statistics.stdev(samples) / mean * 100 + ret += " ± %5.2f %%" % stdev + ret += " (" + for sample in samples: + ret += f"{format_bytes(sample)}/s, " + ret = ret[:-2] + ")" # Replace trailing comma + return ret + + print(pprint_api_res("read", res)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Roundtrip benchmark") + parser.add_argument( + "-n", + "--nelem", + metavar="NELEM", + default="1024", + type=int, + help="Number of elements (default: %(default)s).", + ) + parser.add_argument( + "--dtype", + metavar="DATATYPE", + default="float32", + type=numpy.dtype, + help="The data type of each element (default: %(default)s).", + ) + parser.add_argument( + "--nruns", + metavar="RUNS", + default=1, + type=int, + help="Number of runs per API (default: %(default)s).", + ) + parser.add_argument( + "-t", + "--nthreads", + metavar="THREADS", + default=1, + type=int, + help="Number of threads to use (default: %(default)s).", + ) + parser.add_argument( + "--use-bundled-server", + action="store_true", + help="Launch and use a local slow S3 server (ThreadedMotoServer).", + ) + parser.add_argument( + "--bundled-server-lifetime", + metavar="SECONDS", + default=3600, + type=int, + help="Maximum lifetime of the bundled server (default: %(default)s).", + ) + parser.add_argument( + "--bucket", + metavar="NAME", + default="kvikio-s3-benchmark", + type=str, + help="Name of the AWS S3 bucket to use (default: %(default)s).", + ) + parser.add_argument( + "--api", + metavar="API", + default=list(API.keys())[0], # defaults to the first API + nargs="+", + choices=tuple(API.keys()) + ("all",), + help="List of APIs to use {%(choices)s} (default: %(default)s).", + ) + args = parser.parse_args() + args.nbytes = args.nelem * args.dtype.itemsize + if "all" in args.api: + args.api = tuple(API.keys()) + + ctx: ContextManager = contextlib.nullcontext() + if args.use_bundled_server: + os.environ["AWS_ENDPOINT_URL"] = f"http://127.0.0.1:{get_local_port()}" + ctx = local_s3_server(args.bundled_server_lifetime) + with ctx: + main(args) diff --git a/python/kvikio/kvikio/benchmarks/single_node_io.py b/python/kvikio/kvikio/benchmarks/single_node_io.py index bca29ef90d..a9f8d21c68 100644 --- a/python/kvikio/kvikio/benchmarks/single_node_io.py +++ b/python/kvikio/kvikio/benchmarks/single_node_io.py @@ -285,7 +285,7 @@ def main(args): ws.append(args.nbytes / write) def pprint_api_res(name, samples): - mean = statistics.mean(samples) if len(samples) > 1 else samples[0] + mean = statistics.harmonic_mean(samples) if len(samples) > 1 else samples[0] ret = f"{api} {name}".ljust(18) ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) if len(samples) > 1: diff --git a/python/kvikio/kvikio/benchmarks/zarr_io.py b/python/kvikio/kvikio/benchmarks/zarr_io.py index fc226c2263..3ebeeea707 100644 --- a/python/kvikio/kvikio/benchmarks/zarr_io.py +++ b/python/kvikio/kvikio/benchmarks/zarr_io.py @@ -150,7 +150,7 @@ def main(args): ws.append(args.nbytes / write) def pprint_api_res(name, samples): - mean = statistics.mean(samples) if len(samples) > 1 else samples[0] + mean = statistics.harmonic_mean(samples) if len(samples) > 1 else samples[0] ret = f"{api} {name}".ljust(18) ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) if len(samples) > 1: diff --git a/python/kvikio/kvikio/cufile.py b/python/kvikio/kvikio/cufile.py index ead7bc5f7a..dfe4c0fe05 100644 --- a/python/kvikio/kvikio/cufile.py +++ b/python/kvikio/kvikio/cufile.py @@ -140,9 +140,8 @@ def pread( Returns ------- - IOFuture - Future that on completion returns the size of bytes that were successfully - read. + Future that on completion returns the size of bytes that were successfully + read. Notes ----- @@ -187,9 +186,8 @@ def pwrite( Returns ------- - IOFuture - Future that on completion returns the size of bytes that were successfully - written. + Future that on completion returns the size of bytes that were successfully + written. Notes ----- @@ -307,12 +305,11 @@ def raw_read_async( Returns ------- - IOFutureStream - Future that when executed ".check_bytes_done()" returns the size of bytes - that were successfully read. The instance must be kept alive until - all data has been read from disk. One way to do this, is by calling - `IOFutureStream.check_bytes_done()`, which will synchronize the associated - stream and return the number of bytes read. + Future that when executed ".check_bytes_done()" returns the size of bytes + that were successfully read. The instance must be kept alive until + all data has been read from disk. One way to do this, is by calling + `IOFutureStream.check_bytes_done()`, which will synchronize the associated + stream and return the number of bytes read. """ return self._handle.read_async(buf, size, file_offset, dev_offset, stream) @@ -342,12 +339,11 @@ def raw_write_async( Returns ------- - IOFutureStream - Future that when executed ".check_bytes_done()" returns the size of bytes - that were successfully written. The instance must be kept alive until - all data has been written to disk. One way to do this, is by calling - `IOFutureStream.check_bytes_done()`, which will synchronize the associated - stream and return the number of bytes written. + Future that when executed ".check_bytes_done()" returns the size of bytes + that were successfully written. The instance must be kept alive until + all data has been written to disk. One way to do this, is by calling + `IOFutureStream.check_bytes_done()`, which will synchronize the associated + stream and return the number of bytes written. """ return self._handle.write_async(buf, size, file_offset, dev_offset, stream) diff --git a/python/kvikio/kvikio/remote_file.py b/python/kvikio/kvikio/remote_file.py new file mode 100644 index 0000000000..dde7bf9f2b --- /dev/null +++ b/python/kvikio/kvikio/remote_file.py @@ -0,0 +1,151 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +from __future__ import annotations + +import functools +from typing import Optional + +from kvikio.cufile import IOFuture + + +@functools.cache +def is_remote_file_available() -> bool: + """Check if the remote module is available""" + try: + import kvikio._lib.remote_handle # noqa: F401 + except ImportError: + return False + else: + return True + + +@functools.cache +def _get_remote_module(): + """Get the remote module or raise an error""" + if not is_remote_file_available(): + raise RuntimeError( + "RemoteFile not available, please build KvikIO with AWS S3 support" + ) + import kvikio._lib.remote_handle + + return kvikio._lib.remote_handle + + +class S3Context: + def __init__(self, endpoint_override: Optional[str] = None): + """S3 context, which initializes and maintains the S3 SDK and client. + + The S3Context calls `Aws::InitAPI()` and `Aws::ShutdownAPI`, which inherit + some limitations from the SDK: please construct and destruct `S3Context` + from the same thread (use a dedicated thread if necessary). This avoids + problems in initializing the dependent Common RunTime C libraries. + + Please make sure that AWS credentials have been configured on the system. + A common way to do this, is to set the environment variables: + `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. + + Other relevant options are `AWS_DEFAULT_REGION` and `AWS_ENDPOINT_URL`, see + . + + Parameters + ---------- + endpoint_override + If not empty, the address of the S3 server. This takes precedences over + the AWS system configuration including the `AWS_ENDPOINT_URL` environment + variable. + """ + self._handle = _get_remote_module().S3Context(endpoint_override) + + +class RemoteFile: + """File handle of a remote file (currently, only AWS S3 is supported).""" + + def __init__(self, context: S3Context, bucket_name: str, object_name: str): + """Open a remote file given a bucket and object name. + + Parameters + ---------- + context + The S3 context used for the connection to the remote server. + bucket_name + Name of the bucket. + object_name + Name of the object. + """ + self._handle = _get_remote_module().RemoteFile.from_bucket_and_object( + context._handle, bucket_name, object_name + ) + + @classmethod + def from_url(cls, context: S3Context, url: str) -> RemoteFile: + """Open a remote file given an url such as "s3:///". + + Parameters + ---------- + context + The S3 context used for the connection to the remote server. + url + URL to the remote file. + + Returns + ------- + A newly opened remote file + """ + ret = object.__new__(cls) + ret._handle = _get_remote_module().RemoteFile.from_url(context._handle, url) + return ret + + def __enter__(self) -> RemoteFile: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + pass + + def nbytes(self) -> int: + """Get the file size. + + Note, this is very fast, no communication needed. + + Returns + ------- + The number of bytes. + """ + return self._handle.nbytes() + + def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: + """Read from remote source into buffer (host or device memory) in parallel. + + Parameters + ---------- + buf : buffer-like or array-like + Device or host buffer to read into. + size + Size in bytes to read. + file_offset + Offset in the file to read from. + + Returns + ------- + The size of bytes that were successfully read. + """ + return self.pread(buf, size, file_offset).get() + + def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture: + """Read from remote source into buffer (host or device memory) in parallel. + + Parameters + ---------- + buf : buffer-like or array-like + Device or host buffer to read into. + size + Size in bytes to read. + file_offset + Offset in the file to read from. + + Returns + ------- + Future that on completion returns the size of bytes that were successfully + read. + """ + return IOFuture(self._handle.pread(buf, size, file_offset)) diff --git a/python/kvikio/pyproject.toml b/python/kvikio/pyproject.toml index 046e157a11..8a6b7d4b36 100644 --- a/python/kvikio/pyproject.toml +++ b/python/kvikio/pyproject.toml @@ -38,8 +38,10 @@ classifiers = [ [project.optional-dependencies] test = [ + "boto3>=1.21.21", "cuda-python>=11.7.1,<12.0a0", "dask>=2022.05.2", + "moto[server]>=4.0.8", "pytest", "pytest-cov", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. @@ -138,4 +140,5 @@ regex = "(?P.*)" filterwarnings = [ "error", "ignore:Jitify is performing a one-time only warm-up to populate the persistent cache", + "ignore::DeprecationWarning:botocore.*", ] diff --git a/python/kvikio/tests/test_aws_s3.py b/python/kvikio/tests/test_aws_s3.py new file mode 100644 index 0000000000..7e2490cb74 --- /dev/null +++ b/python/kvikio/tests/test_aws_s3.py @@ -0,0 +1,147 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import multiprocessing as mp +import socket +import time +from contextlib import contextmanager + +import pytest + +import kvikio +import kvikio.defaults + +pytestmark = pytest.mark.skipif( + not kvikio.is_remote_file_available(), + reason="cannot test remote IO, please build KvikIO with with AWS S3 support", +) + +# Notice, we import boto and moto after the `is_remote_file_available` check. +import boto3 # noqa: E402 +import moto # noqa: E402 +import moto.server # noqa: E402 + + +@pytest.fixture(scope="session") +def endpoint_ip(): + return "127.0.0.1" + + +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + +def start_s3_server(ip_address, port): + server = moto.server.ThreadedMotoServer(ip_address=ip_address, port=port) + server.start() + time.sleep(600) + print("ThreadedMotoServer shutting down because of timeout (10min)") + + +@pytest.fixture(scope="session") +def s3_base(endpoint_ip, endpoint_port): + """Fixture to set up moto server in separate process""" + with pytest.MonkeyPatch.context() as monkeypatch: + # Use fake aws credentials + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "foobar_key") + monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "foobar_secret") + monkeypatch.setenv("AWS_SECURITY_TOKEN", "foobar_security_token") + monkeypatch.setenv("AWS_SESSION_TOKEN", "foobar_session_token") + monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1") + + p = mp.Process(target=start_s3_server, args=(endpoint_ip, endpoint_port)) + p.start() + yield f"http://{endpoint_ip}:{endpoint_port}" + p.kill() + + +@contextmanager +def s3_context(s3_base, bucket, files=None): + if files is None: + files = {} + client = boto3.client("s3", endpoint_url=s3_base) + client.create_bucket(Bucket=bucket, ACL="public-read-write") + for f, data in files.items(): + client.put_object(Bucket=bucket, Key=f, Body=data) + yield kvikio.S3Context(s3_base) + for f, data in files.items(): + try: + client.delete_object(Bucket=bucket, Key=f) + except Exception: + pass + + +@pytest.mark.parametrize("size", [10, 100, 1000]) +@pytest.mark.parametrize("nthreads", [1, 3]) +@pytest.mark.parametrize("tasksize", [99, 999]) +@pytest.mark.parametrize("buffer_size", [101, 1001]) +def test_read(s3_base, xp, size, nthreads, tasksize, buffer_size): + bucket_name = "test_read" + object_name = "a1" + a = xp.arange(size) + with s3_context( + s3_base=s3_base, bucket=bucket_name, files={object_name: bytes(a)} + ) as ctx: + with kvikio.defaults.set_num_threads(nthreads): + with kvikio.defaults.set_task_size(tasksize): + with kvikio.defaults.set_bounce_buffer_size(buffer_size): + with kvikio.RemoteFile(ctx, bucket_name, object_name) as f: + assert f.nbytes() == a.nbytes + b = xp.empty_like(a) + assert f.read(buf=b) == a.nbytes + xp.testing.assert_array_equal(a, b) + + +@pytest.mark.parametrize( + "start,end", + [ + (0, 10 * 4096), + (1, int(1.3 * 4096)), + (int(2.1 * 4096), int(5.6 * 4096)), + (42, int(2**20)), + ], +) +def test_read_with_file_offset(s3_base, xp, start, end): + bucket_name = "test_read_with_file_offset" + object_name = "a1" + a = xp.arange(end, dtype=xp.int64) + with s3_context( + s3_base=s3_base, bucket=bucket_name, files={object_name: bytes(a)} + ) as ctx: + with kvikio.RemoteFile(ctx, bucket_name, object_name) as f: + b = xp.zeros(shape=(end - start,), dtype=xp.int64) + assert f.read(b, file_offset=start * a.itemsize) == b.nbytes + xp.testing.assert_array_equal(a[start:end], b) + + with kvikio.RemoteFile.from_url(ctx, f"s3://{bucket_name}/{object_name}") as f: + b = xp.zeros(shape=(end - start,), dtype=xp.int64) + assert f.read(b, file_offset=start * a.itemsize) == b.nbytes + xp.testing.assert_array_equal(a[start:end], b) + + +def test_remote_path_error(s3_base): + bucket_name = "test_remote_path_error" + with s3_context(s3_base=s3_base, bucket=bucket_name) as ctx: + with pytest.raises(ValueError, match="No response body"): + kvikio.RemoteFile.from_url(ctx, "s3://unknown-bucket/unknown-object") + + with pytest.raises(ValueError, match="No response body"): + kvikio.RemoteFile.from_url(ctx, f"s3://{bucket_name}/unknown-object") + + with pytest.raises(ValueError, match="path must start with the S3 scheme"): + kvikio.RemoteFile.from_url(ctx, f"s3:/{bucket_name}/") + + with pytest.raises(ValueError, match="path does not contain a bucket name"): + kvikio.RemoteFile.from_url(ctx, "s3:///unknown-object") + + with pytest.raises(ValueError, match="path does not contain an object name"): + kvikio.RemoteFile.from_url(ctx, f"s3://{bucket_name}/") + + with pytest.raises(ValueError, match="path does not contain an object name"): + kvikio.RemoteFile.from_url(ctx, f"s3://{bucket_name}") diff --git a/python/kvikio/tests/test_benchmarks.py b/python/kvikio/tests/test_benchmarks.py index 3bdaf6613e..5c597ce253 100644 --- a/python/kvikio/tests/test_benchmarks.py +++ b/python/kvikio/tests/test_benchmarks.py @@ -8,6 +8,8 @@ import pytest +import kvikio + benchmarks_path = ( Path(os.path.realpath(__file__)).parent.parent / "kvikio" / "benchmarks" ) @@ -78,3 +80,60 @@ def test_zarr_io(run_cmd, tmp_path, api): cwd=benchmarks_path, ) assert retcode == 0 + + +def skipif_libcudf_s3_io_option_is_not_available() -> None: + """Call pytest.skip() if cudf or its "libcudf_s3_io" option isn't available + + See + """ + cudf = pytest.importorskip("cudf") + try: + cudf.get_option("libcudf_s3_io") + except KeyError: + pytest.skip( + "cudf doesn't have the 'libcudf_s3_io' option, " + "see " + ) + + +@pytest.mark.parametrize( + "api", + [ + "cupy-kvikio", + "numpy-kvikio", + "cudf-kvikio", + "cudf-fsspec", + ], +) +def test_aws_s3_io(run_cmd, api): + """Test benchmarks/aws_s3_io.py""" + + if not kvikio.is_remote_file_available(): + pytest.skip( + "cannot test remote IO, please build KvikIO with with AWS S3 support" + ) + # Fail early if benchmark dependencies aren't available + import boto3 # noqa: F401 + import moto # noqa: F401 + + if "cudf" in api: + skipif_libcudf_s3_io_option_is_not_available() + + retcode = run_cmd( + cmd=[ + sys.executable or "python", + "aws_s3_io.py", + "--use-bundled-server", + "--bundled-server-lifetime", + "30", + "-n", + "1000", + "-t", + "4", + "--api", + api, + ], + cwd=benchmarks_path, + ) + assert retcode == 0 diff --git a/python/kvikio/tests/test_examples.py b/python/kvikio/tests/test_examples.py index e9e1f83d08..4ec44fbc2c 100644 --- a/python/kvikio/tests/test_examples.py +++ b/python/kvikio/tests/test_examples.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import os @@ -7,6 +7,8 @@ import pytest +import kvikio + examples_path = Path(os.path.realpath(__file__)).parent / ".." / "examples" @@ -26,3 +28,18 @@ def test_zarr_cupy_nvcomp(tmp_path, monkeypatch): monkeypatch.syspath_prepend(str(examples_path)) import_module("zarr_cupy_nvcomp").main(tmp_path / "test-file") + + +@pytest.mark.skipif( + not kvikio.is_remote_file_available(), + reason="KvikIO not built with AWS S3 support", +) +def test_aws_s3(monkeypatch): + """Test examples/aws_s3.py""" + + # Fail early if dependencies isn't available + import boto3 # noqa: F401 + import moto # noqa: F401 + + monkeypatch.syspath_prepend(str(examples_path)) + import_module("aws_s3").main()