Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pylibcudf contiguous split APIs in cudf python #17246

Merged
merged 17 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions python/cudf/cudf/_lib/copying.pxd

This file was deleted.

193 changes: 84 additions & 109 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@

import pickle

from libc.stdint cimport uint8_t, uintptr_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move
from libcpp.vector cimport vector

from rmm.pylibrmm.device_buffer cimport DeviceBuffer

import pylibcudf

Expand All @@ -20,20 +16,24 @@ from cudf._lib.column cimport Column
from cudf._lib.scalar import as_device_scalar

from cudf._lib.scalar cimport DeviceScalar
from cudf._lib.utils cimport table_view_from_table

from cudf._lib.reduce import minmax
from cudf.core.abc import Serializable

from libcpp.memory cimport make_unique

cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split
from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.column.column_view cimport column_view
from pylibcudf.libcudf.scalar.scalar cimport scalar
from pylibcudf.libcudf.types cimport size_type

from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_table_view
from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table
import pylibcudf as plc
from pylibcudf.contiguous_split cimport PackedColumns as PlcPackedColumns
cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split
from libcpp.vector cimport vector
from libc.stdint cimport uint8_t
from rmm.pylibrmm.device_buffer cimport DeviceBuffer

# workaround for https://github.com/cython/cython/issues/3885
ctypedef const scalar constscalar
Expand Down Expand Up @@ -335,54 +335,37 @@ def get_element(Column input_column, size_type index):
)


cdef class _CPackedColumns:

@staticmethod
def from_py_table(input_table, keep_index=True):
"""
Construct a ``PackedColumns`` object from a ``cudf.DataFrame``.
"""
import cudf.core.dtypes

cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
input_table_view = table_view_from_table(input_table)
p.index_names = input_table._index_names
else:
input_table_view = table_view_from_table(
input_table, ignore_index=True)

p.column_names = input_table._column_names
p.column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(col.dtype, cudf.core.dtypes._BaseDtype):
p.column_dtypes[name] = col.dtype

p.c_obj = move(cpp_contiguous_split.pack(input_table_view))
class PackedColumns(Serializable):
"""
A packed representation of a Frame, with all columns residing
in a single GPU memory buffer.
"""

return p
def __init__(self, data, column_names=None, index_names=None, column_dtypes=None):
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
self._data = data
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
self.column_names=column_names
self.index_names=index_names
self.column_dtypes=column_dtypes

@property
def gpu_data_ptr(self):
return int(<uintptr_t>self.c_obj.gpu_data.get()[0].data())
def __reduce__(self):
return self.deserialize, self.serialize()

@property
def gpu_data_size(self):
return int(<size_t>self.c_obj.gpu_data.get()[0].size())
def __cuda_array_interface__(self):
return {
"data": (self._data.gpu_data_ptr, False),
"shape": (self._data.gpu_data_size,),
"strides": None,
"typestr": "|u1",
"version": 0
}
Matt711 marked this conversation as resolved.
Show resolved Hide resolved

def serialize(self):
header = {}
frames = []

gpu_data = as_buffer(
data=self.gpu_data_ptr,
size=self.gpu_data_size,
data=self._data.gpu_data_ptr,
size=self._data.gpu_data_size,
owner=self,
exposed=True
)
Expand All @@ -392,35 +375,31 @@ cdef class _CPackedColumns:

header["column-names"] = self.column_names
header["index-names"] = self.index_names
if self.c_obj.metadata.get()[0].data() != NULL:
cdef PlcPackedColumns p = self._data
if p.c_obj.get()[0].metadata.get()[0].data() != NULL:
header["metadata"] = list(
<uint8_t[:self.c_obj.metadata.get()[0].size()]>
self.c_obj.metadata.get()[0].data()
<uint8_t[:p.c_obj.get()[0].metadata.get()[0].size()]>
p.c_obj.get()[0].metadata.get()[0].data()
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
)

