From 1db137298212fb94fc7c3c263b64c8d7f8e2910e Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 6 Dec 2022 22:53:14 +0800 Subject: [PATCH] add a cpp pregel case --- CMakeLists.txt | 8 +-- README.md | 22 ++++---- src/my_app.h | 121 +++++++++++++++++++++++-------------------- src/my_app_context.h | 64 ----------------------- 4 files changed, 82 insertions(+), 133 deletions(-) delete mode 100644 src/my_app_context.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b7348e5..e3b1324 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -154,14 +154,14 @@ file( "# Do not edit this file!!!\n" "app:\n" "- algo: my_app\n" - " type: cpp_pie\n" - " class_name: gs::MyApp\n" + " type: cpp_pregel\n" + " class_name: gs::SSSP\n" " src: my_app.h\n" - " context_type: vertex_data\n" + " context_type: labeled_vertex_data\n" " compatible_graph:\n" " - gs::ArrowProjectedFragment\n" - " - gs::DynamicProjectedFragment\n" " - vineyard::ArrowFragment\n" + " - vineyard::ArrowFlattenFragment\n" ) # custom target to package app diff --git a/README.md b/README.md index 0b8aa5d..a01d73d 100644 --- a/README.md +++ b/README.md @@ -48,15 +48,19 @@ Here is an example to run the packaged gar file in GraphScope Python interface. import graphscope from graphscope.framework.app import load_app -from graphscope.dataset import load_p2p_network - -sess = graphscope.session() -simple_graph = load_p2p_network(sess)._project_to_simple() - -my_app = load_app('') -result = my_app(simple_graph, 10) # pass 10 as param1 defined in 'MyAppContext.h' - -print(result.to_numpy('r')) +g = graphscope.g(directed=False, generate_eid=False, vertex_map="local") +g = g.add_edges( + f"/Users/siyuan/CLionProjects/gstest/property/p2p-31_property_e_0", + label="knows", + src_label="person", + dst_label="person", +) + +my_app = load_app('/Users/siyuan/CLionProjects/cpp-template/build/my_app.gar') +result = my_app(g, src=1) + +df = result.to_dataframe(selector={'id': 'v:person.id', 'r': 'r:person'}).sort_values(by='id') +print(df) ``` ## Codebase Explained diff --git a/src/my_app.h b/src/my_app.h index 106d186..71774e8 100644 --- a/src/my_app.h +++ b/src/my_app.h @@ -1,10 +1,10 @@ -/** Copyright 2022 Alibaba Group Holding Limited. +/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -13,69 +13,78 @@ * limitations under the License. */ -#ifndef MY_APP_H -#define MY_APP_H +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_ -#include "my_app_context.h" +#include +#include +#include + +#include "vineyard/graph/fragment/arrow_fragment.h" + +#include "core/app/pregel/i_vertex_program.h" +#include "core/app/pregel/pregel_compute_context.h" +#include "core/app/pregel/pregel_property_app_base.h" namespace gs { -/** - * @brief Compute the degree for each vertex. - * - * @tparam FRAG_T - */ -template -class MyApp : public grape::ParallelAppBase>, - public grape::ParallelEngine, - public grape::Communicator { +template +class PregelSSSP + : public IPregelProgram< + PregelPropertyVertex, + PregelPropertyComputeContext> { + using fragment_t = FRAG_T; + public: - INSTALL_PARALLEL_WORKER(MyApp, MyAppContext, FRAG_T) - static constexpr grape::MessageStrategy message_strategy = - grape::MessageStrategy::kSyncOnOuterVertex; - static constexpr grape::LoadStrategy load_strategy = - grape::LoadStrategy::kBothOutIn; - using vertex_t = typename fragment_t::vertex_t; - - /** - * @brief Implement your partial evaluation here. - * - * @param fragment - * @param context - * @param messages - */ - void PEval(const fragment_t& fragment, context_t& context, - message_manager_t& messages) { - messages.InitChannels(thread_num()); - // Implement your partial evaluation here. - // We put all compute logic in IncEval phase, thus do nothing but force continue. - messages.ForceContinue(); + void Init(PregelPropertyVertex& v, + PregelPropertyComputeContext& context) + override { + v.set_value(std::numeric_limits::max()); } - void IncEval(const fragment_t& fragment, context_t& context, - message_manager_t& messages) { - // superstep - ++context.step; - - // Process received messages sent by other fragment here. - { - messages.ParallelProcess( - thread_num(), fragment, - [&context](int tid, vertex_t u, const double& msg) { - // Implement your logic here. - }); + void Compute(grape::IteratorPair messages, + PregelPropertyVertex& v, + PregelPropertyComputeContext& + context) override { + bool updated = false; + if (context.superstep() == 0) { + std::string source_id = context.get_config("src"); + if (v.id() == "1") { + std::cout << "Source: " << source_id << " v.id() " << v.id() << std::endl; + } + if (v.id() == source_id) { + updated = true; + v.set_value(0); + } + } else { + double cur_value = v.value(); + double new_value = cur_value; + for (auto msg : messages) { + new_value = std::min(new_value, msg); + } + if (new_value != cur_value) { + v.set_value(new_value); + updated = true; + } } - // Compute the degree for each vertex, set the result in context - auto inner_vertices = fragment.InnerVertices(); - ForEach(inner_vertices.begin(), inner_vertices.end(), - [&context, &fragment](int tid, vertex_t u) { - context.result[u] = - static_cast(fragment.GetOutgoingAdjList(u).Size() + - fragment.GetIncomingAdjList(u).Size()); - }); + if (updated) { + double dist = v.value(); + for (int label_id = 0; label_id < context.edge_label_num(); label_id++) { + for (auto& e : v.outgoing_edges(label_id)) { + double new_dist = dist + static_cast(e.get_int(0)); + v.send(e.vertex(), new_dist); + } + } + } + v.vote_to_halt(); } }; -}; // namespace gs -#endif // MY_APP_H +template +using SSSP = gs::PregelPropertyAppBase>; + +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_SSSP_PREGEL_H_ + diff --git a/src/my_app_context.h b/src/my_app_context.h deleted file mode 100644 index e873365..0000000 --- a/src/my_app_context.h +++ /dev/null @@ -1,64 +0,0 @@ -/** Copyright 2022 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef MY_APP_CONTEXT_H_ -#define MY_APP_CONTEXT_H_ - -#include "grape/grape.h" - -namespace gs { - -/** - * @brief Context for "MyApp" Application. - * - * 'Context' class used to record the intermediate data of each iteration. - * - * @tparam FRAG_T - */ -template -class MyAppContext : public grape::VertexDataContext { - using oid_t = typename FRAG_T::oid_t; - using vid_t = typename FRAG_T::vid_t; - using vertex_t = typename FRAG_T::vertex_t; - - public: - explicit MyAppContext(const FRAG_T& fragment) - : grape::VertexDataContext(fragment, true), - result(this->data()) {} - - /** - * @param param1: algorithm specific parameter, such as - * "source vertex" for SSSP (single source shortest path) - * "delta, max_round" for Pagerank - */ - void Init(grape::ParallelMessageManager& messages, int param1) { - // record current superstep - this->step = 0; - this->param1 = param1; - // init results - result.SetValue(0); - } - - // current superstep - int step = 0; - // algorithm specific parameter - int param1 = 0; - - // result for each vertex, with 'uint64_t' type - typename FRAG_T::template vertex_array_t& result; -}; -} // namespace gs - -#endif // MY_APP_CONTEXT_H_