Skip to content

Commit

Permalink
[Cylon] Arrow 14.0.2 Integration
Browse files Browse the repository at this point in the history
Signed-off-by: Arup Sarker <[email protected]>
  • Loading branch information
arupcsedu committed Apr 18, 2024
1 parent 03e0338 commit a6eec5e
Show file tree
Hide file tree
Showing 19 changed files with 67 additions and 58 deletions.
14 changes: 9 additions & 5 deletions conda/environments/cylon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ channels:
- conda-forge
- defaults
dependencies:
#- python>=3.8
- python>=3.8,<3.10
- cmake>=3.23.1,!=3.25.0
- arrow-cpp=9
- pyarrow=9.0.0
- cmake>=3.23.1
- arrow-cpp=14.0.2
- pyarrow=14.0.2
- glog
- openmpi=4.1.3=ha1ae619_105
#- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
- cython>=0.29.31,<3
- numpy<1.24.4
- pandas>=1.0,<2.0.0
- pandas>=1.0
- fsspec>=0.6.0
- setuptools
# they are not needed for using pygcylon or compiling it
- pytest
- pytest-mpi
- mpi4py
#- gcc=11.4
#- gxx=11.4
#- gxx_linux-64=11.4
4 changes: 2 additions & 2 deletions conda/environments/cylon_MacOS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ channels:
dependencies:
- python>=3.9,<3.10
- cmake>=3.23.1,!=3.25.0
- arrow-cpp=9
- pyarrow=9.0.0
- arrow-cpp=14.0.2
- pyarrow=14.0.2
- glog
- openmpi>=4.1.2
- cython>=0.29.31,<3
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/cylon_NoUCX.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ channels:
dependencies:
- python>=3.8,<3.10
- cmake>=3.23.1,!=3.25.0
- arrow-cpp=9
- pyarrow=9.0.0
- arrow-cpp=14.0.2
- pyarrow=14.0.2
- glog
- openmpi=4.1.3=ha1ae619_105
- cython>=0.29.31,<3
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/cylon_rivanna_1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ channels:
dependencies:
- python>=3.8
- cmake>=3.23.1,!=3.25.0
- arrow-cpp=9
- pyarrow=9.0.0
- arrow-cpp=14.0.2
- pyarrow=14.0.2
- glog
- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/cylon_rivanna_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ channels:
dependencies:
- python=3.10
- cmake>=3.23.1,!=3.25.0
- arrow-cpp=9
- pyarrow=9.0.0
- arrow-cpp=14.0.2
- pyarrow=14.0.2
- glog
#- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/gcylon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ channels:
dependencies:
- python>=3.8,<3.10
- cmake>=3.23.1,!=3.25.0
- arrow-cpp=9
- pyarrow=9.0.0
- arrow-cpp=14.0.2
- pyarrow=14.0.2
- cython>=0.29.31,<3
- cudf=22.12.01
- cudatoolkit=11.5
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ channels:
dependencies:
- python>=3.8,<3.10
- cmake>=3.23.1,!=3.25.0
- arrow-cpp=9
- pyarrow=9.0.0
- arrow-cpp=14.0.2
- pyarrow=14.0.2
- glog
- msmpi
- cython>=0.29.31,<3
Expand Down
8 changes: 4 additions & 4 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
cmake_minimum_required(VERSION 3.17 FATAL_ERROR)
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

project(CYLON VERSION 0.6.0)
project(CYLON VERSION 0.7.0)

set(CYLON_VERSION 0.6.0)
set(CYLON_VERSION 0.7.0)

## defaults to release build
if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release)
endif ()

# cmake modules directories
set(CYLON_ARROW_VERSION 9.0.0)
set(CYLON_ARROW_VERSION 14.0.2)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake/Modules/" ${CMAKE_MODULE_PATH})
list(APPEND CMAKE_MODULE_PATH ${CYLON_SOURCE_DIR}/CMake)

Expand Down Expand Up @@ -74,7 +74,7 @@ else ()
endif ()

