Skip to content

Commit

Permalink
feat(interactive): Add three builtin procedures (#4249)
Browse files Browse the repository at this point in the history
This pull request introduces three built-in stored procedures as per the
OSPP requirements. The following stored procedures have been
implemented:

**Shortest Path Between Three Points**: A procedure to compute the
shortest path between any given three points in the graph.

**K-hop Neighbor Traversal from a Specified Node**: A procedure that
traverses the graph and retrieves all nodes within k hops from a
specified starting node.

**PageRank Calculation**: A procedure to compute the PageRank values of
nodes within the graph.

At present, both **k_neighbors** and **shortest_path_among_three** only
support types where the id input parameter is of type int64. For
example, CALL k_neighbors(1L, "person", 2);

This PR addresses the following issue:
#3737
Closes #3737
  • Loading branch information
zhichao-shi authored Oct 10, 2024
1 parent af119f0 commit d163d5b
Show file tree
Hide file tree
Showing 13 changed files with 803 additions and 15 deletions.
135 changes: 135 additions & 0 deletions flex/engines/graph_db/app/builtin/k_hop_neighbors.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flex/engines/graph_db/app/builtin/k_hop_neighbors.h"

namespace gs {

bool KNeighbors::DoQuery(GraphDBSession& sess, Decoder& input,
Encoder& output) {
auto txn = sess.GetReadTransaction();
Schema schema_ = txn.schema();
if (input.empty()) {
return false;
}
int64_t vertex_id_ = input.get_long();
std::string label_name{input.get_string()};
int k = input.get_int();

if (k <= 0) {
output.put_string_view("k must be greater than 0.");
return false;
}
if (!schema_.has_vertex_label(label_name)) {
output.put_string_view("The requested label doesn't exits.");
return false; // The requested label doesn't exits.
}
label_t vertex_label_ = schema_.get_vertex_label_id(label_name);
struct pair_hash {
std::size_t operator()(const std::pair<label_t, vid_t>& p) const {
auto hash1 = std::hash<label_t>{}(p.first);
auto hash2 = std::hash<vid_t>{}(p.second);
return hash1 ^ (hash2 << 1);
}
};
std::unordered_set<std::pair<label_t, vid_t>, pair_hash> k_neighbors;

int vertex_size_ = (int) schema_.vertex_label_num();
int edge_size_ = (int) schema_.edge_label_num();

std::vector<vid_t> nei_index_;
std::vector<label_t> nei_label_;
std::vector<vid_t> next_nei_indexs_;
std::vector<label_t> next_nei_label_;

nei_label_.push_back(vertex_label_);
vid_t vertex_index{};
if (!txn.GetVertexIndex(vertex_label_, (int64_t) vertex_id_, vertex_index)) {
output.put_string_view("get index fail.");
return false;
}
nei_index_.push_back(vertex_index);
// get k hop neighbors
while (!nei_index_.empty() && k > 0) {
for (long unsigned int i = 0; i < nei_index_.size(); i++) {
for (int j = 0; j < vertex_size_; j++) {
for (int k = 0; k < edge_size_; k++) {
if (schema_.has_edge_label(label_t(nei_label_[i]), label_t(j),
label_t(k))) {
auto outedges = txn.GetOutEdgeIterator(
nei_label_[i], nei_index_[i], j,
k); // 1.self_label 2.self_index 3.edge_label 4.nei_label
while (outedges.IsValid()) {
auto neighbor = outedges.GetNeighbor();
if (k_neighbors.find(std::make_pair(j, neighbor)) ==
k_neighbors.end()) {
next_nei_label_.push_back(j);
next_nei_indexs_.push_back(neighbor);
k_neighbors.insert(std::make_pair(j, neighbor));
}
outedges.Next();
}
}
if (schema_.has_edge_label(label_t(j), label_t(nei_label_[i]),
label_t(k))) {
auto inedges = txn.GetInEdgeIterator(
nei_label_[i], nei_index_[i], j,
k); // 1.self_label 2.self_index 3.edge_label 4.nei_label
while (inedges.IsValid()) {
auto neighbor = inedges.GetNeighbor();
if (k_neighbors.find(std::make_pair(j, neighbor)) ==
k_neighbors.end()) {
next_nei_label_.push_back(j);
next_nei_indexs_.push_back(neighbor);
k_neighbors.insert(std::make_pair(j, neighbor));
}
inedges.Next();
}
}
}
}
}
nei_index_ = next_nei_indexs_;
nei_label_ = next_nei_label_;
next_nei_label_.clear();
next_nei_indexs_.clear();
k--;
}
results::CollectiveResults results;
for (auto vertex_ : k_neighbors) {
std::string label_name_ = schema_.get_vertex_label_name(vertex_.first);
auto result = results.add_results();
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_str(label_name_);
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_i64(txn.GetVertexId(vertex_.first, vertex_.second).AsInt64());
}
output.put_string_view(results.SerializeAsString());

txn.Commit();
return true;
}

AppWrapper KNeighborsFactory::CreateApp(const GraphDB& db) {
return AppWrapper(new KNeighbors(), NULL);
}
} // namespace gs
38 changes: 38 additions & 0 deletions flex/engines/graph_db/app/builtin/k_hop_neighbors.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_GRAPH_DB_APP_BUILDIN_K_HOP_NEIGHBORS_
#define ENGINES_GRAPH_DB_APP_BUILDIN_K_HOP_NEIGHBORS_
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/hqps_db/app/interactive_app_base.h"

namespace gs {
class KNeighbors : public CypherInternalPbWriteAppBase {
public:
KNeighbors() {}
bool DoQuery(GraphDBSession& sess, Decoder& input, Encoder& output) override;
};

class KNeighborsFactory : public AppFactoryBase {
public:
KNeighborsFactory() = default;
~KNeighborsFactory() = default;

AppWrapper CreateApp(const GraphDB& db) override;
};

} // namespace gs

#endif // ENGINES_GRAPH_DB_APP_BUILDIN_K_HOP_NEIGHBORS_
153 changes: 153 additions & 0 deletions flex/engines/graph_db/app/builtin/pagerank.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flex/engines/graph_db/app/builtin/pagerank.h"

namespace gs {

bool PageRank::DoQuery(GraphDBSession& sess, Decoder& input, Encoder& output) {
auto txn = sess.GetReadTransaction();
if (input.empty()) {
output.put_string_view(
"Arguments required(vertex_label, edge_label, damping_factor_, max_iterations_, epsilon_)\n \
for example:(\"person\", \"knows\", 0.85, 100, 0.000001)");
return false;
}

std::string vertex_label{input.get_string()};
std::string edge_label{input.get_string()};

damping_factor_ = input.get_double();
max_iterations_ = input.get_int();
epsilon_ = input.get_double();

if (!sess.schema().has_vertex_label(vertex_label)) {
output.put_string_view("The requested vertex label doesn't exits.");
return false;
}
if (!sess.schema().has_edge_label(vertex_label, vertex_label, edge_label)) {
output.put_string_view("The requested edge label doesn't exits.");
return false;
}
if (damping_factor_ < 0 || damping_factor_ >= 1) {
output.put_string_view(
"The value of the damping_factor_ is between 0 and 1.");
return false;
}
if (max_iterations_ <= 0) {
output.put_string_view("max_iterations_ must be greater than 0.");
return false;
}
if (epsilon_ < 0 || epsilon_ >= 1) {
output.put_string_view("The value of the epsilon_ is between 0 and 1.");
return false;
}

vertex_label_id_ = sess.schema().get_vertex_label_id(vertex_label);
edge_label_id_ = sess.schema().get_edge_label_id(edge_label);

auto num_vertices = txn.GetVertexNum(vertex_label_id_);

std::unordered_map<vid_t, double> pagerank;
std::unordered_map<vid_t, double> new_pagerank;

auto vertex_iter = txn.GetVertexIterator(vertex_label_id_);

while (vertex_iter.IsValid()) {
vid_t vid = vertex_iter.GetIndex();
pagerank[vid] = 1.0 / num_vertices;
new_pagerank[vid] = 0.0;
vertex_iter.Next();
}

std::unordered_map<vid_t, double> outdegree;

for (int iter = 0; iter < max_iterations_; ++iter) {
for (auto& kv : new_pagerank) {
kv.second = 0.0;
}

auto vertex_iter = txn.GetVertexIterator(vertex_label_id_);
while (vertex_iter.IsValid()) {
vid_t v = vertex_iter.GetIndex();

double sum = 0.0;
auto edges = txn.GetInEdgeIterator(vertex_label_id_, v, vertex_label_id_,
edge_label_id_);
while (edges.IsValid()) {
auto neighbor = edges.GetNeighbor();
if (outdegree[neighbor] == 0) {
auto out_edges = txn.GetOutEdgeIterator(
vertex_label_id_, neighbor, vertex_label_id_, edge_label_id_);
while (out_edges.IsValid()) {
outdegree[neighbor]++;
out_edges.Next();
}
}
sum += pagerank[neighbor] / outdegree[neighbor];
edges.Next();
}

new_pagerank[v] =
damping_factor_ * sum + (1.0 - damping_factor_) / num_vertices;
vertex_iter.Next();
}

double diff = 0.0;
for (const auto& kv : pagerank) {
diff += std::abs(new_pagerank[kv.first] - kv.second);
}

if (diff < epsilon_) {
break;
}

std::swap(pagerank, new_pagerank);
}

results::CollectiveResults results;

for (auto kv : pagerank) {
int64_t oid_ = txn.GetVertexId(vertex_label_id_, kv.first).AsInt64();
auto result = results.add_results();
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_str(vertex_label);
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_i64(oid_);
result->mutable_record()
->add_columns()
->mutable_entry()
->mutable_element()
->mutable_object()
->set_f64(kv.second);
}

output.put_string_view(results.SerializeAsString());

txn.Commit();
return true;
}

AppWrapper PageRankFactory::CreateApp(const GraphDB& db) {
return AppWrapper(new PageRank(), NULL);
}
} // namespace gs
51 changes: 51 additions & 0 deletions flex/engines/graph_db/app/builtin/pagerank.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_GRAPH_DB_APP_BUILDIN_PAGERANK_H_
#define ENGINES_GRAPH_DB_APP_BUILDIN_PAGERANK_H_
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/hqps_db/app/interactive_app_base.h"

namespace gs {
class PageRank : public CypherInternalPbWriteAppBase {
public:
PageRank()
: damping_factor_(0.85),
max_iterations_(100),
epsilon_(1e-6),
vertex_label_id_(0),
edge_label_id_(0) {}
bool DoQuery(GraphDBSession& sess, Decoder& input, Encoder& output) override;

private:
double damping_factor_;
int max_iterations_;
double epsilon_;

label_t vertex_label_id_;
label_t edge_label_id_;
};

class PageRankFactory : public AppFactoryBase {
public:
PageRankFactory() = default;
~PageRankFactory() = default;

AppWrapper CreateApp(const GraphDB& db) override;
};

} // namespace gs

#endif
Loading

0 comments on commit d163d5b

Please sign in to comment.