From 3b28616fab0b9ff06629e620abda5d9be3c1ba3c Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 7 Feb 2024 17:51:45 +0100 Subject: [PATCH] add import --- cpp/src/arrow/c/bridge.cc | 21 ++++++- cpp/src/arrow/c/bridge.h | 7 +++ python/pyarrow/array.pxi | 91 +++++++++++++++++++--------- python/pyarrow/cffi.py | 10 +++ python/pyarrow/includes/libarrow.pxd | 4 ++ python/pyarrow/tests/test_cffi.py | 51 ++++++++++++++++ 6 files changed, 156 insertions(+), 28 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index a4eddfbecb670..3e9c686ab842f 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -587,7 +587,8 @@ struct ArrayExporter { export_.buffers_.resize(n_buffers); std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(), [](const std::shared_ptr& buffer) -> const void* { - return buffer ? reinterpret_cast(buffer->address()) : nullptr; + return buffer ? reinterpret_cast(buffer->address()) + : nullptr; }); if (need_variadic_buffer_sizes) { @@ -1977,6 +1978,24 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, return ImportDeviceArray(array, *maybe_type, mapper); } +Result> DefaultDeviceMapper(ArrowDeviceType device_type, + int64_t device_id) { + if (device_type != ARROW_DEVICE_CPU) { + return Status::NotImplemented("Only importing data on CPU is supported"); + } + return default_cpu_memory_manager(); +} + +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type) { + return ImportDeviceArray(array, type, DefaultDeviceMapper); +} + +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type) { + return ImportDeviceArray(array, type, DefaultDeviceMapper); +} + Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, std::shared_ptr schema, const DeviceMemoryMapper& mapper) { diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index e98a42818f628..463ca0500de97 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -249,6 +249,13 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, struct ArrowSchema* type, const DeviceMemoryMapper& mapper); +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type); +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type); + /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data /// interface. /// diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index caf5904d98e4a..48d02d9661e9b 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1682,33 +1682,6 @@ cdef class Array(_PandasConvertible): c_ptr, c_schema_ptr)) - def _export_to_c_device(self, out_ptr, out_schema_ptr): - """ - Export to a C ArrowDeviceArray struct, given its pointer. - - If a C ArrowSchema struct pointer is also given, the array type - is exported to it at the same time. - - Parameters - ---------- - out_ptr: int - The raw pointer to a C ArrowDeviceArray struct. - out_schema_ptr: int (optional) - The raw pointer to a C ArrowSchema struct. - - Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, - array memory will leak. This is a low-level function intended for - expert users. - """ - cdef: - void* c_ptr = _as_c_pointer(out_ptr) - void* c_schema_ptr = _as_c_pointer(out_schema_ptr, - allow_null=True) - with nogil: - check_status(ExportDeviceArray( - deref(self.sp_array), NULL, - c_ptr, c_schema_ptr)) - @staticmethod def _import_from_c(in_ptr, type): """ @@ -1805,6 +1778,70 @@ cdef class Array(_PandasConvertible): return pyarrow_wrap_array(array) + def _export_to_c_device(self, out_ptr, out_schema_ptr=0): + """ + Export to a C ArrowDeviceArray struct, given its pointer. + + If a C ArrowSchema struct pointer is also given, the array type + is exported to it at the same time. + + Parameters + ---------- + out_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + out_schema_ptr: int (optional) + The raw pointer to a C ArrowSchema struct. + + Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, + array memory will leak. This is a low-level function intended for + expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(out_ptr) + void* c_schema_ptr = _as_c_pointer(out_schema_ptr, + allow_null=True) + with nogil: + check_status(ExportDeviceArray( + deref(self.sp_array), NULL, + c_ptr, c_schema_ptr)) + + @staticmethod + def _import_from_c_device(in_ptr, type): + """ + Import Array from a C ArrowDeviceArray struct, given its pointer + and the imported array type. + + Parameters + ---------- + in_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + type: DataType or int + Either a DataType object, or the raw pointer to a C ArrowSchema + struct. + + This is a low-level function intended for expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_type_ptr + shared_ptr[CArray] c_array + + c_type = pyarrow_unwrap_data_type(type) + if c_type == nullptr: + # Not a DataType object, perhaps a raw ArrowSchema pointer + c_type_ptr = _as_c_pointer(type) + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, + c_type_ptr) + ) + else: + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, c_type) + ) + return pyarrow_wrap_array(c_array) + def __dlpack__(self, stream=None): """Export a primitive array as a DLPack capsule. diff --git a/python/pyarrow/cffi.py b/python/pyarrow/cffi.py index 961b61dee59fd..1da1a91691404 100644 --- a/python/pyarrow/cffi.py +++ b/python/pyarrow/cffi.py @@ -64,6 +64,16 @@ // Opaque producer-specific data void* private_data; }; + + typedef int32_t ArrowDeviceType; + + struct ArrowDeviceArray { + struct ArrowArray array; + int64_t device_id; + ArrowDeviceType device_type; + void* sync_event; + int64_t reserved[3]; + }; """ # TODO use out-of-line mode for faster import and avoid C parsing diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 2affdce0a96f2..3334a2c988f00 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2851,6 +2851,10 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent], ArrowDeviceArray* out, ArrowSchema*) + CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*, + shared_ptr[CDataType]) + CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*, + ArrowSchema*) cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index ff81b06440f03..86651479005c0 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -601,3 +601,54 @@ def test_roundtrip_batch_reader_capsule(): assert imported_reader.read_next_batch().equals(batch) with pytest.raises(StopIteration): imported_reader.read_next_batch() + + +@needs_cffi +def test_export_import_device_array(): + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + + gc.collect() # Make sure no Arrow data dangles in a ref cycle + old_allocated = pa.total_allocated_bytes() + + # Type is known up front + typ = pa.list_(pa.int32()) + arr = pa.array([[1], [2, 42]], type=typ) + py_value = arr.to_pylist() + arr._export_to_c_device(ptr_array) + assert pa.total_allocated_bytes() > old_allocated + + # verify exported struct + assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 + assert c_array.device_id == -1 + assert c_array.array.length == 2 + + # Delete recreate C++ object from exported pointer + del arr + arr_new = pa.Array._import_from_c_device(ptr_array, typ) + assert arr_new.to_pylist() == py_value + assert arr_new.type == pa.list_(pa.int32()) + assert pa.total_allocated_bytes() > old_allocated + del arr_new, typ + assert pa.total_allocated_bytes() == old_allocated + # Now released + with assert_array_released: + pa.Array._import_from_c(ptr_array, pa.list_(pa.int32())) + + # Type is exported and imported at the same time + arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) + py_value = arr.to_pylist() + arr._export_to_c(ptr_array, ptr_schema) + # Delete and recreate C++ objects from exported pointers + del arr + arr_new = pa.Array._import_from_c(ptr_array, ptr_schema) + assert arr_new.to_pylist() == py_value + assert arr_new.type == pa.list_(pa.int32()) + assert pa.total_allocated_bytes() > old_allocated + del arr_new + assert pa.total_allocated_bytes() == old_allocated + # Now released + with assert_schema_released: + pa.Array._import_from_c(ptr_array, ptr_schema)