# C++ standard
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
set(GCC_ABI_COMPILE_FLAGS "-D_GLIBCXX_USE_CXX11_ABI=0")
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/cylon/arrow/arrow_comparator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ struct CompareFunc<ArrowT, Asc, arrow::enable_if_floating_point<ArrowT>> {
template<typename ArrowT, bool Asc>
struct CompareFunc<ArrowT, Asc, arrow::enable_if_has_string_view<ArrowT>> {

static int compare(const arrow::util::string_view &v1, const arrow::util::string_view &v2) {
static int compare(const std::string_view &v1, const std::string_view &v2) {
if (Asc) {
return v1.compare(v2);
} else {
Expand Down Expand Up @@ -220,7 +220,7 @@ class EmptyIndexComparator : public ArrayIndexComparator {
/*
* Single implementation for both numeric and binary comparators. array->GetView(idx) method is
* used here. For Numeric arrays, this would translate to value by pointer offset. For binary,
* this will take arrow::util::string_view.
* this will take std::string_view.
*/
template<typename ArrowT, bool Asc, bool NullOrder>
class ArrayIndexComparatorWithNulls : public ArrayIndexComparator {
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/cylon/arrow/arrow_partition_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class FixedSizeBinaryHashPartitionKernel : public HashPartitionKernel {
return
visit_chunked_array<arrow::FixedSizeBinaryType>(
idx_col,
[&](uint64_t global_idx, arrow::util::string_view val) {
[&](uint64_t global_idx, std::string_view val) {
uint32_t hash = 0;
util::MurmurHash3_x86_32(&val, len, 0, &hash);
hash += 31 * target_partitions[global_idx];
Expand All @@ -276,7 +276,7 @@ class FixedSizeBinaryHashPartitionKernel : public HashPartitionKernel {
} else {
return visit_chunked_array<arrow::FixedSizeBinaryType>(
idx_col,
[&](uint64_t global_idx, arrow::util::string_view val) {
[&](uint64_t global_idx, std::string_view val) {
uint32_t hash = 0;
util::MurmurHash3_x86_32(&val, len, 0, &hash);
hash += 31 * target_partitions[global_idx];
Expand All @@ -302,7 +302,7 @@ class FixedSizeBinaryHashPartitionKernel : public HashPartitionKernel {

return visit_chunked_array<arrow::FixedSizeBinaryType>
(idx_col,
[&](uint64_t global_idx, arrow::util::string_view val) {
[&](uint64_t global_idx, std::string_view val) {
uint32_t hash = 0;
util::MurmurHash3_x86_32(&val, byte_width, 0, &hash);
hash += 31 * partial_hashes[global_idx];
Expand Down Expand Up @@ -348,7 +348,7 @@ class BinaryHashPartitionKernel : public HashPartitionKernel {
return
visit_chunked_array<ArrowT>(
idx_col,
[&](uint64_t global_idx, arrow::util::string_view val) {
[&](uint64_t global_idx, std::string_view val) {
uint32_t hash = 0;
util::MurmurHash3_x86_32(&val, static_cast<int>(val.size()), 0, &hash);
hash += 31 * target_partitions[global_idx];
Expand All @@ -365,7 +365,7 @@ class BinaryHashPartitionKernel : public HashPartitionKernel {
return
visit_chunked_array<ArrowT>(
idx_col,
[&](uint64_t global_idx, arrow::util::string_view val) {
[&](uint64_t global_idx, std::string_view val) {
uint32_t hash = 0;
util::MurmurHash3_x86_32(&val, static_cast<int>(val.size()), 0, &hash);
hash += 31 * target_partitions[global_idx];
Expand All @@ -390,7 +390,7 @@ class BinaryHashPartitionKernel : public HashPartitionKernel {
return
visit_chunked_array<arrow::FixedSizeBinaryType>(
idx_col,
[&](uint64_t global_idx, arrow::util::string_view val) {
[&](uint64_t global_idx, std::string_view val) {
uint32_t hash = 0;
util::MurmurHash3_x86_32(&val, static_cast<int>(val.size()), 0, &hash);
hash += 31 * partial_hashes[global_idx];
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/cylon/arrow/arrow_type_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ template<typename ArrowT>
struct ArrowTypeTraits<ArrowT, arrow::enable_if_has_string_view<ArrowT>> {
using ScalarT = typename arrow::TypeTraits<ArrowT>::ScalarType;
using ArrayT = typename arrow::TypeTraits<ArrowT>::ArrayType;
using ValueT = arrow::util::string_view;
using ValueT = std::string_view;

static ValueT ExtractFromScalar(const std::shared_ptr<arrow::Scalar> &scalar) {
return ValueT(*(std::static_pointer_cast<ScalarT>(scalar))->value);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/cylon/ctx/arrow_memory_pool_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ arrow::MemoryPool *cylon::ToArrowPool(cylon::MemoryPool *pool) {
return arrow::default_memory_pool();
} else {
// todo this is dangerous! return a smart pointer
return new ProxyMemoryPool(pool);
//return new ProxyMemoryPool(pool);
return arrow::default_memory_pool();
}
}
14 changes: 7 additions & 7 deletions cpp/src/cylon/ctx/arrow_memory_pool_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,31 @@ class ProxyMemoryPool : public arrow::MemoryPool {
this->tx_memory = tx_memory;
}

~ProxyMemoryPool() override {
~ProxyMemoryPool() override{
delete tx_memory;
}

arrow::Status Allocate(int64_t size, uint8_t **out) override {
arrow::Status Allocate(int64_t size, uint8_t **out) {
return ArrowStatus(tx_memory->Allocate(size, out));
}

arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t **ptr) override {
arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t **ptr) {
return ArrowStatus(tx_memory->Reallocate(old_size, new_size, ptr));
};

void Free(uint8_t *buffer, int64_t size) override {
void Free(uint8_t *buffer, int64_t size) {
tx_memory->Free(buffer, size);
}

int64_t bytes_allocated() const override {
int64_t bytes_allocated() const {
return this->tx_memory->bytes_allocated();
}

int64_t max_memory() const override {
int64_t max_memory() const {
return this->tx_memory->max_memory();
}

std::string backend_name() const override {
std::string backend_name() const {
return this->tx_memory->backend_name();
}

Expand Down
6 changes: 4 additions & 2 deletions cpp/src/cylon/indexing/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ Status ArrowLinearIndex::LocationByValue(const std::shared_ptr<arrow::Scalar> &s
auto cast_val = search_param->CastTo(index_array_->type()).ValueOrDie();
for (int64_t ix = 0; ix < index_array_->length(); ix++) {
auto val = index_array_->GetScalar(ix).ValueOrDie();
if (cast_val->Equals(val)) {
if (cast_val->Equals(*(std::static_pointer_cast<arrow::Scalar>(val)))) {
//if (cast_val->Equals(val)) {
find_index.push_back(ix);
}
}
Expand All @@ -128,7 +129,8 @@ Status ArrowLinearIndex::LocationByValue(const std::shared_ptr<arrow::Scalar> &s
auto cast_val = search_param->CastTo(index_array_->type()).ValueOrDie();
for (int64_t ix = 0; ix < index_array_->length(); ix++) {
auto val = index_array_->GetScalar(ix).ValueOrDie();
if (cast_val->Equals(val)) {
if (cast_val->Equals(*(std::static_pointer_cast<arrow::Scalar>(val)))) {
//if (cast_val->Equals(val)) {
*find_index = ix;
break;
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/cylon/join/sort_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,27 +654,27 @@ Status SortJoin(const std::shared_ptr<arrow::Table> &left_tab,
joined_table,
memory_pool);
case arrow::Type::STRING:
return do_single_column_join<arrow::StringType, arrow::util::string_view>(
return do_single_column_join<arrow::StringType, std::string>(
left_tab, right_tab, left_indices[0], right_indices[0],
join_config.GetType(), join_config.GetLeftTableSuffix(),
join_config.GetRightTableSuffix(), joined_table, memory_pool);
case arrow::Type::BINARY:
return do_single_column_join<arrow::BinaryType, arrow::util::string_view>(
return do_single_column_join<arrow::BinaryType, std::string>(
left_tab, right_tab, left_indices[0], right_indices[0],
join_config.GetType(), join_config.GetLeftTableSuffix(),
join_config.GetRightTableSuffix(), joined_table, memory_pool);
case arrow::Type::LARGE_STRING:
return do_single_column_join<arrow::LargeStringType, arrow::util::string_view>(
return do_single_column_join<arrow::LargeStringType, std::string>(
left_tab, right_tab, left_indices[0], right_indices[0],
join_config.GetType(), join_config.GetLeftTableSuffix(),
join_config.GetRightTableSuffix(), joined_table, memory_pool);
case arrow::Type::LARGE_BINARY:
return do_single_column_join<arrow::LargeBinaryType, arrow::util::string_view>(
return do_single_column_join<arrow::LargeBinaryType, std::string>(
left_tab, right_tab, left_indices[0], right_indices[0],
join_config.GetType(), join_config.GetLeftTableSuffix(),
join_config.GetRightTableSuffix(), joined_table, memory_pool);
case arrow::Type::FIXED_SIZE_BINARY:
return do_single_column_join<arrow::FixedSizeBinaryType, arrow::util::string_view>(
return do_single_column_join<arrow::FixedSizeBinaryType, std::string>(
left_tab, right_tab, left_indices[0], right_indices[0],
join_config.GetType(), join_config.GetLeftTableSuffix(),
join_config.GetRightTableSuffix(), joined_table, memory_pool);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/cylon/util/flatten_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ struct BinaryColumnFlattenKernelImpl : public ColumnFlattenKernel {
} else {
int64_t i = 1; // dont update offsets[0]
arrow::VisitArraySpanInline<ArrowT>(*array_data,
[&](const arrow::util::string_view &val) {
[&](const std::string_view &val) {
offsets[i] += static_cast<int32_t>(val.size());
i++;
},
Expand Down
16 changes: 9 additions & 7 deletions cpp/test/comparator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ struct Helper<T, std::enable_if_t<std::is_arithmetic<T>::value>> {
};

template<>
struct Helper<arrow::util::string_view> {
static arrow::util::string_view max() { return "ZZZZ"; }
static arrow::util::string_view min() { return ""; }
struct Helper<std::string_view> {
static std::string_view max() { return "ZZZZ"; }
static std::string_view min() { return ""; }

static int compare(bool asc,
const arrow::util::string_view &v1,
const arrow::util::string_view &v2) {
const std::string_view &v1,
const std::string_view &v2) {
return asc ? v1.compare(v2) : v2.compare(v1);
}
};
Expand Down Expand Up @@ -229,7 +229,8 @@ TEST_CASE("test table", "[comp]") {
return std::all_of(cols.begin(), cols.end(), [&](auto c) {
auto v1 = *c->chunk(0)->GetScalar(i);
auto v2 = *c->chunk(0)->GetScalar(j);
return v1->Equals(v2);
return v1->Equals(*(std::static_pointer_cast<arrow::Scalar>(v2)));
//return v1->Equals(v2);
});
};

Expand All @@ -249,7 +250,8 @@ TEST_CASE("test table", "[comp]") {
for (const auto &c: table->columns()) {
auto v1 = *c->chunk(0)->GetScalar(i);
auto v2 = *c->chunk(0)->GetScalar(j);
if (v1->Equals(v2)) {
if(v1->Equals(*(std::static_pointer_cast<arrow::Scalar>(v2)))){
//if (v1->Equals(v2)) {
continue;
}

Expand Down
8 changes: 4 additions & 4 deletions cpp/test/test_arrow_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ using ArrowBinaryTypes = std::tuple<arrow::StringType, arrow::LargeStringType,
auto array = ArrayFromJSON(type, R"(["a", "b", "c"])");
*/
std::shared_ptr<arrow::Array> ArrayFromJSON(const std::shared_ptr<arrow::DataType> &type,
arrow::util::string_view json) {
std::string_view json) {
const auto &res = arrow::ipc::internal::json::ArrayFromJSON(type, json);
ARROW_ABORT_NOT_OK(res.status());
return res.ValueOrDie();
}

std::shared_ptr<arrow::Array> DictArrayFromJSON(const std::shared_ptr<arrow::DataType> &type,
arrow::util::string_view indices_json,
arrow::util::string_view dictionary_json) {
std::string_view indices_json,
std::string_view dictionary_json) {
std::shared_ptr<arrow::Array> out;
ARROW_ABORT_NOT_OK(arrow::ipc::internal::json::DictArrayFromJSON(type, indices_json, dictionary_json, &out));
return out;
Expand Down Expand Up @@ -105,7 +105,7 @@ std::shared_ptr<arrow::ChunkedArray> ChunkedArrayFromJSON(const std::shared_ptr<
])");
*/
std::shared_ptr<arrow::RecordBatch> RecordBatchFromJSON(const std::shared_ptr<arrow::Schema> &schema,
arrow::util::string_view json) {
std::string_view json) {
// Parse as a StructArray
auto struct_type = struct_(schema->fields());
std::shared_ptr<arrow::Array> struct_array = ArrayFromJSON(struct_type, json);
Expand Down
2 changes: 1 addition & 1 deletion python/pycylon/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
extra_compile_args = []
extra_link_args = []

std_version = '-std=c++14'
std_version = '-std=c++17'
extra_compile_args.extend([std_version, '-DARROW_METADATA_V4 -DNEED_EXCLUSIVE_SCAN'])
extra_compile_args.append('-DOMPI_SKIP_MPICXX=1')

Expand Down

0 comments on commit a6eec5e

Please sign in to comment.