Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flex): Support using property of string type as primary key for vertex #3296

Merged
merged 31 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions flex/codegen/src/hqps/hqps_scan_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR =
/// 4. vertex label
/// 5. oid
static constexpr const char* SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n";
"auto %1% = Engine::template ScanVertexWithOid<%2%,%3%>(%4%, %5%, %6%);\n";

/// Args
/// 1. res_ctx_name
Expand Down Expand Up @@ -179,14 +179,20 @@ class ScanOpBuilder {
switch (const_value.item_case()) {
case common::Value::kI32:
oid_ = std::to_string(const_value.i32());
oid_type_name_ = "int32_t";
break;
case common::Value::kI64:
oid_ = std::to_string(const_value.i64());
oid_type_name_ = "int64_t";
break;
case common::Value::kStr:
oid_ = const_value.str();
oid_type_name_ = "std::string_view";
default:
LOG(FATAL) << "Currently only support int, long as primary key";
}
VLOG(1) << "Found oid: " << oid_ << " in index scan";
VLOG(1) << "Found oid: " << oid_
<< " in index scan, type: " << oid_type_name_;
} else {
// dynamic param
auto dyn_param_pb = triplet.param();
Expand All @@ -209,7 +215,7 @@ class ScanOpBuilder {
// If oid_ not empty, scan with oid
if (!oid_.empty()) {
VLOG(1) << "Scan with oid: " << oid_;
return scan_with_oid(labels_ids_, oid_);
return scan_with_oid(labels_ids_, oid_, oid_type_name_);
} else {
// If no oid, scan without expression
VLOG(1) << "Scan without expression";
Expand All @@ -220,15 +226,16 @@ class ScanOpBuilder {

private:
std::string scan_with_oid(const std::vector<int32_t>& label_ids,
const std::string& oid) const {
const std::string& oid,
const std::string& oid_type_name) const {
VLOG(10) << "Scan with oid: " << oid;
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

if (label_ids.size() == 1) {
boost::format formater(SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] %
oid;
formater % next_ctx_name % append_opt % oid_type_name % ctx_.GraphVar() %
label_ids[0] % oid;
return formater.str();
} else {
boost::format formater(SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR);
Expand Down Expand Up @@ -302,6 +309,7 @@ class ScanOpBuilder {
std::string expr_var_name_, expr_func_name_, expr_construct_params_,
selectors_str_; // The expression decode from params.
std::string oid_; // the oid decode from idx predicate, or param name.
std::string oid_type_name_;
int res_alias_;
};

Expand Down
12 changes: 6 additions & 6 deletions flex/engines/graph_db/app/server_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static uint32_t get_vertex_vid(const gs::ReadTransaction& txn, uint8_t label,
uint32_t vid = std::numeric_limits<uint32_t>::max();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using oid_t = int64_t; declared in flex/storages/rt_mutable_graph/types.h should be removed, and we shall ensure can remove all usage of oid_t is this PR?

auto vit = txn.GetVertexIterator(label);
for (; vit.IsValid(); vit.Next()) {
if (vit.GetId() == id) {
if (vit.GetId().AsInt64() == id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the result of vit.GetId() must be int64(), do the check first?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked inside the AsXX() function

vid = vit.GetIndex();
break;
}
Expand Down Expand Up @@ -99,7 +99,7 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {
uint8_t vertex_label_id = txn.schema().get_vertex_label_id(vertex_label);
auto vit = txn.GetVertexIterator(vertex_label_id);
for (; vit.IsValid(); vit.Next()) {
if (vit.GetId() == vertex_id) {
if (vit.GetId().AsInt64() == vertex_id) {
output.put_int(1);
int field_num = vit.FieldNum();
for (int i = 0; i < field_num; ++i) {
Expand Down Expand Up @@ -233,13 +233,13 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {

std::vector<std::tuple<int64_t, int64_t, std::string>> match_edges;
for (uint32_t v = dst_range.from; v != dst_range.to; ++v) {
int64_t v_oid = txn.GetVertexId(dst_label_id, v);
int64_t v_oid = txn.GetVertexId(dst_label_id, v).AsInt64();
auto ieit = txn.GetInEdgeIterator(dst_label_id, v, src_label_id,
edge_label_id);
while (ieit.IsValid()) {
uint32_t u = ieit.GetNeighbor();
if (src_range.contains(u)) {
int64_t u_oid = txn.GetVertexId(src_label_id, u);
int64_t u_oid = txn.GetVertexId(src_label_id, u).AsInt64();
match_edges.emplace_back(u_oid, v_oid,
ieit.GetData().to_string());
}
Expand All @@ -248,13 +248,13 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {
}
if (match_edges.empty()) {
for (uint32_t u = src_range.from; u != src_range.to; ++u) {
int64_t u_oid = txn.GetVertexId(src_label_id, u);
int64_t u_oid = txn.GetVertexId(src_label_id, u).AsInt64();
auto oeit = txn.GetOutEdgeIterator(src_label_id, u, dst_label_id,
edge_label_id);
while (oeit.IsValid()) {
uint32_t v = oeit.GetNeighbor();
if (dst_range.contains(v)) {
int64_t v_oid = txn.GetVertexId(dst_label_id, v);
int64_t v_oid = txn.GetVertexId(dst_label_id, v).AsInt64();
match_edges.emplace_back(u_oid, v_oid,
oeit.GetData().to_string());
}
Expand Down
14 changes: 12 additions & 2 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,18 @@ std::shared_ptr<ColumnBase> GraphDBSession::get_vertex_property_column(

std::shared_ptr<RefColumnBase> GraphDBSession::get_vertex_id_column(
uint8_t label) const {
return std::make_shared<TypedRefColumn<oid_t>>(
db_.graph().lf_indexers_[label].get_keys(), StorageStrategy::kMem);
if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kInt64) {
return std::make_shared<TypedRefColumn<int64_t>>(
dynamic_cast<const TypedColumn<int64_t>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else if (db_.graph().lf_indexers_[label].get_type() ==
PropertyType::kString) {
return std::make_shared<TypedRefColumn<std::string_view>>(
dynamic_cast<const TypedColumn<std::string_view>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else {
return nullptr;
}
}

#define likely(x) __builtin_expect(!!(x), 1)
Expand Down
40 changes: 22 additions & 18 deletions flex/engines/graph_db/database/insert_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ InsertTransaction::InsertTransaction(MutablePropertyFragment& graph,

InsertTransaction::~InsertTransaction() { Abort(); }

bool InsertTransaction::AddVertex(label_t label, oid_t id,
bool InsertTransaction::AddVertex(label_t label, const Any& id,
const std::vector<Any>& props) {
size_t arc_size = arc_.GetSize();
arc_ << static_cast<uint8_t>(0) << label << id;
arc_ << static_cast<uint8_t>(0) << label;
serialize_field(arc_, id);
const std::vector<PropertyType>& types =
graph_.schema().get_vertex_properties(label);
if (types.size() != props.size()) {
Expand Down Expand Up @@ -65,15 +66,15 @@ bool InsertTransaction::AddVertex(label_t label, oid_t id,
return true;
}

bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label,
oid_t dst, label_t edge_label,
const Any& prop) {
bool InsertTransaction::AddEdge(label_t src_label, const Any& src,
label_t dst_label, const Any& dst,
label_t edge_label, const Any& prop) {
vid_t lid;
if (!graph_.get_lid(src_label, src, lid)) {
if (added_vertices_.find(std::make_pair(src_label, src)) ==
added_vertices_.end()) {
std::string label_name = graph_.schema().get_vertex_label_name(src_label);
LOG(ERROR) << "Source vertex " << label_name << "[" << src
LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string()
<< "] not found...";
return false;
}
Expand All @@ -82,8 +83,8 @@ bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label,
if (added_vertices_.find(std::make_pair(dst_label, dst)) ==
added_vertices_.end()) {
std::string label_name = graph_.schema().get_vertex_label_name(dst_label);
LOG(ERROR) << "Destination vertex " << label_name << "[" << dst
<< "] not found...";
LOG(ERROR) << "Destination vertex " << label_name << "["
<< dst.to_string() << "] not found...";
return false;
}
}
Expand All @@ -95,8 +96,11 @@ bool InsertTransaction::AddEdge(label_t src_label, oid_t src, label_t dst_label,
<< type << ", got " << prop.type;
return false;
}
arc_ << static_cast<uint8_t>(1) << src_label << src << dst_label << dst
<< edge_label;
arc_ << static_cast<uint8_t>(1) << src_label;
serialize_field(arc_, src);
arc_ << dst_label;
serialize_field(arc_, dst);
arc_ << edge_label;
serialize_field(arc_, prop);
return true;
}
Expand Down Expand Up @@ -143,17 +147,17 @@ void InsertTransaction::IngestWal(MutablePropertyFragment& graph,
arc >> op_type;
if (op_type == 0) {
label_t label;
oid_t id;

arc >> label >> id;
Any id;
label = deserialize_oid(graph, arc, id);
vid_t lid = graph.add_vertex(label, id);
graph.get_vertex_table(label).ingest(lid, arc);
} else if (op_type == 1) {
label_t src_label, dst_label, edge_label;
oid_t src, dst;
Any src, dst;
vid_t src_lid, dst_lid;

arc >> src_label >> src >> dst_label >> dst >> edge_label;
src_label = deserialize_oid(graph, arc, src);
dst_label = deserialize_oid(graph, arc, dst);
arc >> edge_label;

CHECK(get_vertex_with_retries(graph, src_label, src, src_lid));
CHECK(get_vertex_with_retries(graph, dst_label, dst, dst_lid));
Expand All @@ -177,7 +181,7 @@ void InsertTransaction::clear() {
#define likely(x) __builtin_expect(!!(x), 1)

bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph,
label_t label, oid_t oid,
label_t label, const Any& oid,
vid_t& lid) {
if (likely(graph.get_lid(label, oid, lid))) {
return true;
Expand All @@ -189,7 +193,7 @@ bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph,
}
}

LOG(ERROR) << "get_vertex [" << oid << "] failed";
LOG(ERROR) << "get_vertex [" << oid.to_string() << "] failed";
return false;
}

Expand Down
11 changes: 6 additions & 5 deletions flex/engines/graph_db/database/insert_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class InsertTransaction {

~InsertTransaction();

bool AddVertex(label_t label, oid_t id, const std::vector<Any>& props);
bool AddVertex(label_t label, const Any& id, const std::vector<Any>& props);

bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst,
label_t edge_label, const Any& prop);
bool AddEdge(label_t src_label, const Any& src, label_t dst_label,
const Any& dst, label_t edge_label, const Any& prop);

void Commit();

Expand All @@ -55,11 +55,12 @@ class InsertTransaction {
void clear();

static bool get_vertex_with_retries(MutablePropertyFragment& graph,
label_t label, oid_t oid, vid_t& lid);
label_t label, const Any& oid,
vid_t& lid);

grape::InArchive arc_;

std::set<std::pair<label_t, oid_t>> added_vertices_;
std::set<std::pair<label_t, Any>> added_vertices_;

MutablePropertyFragment& graph_;
ArenaAllocator& alloc_;
Expand Down
10 changes: 5 additions & 5 deletions flex/engines/graph_db/database/read_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void ReadTransaction::vertex_iterator::Goto(vid_t target) {
cur_ = std::min(target, num_);
}

oid_t ReadTransaction::vertex_iterator::GetId() const {
Any ReadTransaction::vertex_iterator::GetId() const {
return graph_.get_oid(label_, cur_);
}
vid_t ReadTransaction::vertex_iterator::GetIndex() const { return cur_; }
Expand Down Expand Up @@ -89,8 +89,8 @@ ReadTransaction::vertex_iterator ReadTransaction::GetVertexIterator(
return {label, 0, graph_.vertex_num(label), graph_};
}

ReadTransaction::vertex_iterator ReadTransaction::FindVertex(label_t label,
oid_t id) const {
ReadTransaction::vertex_iterator ReadTransaction::FindVertex(
label_t label, const Any& id) const {
vid_t lid;
if (graph_.get_lid(label, id, lid)) {
return {label, lid, graph_.vertex_num(label), graph_};
Expand All @@ -99,7 +99,7 @@ ReadTransaction::vertex_iterator ReadTransaction::FindVertex(label_t label,
}
}

bool ReadTransaction::GetVertexIndex(label_t label, oid_t id,
bool ReadTransaction::GetVertexIndex(label_t label, const Any& id,
vid_t& index) const {
return graph_.get_lid(label, id, index);
}
Expand All @@ -108,7 +108,7 @@ vid_t ReadTransaction::GetVertexNum(label_t label) const {
return graph_.vertex_num(label);
}

oid_t ReadTransaction::GetVertexId(label_t label, vid_t index) const {
Any ReadTransaction::GetVertexId(label_t label, vid_t index) const {
return graph_.get_oid(label, index);
}

Expand Down
8 changes: 4 additions & 4 deletions flex/engines/graph_db/database/read_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ReadTransaction {
void Next();
void Goto(vid_t target);

oid_t GetId() const;
Any GetId() const;
vid_t GetIndex() const;

Any GetField(int col_id) const;
Expand Down Expand Up @@ -183,13 +183,13 @@ class ReadTransaction {

vertex_iterator GetVertexIterator(label_t label) const;

vertex_iterator FindVertex(label_t label, oid_t id) const;
vertex_iterator FindVertex(label_t label, const Any& id) const;

bool GetVertexIndex(label_t label, oid_t id, vid_t& index) const;
bool GetVertexIndex(label_t label, const Any& id, vid_t& index) const;

vid_t GetVertexNum(label_t label) const;

oid_t GetVertexId(label_t label, vid_t index) const;
Any GetVertexId(label_t label, vid_t index) const;

edge_iterator GetOutEdgeIterator(label_t label, vid_t u,
label_t neighnor_label,
Expand Down
27 changes: 19 additions & 8 deletions flex/engines/graph_db/database/single_edge_insert_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ SingleEdgeInsertTransaction::SingleEdgeInsertTransaction(

SingleEdgeInsertTransaction::~SingleEdgeInsertTransaction() { Abort(); }

bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, oid_t src,
label_t dst_label, oid_t dst,
bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, const Any& src,
label_t dst_label, const Any& dst,
label_t edge_label, const Any& prop) {
if (!graph_.get_lid(src_label, src, src_vid_)) {
std::string label_name = graph_.schema().get_vertex_label_name(src_label);
LOG(ERROR) << "Source vertex " << label_name << "[" << src
LOG(ERROR) << "Source vertex " << label_name << "[" << src.to_string()
<< "] not found...";
return false;
}
if (!graph_.get_lid(dst_label, dst, dst_vid_)) {
std::string label_name = graph_.schema().get_vertex_label_name(dst_label);
LOG(ERROR) << "Destination vertex " << label_name << "[" << dst
LOG(ERROR) << "Destination vertex " << label_name << "[" << dst.to_string()
<< "] not found...";
return false;
}
Expand All @@ -63,8 +63,11 @@ bool SingleEdgeInsertTransaction::AddEdge(label_t src_label, oid_t src,
src_label_ = src_label;
dst_label_ = dst_label;
edge_label_ = edge_label;
arc_ << static_cast<uint8_t>(1) << src_label << src << dst_label << dst
<< edge_label;
arc_ << static_cast<uint8_t>(1) << src_label;
serialize_field(arc_, src);
arc_ << dst_label;
serialize_field(arc_, dst);
arc_ << edge_label;
serialize_field(arc_, prop);
return true;
}
Expand Down Expand Up @@ -93,8 +96,16 @@ void SingleEdgeInsertTransaction::Commit() {
logger_.append(arc_.GetBuffer(), arc_.GetSize());

grape::OutArchive arc;
arc.SetSlice(arc_.GetBuffer() + sizeof(WalHeader) + 20,
arc_.GetSize() - sizeof(WalHeader) - 20);
{
arc.SetSlice(arc_.GetBuffer() + sizeof(WalHeader),
arc_.GetSize() - sizeof(WalHeader));
label_t op_type, label;
Any temp;
arc >> op_type;
deserialize_oid(graph_, arc, temp);
deserialize_oid(graph_, arc, temp);
arc >> label;
}
graph_.IngestEdge(src_label_, src_vid_, dst_label_, dst_vid_, edge_label_,
timestamp_, arc, alloc_);
vm_.release_insert_timestamp(timestamp_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class SingleEdgeInsertTransaction {
VersionManager& vm, timestamp_t timestamp);
~SingleEdgeInsertTransaction();

bool AddEdge(label_t src_label, oid_t src, label_t dst_label, oid_t dst,
label_t edge_label, const Any& prop);
bool AddEdge(label_t src_label, const Any& src, label_t dst_label,
const Any& dst, label_t edge_label, const Any& prop);

void Abort();

Expand Down
Loading
Loading