Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds pylibcudf Scalar #14055

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3aa461f
Initial version of scalar
vyasr Aug 31, 2023
1c4f990
Implement basic scatter algorithm with Scalar
vyasr Aug 31, 2023
8ad8367
Build DeviceScalar around pylibcudf.Scalar
vyasr Sep 1, 2023
c147972
Fix some initialization
vyasr Sep 1, 2023
0e5e5d3
Add missing no_gc_clear
vyasr Sep 1, 2023
e300884
Implement from_libcudf
vyasr Sep 1, 2023
4a2a436
Remove mr from DeviceScalar
vyasr Sep 1, 2023
abd5550
Inline _set_value
vyasr Sep 1, 2023
e8f6847
Add a comment
vyasr Sep 5, 2023
8360f80
Implement from_arrow
vyasr Sep 6, 2023
8814772
Add to package
vyasr Sep 7, 2023
3a04947
Implement release for gpumemoryview
vyasr Sep 7, 2023
032adbb
Add proper constructor for scalar from pyarrow scalar
vyasr Sep 7, 2023
16f4528
Construct pylibcudf.Scalar for numeric types
vyasr Sep 7, 2023
b75949a
Construct pylibcudf.Scalar for string types
vyasr Sep 7, 2023
8d46046
Add comment
vyasr Sep 7, 2023
ce164a0
Stop using reduction in favor of get_element
vyasr Sep 7, 2023
a8d8cb6
Move construction from pyarrow scalar to a factory
vyasr Sep 8, 2023
3fd071e
Implement from_arrow at the C level
vyasr Sep 8, 2023
33abe2b
Enable datetime
vyasr Sep 8, 2023
c6b595d
Enable timedeltas
vyasr Sep 8, 2023
ec4b510
Enable lists
vyasr Sep 8, 2023
636242b
Enable structs
vyasr Sep 9, 2023
4c08dc3
Make list and struct code paths more parallel
vyasr Sep 9, 2023
50bbdc0
Unify builders and use recursion for all paths
vyasr Sep 9, 2023
7b34df4
Enable decimals
vyasr Sep 9, 2023
392407f
Some cleanup
vyasr Sep 9, 2023
f5def95
Combine nestrepl
vyasr Sep 9, 2023
16cef15
Unify constructor
vyasr Sep 9, 2023
edcca20
Copy to avoid overwriting inputs
vyasr Sep 9, 2023
c81e158
Add error check for nested null in scalar and fix test
vyasr Sep 11, 2023
14b2f86
Initial implementation of to_arrow
vyasr Sep 11, 2023
cd80fe0
Add from_arrow_scalar to Cython API
vyasr Sep 11, 2023
5fffc4f
Add Python wrappers for to_arrow
vyasr Sep 11, 2023
cb52d2b
Add pylibcudf converter
vyasr Sep 11, 2023
643d992
Fix bugs in impl
vyasr Sep 11, 2023
309cdfa
Make to_arrow work for all column types
vyasr Sep 11, 2023
4e4bc64
Add to_arrow overload for 32-bit decimal
vyasr Sep 12, 2023
c62c71b
Add support for list/struct up to output field naming
vyasr Sep 14, 2023
d6eee4b
Fully implement metadata
vyasr Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cpp/include/cudf/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ std::shared_ptr<arrow::Table> to_arrow(table_view input,
std::vector<column_metadata> const& metadata = {},
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());

/**
* @brief Create `arrow::Scalar` from cudf scalar `input`
*
* Converts the `cudf::scalar` to `arrow::Scalar`.
*
* @param input scalar that needs to be converted to arrow Scalar
* @param metadata Contains hierarchy of names of columns and children
* @param ar_mr arrow memory pool to allocate memory for arrow Scalar
* @return arrow Scalar generated from `input`
*/
std::shared_ptr<arrow::Scalar> to_arrow(cudf::scalar const& input,
column_metadata const& metadata = {},
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());
/**
* @brief Create `cudf::table` from given arrow Table input
*
Expand All @@ -145,5 +158,17 @@ std::unique_ptr<table> from_arrow(
arrow::Table const& input,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create `cudf::table` from given arrow Scalar input
*
* @param input arrow:Scalar that needs to be converted to `cudf::scalar`
* @param mr Device memory resource used to allocate `cudf::scalar`
* @return cudf scalar generated from given arrow Scalar
*/

