Skip to content

Commit

Permalink
faster load
Browse files Browse the repository at this point in the history
  • Loading branch information
Roger Waleffe authored and Roger Waleffe committed Nov 21, 2023
1 parent 9712815 commit 9c87286
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 33 deletions.
16 changes: 8 additions & 8 deletions src/cpp/src/data/batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void Batch::remoteTo(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id, int t
}

if (sub_batches_.size() > 0) {
// #pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending)
#pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending)
for (int i = 0; i < sub_batches_.size(); i++) {
sub_batches_[i]->remoteTo(pg, worker_id, tag+i, false);
}
Expand Down Expand Up @@ -138,7 +138,7 @@ void Batch::remoteTo(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id, int t
//
// send_tensor(inv_neg_scores_, pg, worker_id, tag);

send_tensor(y_pred_, pg, worker_id, tag);
// send_tensor(y_pred_, pg, worker_id, tag);

// can clear batch, it's sent to another machine at this point
clear();
Expand All @@ -164,14 +164,14 @@ void Batch::remoteReceive(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id,
}

if (sub_batches_.size() > 0) {
// #pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending)
#pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending)
for (int i = 0; i < sub_batches_.size(); i++) {
sub_batches_[i]->remoteReceive(pg, worker_id, tag + i, false);
}
return;
}
Timer t = new Timer(false);
t.start();
// Timer t = new Timer(false);
// t.start();

// edges_ = receive_tensor(pg, worker_id, tag);
//
Expand Down Expand Up @@ -213,9 +213,9 @@ void Batch::remoteReceive(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id,

// inv_neg_scores_ = receive_tensor(pg, worker_id, tag);

y_pred_ = receive_tensor(pg, worker_id, tag);
t.stop();
std::cout<<"batch recv: "<<t.getDuration()<<"\n";
// y_pred_ = receive_tensor(pg, worker_id, tag);
// t.stop();
// std::cout<<"batch recv: "<<t.getDuration()<<"\n";

t_full.stop();
std::cout<<"batch recv full: "<<t_full.getDuration()<<"\n";
Expand Down
18 changes: 9 additions & 9 deletions src/cpp/src/data/dataloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ shared_ptr<Batch> DataLoader::getBatch(at::optional<torch::Device> device, bool

batch->sub_batches_ = sub_batches;

if (compute_worker_)
loadCPUParameters(batch, worker_id);
// if (compute_worker_)
// loadCPUParameters(batch, worker_id);

return batch;
}
Expand Down Expand Up @@ -664,15 +664,15 @@ void DataLoader::loadCPUParameters(shared_ptr<Batch> batch, int id, bool load) {
if (batch->sub_batches_.size() > 0) {

if (batch->sub_batches_[0]->node_features_.defined()) {
std::cout<<"ALREADY LOADED\n";
// std::cout<<"ALREADY LOADED\n";
return;
}

torch::Tensor unique_indices;

if (batch->creator_id_ != -1) {
// received this batch, already have the uniques on the root node_indices_
std::cout<<"received completed batch\n";
// std::cout<<"received completed batch\n";
unique_indices = batch->sub_batches_[0]->root_node_indices_;
} else {
std::vector <torch::Tensor> all_unique_nodes_vec(batch->sub_batches_.size());
Expand All @@ -681,22 +681,22 @@ void DataLoader::loadCPUParameters(shared_ptr<Batch> batch, int id, bool load) {
all_unique_nodes_vec[i] = batch->sub_batches_[i]->unique_node_indices_;
}

Timer t = new Timer(false);
t.start();
// Timer t = new Timer(false);
// t.start();
torch::Tensor all_unique_nodes = torch::cat({all_unique_nodes_vec}, 0);
unique_indices = computeUniques(all_unique_nodes, graph_storage_->getNumNodesInMemory(), id);

for (int i = 0; i < batch->sub_batches_.size(); i++) {
batch->sub_batches_[i]->root_node_indices_ = unique_indices;
}

t.stop();
std::cout<< "calculated and set uniques: " << t.getDuration() << "\n";
// t.stop();
// std::cout<< "calculated and set uniques: " << t.getDuration() << "\n";
// std::cout<<unique_indices.size(0)<<" vs "<<all_unique_nodes.size(0)<<"\n";
}

if (load) {
std::cout<<"load\n";
// std::cout<<"load\n";
// torch::Tensor unique_features = graph_storage_->getNodeFeatures(unique_indices);

int split_size = (int) ceil((float) unique_indices.size(0) / batch->sub_batches_.size());
Expand Down
36 changes: 20 additions & 16 deletions src/cpp/src/data/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ void DENSEGraph::to(torch::Device device, CudaStream *compute_stream, CudaStream
hop_offsets_ = transfer_tensor(hop_offsets_, device, compute_stream, transfer_stream);

out_offsets_ = transfer_tensor(out_offsets_, device, compute_stream, transfer_stream);

in_offsets_ = transfer_tensor(in_offsets_, device, compute_stream, transfer_stream);

for (int i = 0; i < in_neighbors_vec_.size(); i++) {
Expand All @@ -301,13 +300,18 @@ void DENSEGraph::send(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id, int
send_tensor(in_offsets_, pg, worker_id, tag);
// send_tensor(torch::cat({out_offsets_, in_offsets_}, 0), pg, worker_id, tag);
//
// torch::Tensor tmp = torch::cat({out_neighbors_vec_}, 0);
// out_neighbors_vec_ = {};
// out_neighbors_vec_.emplace_back(tmp);
//
// tmp = torch::cat({in_neighbors_vec_}, 0);
// in_neighbors_vec_ = {};
// in_neighbors_vec_.emplace_back(tmp);
torch::Tensor tmp;
if (out_neighbors_vec_.size() > 0) {
tmp = torch::cat({out_neighbors_vec_}, 0);
out_neighbors_vec_ = {};
out_neighbors_vec_.emplace_back(tmp);
}

if (in_neighbors_vec_.size() > 0) {
tmp = torch::cat({in_neighbors_vec_}, 0);
in_neighbors_vec_ = {};
in_neighbors_vec_.emplace_back(tmp);
}

int in_size = in_neighbors_vec_.size();
int out_size = out_neighbors_vec_.size();
Expand All @@ -321,16 +325,16 @@ void DENSEGraph::send(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id, int
}

for (int i = 0; i < in_neighbors_vec_.size(); i++) {
send_tensor(in_neighbors_vec_[i], pg, worker_id, tag);
send_tensor(in_neighbors_vec_[i].select(1, 0), pg, worker_id, tag);
}

for (int i = 0; i < out_neighbors_vec_.size(); i++) {
send_tensor(out_neighbors_vec_[i], pg, worker_id, tag);
send_tensor(out_neighbors_vec_[i].select(1, -1), pg, worker_id, tag);
}

send_tensor(node_properties_, pg, worker_id, tag);
// send_tensor(node_properties_, pg, worker_id, tag);

send_tensor(buffer_state_, pg, worker_id, tag);
// send_tensor(buffer_state_, pg, worker_id, tag);
}

void DENSEGraph::receive(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id, int tag) {
Expand Down Expand Up @@ -361,16 +365,16 @@ void DENSEGraph::receive(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id, i
out_neighbors_vec_ = std::vector<torch::Tensor>(out_size);

for (int i = 0; i < in_neighbors_vec_.size(); i++) {
in_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag);
in_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag).unsqueeze(1);
}

for (int i = 0; i < out_neighbors_vec_.size(); i++) {
out_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag);
out_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag).unsqueeze(1);
}

node_properties_ = receive_tensor(pg, worker_id, tag);
// node_properties_ = receive_tensor(pg, worker_id, tag);

buffer_state_ = receive_tensor(pg, worker_id, tag);
// buffer_state_ = receive_tensor(pg, worker_id, tag);

num_nodes_in_memory_ = metadata[2].item<int64_t>();
partition_size_ = metadata[3].item<int64_t>();
Expand Down

0 comments on commit 9c87286

Please sign in to comment.