From 9c872864584a8c375614cf94b8136a5b22bcd970 Mon Sep 17 00:00:00 2001 From: Roger Waleffe Date: Tue, 21 Nov 2023 17:23:08 -0600 Subject: [PATCH] faster load --- src/cpp/src/data/batch.cpp | 16 +++++++-------- src/cpp/src/data/dataloader.cpp | 18 ++++++++--------- src/cpp/src/data/graph.cpp | 36 ++++++++++++++++++--------------- 3 files changed, 37 insertions(+), 33 deletions(-) diff --git a/src/cpp/src/data/batch.cpp b/src/cpp/src/data/batch.cpp index 03a312fd..eaf8584e 100644 --- a/src/cpp/src/data/batch.cpp +++ b/src/cpp/src/data/batch.cpp @@ -91,7 +91,7 @@ void Batch::remoteTo(shared_ptr pg, int worker_id, int t } if (sub_batches_.size() > 0) { -// #pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending) + #pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending) for (int i = 0; i < sub_batches_.size(); i++) { sub_batches_[i]->remoteTo(pg, worker_id, tag+i, false); } @@ -138,7 +138,7 @@ void Batch::remoteTo(shared_ptr pg, int worker_id, int t // // send_tensor(inv_neg_scores_, pg, worker_id, tag); - send_tensor(y_pred_, pg, worker_id, tag); +// send_tensor(y_pred_, pg, worker_id, tag); // can clear batch, it's sent to another machine at this point clear(); @@ -164,14 +164,14 @@ void Batch::remoteReceive(shared_ptr pg, int worker_id, } if (sub_batches_.size() > 0) { -// #pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending) + #pragma omp parallel for // TODO: need to look at whether this works or not (e.g., parallel sending) for (int i = 0; i < sub_batches_.size(); i++) { sub_batches_[i]->remoteReceive(pg, worker_id, tag + i, false); } return; } - Timer t = new Timer(false); - t.start(); +// Timer t = new Timer(false); +// t.start(); // edges_ = receive_tensor(pg, worker_id, tag); // @@ -213,9 +213,9 @@ void Batch::remoteReceive(shared_ptr pg, int worker_id, // inv_neg_scores_ = receive_tensor(pg, worker_id, tag); - y_pred_ = receive_tensor(pg, worker_id, tag); - t.stop(); - std::cout<<"batch recv: "< DataLoader::getBatch(at::optional device, bool batch->sub_batches_ = sub_batches; - if (compute_worker_) - loadCPUParameters(batch, worker_id); +// if (compute_worker_) +// loadCPUParameters(batch, worker_id); return batch; } @@ -664,7 +664,7 @@ void DataLoader::loadCPUParameters(shared_ptr batch, int id, bool load) { if (batch->sub_batches_.size() > 0) { if (batch->sub_batches_[0]->node_features_.defined()) { - std::cout<<"ALREADY LOADED\n"; +// std::cout<<"ALREADY LOADED\n"; return; } @@ -672,7 +672,7 @@ void DataLoader::loadCPUParameters(shared_ptr batch, int id, bool load) { if (batch->creator_id_ != -1) { // received this batch, already have the uniques on the root node_indices_ - std::cout<<"received completed batch\n"; +// std::cout<<"received completed batch\n"; unique_indices = batch->sub_batches_[0]->root_node_indices_; } else { std::vector all_unique_nodes_vec(batch->sub_batches_.size()); @@ -681,8 +681,8 @@ void DataLoader::loadCPUParameters(shared_ptr batch, int id, bool load) { all_unique_nodes_vec[i] = batch->sub_batches_[i]->unique_node_indices_; } - Timer t = new Timer(false); - t.start(); +// Timer t = new Timer(false); +// t.start(); torch::Tensor all_unique_nodes = torch::cat({all_unique_nodes_vec}, 0); unique_indices = computeUniques(all_unique_nodes, graph_storage_->getNumNodesInMemory(), id); @@ -690,13 +690,13 @@ void DataLoader::loadCPUParameters(shared_ptr batch, int id, bool load) { batch->sub_batches_[i]->root_node_indices_ = unique_indices; } - t.stop(); - std::cout<< "calculated and set uniques: " << t.getDuration() << "\n"; +// t.stop(); +// std::cout<< "calculated and set uniques: " << t.getDuration() << "\n"; // std::cout<getNodeFeatures(unique_indices); int split_size = (int) ceil((float) unique_indices.size(0) / batch->sub_batches_.size()); diff --git a/src/cpp/src/data/graph.cpp b/src/cpp/src/data/graph.cpp index 62410e2f..c6946e80 100644 --- a/src/cpp/src/data/graph.cpp +++ b/src/cpp/src/data/graph.cpp @@ -277,7 +277,6 @@ void DENSEGraph::to(torch::Device device, CudaStream *compute_stream, CudaStream hop_offsets_ = transfer_tensor(hop_offsets_, device, compute_stream, transfer_stream); out_offsets_ = transfer_tensor(out_offsets_, device, compute_stream, transfer_stream); - in_offsets_ = transfer_tensor(in_offsets_, device, compute_stream, transfer_stream); for (int i = 0; i < in_neighbors_vec_.size(); i++) { @@ -301,13 +300,18 @@ void DENSEGraph::send(shared_ptr pg, int worker_id, int send_tensor(in_offsets_, pg, worker_id, tag); // send_tensor(torch::cat({out_offsets_, in_offsets_}, 0), pg, worker_id, tag); // -// torch::Tensor tmp = torch::cat({out_neighbors_vec_}, 0); -// out_neighbors_vec_ = {}; -// out_neighbors_vec_.emplace_back(tmp); -// -// tmp = torch::cat({in_neighbors_vec_}, 0); -// in_neighbors_vec_ = {}; -// in_neighbors_vec_.emplace_back(tmp); + torch::Tensor tmp; + if (out_neighbors_vec_.size() > 0) { + tmp = torch::cat({out_neighbors_vec_}, 0); + out_neighbors_vec_ = {}; + out_neighbors_vec_.emplace_back(tmp); + } + + if (in_neighbors_vec_.size() > 0) { + tmp = torch::cat({in_neighbors_vec_}, 0); + in_neighbors_vec_ = {}; + in_neighbors_vec_.emplace_back(tmp); + } int in_size = in_neighbors_vec_.size(); int out_size = out_neighbors_vec_.size(); @@ -321,16 +325,16 @@ void DENSEGraph::send(shared_ptr pg, int worker_id, int } for (int i = 0; i < in_neighbors_vec_.size(); i++) { - send_tensor(in_neighbors_vec_[i], pg, worker_id, tag); + send_tensor(in_neighbors_vec_[i].select(1, 0), pg, worker_id, tag); } for (int i = 0; i < out_neighbors_vec_.size(); i++) { - send_tensor(out_neighbors_vec_[i], pg, worker_id, tag); + send_tensor(out_neighbors_vec_[i].select(1, -1), pg, worker_id, tag); } - send_tensor(node_properties_, pg, worker_id, tag); +// send_tensor(node_properties_, pg, worker_id, tag); - send_tensor(buffer_state_, pg, worker_id, tag); +// send_tensor(buffer_state_, pg, worker_id, tag); } void DENSEGraph::receive(shared_ptr pg, int worker_id, int tag) { @@ -361,16 +365,16 @@ void DENSEGraph::receive(shared_ptr pg, int worker_id, i out_neighbors_vec_ = std::vector(out_size); for (int i = 0; i < in_neighbors_vec_.size(); i++) { - in_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag); + in_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag).unsqueeze(1); } for (int i = 0; i < out_neighbors_vec_.size(); i++) { - out_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag); + out_neighbors_vec_[i] = receive_tensor(pg, worker_id, tag).unsqueeze(1); } - node_properties_ = receive_tensor(pg, worker_id, tag); +// node_properties_ = receive_tensor(pg, worker_id, tag); - buffer_state_ = receive_tensor(pg, worker_id, tag); +// buffer_state_ = receive_tensor(pg, worker_id, tag); num_nodes_in_memory_ = metadata[2].item(); partition_size_ = metadata[3].item();