From 0b8526dcdf05ca8a613d69503c0beb478e1d6c78 Mon Sep 17 00:00:00 2001 From: Theodor Amariucai Date: Thu, 3 Feb 2022 15:31:09 +0100 Subject: [PATCH] Bugfixes: - Removed data_service_dataset_op metrics dump (as a newer version of TF is required >=2.8) - Switched from dataset key to dataset fingerprint in ShouldUseLocalWorkers --- .../core/data/service/dispatcher_impl.cc | 29 ++++++++------- .../core/data/service/dispatcher_impl.h | 3 +- .../core/data/service/dispatcher_state.cc | 4 +-- .../data/service/easl/local_workers_utils.cc | 35 ++++++++----------- .../data/service/easl/local_workers_utils.h | 5 +-- .../experimental/data_service_dataset_op.cc | 22 ------------ tensorflow/core/protobuf/service_config.proto | 2 +- 7 files changed, 37 insertions(+), 63 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 3efe3804f027a5..70adaf53bba81b 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -832,9 +832,14 @@ Status DataServiceDispatcherImpl::GetOrCreateJob( GetOrCreateJobRequest::kNumConsumers) { num_consumers = request->num_consumers(); } + + absl::flat_hash_set local_workers; + local_workers.insert(request->local_workers().cbegin(), + request->local_workers().cend()); + TF_RETURN_IF_ERROR(CreateJob(request->dataset_id(), requested_processing_mode, key, num_consumers, - job)); + job, local_workers)); int64 job_client_id; TF_RETURN_IF_ERROR(AcquireJobClientId(job, job_client_id)); response->set_job_client_id(job_client_id); @@ -943,7 +948,9 @@ Status DataServiceDispatcherImpl::ValidateMatchingJob( Status DataServiceDispatcherImpl::CreateJob( int64 dataset_id, ProcessingMode processing_mode, absl::optional named_job_key, - absl::optional num_consumers, std::shared_ptr& job) + absl::optional num_consumers, + std::shared_ptr& job, + absl::flat_hash_set local_workers) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { switch (processing_mode) { case ProcessingMode::PARALLEL_EPOCHS: @@ -1005,10 +1012,10 @@ Status DataServiceDispatcherImpl::CreateJob( bool should_use_local_workers; // Do we have enough throughput to decide to use local workers to save network bandwidth? TF_RETURN_IF_ERROR(service::easl::local_workers_utils::ShouldUseLocalWorkers( - config_, metadata_store_, compute_dataset_key, should_use_local_workers + config_, metadata_store_, dataset_fingerprint, should_use_local_workers )); - if(should_use_local_workers && request.local_workers().size() >= 1) { + if(should_use_local_workers && local_workers.size() >= 1) { target_remote_workers = suggested_worker_count - 1; target_local_workers = 1; } else { @@ -1016,13 +1023,12 @@ Status DataServiceDispatcherImpl::CreateJob( target_local_workers = 0; } } else if(config_.scaling_policy() == 2) { // Use all available workers - target_remote_workers = total_workers - request.local_workers().size(); - target_local_workers = request.local_workers().size(); + target_remote_workers = total_workers - local_workers.size(); + target_local_workers = local_workers.size(); } else if(config_.scaling_policy() == 3) { // Grid search over local and remote workers TF_RETURN_IF_ERROR(service::easl::local_workers_utils::DecideTargetWorkersGridSearch( - config_, metadata_store_, compute_dataset_key, - total_workers - request.local_workers().size(), request.local_workers().size(), - target_remote_workers, target_local_workers + total_workers - local_workers.size(), local_workers.size(), + target_remote_workers, target_local_workers // passed by reference )); } @@ -1061,7 +1067,7 @@ Status DataServiceDispatcherImpl::CreateJob( create_job->set_target_worker_count(suggested_worker_count); create_job->set_target_local_workers(target_local_workers); create_job->set_target_remote_workers(target_remote_workers); - *create_job->mutable_local_workers() = {request.local_workers().begin(), request.local_workers().end()}; + *create_job->mutable_local_workers() = {local_workers.begin(), local_workers.end()}; if (named_job_key.has_value()) { NamedJobKeyDef* key = create_job->mutable_named_job_key(); key->set_name(named_job_key->name); @@ -1120,7 +1126,7 @@ Status DataServiceDispatcherImpl::CreateTasksForJob( std::vector>& tasks) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> workers = state_.ReserveWorkers( - job->job_id, job->target_worker_count, job->target_remote_workers, job->target_local_workers, job->local_workers); + job->job_id, job->target_remote_workers, job->target_local_workers, job->local_workers); if (workers.size() < job->target_worker_count){ VLOG(0) << "EASL - Not enough workers for job. Elasticity policy requires " @@ -1415,7 +1421,6 @@ Status DataServiceDispatcherImpl::ClientHeartbeat( } response->set_job_finished(job->finished); response->set_target_local_workers(job->target_local_workers); - response->set_target_remote_workers(job->target_remote_workers); VLOG(4) << "Found " << response->task_info_size() << " tasks for job client id " << request->job_client_id(); diff --git a/tensorflow/core/data/service/dispatcher_impl.h b/tensorflow/core/data/service/dispatcher_impl.h index 5d7f25903b7d37..1fa71d33a33f68 100644 --- a/tensorflow/core/data/service/dispatcher_impl.h +++ b/tensorflow/core/data/service/dispatcher_impl.h @@ -197,7 +197,8 @@ class DataServiceDispatcherImpl { Status CreateJob(int64 dataset_id, ProcessingMode processing_mode, absl::optional named_job_key, absl::optional num_consumers, - std::shared_ptr& job) + std::shared_ptr& job, + absl::flat_hash_set local_workers) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Creates tasks for the specified worker, one task for every unfinished job. Status CreateTasksForWorker(const std::string& worker_address); diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index c623fb4930ed7e..6926f8a2cfa508 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -431,10 +431,10 @@ DispatcherState::ReserveWorkers( workers.push_back(it->second); VLOG(0) << "(ReserveWorkers) Assigning worker at address " << it->second->address << " to job " << job_id; - workers_by_job_[job_id].push_back(it->second); + workers_by_job_[job_id][it->second->address] = it->second; jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; avail_workers_.erase(it++); - if (target_worker_count == 0) + if (target_local_workers + target_remote_workers == 0) break; } VLOG(0) << "(ReserveWorkers) Number of workers for job " << job_id << " is: " diff --git a/tensorflow/core/data/service/easl/local_workers_utils.cc b/tensorflow/core/data/service/easl/local_workers_utils.cc index d7efe8eacfd586..9f203a2f7b2b0d 100644 --- a/tensorflow/core/data/service/easl/local_workers_utils.cc +++ b/tensorflow/core/data/service/easl/local_workers_utils.cc @@ -14,15 +14,15 @@ namespace local_workers_utils { Status ShouldUseLocalWorkers( const experimental::DispatcherConfig& dispatcher_config, const ::tensorflow::data::easl::MetadataStore& metadata_store, - const std::string& dataset_key, + const int64 dataset_fingerprint, bool& should_use_local_workers) { using NodeMetrics = ::tensorflow::data::easl::NodeMetrics; using ModelMetrics = ::tensorflow::data::easl::ModelMetrics; // Check if we have any metrics for this dataset std::shared_ptr job_metrics; - Status s = metadata_store.GetLastNodeMetricsByDatasetFingerprint( - dataset_key, job_metrics); + Status s = metadata_store.GetInputPipelineMetricsByDatasetFingerprint( + dataset_fingerprint, job_metrics); // We do not yet have the metrics for this dataset --> use 1 worker if(errors::IsNotFound(s)) { @@ -36,8 +36,7 @@ Status ShouldUseLocalWorkers( // Pipeline stats: last TF node metrics std::shared_ptr last_tf_node_metrics; - - s = metadata_store.GetLastNodeMetricsByDatasetKey(dataset_key, last_tf_node_metrics); + s = metadata_store.GetLastNodeMetricsByDatasetFingerprint(dataset_fingerprint, last_tf_node_metrics); if (!s.ok()) { VLOG(0) << "DSL (ShouldUseLocalWorkers) Failed to get the last TF node metrics"; return s; @@ -55,9 +54,9 @@ Status ShouldUseLocalWorkers( VLOG(0) << "DSL (ShouldUseLocalWorkers) Total bytes produced: " << total_bytes_produced << "\n" << "Total num elements: " << total_num_elements << "\n" << "Avg bytes produced per element: " << avg_bytes_per_element << "\n" - << "Decision Threshold: " << dispatcher_config.avg_bytes_per_element_local_thres() << "\n"; + << "Decision Threshold: " << dispatcher_config.avg_bytes_per_element_local_workers_threshold() << "\n"; - if (avg_bytes_per_element > dispatcher_config.avg_bytes_per_element_local_thres()) { + if (avg_bytes_per_element > dispatcher_config.avg_bytes_per_element_local_workers_threshold()) { should_use_local_workers = true; VLOG(0) << "DSL (ShouldUseLocalWorkers) Using local workers! (because avg. bytes per element > threshold) \n"; } @@ -71,8 +70,14 @@ Status ShouldUseLocalWorkers( std::vector records; -void grid_search(int64 num_worker_remote_avail, int64 num_worker_local_avail, - int64& num_worker_remote_target, int64& num_worker_local_target) { +Status DecideTargetWorkersGridSearch( + int64 num_worker_remote_avail, + int64 num_worker_local_avail, + int64& num_worker_remote_target, + int64& num_worker_local_target) { + std::time_t t = std::time(nullptr); + records.push_back(t); + std::vector> test_set = std::vector>(); for(int64 n_r = 0; n_r <= num_worker_remote_avail; n_r++) { for(int64 n_l = 0; n_l <= num_worker_local_avail; n_l++) { @@ -95,19 +100,7 @@ void grid_search(int64 num_worker_remote_avail, int64 num_worker_local_avail, auto p = test_set[index]; num_worker_remote_target = p.first; num_worker_local_target = p.second; -} -Status DecideTargetWorkersGridSearch( - const experimental::DispatcherConfig& dispatcher_config, - const ::tensorflow::data::easl::MetadataStore& metadata_store, - const std::string& dataset_key, - int64 num_worker_remote_avail, - int64 num_worker_local_avail, - int64& num_worker_remote_target, - int64& num_worker_local_target) { - std::time_t t = std::time(nullptr); - records.push_back(t); - grid_search(num_worker_remote_avail, num_worker_local_avail, num_worker_remote_target, num_worker_local_target); VLOG(0) << "DSL (DecideTargetWorkersGridSearch)" << "\n" << "Available remote: " << num_worker_remote_avail << "\n" << "Available local: " << num_worker_local_avail << "\n" diff --git a/tensorflow/core/data/service/easl/local_workers_utils.h b/tensorflow/core/data/service/easl/local_workers_utils.h index 06ed141718ad39..c92ba512765685 100644 --- a/tensorflow/core/data/service/easl/local_workers_utils.h +++ b/tensorflow/core/data/service/easl/local_workers_utils.h @@ -23,13 +23,10 @@ namespace local_workers_utils { Status ShouldUseLocalWorkers( const experimental::DispatcherConfig& dispatcher_config, const ::tensorflow::data::easl::MetadataStore& metadata_store, - const std::string& dataset_key, + const int64 dataset_key, bool& should_use_local_workers); Status DecideTargetWorkersGridSearch( - const experimental::DispatcherConfig& dispatcher_config, - const ::tensorflow::data::easl::MetadataStore& metadata_store, - const std::string& dataset_key, int64 num_worker_remote_avail, int64 num_worker_local_avail, int64& num_worker_remote_target, diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 2c1934a8bc447f..f351abbd5c9c84 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -1019,29 +1019,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } if (enqueue_result && !result.end_of_sequence) { - uint64 current_micro_timestamp = Env::Default()->NowMicros(); - std::string data_source = task.info.worker_address(); - bool if_local = false; - int result_size = result.element.size(); - if (local_tasks_.contains(task.info.worker_address())) { - if_local = true; - local_results_buffer_.push(std::move(result)); - } else { results_.push(std::move(result)); - } - - const char* log_location = std::getenv("EASL_MUYU_WORKER_METRICS"); - if (log_location) { - std::ofstream file(log_location, std::ios_base::app); - - file << current_micro_timestamp << "," - << data_source << "," - << if_local << "," - << result_size << "\n"; - - file.flush(); - file.clear(); - } } get_next_cv_.notify_all(); } diff --git a/tensorflow/core/protobuf/service_config.proto b/tensorflow/core/protobuf/service_config.proto index 0cd447b042db58..ad9ef36df8d585 100644 --- a/tensorflow/core/protobuf/service_config.proto +++ b/tensorflow/core/protobuf/service_config.proto @@ -44,7 +44,7 @@ message DispatcherConfig { // The interval at which the dispatcher should dump log files. int64 log_dumps_interval_ms = 14; // MUYU's modification - int64 avg_bytes_per_element_local_thres = 15; + int64 avg_bytes_per_element_local_workers_threshold = 15; } // Configuration for a tf.data service WorkerServer.