Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Support PTHash indexer #3558

Merged
merged 18 commits into from
Feb 21, 2024
27 changes: 26 additions & 1 deletion flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" ON)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)

option(USE_PTHASH "Whether to use pthash" OFF)

#print options
message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}")
message(STATUS "Build test: ${BUILD_TEST}")
message(STATUS "Build doc: ${BUILD_DOC}")
message(STATUS "Build odps fragment loader: ${BUILD_ODPS_FRAGMENT_LOADER}")

message(STATUS "Use pthash indexer : ${USE_PTHASH}")

# ------------------------------------------------------------------------------
# cmake configs
# ------------------------------------------------------------------------------
Expand All @@ -40,6 +44,11 @@ if (BUILD_HQPS)
add_definitions(-DBUILD_HQPS)
endif ()

if(USE_PTHASH)
message("Use PTHash")
add_definitions(-DUSE_PTHASH)
endif()

execute_process(COMMAND uname -r OUTPUT_VARIABLE LINUX_KERNEL_VERSION)
string(STRIP ${LINUX_KERNEL_VERSION} LINUX_KERNEL_VERSION)
message(${LINUX_KERNEL_VERSION})
Expand All @@ -50,6 +59,11 @@ endif()

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../)

if(USE_PTHASH)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/pthash)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
endif()

set(DEFAULT_BUILD_TYPE "Release")
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.")
Expand All @@ -71,7 +85,8 @@ else ()
endif ()
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp -Werror -std=c++17 -Wall -fPIC")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp -Werror -std=c++17 -Wall -fPIC -march=native")

set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")

Expand Down Expand Up @@ -225,4 +240,14 @@ set(CPACK_DEB_COMPONENT_INSTALL YES)
#install CMakeLists.txt.template to resources/
install(FILES resources/hqps/CMakeLists.txt.template DESTINATION lib/flex/)

if(USE_PTHASH)
install(DIRECTORY ${PROJECT_SOURCE_DIR}/third_party/murmurhash
${PROJECT_SOURCE_DIR}/third_party/pthash
DESTINATION include
FILES_MATCHING
PATTERN "*.h"
PATTERN "*.hpp"
)
endif()

include(CPack)
1 change: 0 additions & 1 deletion flex/engines/graph_db/grin/src/property/property.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ const void* grin_get_vertex_property_value(GRIN_GRAPH g, GRIN_VERTEX v,
}
case GRIN_DATATYPE::String: {
auto _col = static_cast<const gs::StringColumn*>(col);
auto view = _col->get_view(vid);
auto s = _col->get_view(vid);
auto len = s.size() + 1;
char* out = new char[len];
Expand Down
169 changes: 163 additions & 6 deletions flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void check_edge_invariant(

template <typename KEY_T>
struct _add_vertex {
#ifndef USE_PTHASH
void operator()(const std::shared_ptr<arrow::Array>& col,
IdIndexer<KEY_T, vid_t>& indexer, std::vector<vid_t>& vids) {
size_t row_num = col->length();
Expand Down Expand Up @@ -134,11 +135,51 @@ struct _add_vertex {
}
}
}

#else
void operator()(const std::shared_ptr<arrow::Array>& col,
PTIndexerBuilder<KEY_T, vid_t>& indexer) {
size_t row_num = col->length();
if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
// for non-string value
auto expected_type = gs::TypeConverter<KEY_T>::ArrowTypeValue();
using arrow_array_t = typename gs::TypeConverter<KEY_T>::ArrowArrayType;
if (!col->type()->Equals(expected_type)) {
LOG(FATAL) << "Inconsistent data type, expect "
<< expected_type->ToString() << ", but got "
<< col->type()->ToString();
}
auto casted_array = std::static_pointer_cast<arrow_array_t>(col);
for (size_t i = 0; i < row_num; ++i) {
indexer.add_vertex(casted_array->Value(i));
}
} else {
if (col->type()->Equals(arrow::utf8())) {
auto casted_array = std::static_pointer_cast<arrow::StringArray>(col);
for (size_t i = 0; i < row_num; ++i) {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
indexer.add_vertex(str_view);
}
} else if (col->type()->Equals(arrow::large_utf8())) {
auto casted_array =
std::static_pointer_cast<arrow::LargeStringArray>(col);
for (size_t i = 0; i < row_num; ++i) {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
indexer.add_vertex(str_view);
}
} else {
LOG(FATAL) << "Not support type: " << col->type()->ToString();
}
}
}
#endif
};

