Skip to content

Commit

Permalink
Bugfixes:
Browse files Browse the repository at this point in the history
- Removed data_service_dataset_op metrics dump (as a newer version of TF is required >=2.8)
- Switched from dataset key to dataset fingerprint in ShouldUseLocalWorkers
  • Loading branch information
amariucaitheodor committed Feb 4, 2022
1 parent d7a04b7 commit 0b8526d
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 63 deletions.
29 changes: 17 additions & 12 deletions tensorflow/core/data/service/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,14 @@ Status DataServiceDispatcherImpl::GetOrCreateJob(
GetOrCreateJobRequest::kNumConsumers) {
num_consumers = request->num_consumers();
}

absl::flat_hash_set<std::string> local_workers;
local_workers.insert(request->local_workers().cbegin(),
request->local_workers().cend());

TF_RETURN_IF_ERROR(CreateJob(request->dataset_id(),
requested_processing_mode, key, num_consumers,
job));
job, local_workers));
int64 job_client_id;
TF_RETURN_IF_ERROR(AcquireJobClientId(job, job_client_id));
response->set_job_client_id(job_client_id);
Expand Down Expand Up @@ -943,7 +948,9 @@ Status DataServiceDispatcherImpl::ValidateMatchingJob(
Status DataServiceDispatcherImpl::CreateJob(
int64 dataset_id, ProcessingMode processing_mode,
absl::optional<NamedJobKey> named_job_key,
absl::optional<int64> num_consumers, std::shared_ptr<const Job>& job)
absl::optional<int64> num_consumers,
std::shared_ptr<const Job>& job,
absl::flat_hash_set<std::string> local_workers)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
switch (processing_mode) {
case ProcessingMode::PARALLEL_EPOCHS:
Expand Down Expand Up @@ -1005,24 +1012,23 @@ Status DataServiceDispatcherImpl::CreateJob(

bool should_use_local_workers; // Do we have enough throughput to decide to use local workers to save network bandwidth?
TF_RETURN_IF_ERROR(service::easl::local_workers_utils::ShouldUseLocalWorkers(
config_, metadata_store_, compute_dataset_key, should_use_local_workers
config_, metadata_store_, dataset_fingerprint, should_use_local_workers
));

if(should_use_local_workers && request.local_workers().size() >= 1) {
if(should_use_local_workers && local_workers.size() >= 1) {
target_remote_workers = suggested_worker_count - 1;
target_local_workers = 1;
} else {
target_remote_workers = suggested_worker_count;
target_local_workers = 0;
}
} else if(config_.scaling_policy() == 2) { // Use all available workers
target_remote_workers = total_workers - request.local_workers().size();
target_local_workers = request.local_workers().size();
target_remote_workers = total_workers - local_workers.size();
target_local_workers = local_workers.size();
} else if(config_.scaling_policy() == 3) { // Grid search over local and remote workers
TF_RETURN_IF_ERROR(service::easl::local_workers_utils::DecideTargetWorkersGridSearch(
config_, metadata_store_, compute_dataset_key,
total_workers - request.local_workers().size(), request.local_workers().size(),
target_remote_workers, target_local_workers
total_workers - local_workers.size(), local_workers.size(),
target_remote_workers, target_local_workers // passed by reference
));
}

Expand Down Expand Up @@ -1061,7 +1067,7 @@ Status DataServiceDispatcherImpl::CreateJob(
create_job->set_target_worker_count(suggested_worker_count);
create_job->set_target_local_workers(target_local_workers);
create_job->set_target_remote_workers(target_remote_workers);
*create_job->mutable_local_workers() = {request.local_workers().begin(), request.local_workers().end()};
*create_job->mutable_local_workers() = {local_workers.begin(), local_workers.end()};
if (named_job_key.has_value()) {
NamedJobKeyDef* key = create_job->mutable_named_job_key();
key->set_name(named_job_key->name);
Expand Down Expand Up @@ -1120,7 +1126,7 @@ Status DataServiceDispatcherImpl::CreateTasksForJob(
std::vector<std::shared_ptr<const Task>>& tasks)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::vector<std::shared_ptr<const Worker>> workers = state_.ReserveWorkers(
job->job_id, job->target_worker_count, job->target_remote_workers, job->target_local_workers, job->local_workers);
job->job_id, job->target_remote_workers, job->target_local_workers, job->local_workers);
if (workers.size() < job->target_worker_count){
VLOG(0)
<< "EASL - Not enough workers for job. Elasticity policy requires "
Expand Down Expand Up @@ -1415,7 +1421,6 @@ Status DataServiceDispatcherImpl::ClientHeartbeat(
}
response->set_job_finished(job->finished);
response->set_target_local_workers(job->target_local_workers);
response->set_target_remote_workers(job->target_remote_workers);
VLOG(4) << "Found " << response->task_info_size()
<< " tasks for job client id " << request->job_client_id();

Expand Down
3 changes: 2 additions & 1 deletion tensorflow/core/data/service/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ class DataServiceDispatcherImpl {
Status CreateJob(int64 dataset_id, ProcessingMode processing_mode,
absl::optional<DispatcherState::NamedJobKey> named_job_key,
absl::optional<int64> num_consumers,
std::shared_ptr<const DispatcherState::Job>& job)
std::shared_ptr<const DispatcherState::Job>& job,
absl::flat_hash_set<std::string> local_workers)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Creates tasks for the specified worker, one task for every unfinished job.
Status CreateTasksForWorker(const std::string& worker_address);
Expand Down
4 changes: 2 additions & 2 deletions tensorflow/core/data/service/dispatcher_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ DispatcherState::ReserveWorkers(
workers.push_back(it->second);
VLOG(0) << "(ReserveWorkers) Assigning worker at address "
<< it->second->address << " to job " << job_id;
workers_by_job_[job_id].push_back(it->second);
workers_by_job_[job_id][it->second->address] = it->second;
jobs_by_worker_[it->second->address][job_id] = jobs_[job_id];
avail_workers_.erase(it++);
if (target_worker_count == 0)
if (target_local_workers + target_remote_workers == 0)
break;
}
VLOG(0) << "(ReserveWorkers) Number of workers for job " << job_id << " is: "
Expand Down
35 changes: 14 additions & 21 deletions tensorflow/core/data/service/easl/local_workers_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ namespace local_workers_utils {
Status ShouldUseLocalWorkers(
const experimental::DispatcherConfig& dispatcher_config,
const ::tensorflow::data::easl::MetadataStore& metadata_store,
const std::string& dataset_key,
const int64 dataset_fingerprint,
bool& should_use_local_workers) {
using NodeMetrics = ::tensorflow::data::easl::NodeMetrics;
using ModelMetrics = ::tensorflow::data::easl::ModelMetrics;

// Check if we have any metrics for this dataset
std::shared_ptr<data::easl::InputPipelineMetrics> job_metrics;
Status s = metadata_store.GetLastNodeMetricsByDatasetFingerprint(
dataset_key, job_metrics);
Status s = metadata_store.GetInputPipelineMetricsByDatasetFingerprint(
dataset_fingerprint, job_metrics);

// We do not yet have the metrics for this dataset --> use 1 worker
if(errors::IsNotFound(s)) {
Expand All @@ -36,8 +36,7 @@ Status ShouldUseLocalWorkers(

// Pipeline stats: last TF node metrics
std::shared_ptr<NodeMetrics> last_tf_node_metrics;

s = metadata_store.GetLastNodeMetricsByDatasetKey(dataset_key, last_tf_node_metrics);
s = metadata_store.GetLastNodeMetricsByDatasetFingerprint(dataset_fingerprint, last_tf_node_metrics);
if (!s.ok()) {
VLOG(0) << "DSL (ShouldUseLocalWorkers) Failed to get the last TF node metrics";
return s;
Expand All @@ -55,9 +54,9 @@ Status ShouldUseLocalWorkers(
VLOG(0) << "DSL (ShouldUseLocalWorkers) Total bytes produced: " << total_bytes_produced << "\n"
<< "Total num elements: " << total_num_elements << "\n"
<< "Avg bytes produced per element: " << avg_bytes_per_element << "\n"
<< "Decision Threshold: " << dispatcher_config.avg_bytes_per_element_local_thres() << "\n";
<< "Decision Threshold: " << dispatcher_config.avg_bytes_per_element_local_workers_threshold() << "\n";

if (avg_bytes_per_element > dispatcher_config.avg_bytes_per_element_local_thres()) {
if (avg_bytes_per_element > dispatcher_config.avg_bytes_per_element_local_workers_threshold()) {
should_use_local_workers = true;
VLOG(0) << "DSL (ShouldUseLocalWorkers) Using local workers! (because avg. bytes per element > threshold) \n";
}
Expand All @@ -71,8 +70,14 @@ Status ShouldUseLocalWorkers(

std::vector<int64> records;

void grid_search(int64 num_worker_remote_avail, int64 num_worker_local_avail,
int64& num_worker_remote_target, int64& num_worker_local_target) {
Status DecideTargetWorkersGridSearch(
int64 num_worker_remote_avail,
int64 num_worker_local_avail,
int64& num_worker_remote_target,
int64& num_worker_local_target) {
std::time_t t = std::time(nullptr);
records.push_back(t);

std::vector<std::pair<int64, int64>> test_set = std::vector<std::pair<int64, int64>>();
for(int64 n_r = 0; n_r <= num_worker_remote_avail; n_r++) {
for(int64 n_l = 0; n_l <= num_worker_local_avail; n_l++) {
Expand All @@ -95,19 +100,7 @@ void grid_search(int64 num_worker_remote_avail, int64 num_worker_local_avail,
auto p = test_set[index];
num_worker_remote_target = p.first;
num_worker_local_target = p.second;
}

Status DecideTargetWorkersGridSearch(
const experimental::DispatcherConfig& dispatcher_config,
const ::tensorflow::data::easl::MetadataStore& metadata_store,
const std::string& dataset_key,
int64 num_worker_remote_avail,
int64 num_worker_local_avail,
int64& num_worker_remote_target,
int64& num_worker_local_target) {
std::time_t t = std::time(nullptr);
records.push_back(t);
grid_search(num_worker_remote_avail, num_worker_local_avail, num_worker_remote_target, num_worker_local_target);
VLOG(0) << "DSL (DecideTargetWorkersGridSearch)" << "\n"
<< "Available remote: " << num_worker_remote_avail << "\n"
<< "Available local: " << num_worker_local_avail << "\n"
Expand Down
5 changes: 1 addition & 4 deletions tensorflow/core/data/service/easl/local_workers_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ namespace local_workers_utils {
Status ShouldUseLocalWorkers(
const experimental::DispatcherConfig& dispatcher_config,
const ::tensorflow::data::easl::MetadataStore& metadata_store,
const std::string& dataset_key,
const int64 dataset_key,
bool& should_use_local_workers);

Status DecideTargetWorkersGridSearch(
const experimental::DispatcherConfig& dispatcher_config,
const ::tensorflow::data::easl::MetadataStore& metadata_store,
const std::string& dataset_key,
int64 num_worker_remote_avail,
int64 num_worker_local_avail,
int64& num_worker_remote_target,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,29 +1019,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase {
}

if (enqueue_result && !result.end_of_sequence) {
uint64 current_micro_timestamp = Env::Default()->NowMicros();
std::string data_source = task.info.worker_address();
bool if_local = false;
int result_size = result.element.size();
if (local_tasks_.contains(task.info.worker_address())) {
if_local = true;
local_results_buffer_.push(std::move(result));
} else {
results_.push(std::move(result));
}

const char* log_location = std::getenv("EASL_MUYU_WORKER_METRICS");
if (log_location) {
std::ofstream file(log_location, std::ios_base::app);

file << current_micro_timestamp << ","
<< data_source << ","
<< if_local << ","
<< result_size << "\n";

file.flush();
file.clear();
}
}
get_next_cv_.notify_all();
}
Expand Down
2 changes: 1 addition & 1 deletion tensorflow/core/protobuf/service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ message DispatcherConfig {
// The interval at which the dispatcher should dump log files.
int64 log_dumps_interval_ms = 14;
// MUYU's modification
int64 avg_bytes_per_element_local_thres = 15;
int64 avg_bytes_per_element_local_workers_threshold = 15;
}

// Configuration for a tf.data service WorkerServer.
Expand Down

0 comments on commit 0b8526d

Please sign in to comment.