std::unique_ptr<cudf::scalar> from_arrow(
arrow::Scalar const& input,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
} // namespace cudf
136 changes: 136 additions & 0 deletions cpp/src/interop/from_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <arrow/array/array_base.h>
#include <arrow/array/builder_nested.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/concatenate.hpp>
Expand All @@ -35,6 +39,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>

#include <stdexcept>
#include <thrust/gather.h>

namespace cudf {
Expand Down Expand Up @@ -472,4 +477,135 @@ std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
return detail::from_arrow(input_table, cudf::get_default_stream(), mr);
}

template <typename Functor, typename... Ts>
constexpr decltype(auto) arrow_type_dispatcher(arrow::DataType const& dtype,
Functor f,
Ts&&... args)
{
switch (dtype.id()) {
case arrow::Type::INT8:
return f.template operator()<arrow::Int8Type>(std::forward<Ts>(args)...);
case arrow::Type::INT16:
return f.template operator()<arrow::Int16Type>(std::forward<Ts>(args)...);
case arrow::Type::INT32:
return f.template operator()<arrow::Int32Type>(std::forward<Ts>(args)...);
case arrow::Type::INT64:
return f.template operator()<arrow::Int64Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT8:
return f.template operator()<arrow::UInt8Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT16:
return f.template operator()<arrow::UInt16Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT32:
return f.template operator()<arrow::UInt32Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT64:
return f.template operator()<arrow::UInt64Type>(std::forward<Ts>(args)...);
case arrow::Type::FLOAT:
return f.template operator()<arrow::FloatType>(std::forward<Ts>(args)...);
case arrow::Type::DOUBLE:
return f.template operator()<arrow::DoubleType>(std::forward<Ts>(args)...);
case arrow::Type::BOOL:
return f.template operator()<arrow::BooleanType>(std::forward<Ts>(args)...);
case arrow::Type::TIMESTAMP:
return f.template operator()<arrow::TimestampType>(std::forward<Ts>(args)...);
case arrow::Type::DURATION:
return f.template operator()<arrow::DurationType>(std::forward<Ts>(args)...);
// case arrow::Type::DICTIONARY32:
// return f.template operator()<arrow::Type::DICTIONARY32>(
// std::forward<Ts>(args)...);
case arrow::Type::STRING:
return f.template operator()<arrow::StringType>(std::forward<Ts>(args)...);
case arrow::Type::LIST:
return f.template operator()<arrow::ListType>(std::forward<Ts>(args)...);
case arrow::Type::DECIMAL128:
return f.template operator()<arrow::Decimal128Type>(std::forward<Ts>(args)...);
case arrow::Type::STRUCT:
return f.template operator()<arrow::StructType>(std::forward<Ts>(args)...);
default: {
CUDF_FAIL("Invalid type.");
}
}
}

struct BuilderGenerator {
template <typename T>
std::shared_ptr<arrow::ArrayBuilder> operator()(std::shared_ptr<arrow::DataType> const& type)
{
return std::make_shared<typename arrow::TypeTraits<T>::BuilderType>(
type, arrow::default_memory_pool());
}
};

template <>
std::shared_ptr<arrow::ArrayBuilder> BuilderGenerator::operator()<arrow::ListType>(
std::shared_ptr<arrow::DataType> const& type)
{
CUDF_FAIL("Not implemented");
}

template <>
std::shared_ptr<arrow::ArrayBuilder> BuilderGenerator::operator()<arrow::StructType>(
std::shared_ptr<arrow::DataType> const& type)
{
CUDF_FAIL("Not implemented");
}

std::shared_ptr<arrow::ArrayBuilder> make_builder(std::shared_ptr<arrow::DataType> const& type)
{
switch (type->id()) {
case arrow::Type::STRUCT: {
std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;

for (auto i = 0; i < type->num_fields(); ++i) {
auto const vt = type->field(i)->type();
if (vt->id() == arrow::Type::STRUCT || vt->id() == arrow::Type::LIST) {
field_builders.push_back(make_builder(vt));
} else {
field_builders.push_back(arrow_type_dispatcher(*vt, BuilderGenerator{}, vt));
}
}
return std::make_shared<arrow::StructBuilder>(
type, arrow::default_memory_pool(), field_builders);
}
case arrow::Type::LIST: {
return std::make_shared<arrow::ListBuilder>(arrow::default_memory_pool(),
make_builder(type->field(0)->type()));
}
default: {
return arrow_type_dispatcher(*type, BuilderGenerator{}, type);
}
}
}

