From f7aa02dced31d7d06d1b5baae53e769c3c0dcaca Mon Sep 17 00:00:00 2001 From: liulx20 <68941872+liulx20@users.noreply.github.com> Date: Tue, 20 Feb 2024 15:16:22 +0800 Subject: [PATCH] fix(interactive): support src and dst of an edge with different type (#3552) --- .../loader/abstract_arrow_fragment_loader.h | 154 +++++++++++------- .../modern_graph_string_edge.yaml | 3 +- .../string_edge_property_test.cc | 13 +- 3 files changed, 106 insertions(+), 64 deletions(-) diff --git a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h index ddb1e7f5c91c..112703d9d940 100644 --- a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h @@ -137,6 +137,55 @@ struct _add_vertex { }; template +void _append(bool is_dst, size_t cur_ind, std::shared_ptr col, + const LFIndexer& indexer, + std::vector>& parsed_edges, + std::vector& degree) { + if constexpr (std::is_same_v) { + if (col->type()->Equals(arrow::utf8())) { + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto str = casted->GetView(j); + std::string_view str_view(str.data(), str.size()); + auto vid = indexer.get_index(Any::From(str_view)); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + degree[vid]++; + } + } else { + // must be large utf8 + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto str = casted->GetView(j); + std::string_view str_view(str.data(), str.size()); + auto vid = indexer.get_index(Any::From(str_view)); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + degree[vid]++; + } + } + } else { + using arrow_array_type = typename gs::TypeConverter::ArrowArrayType; + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto vid = indexer.get_index(Any::From(casted->Value(j))); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + degree[vid]++; + } + } +} + +template static void append_edges( std::shared_ptr src_col, std::shared_ptr dst_col, const LFIndexer& src_indexer, @@ -169,54 +218,6 @@ static void append_edges( VLOG(10) << "resize parsed_edges from" << old_size << " to " << parsed_edges.size(); - auto _append = [&](bool is_dst) { - size_t cur_ind = old_size; - const auto& col = is_dst ? dst_col : src_col; - const auto& indexer = is_dst ? dst_indexer : src_indexer; - if constexpr (std::is_same_v) { - if (col->type()->Equals(arrow::utf8())) { - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto str = casted->GetView(j); - std::string_view str_view(str.data(), str.size()); - auto vid = indexer.get_index(Any::From(str_view)); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else { - // must be large utf8 - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto str = casted->GetView(j); - std::string_view str_view(str.data(), str.size()); - auto vid = indexer.get_index(Any::From(str_view)); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } - } else { - using arrow_array_type = typename gs::TypeConverter::ArrowArrayType; - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } - }; - // if EDATA_T is grape::EmptyType, no need to read columns auto edata_col_thread = std::thread([&]() { if constexpr (!std::is_same::value) { @@ -250,8 +251,15 @@ static void append_edges( VLOG(10) << "Finish inserting: " << src_col->length() << " edges"; } }); - auto src_col_thread = std::thread([&]() { _append(false); }); - auto dst_col_thread = std::thread([&]() { _append(true); }); + size_t cur_ind = old_size; + auto src_col_thread = std::thread([&]() { + _append(false, cur_ind, src_col, src_indexer, + parsed_edges, oe_degree); + }); + auto dst_col_thread = std::thread([&]() { + _append(true, cur_ind, dst_col, dst_indexer, + parsed_edges, ie_degree); + }); src_col_thread.join(); dst_col_thread.join(); edata_col_thread.join(); @@ -374,6 +382,39 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer); } + template + void _append_edges( + std::shared_ptr src_col, + std::shared_ptr dst_col, + const LFIndexer& src_indexer, const LFIndexer& dst_indexer, + std::vector>& property_cols, + const PropertyType& edge_property, + std::vector>& parsed_edges, + std::vector& ie_degree, std::vector& oe_degree) { + auto dst_col_type = dst_col->type(); + if (dst_col_type->Equals(arrow::int64())) { + append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else if (dst_col_type->Equals(arrow::uint64())) { + append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else if (dst_col_type->Equals(arrow::int32())) { + append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else if (dst_col_type->Equals(arrow::uint32())) { + append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else { + // must be string + append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } + } template void addEdgesRecordBatchImpl( label_t src_label_id, label_t dst_label_id, label_t e_label_id, @@ -444,9 +485,6 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { << "unsupported src_col type: " << src_col_type->ToString(); CHECK(check_primary_key_type(dst_col_type)) << "unsupported dst_col type: " << dst_col_type->ToString(); - CHECK(src_col_type->Equals(dst_col_type)) - << "src_col type: " << src_col_type->ToString() - << " neq dst_col type: " << dst_col_type->ToString(); std::vector> property_cols; for (size_t i = 2; i < columns.size(); ++i) { @@ -463,24 +501,24 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { // add edges to vector CHECK(src_col->length() == dst_col->length()); if (src_col_type->Equals(arrow::int64())) { - append_edges( + _append_edges( src_col, dst_col, src_indexer, dst_indexer, property_cols, edge_property, parsed_edges, ie_degree, oe_degree); } else if (src_col_type->Equals(arrow::uint64())) { - append_edges( + _append_edges( src_col, dst_col, src_indexer, dst_indexer, property_cols, edge_property, parsed_edges, ie_degree, oe_degree); } else if (src_col_type->Equals(arrow::int32())) { - append_edges( + _append_edges( src_col, dst_col, src_indexer, dst_indexer, property_cols, edge_property, parsed_edges, ie_degree, oe_degree); } else if (src_col_type->Equals(arrow::uint32())) { - append_edges( + _append_edges( src_col, dst_col, src_indexer, dst_indexer, property_cols, edge_property, parsed_edges, ie_degree, oe_degree); } else { // must be string - append_edges( + _append_edges( src_col, dst_col, src_indexer, dst_indexer, property_cols, edge_property, parsed_edges, ie_degree, oe_degree); } diff --git a/flex/tests/rt_mutable_graph/modern_graph_string_edge.yaml b/flex/tests/rt_mutable_graph/modern_graph_string_edge.yaml index 3b18fc4dbcd2..17a4e62e6fc4 100644 --- a/flex/tests/rt_mutable_graph/modern_graph_string_edge.yaml +++ b/flex/tests/rt_mutable_graph/modern_graph_string_edge.yaml @@ -30,7 +30,8 @@ schema: - property_id: 0 property_name: id property_type: - primitive_type: DT_SIGNED_INT64 + varchar: + max_length: 64 - property_id: 1 property_name: name property_type: diff --git a/flex/tests/rt_mutable_graph/string_edge_property_test.cc b/flex/tests/rt_mutable_graph/string_edge_property_test.cc index bda1011be08c..39117e8d9bf0 100644 --- a/flex/tests/rt_mutable_graph/string_edge_property_test.cc +++ b/flex/tests/rt_mutable_graph/string_edge_property_test.cc @@ -27,17 +27,20 @@ class TestStringEdgeProperty { src_label_(db.graph().schema().get_vertex_label_id("person")), dst_label_(db.graph().schema().get_vertex_label_id("software")), edge_label_(db.graph().schema().get_edge_label_id("created")) {} + void test() { - int64_t src, dst; + int64_t src; + std::string dst{}; src = 1; test_get_edge(src); src = 1; - dst = 3; + dst = "5"; test_get_graph_view(src, dst); src = 3; - dst = 5; + dst = "5"; test_add_edge(src, dst); } + void test_get_edge(int64_t oid) { vid_t src_lid; CHECK(db_.graph().get_lid(src_label_, oid, src_lid)); @@ -50,7 +53,7 @@ class TestStringEdgeProperty { LOG(INFO) << "Finish test get edge\n"; } - void test_get_graph_view(int64_t src, int64_t dst) { + void test_get_graph_view(int64_t src, const std::string& dst) { auto txn = db_.GetReadTransaction(); vid_t src_lid, dst_lid; @@ -77,7 +80,7 @@ class TestStringEdgeProperty { LOG(INFO) << "Finish test get GraphView\n"; } - void test_add_edge(int64_t src, int64_t dst) { + void test_add_edge(int64_t src, const std::string& dst) { { auto txn = db_.GetSingleVertexInsertTransaction(); std::string name = "test-3";