Skip to content

Commit

Permalink
feat(flex): Support using property of string type as primary key for …
Browse files Browse the repository at this point in the history
…vertex (#3296)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

Support using property of string type as primary key for vertex.

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes

---------

Co-authored-by: xiaolei.zl <[email protected]>
  • Loading branch information
liulx20 and zhanglei1949 authored Oct 20, 2023
1 parent 6d7825d commit 2d19869
Show file tree
Hide file tree
Showing 37 changed files with 802 additions and 334 deletions.
20 changes: 14 additions & 6 deletions flex/codegen/src/hqps/hqps_scan_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR =
/// 4. vertex label
/// 5. oid
static constexpr const char* SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n";
"auto %1% = Engine::template ScanVertexWithOid<%2%,%3%>(%4%, %5%, %6%);\n";

/// Args
/// 1. res_ctx_name
Expand Down Expand Up @@ -179,14 +179,20 @@ class ScanOpBuilder {
switch (const_value.item_case()) {
case common::Value::kI32:
oid_ = std::to_string(const_value.i32());
oid_type_name_ = "int32_t";
break;
case common::Value::kI64:
oid_ = std::to_string(const_value.i64());
oid_type_name_ = "int64_t";
break;
case common::Value::kStr:
oid_ = const_value.str();
oid_type_name_ = "std::string_view";
default:
LOG(FATAL) << "Currently only support int, long as primary key";
}
VLOG(1) << "Found oid: " << oid_ << " in index scan";
VLOG(1) << "Found oid: " << oid_
<< " in index scan, type: " << oid_type_name_;
} else {
// dynamic param
auto dyn_param_pb = triplet.param();
Expand All @@ -209,7 +215,7 @@ class ScanOpBuilder {
// If oid_ not empty, scan with oid
if (!oid_.empty()) {
VLOG(1) << "Scan with oid: " << oid_;
return scan_with_oid(labels_ids_, oid_);
return scan_with_oid(labels_ids_, oid_, oid_type_name_);
} else {
// If no oid, scan without expression
VLOG(1) << "Scan without expression";
Expand All @@ -220,15 +226,16 @@ class ScanOpBuilder {

private:
std::string scan_with_oid(const std::vector<int32_t>& label_ids,
const std::string& oid) const {
const std::string& oid,
const std::string& oid_type_name) const {
VLOG(10) << "Scan with oid: " << oid;
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

if (label_ids.size() == 1) {
boost::format formater(SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] %
oid;
formater % next_ctx_name % append_opt % oid_type_name % ctx_.GraphVar() %
label_ids[0] % oid;
return formater.str();
} else {
boost::format formater(SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR);
Expand Down Expand Up @@ -302,6 +309,7 @@ class ScanOpBuilder {
std::string expr_var_name_, expr_func_name_, expr_construct_params_,
selectors_str_; // The expression decode from params.
std::string oid_; // the oid decode from idx predicate, or param name.
std::string oid_type_name_;
int res_alias_;
};

Expand Down
12 changes: 6 additions & 6 deletions flex/engines/graph_db/app/server_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static uint32_t get_vertex_vid(const gs::ReadTransaction& txn, uint8_t label,
uint32_t vid = std::numeric_limits<uint32_t>::max();
auto vit = txn.GetVertexIterator(label);
for (; vit.IsValid(); vit.Next()) {
if (vit.GetId() == id) {
if (vit.GetId().AsInt64() == id) {
vid = vit.GetIndex();
break;
}
Expand Down Expand Up @@ -99,7 +99,7 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {
uint8_t vertex_label_id = txn.schema().get_vertex_label_id(vertex_label);
auto vit = txn.GetVertexIterator(vertex_label_id);
for (; vit.IsValid(); vit.Next()) {
if (vit.GetId() == vertex_id) {
if (vit.GetId().AsInt64() == vertex_id) {
output.put_int(1);
int field_num = vit.FieldNum();
for (int i = 0; i < field_num; ++i) {
Expand Down Expand Up @@ -233,13 +233,13 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {

std::vector<std::tuple<int64_t, int64_t, std::string>> match_edges;
for (uint32_t v = dst_range.from; v != dst_range.to; ++v) {
int64_t v_oid = txn.GetVertexId(dst_label_id, v);
int64_t v_oid = txn.GetVertexId(dst_label_id, v).AsInt64();
auto ieit = txn.GetInEdgeIterator(dst_label_id, v, src_label_id,
edge_label_id);
while (ieit.IsValid()) {
uint32_t u = ieit.GetNeighbor();
if (src_range.contains(u)) {
int64_t u_oid = txn.GetVertexId(src_label_id, u);
int64_t u_oid = txn.GetVertexId(src_label_id, u).AsInt64();
match_edges.emplace_back(u_oid, v_oid,
ieit.GetData().to_string());
}
Expand All @@ -248,13 +248,13 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {
}
if (match_edges.empty()) {
for (uint32_t u = src_range.from; u != src_range.to; ++u) {
int64_t u_oid = txn.GetVertexId(src_label_id, u);
int64_t u_oid = txn.GetVertexId(src_label_id, u).AsInt64();
auto oeit = txn.GetOutEdgeIterator(src_label_id, u, dst_label_id,
edge_label_id);
while (oeit.IsValid()) {
uint32_t v = oeit.GetNeighbor();
if (dst_range.contains(v)) {
int64_t v_oid = txn.GetVertexId(dst_label_id, v);
int64_t v_oid = txn.GetVertexId(dst_label_id, v).AsInt64();
match_edges.emplace_back(u_oid, v_oid,
oeit.GetData().to_string());
}
Expand Down
14 changes: 12 additions & 2 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,18 @@ std::shared_ptr<ColumnBase> GraphDBSession::get_vertex_property_column(

std::shared_ptr<RefColumnBase> GraphDBSession::get_vertex_id_column(
uint8_t label) const {
return std::make_shared<TypedRefColumn<oid_t>>(
db_.graph().lf_indexers_[label].get_keys(), StorageStrategy::kMem);
if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kInt64) {
return std::make_shared<TypedRefColumn<int64_t>>(
dynamic_cast<const TypedColumn<int64_t>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else if (db_.graph().lf_indexers_[label].get_type() ==
PropertyType::kString) {
return std::make_shared<TypedRefColumn<std::string_view>>(
dynamic_cast<const TypedColumn<std::string_view>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else {
return nullptr;
}
}

#define likely(x) __builtin_expect(!!(x), 1)
Expand Down
40 changes: 22 additions & 18 deletions flex/engines/graph_db/database/insert_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ InsertTransaction::InsertTransaction(MutablePropertyFragment& graph,

InsertTransaction::~InsertTransaction() { Abort(); }

bool InsertTransaction::AddVertex(label_t label, oid_t id,
bool InsertTransaction::AddVertex(label_t label, const Any& id,
const std::vector<Any>& props) {
size_t arc_size = arc_.GetSize();
arc_ << static_cast<uint8_t>(0) << label << id;
arc_ << static_cast<uint8_t>(0) << label;
serialize_field(arc_, id);
const std::vector<PropertyType>& types =
graph_.schema().get_vertex_properties(label);
if (types.size() != props.size()) {
Expand Down Expand Up @@ -65,15 +66,15 @@ bool InsertTransaction::AddVertex(label_t label, oid_t id,
return true;
}

bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label,
oid_t dst, label_t edge_label,
const Any& prop) {
bool InsertTransaction::AddEdge(label_t src_label, const Any& src,
label_t dst_label, const Any& dst,
label_t edge_label, const Any& prop) {
vid_t lid;
if (!graph_.get_lid(src_label, src, lid)) {
if (added_vertices_.find(std::make_pair(src_label, src)) ==
added_vertices_.end()) {
std::string label_name = graph_.schema().get_vertex_label_name(src_label);
LOG(ERROR) << "Source vertex " << label_name << "[" << src
LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string()
<< "] not found...";
return false;
}
Expand All @@ -82,8 +83,8 @@ bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label,
if (added_vertices_.find(std::make_pair(dst_label, dst)) ==
added_vertices_.end()) {
std::string label_name = graph_.schema().get_vertex_label_name(dst_label);
LOG(ERROR) << "Destination vertex " << label_name << "[" << dst
<< "] not found...";
LOG(ERROR) << "Destination vertex " << label_name << "["
<< dst.to_string() << "] not found...";
return false;
}
}
Expand All @@ -95,8 +96,11 @@ bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label,
<< type << ", got " << prop.type;
return false;
}
arc_ << static_cast<uint8_t>(1) << src_label << src << dst_label << dst
<< edge_label;
arc_ << static_cast<uint8_t>(1) << src_label;
serialize_field(arc_, src);
arc_ << dst_label;
serialize_field(arc_, dst);
arc_ << edge_label;
serialize_field(arc_, prop);
return true;
}
Expand Down Expand Up @@ -143,17 +147,17 @@ void InsertTransaction::IngestWal(MutablePropertyFragment& graph,
arc >> op_type;
if (op_type == 0) {
label_t label;
oid_t id;

arc >> label >> id;
Any id;
label = deserialize_oid(graph, arc, id);
vid_t lid = graph.add_vertex(label, id);
graph.get_vertex_table(label).ingest(lid, arc);
} else if (op_type == 1) {
label_t src_label, dst_label, edge_label;
oid_t src, dst;
Any src, dst;
vid_t src_lid, dst_lid;

arc >> src_label >> src >> dst_label >> dst >> edge_label;
src_label = deserialize_oid(graph, arc, src);
dst_label = deserialize_oid(graph, arc, dst);
arc >> edge_label;

CHECK(get_vertex_with_retries(graph, src_label, src, src_lid));
CHECK(get_vertex_with_retries(graph, dst_label, dst, dst_lid));
Expand All @@ -177,7 +181,7 @@ void InsertTransaction::clear() {
#define likely(x) __builtin_expect(!!(x), 1)

bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph,
label_t label, oid_t oid,
label_t label, const Any& oid,
vid_t& lid) {
if (likely(graph.get_lid(label, oid, lid))) {
return true;
Expand All @@ -189,7 +193,7 @@ bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph,
}
}

LOG(ERROR) << "get_vertex [" << oid << "] failed";
LOG(ERROR) << "get_vertex [" << oid.to_string() << "] failed";
return false;
}

Expand Down
11 changes: 6 additions & 5 deletions flex/engines/graph_db/database/insert_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class InsertTransaction {

~InsertTransaction();

bool AddVertex(label_t label, oid_t id, const std::vector<Any>& props);
bool AddVertex(label_t label, const Any& id, const std::vector<Any>& props);

bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst,
label_t edge_label, const Any& prop);
bool AddEdge(label_t src_label, const Any& src, label_t dst_label,
const Any& dst, label_t edge_label, const Any& prop);

void Commit();

Expand All @@ -55,11 +55,12 @@ class InsertTransaction {
void clear();

static bool get_vertex_with_retries(MutablePropertyFragment& graph,
label_t label, oid_t oid, vid_t& lid);
label_t label, const Any& oid,
vid_t& lid);

grape::InArchive arc_;

std::set<std::pair<label_t, oid_t>> added_vertices_;
std::set<std::pair<label_t, Any>> added_vertices_;

MutablePropertyFragment& graph_;
ArenaAllocator& alloc_;
Expand Down
10 changes: 5 additions & 5 deletions flex/engines/graph_db/database/read_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void ReadTransaction::vertex_iterator::Goto(vid_t target) {
cur_ = std::min(target, num_);
}

oid_t ReadTransaction::vertex_iterator::GetId() const {
Any ReadTransaction::vertex_iterator::GetId() const {
return graph_.get_oid(label_, cur_);
}
vid_t ReadTransaction::vertex_iterator::GetIndex() const { return cur_; }
Expand Down Expand Up @@ -89,8 +89,8 @@ ReadTransaction::vertex_iterator ReadTransaction::GetVertexIterator(
return {label, 0, graph_.vertex_num(label), graph_};
}

ReadTransaction::vertex_iterator ReadTransaction::FindVertex(label_t label,
oid_t id) const {
ReadTransaction::vertex_iterator ReadTransaction::FindVertex(
label_t label, const Any& id) const {
vid_t lid;
if (graph_.get_lid(label, id, lid)) {
return {label, lid, graph_.vertex_num(label), graph_};
Expand All @@ -99,7 +99,7 @@ ReadTransaction::vertex_iterator ReadTransaction::FindVertex(label_t label,
}
}

bool ReadTransaction::GetVertexIndex(label_t label, oid_t id,
bool ReadTransaction::GetVertexIndex(label_t label, const Any& id,
vid_t& index) const {
return graph_.get_lid(label, id, index);
}
Expand All @@ -108,7 +108,7 @@ vid_t ReadTransaction::GetVertexNum(label_t label) const {
return graph_.vertex_num(label);
}

oid_t ReadTransaction::GetVertexId(label_t label, vid_t index) const {
Any ReadTransaction::GetVertexId(label_t label, vid_t index) const {
return graph_.get_oid(label, index);
}

Expand Down
8 changes: 4 additions & 4 deletions flex/engines/graph_db/database/read_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ReadTransaction {
void Next();
void Goto(vid_t target);

oid_t GetId() const;
Any GetId() const;
vid_t GetIndex() const;

Any GetField(int col_id) const;
Expand Down Expand Up @@ -183,13 +183,13 @@ class ReadTransaction {

vertex_iterator GetVertexIterator(label_t label) const;

vertex_iterator FindVertex(label_t label, oid_t id) const;
vertex_iterator FindVertex(label_t label, const Any& id) const;

bool GetVertexIndex(label_t label, oid_t id, vid_t& index) const;
bool GetVertexIndex(label_t label, const Any& id, vid_t& index) const;

vid_t GetVertexNum(label_t label) const;

oid_t GetVertexId(label_t label, vid_t index) const;
Any GetVertexId(label_t label, vid_t index) const;

edge_iterator GetOutEdgeIterator(label_t label, vid_t u,
label_t neighnor_label,
Expand Down
27 changes: 19 additions & 8 deletions flex/engines/graph_db/database/single_edge_insert_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ SingleEdgeInsertTransaction::SingleEdgeInsertTransaction(

SingleEdgeInsertTransaction::~SingleEdgeInsertTransaction() { Abort(); }

bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, oid_t src,
label_t dst_label, oid_t dst,
bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, const Any& src,
label_t dst_label, const Any& dst,
label_t edge_label, const Any& prop) {
if (!graph_.get_lid(src_label, src, src_vid_)) {
std::string label_name = graph_.schema().get_vertex_label_name(src_label);
LOG(ERROR) << "Source vertex " << label_name << "[" << src
LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string()
<< "] not found...";
return false;
}
if (!graph_.get_lid(dst_label, dst, dst_vid_)) {
std::string label_name = graph_.schema().get_vertex_label_name(dst_label);
LOG(ERROR) << "Destination vertex " << label_name << "[" << dst
LOG(ERROR) << "Destination vertex " << label_name << "[" << dst.to_string()
<< "] not found...";
return false;
}
Expand All @@ -63,8 +63,11 @@ bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, oid_t src,
src_label_ = src_label;
dst_label_ = dst_label;
edge_label_ = edge_label;
arc_ << static_cast<uint8_t>(1) << src_label << src << dst_label << dst
<< edge_label;
arc_ << static_cast<uint8_t>(1) << src_label;
serialize_field(arc_, src);
arc_ << dst_label;
serialize_field(arc_, dst);
arc_ << edge_label;
serialize_field(arc_, prop);
return true;
}
Expand Down Expand Up @@ -93,8 +96,16 @@ void SingleEdgeInsertTransaction::Commit() {
logger_.append(arc_.GetBuffer(), arc_.GetSize());

grape::OutArchive arc;
arc.SetSlice(arc_.GetBuffer() + sizeof(WalHeader) + 20,
arc_.GetSize() - sizeof(WalHeader) - 20);
{
arc.SetSlice(arc_.GetBuffer() + sizeof(WalHeader),
arc_.GetSize() - sizeof(WalHeader));
label_t op_type, label;
Any temp;
arc >> op_type;
deserialize_oid(graph_, arc, temp);
deserialize_oid(graph_, arc, temp);
arc >> label;
}
graph_.IngestEdge(src_label_, src_vid_, dst_label_, dst_vid_, edge_label_,
timestamp_, arc, alloc_);
vm_.release_insert_timestamp(timestamp_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class SingleEdgeInsertTransaction {
VersionManager& vm, timestamp_t timestamp);
~SingleEdgeInsertTransaction();

bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst,
label_t edge_label, const Any& prop);
bool AddEdge(label_t src_label, const Any& src, label_t dst_label,
const Any& dst, label_t edge_label, const Any& prop);

void Abort();

Expand Down
Loading

0 comments on commit 2d19869

Please sign in to comment.