Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pylibcudf contiguous split APIs in cudf python #17246

Merged
merged 17 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions python/cudf/cudf/_lib/copying.pxd

This file was deleted.

213 changes: 84 additions & 129 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,31 @@

import pickle

from libc.stdint cimport uint8_t, uintptr_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move
from libcpp.vector cimport vector

from rmm.pylibrmm.device_buffer cimport DeviceBuffer

import pylibcudf

import cudf
from cudf.core.buffer import Buffer, acquire_spill_lock, as_buffer

from cudf.core.buffer import acquire_spill_lock, as_buffer
from cudf.core.abc import Serializable
from cudf._lib.column cimport Column

from cudf._lib.scalar import as_device_scalar

from cudf._lib.scalar cimport DeviceScalar
from cudf._lib.utils cimport table_view_from_table

from cudf._lib.reduce import minmax
from cudf.core.abc import Serializable

from libcpp.memory cimport make_unique

cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split
from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.column.column_view cimport column_view
from pylibcudf.libcudf.types cimport size_type

from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_table_view
from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table
import pylibcudf as plc
from pylibcudf.contiguous_split cimport PackedColumns as PlcPackedColumns


def _gather_map_is_valid(
Expand Down Expand Up @@ -331,54 +325,37 @@ def get_element(Column input_column, size_type index):
)


cdef class _CPackedColumns:

@staticmethod
def from_py_table(input_table, keep_index=True):
"""
Construct a ``PackedColumns`` object from a ``cudf.DataFrame``.
"""
import cudf.core.dtypes

cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
input_table_view = table_view_from_table(input_table)
p.index_names = input_table._index_names
else:
input_table_view = table_view_from_table(
input_table, ignore_index=True)

p.column_names = input_table._column_names
p.column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(col.dtype, cudf.core.dtypes._BaseDtype):
p.column_dtypes[name] = col.dtype

p.c_obj = move(cpp_contiguous_split.pack(input_table_view))
class PackedColumns(Serializable):
"""
A packed representation of a Frame, with all columns residing
in a single GPU memory buffer.
"""

return p
def __init__(
self,
PlcPackedColumns data,
object column_names = None,
object index_names = None,
object column_dtypes = None
):
self._metadata, self._gpu_data = data.release()
self.column_names=column_names
self.index_names=index_names
self.column_dtypes=column_dtypes

@property
def gpu_data_ptr(self):
return int(<uintptr_t>self.c_obj.gpu_data.get()[0].data())
def __reduce__(self):
return self.deserialize, self.serialize()

@property
def gpu_data_size(self):
return int(<size_t>self.c_obj.gpu_data.get()[0].size())
def __cuda_array_interface__(self):
return self._gpu_data.__cuda_array_interface__

def serialize(self):
header = {}
frames = []

gpu_data = as_buffer(
data=self.gpu_data_ptr,
size=self.gpu_data_size,
data = self._gpu_data.obj.ptr,
size = self._gpu_data.obj.size,
owner=self,
exposed=True
)
Expand All @@ -388,65 +365,83 @@ cdef class _CPackedColumns:

header["column-names"] = self.column_names
header["index-names"] = self.index_names
if self.c_obj.metadata.get()[0].data() != NULL:
header["metadata"] = list(
<uint8_t[:self.c_obj.metadata.get()[0].size()]>
self.c_obj.metadata.get()[0].data()
)

