Skip to content

Commit

Permalink
add import
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche committed Feb 7, 2024
1 parent 4dfd0d6 commit 3b28616
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 28 deletions.
21 changes: 20 additions & 1 deletion cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ struct ArrayExporter {
export_.buffers_.resize(n_buffers);
std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(),
[](const std::shared_ptr<Buffer>& buffer) -> const void* {
return buffer ? reinterpret_cast<const void*>(buffer->address()) : nullptr;
return buffer ? reinterpret_cast<const void*>(buffer->address())
: nullptr;
});

if (need_variadic_buffer_sizes) {
Expand Down Expand Up @@ -1977,6 +1978,24 @@ Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
return ImportDeviceArray(array, *maybe_type, mapper);
}

Result<std::shared_ptr<MemoryManager>> DefaultDeviceMapper(ArrowDeviceType device_type,
int64_t device_id) {
if (device_type != ARROW_DEVICE_CPU) {
return Status::NotImplemented("Only importing data on CPU is supported");
}
return default_cpu_memory_manager();
}

Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
std::shared_ptr<DataType> type) {
return ImportDeviceArray(array, type, DefaultDeviceMapper);
}

Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
struct ArrowSchema* type) {
return ImportDeviceArray(array, type, DefaultDeviceMapper);
}

Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema,
const DeviceMemoryMapper& mapper) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
struct ArrowSchema* type,
const DeviceMemoryMapper& mapper);

ARROW_EXPORT
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
std::shared_ptr<DataType> type);
ARROW_EXPORT
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
struct ArrowSchema* type);

/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data
/// interface.
///
Expand Down
91 changes: 64 additions & 27 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1682,33 +1682,6 @@ cdef class Array(_PandasConvertible):
<ArrowArray*> c_ptr,
<ArrowSchema*> c_schema_ptr))

def _export_to_c_device(self, out_ptr, out_schema_ptr):
"""
Export to a C ArrowDeviceArray struct, given its pointer.
If a C ArrowSchema struct pointer is also given, the array type
is exported to it at the same time.
Parameters
----------
out_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
out_schema_ptr: int (optional)
The raw pointer to a C ArrowSchema struct.
Be careful: if you don't pass the ArrowDeviceArray struct to a consumer,
array memory will leak. This is a low-level function intended for
expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(out_ptr)
void* c_schema_ptr = _as_c_pointer(out_schema_ptr,
allow_null=True)
with nogil:
check_status(ExportDeviceArray(
deref(self.sp_array), <shared_ptr[CSyncEvent]>NULL,
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))

@staticmethod
def _import_from_c(in_ptr, type):
"""
Expand Down Expand Up @@ -1805,6 +1778,70 @@ cdef class Array(_PandasConvertible):

return pyarrow_wrap_array(array)

def _export_to_c_device(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowDeviceArray struct, given its pointer.
If a C ArrowSchema struct pointer is also given, the array type
is exported to it at the same time.
Parameters
----------
out_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
out_schema_ptr: int (optional)
The raw pointer to a C ArrowSchema struct.
Be careful: if you don't pass the ArrowDeviceArray struct to a consumer,
array memory will leak. This is a low-level function intended for
expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(out_ptr)
void* c_schema_ptr = _as_c_pointer(out_schema_ptr,
allow_null=True)
with nogil:
check_status(ExportDeviceArray(
deref(self.sp_array), <shared_ptr[CSyncEvent]>NULL,
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))

@staticmethod
def _import_from_c_device(in_ptr, type):
"""
Import Array from a C ArrowDeviceArray struct, given its pointer
and the imported array type.
Parameters
----------
in_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
type: DataType or int
Either a DataType object, or the raw pointer to a C ArrowSchema
struct.
This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr)
)
else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type)
)
return pyarrow_wrap_array(c_array)

def __dlpack__(self, stream=None):
"""Export a primitive array as a DLPack capsule.
Expand Down
10 changes: 10 additions & 0 deletions python/pyarrow/cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@
// Opaque producer-specific data
void* private_data;
};
typedef int32_t ArrowDeviceType;
struct ArrowDeviceArray {
struct ArrowArray array;
int64_t device_id;
ArrowDeviceType device_type;
void* sync_event;
int64_t reserved[3];
};
"""

# TODO use out-of-line mode for faster import and avoid C parsing
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2851,6 +2851,10 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:

CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*,
shared_ptr[CDataType])
CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*,
ArrowSchema*)

cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
51 changes: 51 additions & 0 deletions python/pyarrow/tests/test_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,54 @@ def test_roundtrip_batch_reader_capsule():
assert imported_reader.read_next_batch().equals(batch)
with pytest.raises(StopIteration):
imported_reader.read_next_batch()


@needs_cffi
def test_export_import_device_array():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))

gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()

# Type is known up front
typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)
py_value = arr.to_pylist()
arr._export_to_c_device(ptr_array)
assert pa.total_allocated_bytes() > old_allocated

# verify exported struct
assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1
assert c_array.device_id == -1
assert c_array.array.length == 2

# Delete recreate C++ object from exported pointer
del arr
arr_new = pa.Array._import_from_c_device(ptr_array, typ)
assert arr_new.to_pylist() == py_value
assert arr_new.type == pa.list_(pa.int32())
assert pa.total_allocated_bytes() > old_allocated
del arr_new, typ
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_array_released:
pa.Array._import_from_c(ptr_array, pa.list_(pa.int32()))

# Type is exported and imported at the same time
arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32()))
py_value = arr.to_pylist()
arr._export_to_c(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del arr
arr_new = pa.Array._import_from_c(ptr_array, ptr_schema)
assert arr_new.to_pylist() == py_value
assert arr_new.type == pa.list_(pa.int32())
assert pa.total_allocated_bytes() > old_allocated
del arr_new
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.Array._import_from_c(ptr_array, ptr_schema)

0 comments on commit 3b28616

Please sign in to comment.