template <typename PK_T, typename EDATA_T>
void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
const LFIndexer<vid_t>& indexer,
const IndexerType& indexer,
std::vector<std::tuple<vid_t, vid_t, EDATA_T>>& parsed_edges,
std::vector<int32_t>& degree) {
if constexpr (std::is_same_v<PK_T, std::string_view>) {
Expand Down Expand Up @@ -188,14 +229,14 @@ void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
template <typename SRC_PK_T, typename DST_PK_T, typename EDATA_T>
static void append_edges(
std::shared_ptr<arrow::Array> src_col,
std::shared_ptr<arrow::Array> dst_col, const LFIndexer<vid_t>& src_indexer,
const LFIndexer<vid_t>& dst_indexer,
std::shared_ptr<arrow::Array> dst_col, const IndexerType& src_indexer,
const IndexerType& dst_indexer,
std::vector<std::shared_ptr<arrow::Array>>& edata_cols,
const PropertyType& edge_prop,
std::vector<std::tuple<vid_t, vid_t, EDATA_T>>& parsed_edges,
std::vector<int32_t>& ie_degree, std::vector<int32_t>& oe_degree) {
CHECK(src_col->length() == dst_col->length());
auto indexer_check_lambda = [](const LFIndexer<vid_t>& cur_indexer,
auto indexer_check_lambda = [](const IndexerType& cur_indexer,
const std::shared_ptr<arrow::Array>& cur_col) {
if (cur_indexer.get_type() == PropertyType::kInt64) {
CHECK(cur_col->type()->Equals(arrow::int64()));
Expand Down Expand Up @@ -325,6 +366,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader {
VLOG(10) << "Insert rows: " << row_num;
}

#ifndef USE_PTHASH
template <typename KEY_T>
void addVertexRecordBatchImpl(
label_t v_label_id, const std::vector<std::string>& v_files,
Expand Down Expand Up @@ -381,12 +423,127 @@ class AbstractArrowFragmentLoader : public IFragmentLoader {
}
basic_fragment_loader_.FinishAddingVertex<KEY_T>(v_label_id, indexer);
}
#else
template <typename KEY_T>
void addVertexRecordBatchImpl(
label_t v_label_id, const std::vector<std::string>& v_files,
std::function<std::shared_ptr<IRecordBatchSupplier>(
label_t, const std::string&, const LoadingConfig&)>
supplier_creator) {
std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
<< v_label_name;
auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
auto primary_key_name = std::get<1>(primary_key);
size_t primary_key_ind = std::get<2>(primary_key);
PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
std::vector<std::shared_ptr<arrow::RecordBatch>> batchs;
for (auto& v_file : v_files) {
VLOG(10) << "Parsing vertex file:" << v_file << " for label "
<< v_label_name;
auto record_batch_supplier =
supplier_creator(v_label_id, v_file, loading_config_);

bool first_batch = true;
while (true) {
auto batch = record_batch_supplier->GetNextBatch();
if (!batch) {
break;
}
batchs.emplace_back(batch);
if (first_batch) {
auto header = batch->schema()->field_names();
auto schema_column_names =
schema_.get_vertex_property_names(v_label_id);
CHECK(schema_column_names.size() + 1 == header.size())
<< "File header of size: " << header.size()
<< " does not match schema column size: "
<< schema_column_names.size() + 1;
first_batch = false;
}
auto columns = batch->columns();
CHECK(primary_key_ind < columns.size());
auto primary_key_column = columns[primary_key_ind];
_add_vertex<KEY_T>()(primary_key_column, indexer_builder);
}
VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
<< v_label_name;
}
basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer_builder);
const auto& indexer = basic_fragment_loader_.GetLFIndexer(v_label_id);

std::vector<std::thread> work_threads;
std::atomic<size_t> cur_batch_id(0);
for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
work_threads.emplace_back([&]() {
while (true) {
auto id = cur_batch_id.fetch_add(1);
if (id >= batchs.size()) {
break;
}
auto batch = batchs[id];
auto columns = batch->columns();
auto other_columns_array = columns;
auto primary_key_column = columns[primary_key_ind];
size_t row_num = primary_key_column->length();
std::vector<vid_t> vids;
if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
using arrow_array_t =
typename gs::TypeConverter<KEY_T>::ArrowArrayType;
auto casted_array =
std::static_pointer_cast<arrow_array_t>(primary_key_column);
for (size_t i = 0; i < row_num; ++i) {
vids.emplace_back(indexer.get_index(casted_array->Value(i)));
}
} else {
if (primary_key_column->type()->Equals(arrow::utf8())) {
auto casted_array = std::static_pointer_cast<arrow::StringArray>(
primary_key_column);
for (size_t i = 0; i < row_num; ++i) {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
vids.emplace_back(indexer.get_index(str_view));
}
} else if (primary_key_column->type()->Equals(
arrow::large_utf8())) {
auto casted_array =
std::static_pointer_cast<arrow::LargeStringArray>(
primary_key_column);
for (size_t i = 0; i < row_num; ++i) {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
vids.emplace_back(indexer.get_index(str_view));
}
}
}
other_columns_array.erase(other_columns_array.begin() +
primary_key_ind);

for (size_t j = 0; j < other_columns_array.size(); ++j) {
auto array = other_columns_array[j];
auto chunked_array = std::make_shared<arrow::ChunkedArray>(array);
set_vertex_properties(
basic_fragment_loader_.GetVertexTable(v_label_id)
.column_ptrs()[j],
chunked_array, vids);
}
}
});
}
for (auto& t : work_threads) {
t.join();
}

VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
<< v_label_name;
}
#endif

template <typename SRC_PK_T, typename EDATA_T>
void _append_edges(
std::shared_ptr<arrow::Array> src_col,
std::shared_ptr<arrow::Array> dst_col,
const LFIndexer<vid_t>& src_indexer, const LFIndexer<vid_t>& dst_indexer,
std::shared_ptr<arrow::Array> dst_col, const IndexerType& src_indexer,
const IndexerType& dst_indexer,
std::vector<std::shared_ptr<arrow::Array>>& property_cols,
const PropertyType& edge_property,
std::vector<std::tuple<vid_t, vid_t, EDATA_T>>& parsed_edges,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ void BasicFragmentLoader::AddVertexBatch(
}
}

const LFIndexer<vid_t>& BasicFragmentLoader::GetLFIndexer(
label_t v_label) const {
const IndexerType& BasicFragmentLoader::GetLFIndexer(label_t v_label) const {
CHECK(v_label < vertex_label_num_);
return lf_indexers_[v_label];
}

IndexerType& BasicFragmentLoader::GetLFIndexer(label_t v_label) {
CHECK(v_label < vertex_label_num_);
return lf_indexers_[v_label];
}
Expand Down
25 changes: 19 additions & 6 deletions flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BasicFragmentLoader {
CHECK(col_ind < dst_columns.size());
dst_columns[col_ind]->set_any(vid, prop);
}

#ifndef USE_PTHASH
template <typename KEY_T>
void FinishAddingVertex(label_t v_label,
const IdIndexer<KEY_T, vid_t>& indexer) {
Expand All @@ -65,10 +65,22 @@ class BasicFragmentLoader {
auto primary_keys = schema_.get_vertex_primary_key(v_label);
auto type = std::get<0>(primary_keys[0]);

build_lf_indexer<KEY_T, vid_t>(indexer, filename, lf_indexers_[v_label],
snapshot_dir(work_dir_, 0),
tmp_dir(work_dir_), type);
build_lf_indexer<KEY_T, vid_t>(
indexer, LFIndexer<vid_t>::prefix() + "_" + filename,
lf_indexers_[v_label], snapshot_dir(work_dir_, 0), tmp_dir(work_dir_),
type);
}
#else
template <typename KEY_T>
void FinishAddingVertex(label_t v_label,
PTIndexerBuilder<KEY_T, vid_t>& indexer_builder) {
CHECK(v_label < vertex_label_num_);
std::string filename =
vertex_map_prefix(schema_.get_vertex_label_name(v_label));
indexer_builder.finish(PTIndexer<vid_t>::prefix() + "_" + filename,
snapshot_dir(work_dir_, 0), lf_indexers_[v_label]);
}
#endif

template <typename EDATA_T>
void AddNoPropEdgeBatch(label_t src_label_id, label_t dst_label_id,
Expand Down Expand Up @@ -178,14 +190,15 @@ class BasicFragmentLoader {
}

// get lf_indexer
const LFIndexer<vid_t>& GetLFIndexer(label_t v_label) const;
const IndexerType& GetLFIndexer(label_t v_label) const;
IndexerType& GetLFIndexer(label_t v_label);

private:
void init_vertex_data();
const Schema& schema_;
std::string work_dir_;
size_t vertex_label_num_, edge_label_num_;
std::vector<LFIndexer<vid_t>> lf_indexers_;
std::vector<IndexerType> lf_indexers_;
std::vector<CsrBase*> ie_, oe_;
std::vector<DualCsrBase*> dual_csr_list_;
std::vector<Table> vertex_data_;
Expand Down
Loading
Loading