column_dtypes = {}
header["metadata"] = self._metadata.tobytes()
for name, dtype in self.column_dtypes.items():
dtype_header, dtype_frames = dtype.serialize()
column_dtypes[name] = (
self.column_dtypes[name] = (
dtype_header,
(len(frames), len(frames) + len(dtype_frames)),
)
frames.extend(dtype_frames)
header["column-dtypes"] = column_dtypes

header["column-dtypes"] = self.column_dtypes
header["type-serialized"] = pickle.dumps(type(self))
return header, frames

@staticmethod
def deserialize(header, frames):
cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

gpu_data = Buffer.deserialize(header["data"], frames)

dbuf = DeviceBuffer(
ptr=gpu_data.get_ptr(mode="write"),
size=gpu_data.nbytes
)

cdef cpp_contiguous_split.packed_columns data
data.metadata = move(
make_unique[vector[uint8_t]](
move(<vector[uint8_t]>header.get("metadata", []))
)
)
data.gpu_data = move(dbuf.c_obj)

p.c_obj = move(data)
p.column_names = header["column-names"]
p.index_names = header["index-names"]

@classmethod
def deserialize(cls, header, frames):
column_dtypes = {}
for name, dtype in header["column-dtypes"].items():
dtype_header, (start, stop) = dtype
column_dtypes[name] = pickle.loads(
dtype_header["type-serialized"]
).deserialize(dtype_header, frames[start:stop])
p.column_dtypes = column_dtypes
return cls(
plc.contiguous_split.pack(
plc.contiguous_split.unpack_from_memoryviews(
memoryview(header["metadata"]),
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
plc.gpumemoryview(frames[0]),
)
),
header["column-names"],
header["index-names"],
column_dtypes,
)

return p
@classmethod
def from_py_table(cls, input_table, keep_index=True):
if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
columns = input_table._index._columns + input_table._columns
index_names = input_table._index_names
else:
columns = input_table._columns
index_names = None

column_names = input_table._column_names
column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(
col.dtype,
(cudf.core.dtypes._BaseDtype, cudf.core.dtypes.CategoricalDtype)
):
column_dtypes[name] = col.dtype

return cls(
plc.contiguous_split.pack(
plc.Table(
[
col.to_pylibcudf(mode="read") for col in columns
]
)
),
column_names,
index_names,
column_dtypes,
)

def unpack(self):
output_table = cudf.DataFrame._from_data(*data_from_table_view(
cpp_contiguous_split.unpack(self.c_obj),
self,
output_table = cudf.DataFrame._from_data(*data_from_pylibcudf_table(
plc.contiguous_split.unpack_from_memoryviews(
self._metadata,
self._gpu_data
),
self.column_names,
self.index_names
))

for name, dtype in self.column_dtypes.items():
output_table._data[name] = (
output_table._data[name]._with_type_metadata(dtype)
Expand All @@ -455,46 +450,6 @@ cdef class _CPackedColumns:
return output_table


class PackedColumns(Serializable):
"""
A packed representation of a Frame, with all columns residing
in a single GPU memory buffer.
"""

def __init__(self, data):
self._data = data

def __reduce__(self):
return self.deserialize, self.serialize()

@property
def __cuda_array_interface__(self):
return {
"data": (self._data.gpu_data_ptr, False),
"shape": (self._data.gpu_data_size,),
"strides": None,
"typestr": "|u1",
"version": 0
}

def serialize(self):
header, frames = self._data.serialize()
header["type-serialized"] = pickle.dumps(type(self))

return header, frames

@classmethod
def deserialize(cls, header, frames):
return cls(_CPackedColumns.deserialize(header, frames))

@classmethod
def from_py_table(cls, input_table, keep_index=True):
return cls(_CPackedColumns.from_py_table(input_table, keep_index))

def unpack(self):
return self._data.unpack()


def pack(input_table, keep_index=True):
"""
Pack the columns of a cudf Frame into a single GPU memory buffer.
Expand Down
1 change: 1 addition & 0 deletions python/pylibcudf/pylibcudf/contiguous_split.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cdef class PackedColumns:

@staticmethod
cdef PackedColumns from_libcudf(unique_ptr[packed_columns] data)
cpdef tuple release(self)

cpdef PackedColumns pack(Table input)

Expand Down
3 changes: 2 additions & 1 deletion python/pylibcudf/pylibcudf/contiguous_split.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cdef class HostBuffer:
def __releasebuffer__(self, Py_buffer *buffer):
pass


cdef class PackedColumns:
"""Column data in a serialized format.

Expand All @@ -87,7 +88,7 @@ cdef class PackedColumns:
out.c_obj = move(data)
return out

def release(self):
cpdef tuple release(self):
"""Releases and returns the underlying serialized metadata and gpu data.

The ownership of the memory are transferred to the returned buffers. After
Expand Down
Loading