std::unique_ptr<cudf::scalar> from_arrow(arrow::Scalar const& input,
rmm::mr::device_memory_resource* mr)
{
// Get a builder for the scalar type
auto builder = make_builder(input.type);

auto status = builder->AppendScalar(input);
if (status != arrow::Status::OK()) {
if (status.IsNotImplemented()) {
// The only known failure case here is for nulls
CUDF_FAIL("Cannot create untyped null scalars or nested types with untyped null leaf nodes",
std::invalid_argument);
}
CUDF_FAIL("Arrow ArrayBuilder::AppendScalar failed");
}

auto maybe_array = builder->Finish();
if (!maybe_array.ok()) { CUDF_FAIL("Arrow ArrayBuilder::Finish failed"); }
auto array = *maybe_array;

auto field = arrow::field("", input.type);

auto table = arrow::Table::Make(arrow::schema({field}), {array});

auto cudf_table = from_arrow(*table);

auto col = cudf_table->get_column(0);

auto cv = col.view();
return get_element(cv, 0);
}

} // namespace cudf
58 changes: 58 additions & 0 deletions cpp/src/interop/to_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
*/

#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/interop.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/scatter.hpp>
#include <cudf/detail/unary.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/interop.hpp>
Expand Down Expand Up @@ -139,6 +143,46 @@ struct dispatch_to_arrow {
}
};

template <>
std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<numeric::decimal32>(
column_view input,
cudf::type_id,
column_metadata const&,
arrow::MemoryPool* ar_mr,
rmm::cuda_stream_view stream)
{
using DeviceType = int32_t;
size_type const BIT_WIDTH_RATIO = 4; // Array::Type:type::DECIMAL (128) / int32_t

rmm::device_uvector<__int128_t> buf(input.size() * BIT_WIDTH_RATIO, stream);

auto count = thrust::make_counting_iterator(0);

thrust::for_each(rmm::exec_policy(cudf::get_default_stream()),
count,
count + input.size(),
[in = input.begin<DeviceType>(), out = buf.data()] __device__(auto in_idx) {
auto const out_idx = in_idx;
auto unsigned_value = in[in_idx] < 0 ? -in[in_idx] : in[in_idx];
auto unsigned_128bit = static_cast<__int128_t>(unsigned_value);
auto signed_128bit = in[in_idx] < 0 ? -unsigned_128bit : unsigned_128bit;
out[out_idx] = signed_128bit;
});

auto const buf_size_in_bytes = buf.size() * sizeof(DeviceType);
auto data_buffer = allocate_arrow_buffer(buf_size_in_bytes, ar_mr);

CUDF_CUDA_TRY(cudaMemcpyAsync(
data_buffer->mutable_data(), buf.data(), buf_size_in_bytes, cudaMemcpyDefault, stream.value()));

auto type = arrow::decimal(9, -input.type().scale());
auto mask = fetch_mask_buffer(input, ar_mr, stream);
auto buffers = std::vector<std::shared_ptr<arrow::Buffer>>{mask, std::move(data_buffer)};
auto data = std::make_shared<arrow::ArrayData>(type, input.size(), buffers);

return std::make_shared<arrow::Decimal128Array>(data);
}

template <>
std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<numeric::decimal64>(
column_view input,
Expand Down Expand Up @@ -413,4 +457,18 @@ std::shared_ptr<arrow::Table> to_arrow(table_view input,
return detail::to_arrow(input, metadata, cudf::get_default_stream(), ar_mr);
}

std::shared_ptr<arrow::Scalar> to_arrow(cudf::scalar const& input,
column_metadata const& metadata,

arrow::MemoryPool* ar_mr)
{
auto stream = cudf::get_default_stream();
auto column = cudf::make_column_from_scalar(input, 1);
cudf::table_view tv{{column->view()}};
auto arrow_table = cudf::to_arrow(tv, {metadata});
auto ac = arrow_table->column(0);
auto maybe_scalar = ac->GetScalar(0);
if (!maybe_scalar.ok()) { CUDF_FAIL("Failed to produce a scalar"); }
return maybe_scalar.ValueOrDie();
}
} // namespace cudf
11 changes: 9 additions & 2 deletions python/cudf/cudf/_lib/cpp/interop.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string
from libcpp.vector cimport vector
from pyarrow.lib cimport CTable
from pyarrow.lib cimport CScalar, CTable

from cudf._lib.types import cudf_to_np_types, np_to_cudf_types

