Skip to content

Commit

Permalink
uniques offloading to batch workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Roger Waleffe authored and Roger Waleffe committed Nov 21, 2023
1 parent edb983f commit 8f17391
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 71 deletions.
6 changes: 4 additions & 2 deletions src/cpp/include/data/dataloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class DataLoader {
bool batch_worker_;
bool compute_worker_;

std::vector<torch::Tensor> hash_maps_;

DataLoader(shared_ptr<GraphModelStorage> graph_storage, LearningTask learning_task, bool use_partition_embeddings,
shared_ptr<TrainingConfig> training_config, shared_ptr<EvaluationConfig> evaluation_config, shared_ptr<EncoderConfig> encoder_config, bool batch_worker = true, bool compute_worker = true);

Expand Down Expand Up @@ -149,7 +151,7 @@ class DataLoader {
* Loads CPU parameters into batch
* @param batch: Batch object to load parameters into.
*/
void loadCPUParameters(shared_ptr<Batch> batch);
void loadCPUParameters(shared_ptr<Batch> batch, int id = 0, bool load = true);

/**
* Loads GPU parameters into batch
Expand Down Expand Up @@ -182,7 +184,7 @@ class DataLoader {
*/
void unloadStorage(bool write = false) { graph_storage_->unload(write); }

torch::Tensor computeUniques(torch::Tensor node_ids, int64_t num_nodes_in_memory);
torch::Tensor computeUniques(torch::Tensor node_ids, int64_t num_nodes_in_memory, int id);

/**
* Gets the number of edges from the graph storage.
Expand Down
8 changes: 6 additions & 2 deletions src/cpp/include/pipeline/pipeline_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

class BatchToDeviceWorker : public Worker {
public:
BatchToDeviceWorker(Pipeline *pipeline) : Worker{pipeline} {};
int worker_id_;

BatchToDeviceWorker(Pipeline *pipeline, int worker_id) : Worker{pipeline}, worker_id_{worker_id} {};

void run() override;
};
Expand Down Expand Up @@ -51,7 +53,9 @@ class RemoteLoadWorker : public Worker {

class RemoteToDeviceWorker : public Worker {
public:
RemoteToDeviceWorker(Pipeline *pipeline) : Worker{pipeline} {};
int worker_id_;

RemoteToDeviceWorker(Pipeline *pipeline, int worker_id) : Worker{pipeline}, worker_id_{worker_id} {};

void run() override;
};
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/data/batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void Batch::remoteTo(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id, int t
//
// send_tensor(neg_edges_, pg, worker_id, tag);
//
// send_tensor(root_node_indices_, pg, worker_id, tag);
send_tensor(root_node_indices_, pg, worker_id, tag);

send_tensor(unique_node_indices_, pg, worker_id, tag);

Expand Down Expand Up @@ -177,7 +177,7 @@ void Batch::remoteReceive(shared_ptr<c10d::ProcessGroupGloo> pg, int worker_id,
//
// neg_edges_ = receive_tensor(pg, worker_id, tag);

// root_node_indices_ = receive_tensor(pg, worker_id, tag);
root_node_indices_ = receive_tensor(pg, worker_id, tag);

unique_node_indices_ = receive_tensor(pg, worker_id, tag);

Expand Down
103 changes: 53 additions & 50 deletions src/cpp/src/data/dataloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ DataLoader::DataLoader(shared_ptr<GraphModelStorage> graph_storage, LearningTask
pg_gloo_ = nullptr;
dist_config_ = nullptr;
// dist_ = false;


int num_hash_maps = training_config_->pipeline->batch_transfer_threads;
if (num_hash_maps > 0) {
auto bool_device_options = torch::TensorOptions().dtype(torch::kBool).device(torch::kCPU);
for (int i = 0; i < num_hash_maps; i++) {
hash_maps_.emplace_back(torch::zeros({graph_storage_->getNumNodes()}, bool_device_options));
}
}
}

DataLoader::DataLoader(shared_ptr<GraphModelStorage> graph_storage, LearningTask learning_task, int batch_size, shared_ptr<NegativeSampler> negative_sampler,
Expand Down Expand Up @@ -631,7 +640,7 @@ void DataLoader::negativeSample(shared_ptr<Batch> batch) {
negative_sampler_->getNegatives(graph_storage_->current_subgraph_state_->in_memory_subgraph_, batch->edges_, false);
}

void DataLoader::loadCPUParameters(shared_ptr<Batch> batch) {
void DataLoader::loadCPUParameters(shared_ptr<Batch> batch, int id, bool load) {
if (graph_storage_->storage_ptrs_.node_embeddings != nullptr) {
if (graph_storage_->storage_ptrs_.node_embeddings->device_ != torch::kCUDA) {
batch->node_embeddings_ = graph_storage_->getNodeEmbeddings(batch->unique_node_indices_);
Expand All @@ -642,7 +651,7 @@ void DataLoader::loadCPUParameters(shared_ptr<Batch> batch) {
}

// if (graph_storage_->storage_ptrs_.node_features != nullptr) {
if (graph_storage_->storage_ptrs_.node_features != nullptr and compute_worker_) {
if (graph_storage_->storage_ptrs_.node_features != nullptr) {
if (graph_storage_->storage_ptrs_.node_features->device_ != torch::kCUDA) {
if (only_root_features_) {
batch->node_features_ = graph_storage_->getNodeFeatures(batch->root_node_indices_);
Expand All @@ -652,58 +661,51 @@ void DataLoader::loadCPUParameters(shared_ptr<Batch> batch) {


if (batch->sub_batches_.size() > 0) {
// std::cout << "start\n";
std::vector<torch::Tensor> all_unique_nodes_vec(batch->sub_batches_.size());
// int total_unique_nodes = 0;

// #pragma omp parallel for
for (int i = 0; i < batch->sub_batches_.size(); i++) {
all_unique_nodes_vec[i] = batch->sub_batches_[i]->unique_node_indices_;
// total_unique_nodes += batch->sub_batches_[i]->unique_node_indices_.size(0);
torch::Tensor unique_indices;

// std::cout << batch->sub_batches_[i]->unique_node_indices_.sizes() << " "
// << batch->sub_batches_[i]->unique_node_indices_.device() << "\n";
}
if (batch->creator_id_ != -1) {
// received this batch, already have the uniques on the root node_indices_
std::cout<<"received completed batch\n";
unique_indices = batch->sub_batches_[0]->root_node_indices_;
} else {
std::vector <torch::Tensor> all_unique_nodes_vec(batch->sub_batches_.size());

for (int i = 0; i < batch->sub_batches_.size(); i++) {
all_unique_nodes_vec[i] = batch->sub_batches_[i]->unique_node_indices_;
}

Timer t = new Timer(false);
t.start();
// std::cout << "cat\n";
torch::Tensor all_unique_nodes = torch::cat({all_unique_nodes_vec}, 0);
// std::cout << all_unique_nodes.sizes() << "\n";
// auto unique_nodes = torch::_unique2(all_unique_nodes, true, true, false);
// torch::Tensor unique_indices = std::get<0>(unique_nodes);
// torch::Tensor inverse = std::get<1>(unique_nodes);
// torch::Tensor unique_features = graph_storage_->getNodeFeatures(unique_indices);
// std::cout << unique_indices.sizes() << "\n";
// std::cout << inverse.sizes() << " " << inverse.device() << "\n";
// std::cout << unique_features.sizes() << " " << unique_features.device() << "\n";

torch::Tensor unique_indices = computeUniques(all_unique_nodes, graph_storage_->getNumNodesInMemory());
torch::Tensor unique_features = graph_storage_->getNodeFeatures(unique_indices);

std::cout<<unique_indices.size(0)<<" vs " <<all_unique_nodes.size(0)<<"\n";
t.stop();
std::cout<<"uniques: "<<t.getDuration()<<"\n";

// std::cout << "end cat\n";
int count = 0;
int count1 = 0;
int split_size = (int) ceil((float) unique_features.size(0) / batch->sub_batches_.size());
for (int i = 0; i < batch->sub_batches_.size(); i++) {
if (!batch->sub_batches_[i]->node_features_.defined()) {
// batch->sub_batches_[i]->unique_node_indices_ = inverse.narrow(0, count, batch->sub_batches_[i]->unique_node_indices_.size(0));
// count += batch->sub_batches_[i]->unique_node_indices_.size(0);
// std::cout << batch->sub_batches_[i]->unique_node_indices_.sizes() << "\n";
Timer t = new Timer(false);
t.start();
torch::Tensor all_unique_nodes = torch::cat({all_unique_nodes_vec}, 0);
unique_indices = computeUniques(all_unique_nodes, graph_storage_->getNumNodesInMemory(), id);

for (int i = 0; i < batch->sub_batches_.size(); i++) {
batch->sub_batches_[i]->root_node_indices_ = unique_indices;
}

t.stop();
std::cout<< "calculated and set uniques: " << t.getDuration() << "\n";
// std::cout<<unique_indices.size(0)<<" vs "<<all_unique_nodes.size(0)<<"\n";
}

if (load) {
std::cout<<"load\n";
torch::Tensor unique_features = graph_storage_->getNodeFeatures(unique_indices);

int count = 0;
int split_size = (int) ceil((float) unique_features.size(0) / batch->sub_batches_.size());
for (int i = 0; i < batch->sub_batches_.size(); i++) {
int size = split_size;
if (count1 + split_size > unique_features.size(0)) size = unique_features.size(0) - count1;
batch->sub_batches_[i]->node_features_ = unique_features.narrow(0, count1, size);
count1 += size;
// std::cout << batch->sub_batches_[i]->node_features_.sizes() << "\n";
if (count + split_size > unique_features.size(0))
size = unique_features.size(0) - count;

batch->sub_batches_[i]->node_features_ = unique_features.narrow(0, count, size);
count += size;
}
}
// std::cout << "end\n";


} else {
batch->node_features_ = graph_storage_->getNodeFeatures(batch->unique_node_indices_);
}
Expand Down Expand Up @@ -803,7 +805,7 @@ void DataLoader::loadStorage() {
}
}

torch::Tensor DataLoader::computeUniques(torch::Tensor node_ids, int64_t num_nodes_in_memory) {
torch::Tensor DataLoader::computeUniques(torch::Tensor node_ids, int64_t num_nodes_in_memory, int id) {
unsigned int num_threads = 1;

#ifdef MARIUS_OMP
Expand All @@ -816,8 +818,9 @@ torch::Tensor DataLoader::computeUniques(torch::Tensor node_ids, int64_t num_nod

int64_t chunk_size = ceil((double)num_nodes_in_memory / num_threads);

auto bool_device_options = torch::TensorOptions().dtype(torch::kBool).device(node_ids.device());
torch::Tensor hash_map = torch::zeros({num_nodes_in_memory}, bool_device_options);
// auto bool_device_options = torch::TensorOptions().dtype(torch::kBool).device(node_ids.device());
// torch::Tensor hash_map = torch::zeros({num_nodes_in_memory}, bool_device_options);
torch::Tensor hash_map = hash_maps_[id];

auto hash_map_accessor = hash_map.accessor<bool, 1>();
auto nodes_accessor = node_ids.accessor<int64_t, 1>();
Expand Down Expand Up @@ -868,7 +871,7 @@ torch::Tensor DataLoader::computeUniques(torch::Tensor node_ids, int64_t num_nod
for (int64_t j = start; j < end; j++) {
if (hash_map_accessor[j]) {
delta_ids_accessor[private_count++] = j;
// hash_map_accessor[j] = 0;
hash_map_accessor[j] = 0;
grow_count++;

if (grow_count == upper_bound) {
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ shared_ptr<Worker> Pipeline::initWorkerOfType(int worker_type, int gpu_id, int w
if (worker_type == LOAD_BATCH_ID) {
worker = std::make_shared<LoadBatchWorker>(this, worker_id);
} else if (worker_type == H2D_TRANSFER_ID) {
worker = std::make_shared<BatchToDeviceWorker>(this);
worker = std::make_shared<BatchToDeviceWorker>(this, worker_id);
} else if (worker_type == CPU_COMPUTE_ID) {
worker = std::make_shared<ComputeWorkerCPU>(this);
} else if (worker_type == GPU_COMPUTE_ID) {
Expand All @@ -153,7 +153,7 @@ shared_ptr<Worker> Pipeline::initWorkerOfType(int worker_type, int gpu_id, int w
else if (worker_type == REMOTE_LOADER_ID) {
worker = std::make_shared<RemoteLoadWorker>(this);
} else if (worker_type == REMOTE_TO_DEVICE_ID) {
worker = std::make_shared<RemoteToDeviceWorker>(this);
worker = std::make_shared<RemoteToDeviceWorker>(this, worker_id);
} else if (worker_type == REMOTE_TO_HOST_ID) {
worker = std::make_shared<RemoteToHostWorker>(this);
} else if (worker_type == REMOTE_LISTEN_FOR_UPDATES_ID) {
Expand Down
30 changes: 17 additions & 13 deletions src/cpp/src/pipeline/pipeline_gpu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ void RemoteToDeviceWorker::run() {

if (child == pipeline_->model_->pg_gloo_->pg->getRank()) { // child is self
// need to call regular to device here
batchToDevice(pipeline_, batch);
batchToDevice(pipeline_, batch); // TODO this would need to load cpu parameters given recent changes
continue;
}

pipeline_->dataloader_->loadCPUParameters(batch, worker_id_, false);
batch->creator_id_ = pipeline_->model_->pg_gloo_->pg->getRank();
batch->remoteTo(pipeline_->model_->pg_gloo_->pg, child, tag);
// t.stop();
Expand Down Expand Up @@ -150,18 +151,21 @@ void BatchToDeviceWorker::run() {
break;
}

if (batch->sub_batches_.size() > 0) {
if (!batch->sub_batches_[0]->node_features_.defined()) {
pipeline_->dataloader_->loadCPUParameters(batch);
}
} else {
if (!batch->node_features_.defined())
pipeline_->dataloader_->loadCPUParameters(batch);
// batch->node_features_ = pipeline_->dataloader_->graph_storage_->getNodeFeatures(batch->unique_node_indices_);
// batch->node_labels_ = pipeline_->dataloader_->graph_storage_->getNodeLabels(
// batch->dense_graph_.node_ids_.narrow(0, batch->dense_graph_.hop_offsets_[-2].item<int64_t>(),
// (batch->dense_graph_.node_ids_.size(0)-batch->dense_graph_.hop_offsets_[-2]).item<int64_t>())).flatten(0, 1);
}

// if (batch->sub_batches_.size() > 0) {
// if (!batch->sub_batches_[0]->node_features_.defined()) {
// pipeline_->dataloader_->loadCPUParameters(batch);
// }
// } else {
// if (!batch->node_features_.defined())
// pipeline_->dataloader_->loadCPUParameters(batch);
//// batch->node_features_ = pipeline_->dataloader_->graph_storage_->getNodeFeatures(batch->unique_node_indices_);
//// batch->node_labels_ = pipeline_->dataloader_->graph_storage_->getNodeLabels(
//// batch->dense_graph_.node_ids_.narrow(0, batch->dense_graph_.hop_offsets_[-2].item<int64_t>(),
//// (batch->dense_graph_.node_ids_.size(0)-batch->dense_graph_.hop_offsets_[-2]).item<int64_t>())).flatten(0, 1);
// }
pipeline_->dataloader_->loadCPUParameters(batch, worker_id_);


batchToDevice(pipeline_, batch);
t.stop();
Expand Down

0 comments on commit 8f17391

Please sign in to comment.