Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Parallel Graph Loading from efile only. #171

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/analytical_apps/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ DEFINE_string(serialization_prefix, "",
"where to load/store the serialization files");

DEFINE_int32(app_concurrency, -1, "concurrency of application");
DEFINE_int32(load_concurrency, 1, "concurrency of loading graph");

DEFINE_string(lb, "cta",
"Load balancing policy, these options can be used: "
Expand Down
1 change: 1 addition & 0 deletions examples/analytical_apps/flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ DECLARE_bool(deserialize);
DECLARE_string(serialization_prefix);

DECLARE_int32(app_concurrency);
DECLARE_int32(load_concurrency);

DECLARE_string(lb);
#endif // EXAMPLES_ANALYTICAL_APPS_FLAGS_H_
2 changes: 2 additions & 0 deletions examples/analytical_apps/run_app.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ void CreateAndQuery(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
} else if (FLAGS_serialize) {
Expand Down Expand Up @@ -194,6 +195,7 @@ void CreateAndQueryStagedApp(const CommSpec& comm_spec,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
} else if (FLAGS_serialize) {
Expand Down
5 changes: 5 additions & 0 deletions examples/analytical_apps/run_app_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void RunUndirectedPageRankOpt(const CommSpec& comm_spec,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -171,6 +172,7 @@ void RunDirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -208,6 +210,7 @@ void RunUndirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -263,6 +266,7 @@ void CreateAndQueryOpt(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -293,6 +297,7 @@ void CreateAndQueryStagedAppOpt(const CommSpec& comm_spec,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
} else if (FLAGS_serialize) {
Expand Down
22 changes: 22 additions & 0 deletions grape/communication/shuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ void foreach_helper(const Tuple& t, const Func& func,
}
}

template <typename Tuple, typename Func, std::size_t... index>
void range_foreach_helper(const Tuple& t, size_t begin, size_t end,
const Func& func, index_sequence<index...>) {
for (size_t i = begin; i < end; ++i) {
func(get_const_buffer<index>(t)[i]...);
}
}

template <typename Tuple, typename Func, std::size_t... index>
void foreach_rval_helper(Tuple& t, const Func& func, index_sequence<index...>) {
size_t size = t.size();
Expand All @@ -314,11 +322,25 @@ void foreach_rval_helper(Tuple& t, const Func& func, index_sequence<index...>) {
}
}

template <typename Tuple, typename Func, std::size_t... index>
void range_foreach_rval_helper(Tuple& t, size_t begin, size_t end,
const Func& func, index_sequence<index...>) {
for (size_t i = begin; i < end; ++i) {
func(std::move(get_buffer<index>(t)[i])...);
}
}

template <typename Tuple, typename Func>
void foreach(Tuple& t, const Func& func) {
foreach_helper(t, func, make_index_sequence<Tuple::tuple_size>{});
}

template <typename Tuple, typename Func>
void range_foreach_rval(Tuple& t, size_t begin, size_t end, const Func& func) {
range_foreach_rval_helper(t, begin, end, func,
make_index_sequence<Tuple::tuple_size>{});
}

template <typename Tuple, typename Func>
void foreach_rval(Tuple& t, const Func& func) {
foreach_rval_helper(t, func, make_index_sequence<Tuple::tuple_size>{});
Expand Down
211 changes: 190 additions & 21 deletions grape/fragment/basic_efile_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.
#ifndef GRAPE_FRAGMENT_BASIC_EFILE_FRAGMENT_LOADER_H_
#define GRAPE_FRAGMENT_BASIC_EFILE_FRAGMENT_LOADER_H_

#include <atomic>

#include "grape/communication/shuffle.h"
#include "grape/fragment/basic_fragment_loader_base.h"
#include "grape/fragment/rebalancer.h"
Expand All @@ -25,6 +27,15 @@ limitations under the License.

namespace grape {

inline size_t rehash_oid(size_t val) {
val = (val ^ 61) ^ (val >> 16);
val = val + (val << 3);
val = val ^ (val >> 4);
val = val * 0x27d4eb2d;
val = val ^ (val >> 15);
return val;
}

template <typename FRAG_T>
class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {
using fragment_t = FRAG_T;
Expand Down Expand Up @@ -62,6 +73,8 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {
edge_recv_thread_ =
std::thread(&BasicEFileFragmentLoader::edgeRecvRoutine, this);
recv_thread_running_ = true;

concurrency_ = spec.load_concurrency;
}

~BasicEFileFragmentLoader() {
Expand Down Expand Up @@ -106,43 +119,198 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {
std::move(edges_to_frag_[comm_spec_.fid()].buffers()));
edges_to_frag_[comm_spec_.fid()].Clear();

double t0 = -grape::GetCurrentTime();
std::unique_ptr<VertexMap<oid_t, vid_t>> vm_ptr(
new VertexMap<oid_t, vid_t>());
{
VertexMapBuilder<oid_t, vid_t> builder(
comm_spec_.fid(), comm_spec_.fnum(), std::move(partitioner_),
spec_.idxer_type);
for (auto& buffers : got_edges_) {
foreach_helper(
buffers,
[&builder](const internal_oid_t& src, const internal_oid_t& dst) {
builder.add_vertex(src);
builder.add_vertex(dst);
},
make_index_sequence<2>{});
if (concurrency_ == 1) {
for (auto& buffers : got_edges_) {
foreach_helper(
buffers,
[&builder](const internal_oid_t& src, const internal_oid_t& dst) {
builder.add_vertex(src);
builder.add_vertex(dst);
},
make_index_sequence<2>{});
}
} else {
std::atomic<size_t> idx(0);
std::vector<std::vector<internal_oid_t>> vertices(concurrency_);
std::vector<std::vector<internal_oid_t>> vertices_mat(concurrency_ *
concurrency_);
std::vector<std::thread> threads;
for (int i = 0; i < concurrency_; ++i) {
threads.emplace_back(
[&, this](int tid) {
fid_t fid = comm_spec_.fid();
for (auto& buffer : got_edges_) {
size_t size = buffer.size();
size_t chunk = (size + concurrency_ - 1) / concurrency_;
size_t start = std::min(size, chunk * tid);
size_t end = std::min(size, start + chunk);
if (spec_.idxer_type == IdxerType::kLocalIdxer) {
range_foreach_helper(
buffer, start, end,
[&](const internal_oid_t& src,
const internal_oid_t& dst) {
int src_hash =
rehash_oid(std::hash<internal_oid_t>()(src)) %
concurrency_;
vertices_mat[tid * concurrency_ + src_hash]
.emplace_back(src);
int dst_hash =
rehash_oid(std::hash<internal_oid_t>()(dst)) %
concurrency_;
vertices_mat[tid * concurrency_ + dst_hash]
.emplace_back(dst);
},
make_index_sequence<2>{});
} else {
range_foreach_helper(
buffer, start, end,
[&](const internal_oid_t& src,
const internal_oid_t& dst) {
if (builder.get_fragment_id(src) == fid) {
int src_hash =
rehash_oid(std::hash<internal_oid_t>()(src)) %
concurrency_;
vertices_mat[tid * concurrency_ + src_hash]
.emplace_back(src);
}
if (builder.get_fragment_id(dst) == fid) {
int dst_hash =
rehash_oid(std::hash<internal_oid_t>()(dst)) %
concurrency_;
vertices_mat[tid * concurrency_ + dst_hash]
.emplace_back(dst);
}
},
make_index_sequence<2>{});
}
}
},
i);
}
for (auto& thrd : threads) {
thrd.join();
}
std::vector<std::thread> aggregate_threads;
for (int i = 0; i < concurrency_; ++i) {
aggregate_threads.emplace_back(
[&, this](int tid) {
auto& vec = vertices[tid];
for (int j = 0; j < concurrency_; ++j) {
vec.insert(vec.end(),
vertices_mat[j * concurrency_ + tid].begin(),
vertices_mat[j * concurrency_ + tid].end());
}
DistinctSort(vec);
},
i);
}
for (auto& thrd : aggregate_threads) {
thrd.join();
}
// TODO(luoxiaojian): parallelize this part
for (auto& vec : vertices) {
for (auto& v : vec) {
builder.add_vertex(v);
}
}
}
builder.finish(comm_spec_, *vm_ptr);
}
MPI_Barrier(comm_spec_.comm());
t0 += grape::GetCurrentTime();
if (comm_spec_.worker_id() == 0) {
VLOG(1) << "finished constructing vertex_map, time: " << t0 << " s";
}

double t1 = -grape::GetCurrentTime();
std::vector<Edge<vid_t, edata_t>> processed_edges;
for (auto& buffers : got_edges_) {
foreach_rval(buffers, [&processed_edges, &vm_ptr](internal_oid_t&& src,
internal_oid_t&& dst,
edata_t&& data) {
vid_t src_gid, dst_gid;
if (vm_ptr->GetGid(oid_t(src), src_gid) &&
vm_ptr->GetGid(oid_t(dst), dst_gid)) {
processed_edges.emplace_back(src_gid, dst_gid, std::move(data));
}
});
if (concurrency_ == 1) {
for (auto& buffers : got_edges_) {
foreach_rval(buffers, [&processed_edges, &vm_ptr](internal_oid_t&& src,
internal_oid_t&& dst,
edata_t&& data) {
vid_t src_gid, dst_gid;
if (vm_ptr->GetGid(oid_t(src), src_gid) &&
vm_ptr->GetGid(oid_t(dst), dst_gid)) {
processed_edges.emplace_back(src_gid, dst_gid, std::move(data));
}
});
}
} else {
std::vector<size_t> offsets;
size_t total = 0;
for (auto& buffers : got_edges_) {
offsets.emplace_back(total);
total += buffers.size();
}
processed_edges.resize(total);
std::vector<std::thread> threads;
for (int i = 0; i < concurrency_; ++i) {
threads.emplace_back(
[&, this](int tid) {
size_t global_offset = 0;
for (auto& buffer : got_edges_) {
size_t size = buffer.size();
size_t chunk = (size + concurrency_ - 1) / concurrency_;
size_t start = std::min(size, chunk * tid);
size_t end = std::min(size, start + chunk);
size_t local_offset = global_offset + start;
global_offset += size;
range_foreach_rval(
buffer, start, end,
[&](internal_oid_t&& src, internal_oid_t&& dst,
edata_t&& data) {
vid_t src_gid, dst_gid;
if (vm_ptr->GetGidFromInternalOid(src, src_gid) &&
vm_ptr->GetGidFromInternalOid(dst, dst_gid)) {
processed_edges[local_offset++] = Edge<vid_t, edata_t>(
src_gid, dst_gid, std::move(data));
} else {
processed_edges[local_offset++] = Edge<vid_t, edata_t>(
std::numeric_limits<vid_t>::max(),
std::numeric_limits<vid_t>::max(), std::move(data));
}
});
}
},
i);
}
for (auto& thrd : threads) {
thrd.join();
}
}
MPI_Barrier(comm_spec_.comm());
t1 += grape::GetCurrentTime();
if (comm_spec_.worker_id() == 0) {
VLOG(1) << "finished parsing edges, time: " << t1 << " s";
}

double t2 = -grape::GetCurrentTime();
fragment = std::make_shared<fragment_t>();
std::vector<internal::Vertex<vid_t, vdata_t>> fake_vertices;
fragment->Init(comm_spec_, spec_.directed, std::move(vm_ptr), fake_vertices,
processed_edges);
if (concurrency_ == 1) {
fragment->Init(comm_spec_, spec_.directed, std::move(vm_ptr),
fake_vertices, processed_edges);
} else {
fragment->ParallelInit(comm_spec_, spec_.directed, std::move(vm_ptr),
fake_vertices, processed_edges, concurrency_);
}
MPI_Barrier(comm_spec_.comm());
t2 += grape::GetCurrentTime();
if (comm_spec_.worker_id() == 0) {
VLOG(1) << "finished initializing fragment, time: " << t2 << " s";
}

this->InitOuterVertexData(fragment);
if (!std::is_same<EmptyType, vdata_t>::value) {
this->InitOuterVertexData(fragment);
}
}

private:
Expand Down Expand Up @@ -172,6 +340,7 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {

std::vector<ShuffleBufferTuple<internal_oid_t, internal_oid_t, edata_t>>
got_edges_;
int concurrency_;

using BasicFragmentLoaderBase<FRAG_T>::comm_spec_;
using BasicFragmentLoaderBase<FRAG_T>::spec_;
Expand Down
3 changes: 3 additions & 0 deletions grape/fragment/basic_fragment_loader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct LoadGraphSpec {
PartitionerType partitioner_type;
IdxerType idxer_type;

int load_concurrency;

void set_directed(bool val = true) { directed = val; }
void set_rebalance(bool flag, int weight) {
rebalance = flag;
Expand Down Expand Up @@ -100,6 +102,7 @@ inline LoadGraphSpec DefaultLoadGraphSpec() {
spec.deserialize = false;
spec.partitioner_type = PartitionerType::kHashPartitioner;
spec.idxer_type = IdxerType::kHashMapIdxer;
spec.load_concurrency = 1;
return spec;
}

Expand Down
Loading
Loading