Skip to content

Commit

Permalink
fix(interactive): support src and dst of an edge with different type (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 authored Feb 20, 2024
1 parent 738f645 commit f7aa02d
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 64 deletions.
154 changes: 96 additions & 58 deletions flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,55 @@ struct _add_vertex {
};

template <typename PK_T, typename EDATA_T>
void _append(bool is_dst, size_t cur_ind, std::shared_ptr<arrow::Array> col,
const LFIndexer<vid_t>& indexer,
std::vector<std::tuple<vid_t, vid_t, EDATA_T>>& parsed_edges,
std::vector<int32_t>& degree) {
if constexpr (std::is_same_v<PK_T, std::string_view>) {
if (col->type()->Equals(arrow::utf8())) {
auto casted = std::static_pointer_cast<arrow::StringArray>(col);
for (auto j = 0; j < casted->length(); ++j) {
auto str = casted->GetView(j);
std::string_view str_view(str.data(), str.size());
auto vid = indexer.get_index(Any::From(str_view));
if (is_dst) {
std::get<1>(parsed_edges[cur_ind++]) = vid;
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
degree[vid]++;
}
} else {
// must be large utf8
auto casted = std::static_pointer_cast<arrow::LargeStringArray>(col);
for (auto j = 0; j < casted->length(); ++j) {
auto str = casted->GetView(j);
std::string_view str_view(str.data(), str.size());
auto vid = indexer.get_index(Any::From(str_view));
if (is_dst) {
std::get<1>(parsed_edges[cur_ind++]) = vid;
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
degree[vid]++;
}
}
} else {
using arrow_array_type = typename gs::TypeConverter<PK_T>::ArrowArrayType;
auto casted = std::static_pointer_cast<arrow_array_type>(col);
for (auto j = 0; j < casted->length(); ++j) {
auto vid = indexer.get_index(Any::From(casted->Value(j)));
if (is_dst) {
std::get<1>(parsed_edges[cur_ind++]) = vid;
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
degree[vid]++;
}
}
}

template <typename SRC_PK_T, typename DST_PK_T, typename EDATA_T>
static void append_edges(
std::shared_ptr<arrow::Array> src_col,
std::shared_ptr<arrow::Array> dst_col, const LFIndexer<vid_t>& src_indexer,
Expand Down Expand Up @@ -169,54 +218,6 @@ static void append_edges(
VLOG(10) << "resize parsed_edges from" << old_size << " to "
<< parsed_edges.size();

auto _append = [&](bool is_dst) {
size_t cur_ind = old_size;
const auto& col = is_dst ? dst_col : src_col;
const auto& indexer = is_dst ? dst_indexer : src_indexer;
if constexpr (std::is_same_v<PK_T, std::string_view>) {
if (col->type()->Equals(arrow::utf8())) {
auto casted = std::static_pointer_cast<arrow::StringArray>(col);
for (auto j = 0; j < casted->length(); ++j) {
auto str = casted->GetView(j);
std::string_view str_view(str.data(), str.size());
auto vid = indexer.get_index(Any::From(str_view));
if (is_dst) {
std::get<1>(parsed_edges[cur_ind++]) = vid;
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
is_dst ? ie_degree[vid]++ : oe_degree[vid]++;
}
} else {
// must be large utf8
auto casted = std::static_pointer_cast<arrow::LargeStringArray>(col);
for (auto j = 0; j < casted->length(); ++j) {
auto str = casted->GetView(j);
std::string_view str_view(str.data(), str.size());
auto vid = indexer.get_index(Any::From(str_view));
if (is_dst) {
std::get<1>(parsed_edges[cur_ind++]) = vid;
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
is_dst ? ie_degree[vid]++ : oe_degree[vid]++;
}
}
} else {
using arrow_array_type = typename gs::TypeConverter<PK_T>::ArrowArrayType;
auto casted = std::static_pointer_cast<arrow_array_type>(col);
for (auto j = 0; j < casted->length(); ++j) {
auto vid = indexer.get_index(Any::From(casted->Value(j)));
if (is_dst) {
std::get<1>(parsed_edges[cur_ind++]) = vid;
} else {
std::get<0>(parsed_edges[cur_ind++]) = vid;
}
is_dst ? ie_degree[vid]++ : oe_degree[vid]++;
}
}
};

// if EDATA_T is grape::EmptyType, no need to read columns
auto edata_col_thread = std::thread([&]() {
if constexpr (!std::is_same<EDATA_T, grape::EmptyType>::value) {
Expand Down Expand Up @@ -250,8 +251,15 @@ static void append_edges(
VLOG(10) << "Finish inserting: " << src_col->length() << " edges";
}
});
auto src_col_thread = std::thread([&]() { _append(false); });
auto dst_col_thread = std::thread([&]() { _append(true); });
size_t cur_ind = old_size;
auto src_col_thread = std::thread([&]() {
_append<SRC_PK_T, EDATA_T>(false, cur_ind, src_col, src_indexer,
parsed_edges, oe_degree);
});
auto dst_col_thread = std::thread([&]() {
_append<DST_PK_T, EDATA_T>(true, cur_ind, dst_col, dst_indexer,
parsed_edges, ie_degree);
});
src_col_thread.join();
dst_col_thread.join();
edata_col_thread.join();
Expand Down Expand Up @@ -374,6 +382,39 @@ class AbstractArrowFragmentLoader : public IFragmentLoader {
basic_fragment_loader_.FinishAddingVertex<KEY_T>(v_label_id, indexer);
}

template <typename SRC_PK_T, typename EDATA_T>
void _append_edges(
std::shared_ptr<arrow::Array> src_col,
std::shared_ptr<arrow::Array> dst_col,
const LFIndexer<vid_t>& src_indexer, const LFIndexer<vid_t>& dst_indexer,
std::vector<std::shared_ptr<arrow::Array>>& property_cols,
const PropertyType& edge_property,
std::vector<std::tuple<vid_t, vid_t, EDATA_T>>& parsed_edges,
std::vector<int32_t>& ie_degree, std::vector<int32_t>& oe_degree) {
auto dst_col_type = dst_col->type();
if (dst_col_type->Equals(arrow::int64())) {
append_edges<SRC_PK_T, int64_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else if (dst_col_type->Equals(arrow::uint64())) {
append_edges<SRC_PK_T, uint64_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else if (dst_col_type->Equals(arrow::int32())) {
append_edges<SRC_PK_T, int32_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else if (dst_col_type->Equals(arrow::uint32())) {
append_edges<SRC_PK_T, uint32_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else {
// must be string
append_edges<SRC_PK_T, std::string_view, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
}
}
template <typename EDATA_T>
void addEdgesRecordBatchImpl(
label_t src_label_id, label_t dst_label_id, label_t e_label_id,
Expand Down Expand Up @@ -444,9 +485,6 @@ class AbstractArrowFragmentLoader : public IFragmentLoader {
<< "unsupported src_col type: " << src_col_type->ToString();
CHECK(check_primary_key_type(dst_col_type))
<< "unsupported dst_col type: " << dst_col_type->ToString();
CHECK(src_col_type->Equals(dst_col_type))
<< "src_col type: " << src_col_type->ToString()
<< " neq dst_col type: " << dst_col_type->ToString();

std::vector<std::shared_ptr<arrow::Array>> property_cols;
for (size_t i = 2; i < columns.size(); ++i) {
Expand All @@ -463,24 +501,24 @@ class AbstractArrowFragmentLoader : public IFragmentLoader {
// add edges to vector
CHECK(src_col->length() == dst_col->length());
if (src_col_type->Equals(arrow::int64())) {
append_edges<int64_t, EDATA_T>(
_append_edges<int64_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else if (src_col_type->Equals(arrow::uint64())) {
append_edges<uint64_t, EDATA_T>(
_append_edges<uint64_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else if (src_col_type->Equals(arrow::int32())) {
append_edges<int32_t, EDATA_T>(
_append_edges<int32_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else if (src_col_type->Equals(arrow::uint32())) {
append_edges<uint32_t, EDATA_T>(
_append_edges<uint32_t, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
} else {
// must be string
append_edges<std::string_view, EDATA_T>(
_append_edges<std::string_view, EDATA_T>(
src_col, dst_col, src_indexer, dst_indexer, property_cols,
edge_property, parsed_edges, ie_degree, oe_degree);
}
Expand Down
3 changes: 2 additions & 1 deletion flex/tests/rt_mutable_graph/modern_graph_string_edge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ schema:
- property_id: 0
property_name: id
property_type:
primitive_type: DT_SIGNED_INT64
varchar:
max_length: 64
- property_id: 1
property_name: name
property_type:
Expand Down
13 changes: 8 additions & 5 deletions flex/tests/rt_mutable_graph/string_edge_property_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ class TestStringEdgeProperty {
src_label_(db.graph().schema().get_vertex_label_id("person")),
dst_label_(db.graph().schema().get_vertex_label_id("software")),
edge_label_(db.graph().schema().get_edge_label_id("created")) {}

void test() {
int64_t src, dst;
int64_t src;
std::string dst{};
src = 1;
test_get_edge(src);
src = 1;
dst = 3;
dst = "5";
test_get_graph_view(src, dst);
src = 3;
dst = 5;
dst = "5";
test_add_edge(src, dst);
}

void test_get_edge(int64_t oid) {
vid_t src_lid;
CHECK(db_.graph().get_lid(src_label_, oid, src_lid));
Expand All @@ -50,7 +53,7 @@ class TestStringEdgeProperty {
LOG(INFO) << "Finish test get edge\n";
}

void test_get_graph_view(int64_t src, int64_t dst) {
void test_get_graph_view(int64_t src, const std::string& dst) {
auto txn = db_.GetReadTransaction();
vid_t src_lid, dst_lid;

Expand All @@ -77,7 +80,7 @@ class TestStringEdgeProperty {
LOG(INFO) << "Finish test get GraphView\n";
}

void test_add_edge(int64_t src, int64_t dst) {
void test_add_edge(int64_t src, const std::string& dst) {
{
auto txn = db_.GetSingleVertexInsertTransaction();
std::string name = "test-3";
Expand Down

0 comments on commit f7aa02d

Please sign in to comment.