Skip to content

Commit

Permalink
all of the remote-io-s3 branch
Browse files Browse the repository at this point in the history
this is the only commit to cherry-pick
  • Loading branch information
madsbk committed Oct 4, 2024
1 parent 4ed88a2 commit f82173e
Show file tree
Hide file tree
Showing 11 changed files with 638 additions and 0 deletions.
2 changes: 2 additions & 0 deletions conda/environments/all_cuda-118_arch-aarch64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ channels:
- conda-forge
- nvidia
dependencies:
- boto3>=1.21.21
- c-compiler
- cmake>=3.26.4,!=3.30.0
- cuda-python>=11.7.1,<12.0a0
Expand All @@ -18,6 +19,7 @@ dependencies:
- doxygen=1.9.1
- gcc_linux-aarch64=11.*
- libcurl>=7.87.0
- moto>=4.0.8
- ninja
- numcodecs !=0.12.0
- numpy>=1.23,<3.0a0
Expand Down
2 changes: 2 additions & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ channels:
- conda-forge
- nvidia
dependencies:
- boto3>=1.21.21
- c-compiler
- cmake>=3.26.4,!=3.30.0
- cuda-python>=11.7.1,<12.0a0
Expand All @@ -20,6 +21,7 @@ dependencies:
- libcufile-dev=1.4.0.31
- libcufile=1.4.0.31
- libcurl>=7.87.0
- moto>=4.0.8
- ninja
- numcodecs !=0.12.0
- numpy>=1.23,<3.0a0
Expand Down
2 changes: 2 additions & 0 deletions conda/environments/all_cuda-125_arch-aarch64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ channels:
- conda-forge
- nvidia
dependencies:
- boto3>=1.21.21
- c-compiler
- cmake>=3.26.4,!=3.30.0
- cuda-nvcc
Expand All @@ -19,6 +20,7 @@ dependencies:
- gcc_linux-aarch64=11.*
- libcufile-dev
- libcurl>=7.87.0
- moto>=4.0.8
- ninja
- numcodecs !=0.12.0
- numpy>=1.23,<3.0a0
Expand Down
2 changes: 2 additions & 0 deletions conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ channels:
- conda-forge
- nvidia
dependencies:
- boto3>=1.21.21
- c-compiler
- cmake>=3.26.4,!=3.30.0
- cuda-nvcc
Expand All @@ -19,6 +20,7 @@ dependencies:
- gcc_linux-64=11.*
- libcufile-dev
- libcurl>=7.87.0
- moto>=4.0.8
- ninja
- numcodecs !=0.12.0
- numpy>=1.23,<3.0a0
Expand Down
136 changes: 136 additions & 0 deletions cpp/include/kvikio/remote_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstddef>
#include <cstring>
#include <memory>
#include <optional>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -151,6 +152,141 @@ class HttpEndpoint : public RemoteEndpoint {
~HttpEndpoint() override = default;
};

/**
* @brief
*/
class S3Endpoint : public RemoteEndpoint {
private:
std::string _url;
std::string _aws_sigv4;
std::string _aws_userpwd;

static std::string parse_aws_argument(std::optional<std::string> aws_arg,
const std::string& env_var,
const std::string& err_msg,
bool allow_empty = false)
{
if (aws_arg.has_value()) { return std::move(*aws_arg); }

char const* env = std::getenv(env_var.c_str());
if (env == nullptr) {
if (allow_empty) { return std::string(); }
throw std::invalid_argument(err_msg);
}
return std::string(env);
}

static std::string url_from_bucket_and_object(const std::string& bucket_name,
const std::string& object_name,
const std::optional<std::string>& aws_region,
std::optional<std::string> aws_endpoint_url)
{
std::string endpoint_url =
parse_aws_argument(std::move(aws_endpoint_url),
"AWS_ENDPOINT_URL",
"S3: must provide `aws_endpoint_url` if AWS_ENDPOINT_URL isn't set.",
true);
std::stringstream ss;
if (endpoint_url.empty()) {
std::string region =
parse_aws_argument(std::move(aws_region),
"AWS_DEFAULT_REGION",
"S3: must provide `aws_region` if AWS_DEFAULT_REGION isn't set.");
// We default to the official AWS url scheme.
ss << "https://" << bucket_name << ".s3." << region << ".amazonaws.com/" << object_name;
} else {
ss << endpoint_url << "/" << bucket_name << "/" << object_name;
}
return ss.str();
}

public:
/**
* @brief Given an url like "s3://<bucket>/<object>", return the name of the bucket and object.
*
* @throws std::invalid_argument if url is ill-formed or is missing the bucket or object name.
*
* @param s3_url S3 url.
* @return Pair of strings: [bucket-name, object-name].
*/
static std::pair<std::string, std::string> parse_s3_url(std::string const& s3_url)
{
if (s3_url.empty()) { throw std::invalid_argument("The S3 url cannot be an empty string."); }
if (s3_url.size() < 5 || s3_url.substr(0, 5) != "s3://") {
throw std::invalid_argument("The S3 url must start with the S3 scheme (\"s3://\").");
}
std::string p = s3_url.substr(5);
if (p.empty()) { throw std::invalid_argument("The S3 url cannot be an empty string."); }
size_t pos = p.find_first_of('/');
std::string bucket_name = p.substr(0, pos);
if (bucket_name.empty()) {
throw std::invalid_argument("The S3 url does not contain a bucket name.");
}
std::string object_name = (pos == std::string::npos) ? "" : p.substr(pos + 1);
if (object_name.empty()) {
throw std::invalid_argument("The S3 url does not contain an object name.");
}
return std::make_pair(std::move(bucket_name), std::move(object_name));
}

S3Endpoint(std::string url,
std::optional<std::string> aws_region = std::nullopt,
std::optional<std::string> aws_access_key = std::nullopt,
std::optional<std::string> aws_secret_access_key = std::nullopt)
: _url{std::move(url)}
{
std::string region =
parse_aws_argument(std::move(aws_region),
"AWS_DEFAULT_REGION",
"S3: must provide `aws_region` if AWS_DEFAULT_REGION isn't set.");

std::string access_key =
parse_aws_argument(std::move(aws_access_key),
"AWS_ACCESS_KEY_ID",
"S3: must provide `aws_access_key` if AWS_ACCESS_KEY_ID isn't set.");

std::string secret_access_key = parse_aws_argument(
std::move(aws_secret_access_key),
"AWS_SECRET_ACCESS_KEY",
"S3: must provide `aws_secret_access_key` if AWS_SECRET_ACCESS_KEY isn't set.");

// Create the CURLOPT_AWS_SIGV4 option
{
std::stringstream ss;
ss << "aws:amz:" << region << ":s3";
_aws_sigv4 = ss.str();
}
// Create the CURLOPT_USERPWD option
{
std::stringstream ss;
ss << access_key << ":" << secret_access_key;
_aws_userpwd = ss.str();
}
}
S3Endpoint(const std::string& bucket_name,
const std::string& object_name,
std::optional<std::string> aws_region = std::nullopt,
std::optional<std::string> aws_access_key = std::nullopt,
std::optional<std::string> aws_secret_access_key = std::nullopt,
std::optional<std::string> aws_endpoint_url = std::nullopt)
: S3Endpoint(url_from_bucket_and_object(
bucket_name, object_name, aws_region, std::move(aws_endpoint_url)),
std::move(aws_region),
std::move(aws_access_key),
std::move(aws_secret_access_key))
{
}

void setopt(CurlHandle& curl) override
{
curl.setopt(CURLOPT_URL, _url.c_str());
curl.setopt(CURLOPT_AWS_SIGV4, _aws_sigv4.c_str());
curl.setopt(CURLOPT_USERPWD, _aws_userpwd.c_str());
}
std::string str() override { return _url; }
~S3Endpoint() override = default;
};

