Skip to content

Commit

Permalink
fix bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
luoxiaojian committed Sep 19, 2024
1 parent 260ae20 commit 19e0992
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 116 deletions.
1 change: 1 addition & 0 deletions examples/analytical_apps/run_app_vc.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void CreateAndQueryVC(const CommSpec& comm_spec, const std::string& out_prefix,
graph_spec.set_serialize(true, FLAGS_serialization_prefix);
}
graph_spec.single_scan = FLAGS_single_scan_load;
graph_spec.load_concurrency = FLAGS_load_concurrency;

using FRAG_T =
ImmutableVertexcutFragment<int64_t, grape::EmptyType, grape::EmptyType>;
Expand Down
40 changes: 8 additions & 32 deletions grape/fragment/basic_vc_ds_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,24 @@ class BasicVCDSFragmentLoader {
(sizeof(oid_t) + sizeof(oid_t) + sizeof(edata_t)) * comm_spec_.fnum() *
shuffle_out_size);
#endif
static constexpr size_t thread_local_cache_size = 128;
for (int i = 0; i < load_concurrency_; ++i) {
edge_move_threads_.emplace_back([this] {
ShuffleBufferTuple<oid_t, oid_t, edata_t> cur;
std::vector<std::vector<edge_t>> edges_cache(bucket_num_ * bucket_num_);
#ifdef TRACKING_MEMORY_ALLOCATIONS
// allocate memory for thread local edge cache
MemoryTracker::GetInstance().allocate(
(sizeof(oid_t) + sizeof(oid_t) + sizeof(edata_t)) * 4096);
(sizeof(oid_t) + sizeof(oid_t) + sizeof(edata_t)) *
thread_local_cache_size);
#endif
while (got_edges_.Get(cur)) {
size_t cur_size = cur.size();
foreach_rval(cur, [&edges_cache, this](oid_t src, oid_t dst,
edata_t data) {
int bucket_id = bucketer_->get_bucket_id(src, dst);
edges_cache[bucket_id].emplace_back(src, dst, data);
if (edges_cache[bucket_id].size() >= 4096) {
if (edges_cache[bucket_id].size() >= thread_local_cache_size) {
size_t cursor = bucket_cursor_[bucket_id].fetch_add(
edges_cache[bucket_id].size());
std::copy(edges_cache[bucket_id].begin(),
Expand All @@ -201,7 +203,8 @@ class BasicVCDSFragmentLoader {
#ifdef TRACKING_MEMORY_ALLOCATIONS
// deallocate memory for thread local edge cache
MemoryTracker::GetInstance().deallocate(
(sizeof(oid_t) + sizeof(oid_t) + sizeof(edata_t)) * 4096);
(sizeof(oid_t) + sizeof(oid_t) + sizeof(edata_t)) *
thread_local_cache_size);
#endif
});
}
Expand Down Expand Up @@ -255,36 +258,9 @@ class BasicVCDSFragmentLoader {
}
bucket_edge_offset.emplace_back(en);

{
std::atomic<int> d2_bucket_idx(0);
int d2_bucket_num = bucket_num_ * bucket_num_;
std::vector<std::thread> sort_threads;
for (int i = 0; i < load_concurrency_; ++i) {
sort_threads.emplace_back(
[this, &d2_bucket_idx, d2_bucket_num,
&bucket_edge_offset](int tid) {
while (true) {
int idx = d2_bucket_idx.fetch_add(1);
if (idx >= d2_bucket_num) {
break;
}
std::sort(edges_.begin() + bucket_edge_offset[idx],
edges_.begin() + bucket_edge_offset[idx + 1],
[](const edge_t& a, const edge_t& b) {
return a.src < b.src ||
(a.src == b.src && a.dst < b.dst);
});
}
},
i);
}
for (auto& thrd : sort_threads) {
thrd.join();
}
}

fragment.reset(new fragment_t());
fragment->Init(comm_spec_, vnum_, std::move(edges_));
fragment->Init(comm_spec_, vnum_, std::move(edges_), bucket_num_,
std::move(bucket_edge_offset));
}

private:
Expand Down
44 changes: 10 additions & 34 deletions grape/fragment/basic_vc_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class BasicVCFragmentLoader {
#endif

{
static constexpr size_t thread_local_cache_size = 4096;
static constexpr size_t thread_local_cache_size = 128;
#ifdef TRACKING_MEMORY_ALLOCATIONS
// allocate thread local cache
MemoryTracker::GetInstance().allocate(
Expand Down Expand Up @@ -209,10 +209,13 @@ class BasicVCFragmentLoader {
}

for (size_t i = 0; i < bucket_num_ * bucket_num_; ++i) {
size_t cursor = bucket_edge_cursor[i].fetch_add(
thread_local_edges[i].size());
std::copy(thread_local_edges[i].begin(),
thread_local_edges[i].end(), edges.begin() + cursor);
if (!thread_local_edges[i].empty()) {
size_t cursor = bucket_edge_cursor[i].fetch_add(
thread_local_edges[i].size());
std::copy(thread_local_edges[i].begin(),
thread_local_edges[i].end(),
edges.begin() + cursor);
}
}
},
i);
Expand All @@ -233,36 +236,9 @@ class BasicVCFragmentLoader {
#endif
}

{
std::atomic<int> d2_bucket_idx(0);
int d2_bucket_num = bucket_num_ * bucket_num_;
std::vector<std::thread> sort_threads;
for (int i = 0; i < load_concurrency_; ++i) {
sort_threads.emplace_back(
[this, &d2_bucket_idx, d2_bucket_num, &edges,
&bucket_edge_offset](int tid) {
while (true) {
int idx = d2_bucket_idx.fetch_add(1);
if (idx >= d2_bucket_num) {
break;
}
std::sort(edges.begin() + bucket_edge_offset[idx],
edges.begin() + bucket_edge_offset[idx + 1],
[](const edge_t& a, const edge_t& b) {
return a.src < b.src ||
(a.src == b.src && a.dst < b.dst);
});
}
},
i);
}
for (auto& thrd : sort_threads) {
thrd.join();
}
}

fragment.reset(new fragment_t());
fragment->Init(comm_spec_, vnum_, std::move(edges));
fragment->Init(comm_spec_, vnum_, std::move(edges), bucket_num_,
std::move(bucket_edge_offset));
}

private:
Expand Down
130 changes: 80 additions & 50 deletions grape/fragment/immutable_vertexcut_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class ImmutableVertexcutFragment<int64_t, EmptyType, EmptyType>
using base_t::fnum_;

void Init(const CommSpec& comm_spec, int64_t vnum,
std::vector<edge_t>&& edges) {
std::vector<edge_t>&& edges, int bucket_num = 1,
std::vector<size_t>&& bucket_edge_offsets = {}) {
base_t::init(comm_spec.fid(), comm_spec.fnum(), false);

partitioner_.init(comm_spec.fnum(), vnum);
Expand Down Expand Up @@ -109,16 +110,36 @@ class ImmutableVertexcutFragment<int64_t, EmptyType, EmptyType>
edges_ = std::move(edges);
#endif

bucket_num_ = 1;
bucket_edge_offsets_.push_back(0);
bucket_edge_offsets_.push_back(edges_.size());
bucket_num_ = bucket_num;
if (bucket_num_ == 1) {
bucket_edge_offsets_.push_back(0);
bucket_edge_offsets_.push_back(edges_.size());
} else {
bucket_edge_offsets_ = std::move(bucket_edge_offsets);
}
}

void PrepareToRunApp(const CommSpec& comm_spec, PrepareConf conf,
const ParallelEngineSpec& pe_spec) override {
if (bucket_num_ < static_cast<int>(pe_spec.thread_num)) {
buildBucket(pe_spec.thread_num);
}
oid_t src_begin = src_vertices_.begin_value();
size_t src_size = src_vertices_.size();
size_t src_chunk = (src_size + pe_spec.thread_num - 1) / pe_spec.thread_num;
oid_t dst_begin = dst_vertices_.begin_value();
size_t dst_size = dst_vertices_.size();
size_t dst_chunk = (dst_size + pe_spec.thread_num - 1) / pe_spec.thread_num;
for (int src_i = 0; src_i < bucket_num_; ++src_i) {
for (int dst_i = 0; dst_i < bucket_num_; ++dst_i) {
for (auto& e : GetEdgesOfBucket(src_i, dst_i)) {
int src_bucket_id = (e.src - src_begin) / src_chunk;
int dst_bucket_id = (e.dst - dst_begin) / dst_chunk;
CHECK_EQ(src_bucket_id, src_i);
CHECK_EQ(dst_bucket_id, dst_i);
}
}
}
}

const vertices_t& SourceVertices() const { return src_vertices_; }
Expand Down Expand Up @@ -147,20 +168,18 @@ class ImmutableVertexcutFragment<int64_t, EmptyType, EmptyType>

InArchive arc;
arc << edges_.size();
arc << src_vertices_ << dst_vertices_ << vertices_ << master_vertices_;
arc << src_vertices_ << dst_vertices_ << vertices_ << master_vertices_
<< bucket_num_ << bucket_edge_offsets_;
CHECK(io_adaptor->WriteArchive(arc));
arc.Clear();

partitioner_.serialize(io_adaptor);

if (std::is_pod<edata_t>::value && std::is_pod<oid_t>::value) {
LOG(INFO) << "is pod";
if (!edges_.empty()) {
io_adaptor->Write(edges_.data(), edges_.size() * sizeof(edge_t));
}
} else {
LOG(FATAL) << "is not pod";

arc << edges_;
if (!arc.Empty()) {
CHECK(io_adaptor->WriteArchive(arc));
Expand All @@ -186,12 +205,12 @@ class ImmutableVertexcutFragment<int64_t, EmptyType, EmptyType>
arc >> edge_num;
CHECK_EQ(fid_, comm_spec.fid());
CHECK_EQ(fnum_, comm_spec.fnum());
arc >> src_vertices_ >> dst_vertices_ >> vertices_ >> master_vertices_;
arc >> src_vertices_ >> dst_vertices_ >> vertices_ >> master_vertices_ >>
bucket_num_ >> bucket_edge_offsets_;

partitioner_.deserialize(io_adaptor);

if (std::is_pod<edata_t>::value && std::is_pod<oid_t>::value) {
LOG(INFO) << "is pod";
edges_.resize(edge_num);
#ifdef TRACKING_MEMORY_ALLOCATIONS
MemoryTracker::GetInstance().allocate(sizeof(edge_t) * edges_.size());
Expand All @@ -200,15 +219,10 @@ class ImmutableVertexcutFragment<int64_t, EmptyType, EmptyType>
CHECK(io_adaptor->Read(edges_.data(), edge_num * sizeof(edge_t)));
}
} else {
LOG(FATAL) << "is not pod";
arc.Clear();
CHECK(io_adaptor->ReadArchive(arc));
arc >> edges_;
}

bucket_num_ = 1;
bucket_edge_offsets_.push_back(0);
bucket_edge_offsets_.push_back(edges_.size());
}

size_t GetTotalVerticesNum() const override {
Expand Down Expand Up @@ -237,59 +251,75 @@ class ImmutableVertexcutFragment<int64_t, EmptyType, EmptyType>
if (bucket_num_ == thread_num) {
return;
}
std::vector<std::vector<size_t>> thread_bucket_edge_nums(thread_num);
std::vector<std::thread> threads;
std::atomic<size_t> edge_idx(0);
size_t edge_num = edges_.size();

oid_t src_begin = src_vertices_.begin_value();
size_t src_size = src_vertices_.size();
size_t src_chunk = (src_size + thread_num - 1) / thread_num;
oid_t dst_begin = dst_vertices_.begin_value();
size_t dst_size = dst_vertices_.size();
size_t dst_chunk = (dst_size + thread_num - 1) / thread_num;

for (int i = 0; i < thread_num; ++i) {
threads.emplace_back(
[&, this](int tid) {
thread_bucket_edge_nums[tid].clear();
thread_bucket_edge_nums[tid].resize(thread_num * thread_num, 0);
while (true) {
size_t idx = std::min(edge_idx.fetch_add(4096), edge_num);
size_t end = std::min(idx + 4096, edge_num);
if (idx == end) {
break;
}
while (idx < end) {
auto& edge = edges_[idx];
int src_bucket_id = (edge.src - src_begin) / src_chunk;
int dst_bucket_id = (edge.dst - dst_begin) / dst_chunk;
thread_bucket_edge_nums[tid][src_bucket_id * thread_num +
dst_bucket_id]++;
++idx;
}
}
},
i);
}
for (auto& t : threads) {
t.join();
}
std::vector<size_t> bucket_edge_num(thread_num * thread_num, 0);
for (int i = 0; i < thread_num; ++i) {
for (int j = 0; j < thread_num * thread_num; ++j) {
bucket_edge_num[j] += thread_bucket_edge_nums[i][j];
{
std::vector<std::vector<size_t>> thread_bucket_edge_nums(thread_num);
std::vector<std::thread> threads;
std::atomic<size_t> edge_idx(0);
size_t edge_num = edges_.size();

for (int i = 0; i < thread_num; ++i) {
threads.emplace_back(
[&, this](int tid) {
thread_bucket_edge_nums[tid].clear();
thread_bucket_edge_nums[tid].resize(thread_num * thread_num, 0);
while (true) {
size_t idx = std::min(edge_idx.fetch_add(4096), edge_num);
size_t end = std::min(idx + 4096, edge_num);
if (idx == end) {
break;
}
while (idx < end) {
auto& edge = edges_[idx];
int src_bucket_id = (edge.src - src_begin) / src_chunk;
int dst_bucket_id = (edge.dst - dst_begin) / dst_chunk;
thread_bucket_edge_nums[tid][src_bucket_id * thread_num +
dst_bucket_id]++;
++idx;
}
}
},
i);
}
for (auto& t : threads) {
t.join();
}
for (int i = 0; i < thread_num; ++i) {
for (int j = 0; j < thread_num * thread_num; ++j) {
bucket_edge_num[j] += thread_bucket_edge_nums[i][j];
}
}
}

bucket_num_ = thread_num;
bucket_edge_offsets_.clear();
edge_num = 0;
size_t edge_num = 0;
for (size_t i = 0; i < bucket_edge_num.size(); ++i) {
bucket_edge_offsets_.push_back(edge_num);
edge_num += bucket_edge_num[i];
}
bucket_edge_offsets_.push_back(edge_num);
std::sort(edges_.begin(), edges_.end(),
[&](const edge_t& a, const edge_t& b) {
int src_bucket_id_a = (a.src - src_begin) / src_chunk;
int src_bucket_id_b = (b.src - src_begin) / src_chunk;

if (src_bucket_id_a != src_bucket_id_b) {
return src_bucket_id_a < src_bucket_id_b;
}

int dst_bucket_id_a = (a.dst - dst_begin) / dst_chunk;
int dst_bucket_id_b = (b.dst - dst_begin) / dst_chunk;

return dst_bucket_id_a < dst_bucket_id_b;
});
}

VCPartitioner<oid_t> partitioner_;
Expand Down

0 comments on commit 19e0992

Please sign in to comment.