from cudf._lib.cpp.scalar.scalar cimport scalar
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view

Expand All @@ -24,6 +25,7 @@ cdef extern from "cudf/interop.hpp" namespace "cudf" \
) except +

cdef unique_ptr[table] from_arrow(CTable input) except +
cdef unique_ptr[scalar] from_arrow(CScalar input) except +

cdef cppclass column_metadata:
column_metadata() except +
Expand All @@ -35,3 +37,8 @@ cdef extern from "cudf/interop.hpp" namespace "cudf" \
table_view input,
vector[column_metadata] metadata,
) except +

cdef shared_ptr[CScalar] to_arrow(
const scalar& input,
column_metadata metadata,
) except +
4 changes: 3 additions & 1 deletion python/cudf/cudf/_lib/cpp/libcpp/functional.pxd
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.


# TODO: Can be replaced once https://github.com/cython/cython/pull/5671 is
# merged and released
cdef extern from "<functional>" namespace "std" nogil:
cdef cppclass reference_wrapper[T]:
reference_wrapper()
Expand Down
5 changes: 2 additions & 3 deletions python/cudf/cudf/_lib/cpp/reduce.pxd
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.utility cimport pair

from cudf._lib.aggregation cimport reduce_aggregation, scan_aggregation
from cudf._lib.cpp.aggregation cimport reduce_aggregation, scan_aggregation
from cudf._lib.cpp.column.column cimport column
from cudf._lib.cpp.column.column_view cimport column_view
from cudf._lib.cpp.scalar.scalar cimport scalar
from cudf._lib.cpp.types cimport data_type
from cudf._lib.scalar cimport DeviceScalar


cdef extern from "cudf/reduction.hpp" namespace "cudf" nogil:
Expand Down
6 changes: 4 additions & 2 deletions python/cudf/cudf/_lib/datetime.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from cudf.core.buffer import acquire_spill_lock

Expand All @@ -10,6 +10,7 @@ from cudf._lib.column cimport Column
from cudf._lib.cpp.column.column cimport column
from cudf._lib.cpp.column.column_view cimport column_view
from cudf._lib.cpp.filling cimport calendrical_month_sequence
from cudf._lib.cpp.scalar.scalar cimport scalar
from cudf._lib.cpp.types cimport size_type
from cudf._lib.scalar cimport DeviceScalar

Expand Down Expand Up @@ -166,10 +167,11 @@ def date_range(DeviceScalar start, size_type n, offset):
+ offset.kwds.get("months", 0)
)

cdef const scalar* c_start = start.c_value.get()
with nogil:
c_result = move(calendrical_month_sequence(
n,
start.c_value.get()[0],
c_start[0],
months
))
return Column.from_unique_ptr(move(c_result))
Expand Down
27 changes: 26 additions & 1 deletion python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,35 @@
# the License.
# =============================================================================

set(cython_sources column.pyx copying.pyx gpumemoryview.pyx table.pyx types.pyx utils.pyx)
set(cython_sources column.pyx copying.pyx gpumemoryview.pyx interop.pyx scalar.pyx table.pyx
types.pyx utils.pyx
)
set(linked_libraries cudf::cudf)
rapids_cython_create_modules(
CXX
SOURCE_FILES "${cython_sources}"
LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_ ASSOCIATED_TARGETS cudf
)

find_package(Python 3.9 REQUIRED COMPONENTS Interpreter)

execute_process(
COMMAND "${Python_EXECUTABLE}" -c "import pyarrow; print(pyarrow.get_include())"
OUTPUT_VARIABLE PYARROW_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)

set(targets_using_arrow_headers pylibcudf_interop pylibcudf_scalar)
foreach(target IN LISTS targets_using_arrow_headers)
target_include_directories(${target} PRIVATE "${PYARROW_INCLUDE_DIR}")
endforeach()

# TODO: Clean up this include when switching to scikit-build-core. See cudf/_lib/CMakeLists.txt for
# more info
find_package(NumPy REQUIRED)
set(targets_using_numpy pylibcudf_interop pylibcudf_scalar)
foreach(target IN LISTS targets_using_numpy)
target_include_directories(${target} PRIVATE "${NumPy_INCLUDE_DIRS}")
# Switch to the line below when we switch back to FindPython.cmake in CMake 3.24.
# target_include_directories(${target} PRIVATE "${Python_NumPy_INCLUDE_DIRS}")
endforeach()
Loading