/**
* @brief Handle of remote file.
*/
Expand Down
7 changes: 7 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ dependencies:
- pytest
- pytest-cov
- rangehttpserver
- boto3>=1.21.21
- output_types: [requirements, pyproject]
packages:
- moto[server]>=4.0.8
- output_types: conda
packages:
- moto>=4.0.8
specific:
- output_types: [conda, requirements, pyproject]
matrices:
Expand Down
62 changes: 62 additions & 0 deletions python/kvikio/kvikio/_lib/remote_handle.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ cdef extern from "<kvikio/remote_handle.hpp>" nogil:
cdef cppclass cpp_HttpEndpoint "kvikio::HttpEndpoint":
cpp_HttpEndpoint(string url) except +

cdef cppclass cpp_S3Endpoint "kvikio::S3Endpoint":
cpp_S3Endpoint(string url) except +

cdef cppclass cpp_S3Endpoint "kvikio::S3Endpoint":
cpp_S3Endpoint(string bucket_name, string object_name) except +

pair[string, string] cpp_parse_s3_url \
"kvikio::S3Endpoint::parse_s3_url"(string url) except +

cdef cppclass cpp_RemoteHandle "kvikio::RemoteHandle":
cpp_RemoteHandle(
unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes
Expand Down Expand Up @@ -67,6 +76,59 @@ cdef class RemoteFile:
ret._handle = make_unique[cpp_RemoteHandle](move(ep), n)
return ret

@classmethod
def open_s3_from_http_url(
cls,
url: str,
nbytes: Optional[int],
):
cdef RemoteFile ret = RemoteFile()
cdef unique_ptr[cpp_S3Endpoint] ep = make_unique[cpp_S3Endpoint](
_to_string(url)
)
if nbytes is None:
ret._handle = make_unique[cpp_RemoteHandle](move(ep))
return ret
cdef size_t n = nbytes
ret._handle = make_unique[cpp_RemoteHandle](move(ep), n)
return ret

@classmethod
def open_s3(
cls,
bucket_name: str,
object_name: str,
nbytes: Optional[int],
):
cdef RemoteFile ret = RemoteFile()
cdef unique_ptr[cpp_S3Endpoint] ep = make_unique[cpp_S3Endpoint](
_to_string(bucket_name), _to_string(object_name)
)
if nbytes is None:
ret._handle = make_unique[cpp_RemoteHandle](move(ep))
return ret
cdef size_t n = nbytes
ret._handle = make_unique[cpp_RemoteHandle](move(ep), n)
return ret

@classmethod
def open_s3_from_s3_url(
cls,
url: str,
nbytes: Optional[int],
):
cdef pair[string, string] bucket_and_object = cpp_parse_s3_url(_to_string(url))
cdef RemoteFile ret = RemoteFile()
cdef unique_ptr[cpp_S3Endpoint] ep = make_unique[cpp_S3Endpoint](
bucket_and_object.first, bucket_and_object.second
)
if nbytes is None:
ret._handle = make_unique[cpp_RemoteHandle](move(ep))
return ret
cdef size_t n = nbytes
ret._handle = make_unique[cpp_RemoteHandle](move(ep), n)
return ret

def nbytes(self) -> int:
return deref(self._handle).nbytes()

Expand Down
Loading

0 comments on commit f82173e

Please sign in to comment.