column_dtypes = {}
for name, dtype in self.column_dtypes.items():
dtype_header, dtype_frames = dtype.serialize()
column_dtypes[name] = (
self.column_dtypes[name] = (
dtype_header,
(len(frames), len(frames) + len(dtype_frames)),
)
frames.extend(dtype_frames)
header["column-dtypes"] = column_dtypes

header["column-dtypes"] = self.column_dtypes
header["type-serialized"] = pickle.dumps(type(self))
return header, frames

@staticmethod
def deserialize(header, frames):
cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

@classmethod
def deserialize(cls, header, frames):
gpu_data = Buffer.deserialize(header["data"], frames)

cdef PlcPackedColumns p = PlcPackedColumns.__new__(PlcPackedColumns)
dbuf = DeviceBuffer(
ptr=gpu_data.get_ptr(mode="write"),
size=gpu_data.nbytes
)

cdef cpp_contiguous_split.packed_columns data
data.metadata = move(
make_unique[vector[uint8_t]](
Expand All @@ -429,28 +408,64 @@ cdef class _CPackedColumns:
)
data.gpu_data = move(dbuf.c_obj)

p.c_obj = move(data)
p.column_names = header["column-names"]
p.index_names = header["index-names"]
p.c_obj = move(make_unique[cpp_contiguous_split.packed_columns](move(data)))
Matt711 marked this conversation as resolved.
Show resolved Hide resolved

column_dtypes = {}
for name, dtype in header["column-dtypes"].items():
dtype_header, (start, stop) = dtype
column_dtypes[name] = pickle.loads(
dtype_header["type-serialized"]
).deserialize(dtype_header, frames[start:stop])
p.column_dtypes = column_dtypes

return p
return cls(
p,
header["column-names"],
header["index-names"],
column_dtypes,
)

@classmethod
def from_py_table(cls, input_table, keep_index=True):
if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
columns = input_table._index._columns + input_table._columns
index_names = input_table._index_names
else:
columns = input_table._columns
index_names = None

column_names = input_table._column_names
column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(
col.dtype,
(cudf.core.dtypes._BaseDtype, cudf.core.dtypes.CategoricalDtype)
):
column_dtypes[name] = col.dtype

return cls(
plc.contiguous_split.PackedColumns.from_plc_table(
plc.Table(
[
col.to_pylibcudf(mode="read") for col in columns
]
)
),
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
column_names,
index_names,
column_dtypes,
)

def unpack(self):
output_table = cudf.DataFrame._from_data(*data_from_table_view(
cpp_contiguous_split.unpack(self.c_obj),
self,
output_table = cudf.DataFrame._from_data(*data_from_pylibcudf_table(
plc.contiguous_split.unpack(self._data),
self.column_names,
self.index_names
))

for name, dtype in self.column_dtypes.items():
output_table._data[name] = (
output_table._data[name]._with_type_metadata(dtype)
Expand All @@ -459,46 +474,6 @@ cdef class _CPackedColumns:
return output_table


class PackedColumns(Serializable):
"""
A packed representation of a Frame, with all columns residing
in a single GPU memory buffer.
"""

def __init__(self, data):
self._data = data

def __reduce__(self):
return self.deserialize, self.serialize()

@property
def __cuda_array_interface__(self):
return {
"data": (self._data.gpu_data_ptr, False),
"shape": (self._data.gpu_data_size,),
"strides": None,
"typestr": "|u1",
"version": 0
}

def serialize(self):
header, frames = self._data.serialize()
header["type-serialized"] = pickle.dumps(type(self))

return header, frames

@classmethod
def deserialize(cls, header, frames):
return cls(_CPackedColumns.deserialize(header, frames))

@classmethod
def from_py_table(cls, input_table, keep_index=True):
return cls(_CPackedColumns.from_py_table(input_table, keep_index))

def unpack(self):
return self._data.unpack()


def pack(input_table, keep_index=True):
"""
Pack the columns of a cudf Frame into a single GPU memory buffer.
Expand Down
30 changes: 29 additions & 1 deletion python/pylibcudf/pylibcudf/contiguous_split.pyx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from cython.operator cimport dereference
from libc.stdint cimport uint8_t
from libc.stdint cimport uint8_t, uintptr_t
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.utility cimport move
from libcpp.vector cimport vector
Expand Down Expand Up @@ -63,6 +63,7 @@ cdef class HostBuffer:
def __releasebuffer__(self, Py_buffer *buffer):
pass


cdef class PackedColumns:
"""Column data in a serialized format.

Expand All @@ -87,6 +88,33 @@ cdef class PackedColumns:
out.c_obj = move(data)
return out

@staticmethod
def from_plc_table(Table input):
"""
Construct a ``PackedColumns`` object from a ``pylibcudf.Table``.
"""
cdef unique_ptr[packed_columns] c_packed_columns = move(
make_unique[packed_columns](
move(
cpp_pack(
input.view()
)
)
)
)

return PackedColumns.from_libcudf(move(c_packed_columns))

@property
def gpu_data_ptr(self):
if self.c_obj.get() != NULL:
return int(<uintptr_t>self.c_obj.get()[0].gpu_data.get()[0].data())

@property
def gpu_data_size(self):
if self.c_obj.get() != NULL:
return int(<size_t>self.c_obj.get()[0].gpu_data.get()[0].size())

Matt711 marked this conversation as resolved.
Show resolved Hide resolved
def release(self):
"""Releases and returns the underlying serialized metadata and gpu data.

Expand Down
Loading