Skip to content

Commit

Permalink
Sarkars/ex create tensor 1 (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
sayantan-nervana authored and avijit-nervana committed Aug 22, 2019
1 parent 8ab37fd commit c0760d5
Show file tree
Hide file tree
Showing 17 changed files with 694 additions and 31 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ if (NOT USE_PRE_BUILT_NGRAPH)
ExternalProject_Add(
ext_ngraph
GIT_REPOSITORY https://github.com/NervanaSystems/ngraph
GIT_TAG v0.25.0-rc.3
GIT_TAG v0.25.1-rc.0
CMAKE_ARGS
-DNGRAPH_DISTRIBUTED_ENABLE=${NGRAPH_DISTRIBUTED_ENABLE}
-DNGRAPH_INSTALL_PREFIX=${NGRAPH_ARTIFACTS_DIR}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Once TensorFlow's dependencies are installed, clone the `ngraph-bridge` repo:

git clone https://github.com/tensorflow/ngraph-bridge.git
cd ngraph-bridge
git checkout v0.18.0-rc4
git checkout v0.19.0-rc0

Run the following Python script to build TensorFlow, nGraph, and the bridge. Use Python 3.5:

Expand Down
2 changes: 2 additions & 0 deletions bazel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ cc_library(
"ngraph_bridge/ngraph_encapsulate_op.h",
"ngraph_bridge/ngraph_freshness_tracker.h",
"ngraph_bridge/ngraph_mark_for_clustering.h",
"ngraph_bridge/ngraph_pipelined_tensors.h",
"ngraph_bridge/ngraph_rewrite_for_tracking.h",
"ngraph_bridge/ngraph_timer.h",
"ngraph_bridge/ngraph_utils.h",
Expand Down Expand Up @@ -69,6 +70,7 @@ cc_binary(
"ngraph_bridge/ngraph_encapsulate_op.cc",
"ngraph_bridge/ngraph_freshness_tracker.cc",
"ngraph_bridge/ngraph_mark_for_clustering.cc",
"ngraph_bridge/ngraph_pipelined_tensors.cc",
"ngraph_bridge/ngraph_rewrite_for_tracking.cc",
"ngraph_bridge/ngraph_tracked_variable.cc",
"ngraph_bridge/ngraph_utils.cc",
Expand Down
8 changes: 4 additions & 4 deletions bazel/WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ tf_configure(
http_archive(
name = "ngraph",
build_file = "//:bazel/ngraph.BUILD",
sha256 = "0b0cbd617653552d219c05bf975acfbcac513061a7b04465a71db324a9d9d7e3",
strip_prefix = "ngraph-0.25.0-rc.3",
sha256 = "030a0c22a098a958e1856b7930bdd7d1694ec882b61de3108afa8e59ff960e42",
strip_prefix = "ngraph-0.25.1-rc.0",
urls = [
"https://mirror.bazel.build/github.com/NervanaSystems/ngraph/archive/v0.25.0-rc.3.tar.gz",
"https://github.com/NervanaSystems/ngraph/archive/v0.25.0-rc.3.tar.gz"
"https://mirror.bazel.build/github.com/NervanaSystems/ngraph/archive/v0.25.1-rc.0.tar.gz",
"https://github.com/NervanaSystems/ngraph/archive/v0.25.1-rc.0.tar.gz"
],
)

Expand Down
2 changes: 1 addition & 1 deletion build_ngtf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def main():
'''

# Component versions
ngraph_version = "v0.25.0-rc.3"
ngraph_version = "v0.25.1-rc.0"
tf_version = "v1.14.0"

# Command line parser options
Expand Down
1 change: 1 addition & 0 deletions ngraph_bridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ set(SRC
ngraph_conversions.cc
ngraph_deassign_clusters.cc
ngraph_encapsulate_clusters.cc
ngraph_pipelined_tensors.cc
ngraph_encapsulate_impl.cc
ops/ngraph_ops.cc
ngraph_encapsulate_op.cc
Expand Down
85 changes: 71 additions & 14 deletions ngraph_bridge/ngraph_encapsulate_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ Status NGraphEncapsulateImpl::GetNgExecutable(
Status NGraphEncapsulateImpl::AllocateNGInputTensors(
const std::vector<Tensor>& tf_input_tensors,
const std::shared_ptr<ngraph::runtime::Executable>& ng_exec,
const PipelinedTensorVector& inp_group_from_pipeline,
ng::runtime::Backend* const op_backend,
vector<shared_ptr<ng::runtime::Tensor>>& ng_inputs) {
std::vector<std::unique_ptr<ngraph::Event>> input_copy_events;
Expand Down Expand Up @@ -291,9 +292,10 @@ Status NGraphEncapsulateImpl::AllocateNGInputTensors(
std::shared_ptr<ng::runtime::Tensor> last_ng_tensor =
input_caches[i].second;
void* current_src_ptr = (void*)DMAHelper::base(&tf_input_tensors[i]);
std::shared_ptr<ng::runtime::Tensor> current_ng_tensor =
GetCurrentNgTensor(current_src_ptr, last_src_ptr, last_ng_tensor, false,
ng_exec, op_backend, ng_element_type, ng_shape);
std::shared_ptr<ng::runtime::Tensor> current_ng_tensor = GetCurrentNgTensor(
current_src_ptr, last_src_ptr, last_ng_tensor, false, ng_exec,
op_backend, ng_element_type, ng_shape,
m_executable_can_create_tensor ? inp_group_from_pipeline[i] : nullptr);
bool is_cpu = m_op_backend_name == "CPU";

if (!is_cpu && current_ng_tensor->get_stale()) {
Expand Down Expand Up @@ -340,6 +342,7 @@ Status NGraphEncapsulateImpl::AllocateNGInputTensors(
Status NGraphEncapsulateImpl::AllocateNGOutputTensors(
const std::vector<Tensor*>& output_tensors,
const std::shared_ptr<ngraph::runtime::Executable>& ng_exec,
const PipelinedTensorVector& out_group_from_pipeline,
ng::runtime::Backend* const op_backend,
vector<shared_ptr<ng::runtime::Tensor>>& ng_outputs) {
std::vector<std::pair<void*, std::shared_ptr<ng::runtime::Tensor>>>&
Expand Down Expand Up @@ -374,9 +377,10 @@ Status NGraphEncapsulateImpl::AllocateNGOutputTensors(
NGRAPH_VLOG(4) << "NGraphEncapsulateImpl:: Output from non Variable Node";
#endif

current_ng_tensor =
GetCurrentNgTensor(current_dst_ptr, last_dst_ptr, last_ng_tensor, true,
ng_exec, op_backend, ng_element_type, ng_shape);
current_ng_tensor = GetCurrentNgTensor(
current_dst_ptr, last_dst_ptr, last_ng_tensor, true, ng_exec,
op_backend, ng_element_type, ng_shape,
m_executable_can_create_tensor ? out_group_from_pipeline[i] : nullptr);

current_ng_tensor->set_stale(true);
output_caches[i] = std::make_pair(current_dst_ptr, current_ng_tensor);
Expand All @@ -393,7 +397,8 @@ std::shared_ptr<ng::runtime::Tensor> NGraphEncapsulateImpl::GetCurrentNgTensor(
const bool& output_tensor,
const std::shared_ptr<ngraph::runtime::Executable>& ng_exec,
ng::runtime::Backend* const op_backend,
const ng::element::Type& ng_element_type, const ng::Shape& ng_shape) {
const ng::element::Type& ng_element_type, const ng::Shape& ng_shape,
std::shared_ptr<ng::runtime::Tensor> tensor_from_pipeline) {
// NOTE: we assume that TF's pointers WILL change if it actually changes
// values. ie, it will not reuse the same space if its rewritten it
bool tf_tensor_has_changed = current_tf_ptr != last_tf_ptr;
Expand Down Expand Up @@ -426,20 +431,72 @@ std::shared_ptr<ng::runtime::Tensor> NGraphEncapsulateImpl::GetCurrentNgTensor(
}
// create a new ng tensor or use the last one
std::shared_ptr<ng::runtime::Tensor> current_ng_tensor;
if (need_new_tensor_creation) {
if (is_cpu) {
current_ng_tensor =
op_backend->create_tensor(ng_element_type, ng_shape, current_tf_ptr);
if (m_executable_can_create_tensor) {
current_ng_tensor = tensor_from_pipeline;
} else {
if (need_new_tensor_creation) {
if (is_cpu) {
current_ng_tensor = op_backend->create_tensor(ng_element_type, ng_shape,
current_tf_ptr);
} else {
current_ng_tensor =
op_backend->create_tensor(ng_element_type, ng_shape);
}
} else {
current_ng_tensor = op_backend->create_tensor(ng_element_type, ng_shape);
current_ng_tensor = last_ng_tensor;
}
} else {
current_ng_tensor = last_ng_tensor;
}
current_ng_tensor->set_stale(is_stale);
return current_ng_tensor;
}

Status NGraphEncapsulateImpl::CachePipelinedTensorIfNeeded(
std::shared_ptr<ngraph::runtime::Executable> ng_exec) {
if (!m_executable_can_create_tensor) {
return errors::Internal(
"CachePipelinedTensorIfNeeded called, but executable cannot create "
"tensors");
}
auto itr = m_executable_pipelined_tensors_map.find(ng_exec);
if (itr == m_executable_pipelined_tensors_map.end()) {
// Create these pipelined ng tensors only if needed, else reuse from cache
size_t num_inputs = ng_exec->get_parameters().size();
size_t num_outputs = ng_exec->get_results().size();
PipelinedTensorMatrix pipelined_input_tensors(num_inputs);
PipelinedTensorMatrix pipelined_output_tensors(num_outputs);
for (size_t i = 0; i < num_inputs; i++) {
pipelined_input_tensors[i] = ng_exec->create_input_tensor(i, m_depth);
}
for (size_t i = 0; i < num_outputs; i++) {
pipelined_output_tensors[i] = ng_exec->create_output_tensor(i, m_depth);
}
m_executable_pipelined_tensors_map.insert(
{ng_exec, PipelinedTensorsStore(pipelined_input_tensors,
pipelined_output_tensors)});
}
return Status::OK();
}

std::tuple<int, PipelinedTensorVector, PipelinedTensorVector>
NGraphEncapsulateImpl::GetTensorsFromPipeline(
std::shared_ptr<ngraph::runtime::Executable> ng_exec) {
PipelinedTensorsStore pts = m_executable_pipelined_tensors_map.at(ng_exec);

// TODO: do something about this spin lock
// get_tensors returns an index integer, that can be -1, 0, ... depth-1
// If it returns -1, then it indicates there are no free groups of tensors
// or the pipeline is full. In that case, we need to wait, hence the while
std::tuple<int, PipelinedTensorVector, PipelinedTensorVector> out_tpl;
while (true) {
out_tpl = pts.get_tensors();

if (std::get<0>(out_tpl) >= 0) {
break;
}
}
return out_tpl;
}

} // namespace ngraph_bridge

} // namespace tensorflow
32 changes: 31 additions & 1 deletion ngraph_bridge/ngraph_encapsulate_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "logging/ngraph_log.h"
#include "ngraph_bridge/ngraph_freshness_tracker.h"
#include "ngraph_bridge/ngraph_pipelined_tensors.h"

namespace tensorflow {

Expand Down Expand Up @@ -61,6 +62,7 @@ class NGraphEncapsulateImpl {
Status AllocateNGInputTensors(
const std::vector<Tensor>& tf_input_tensors,
const std::shared_ptr<ngraph::runtime::Executable>& ng_exec,
const PipelinedTensorVector& inp_group_from_pipeline,
ng::runtime::Backend* const op_backend,
vector<shared_ptr<ng::runtime::Tensor>>& ng_inputs);

Expand All @@ -69,6 +71,7 @@ class NGraphEncapsulateImpl {
Status AllocateNGOutputTensors(
const std::vector<Tensor*>& tf_output_tensors,
const std::shared_ptr<ngraph::runtime::Executable>& ng_exec,
const PipelinedTensorVector& out_group_from_pipeline,
ng::runtime::Backend* const op_backend,
vector<shared_ptr<ng::runtime::Tensor>>& ng_outputs);

Expand All @@ -79,7 +82,8 @@ class NGraphEncapsulateImpl {
const bool& output_tensor,
const std::shared_ptr<ngraph::runtime::Executable>& ng_exec,
ng::runtime::Backend* const op_backend,
const ng::element::Type& ng_element_type, const ng::Shape& ng_shape);
const ng::element::Type& ng_element_type, const ng::Shape& ng_shape,
std::shared_ptr<ng::runtime::Tensor> tensor_from_pipeline);

// Accessors(getters and setters) for the private data members of
// NgraphEncapsulateImpl class
Expand Down Expand Up @@ -185,6 +189,25 @@ class NGraphEncapsulateImpl {

void SetName(string name) { m_name = name; }

void SetExecCanCreateTensor(bool b) { m_executable_can_create_tensor = b; }

bool GetExecCanCreateTensor() { return m_executable_can_create_tensor; }

void ClearNgExecPipelinedTensorMap() {
m_executable_pipelined_tensors_map.clear();
}

Status CachePipelinedTensorIfNeeded(
std::shared_ptr<ngraph::runtime::Executable> ng_exec);

std::tuple<int, PipelinedTensorVector, PipelinedTensorVector>
GetTensorsFromPipeline(std::shared_ptr<ngraph::runtime::Executable> ng_exec);

void ReturnPipelinedTensors(
std::shared_ptr<ngraph::runtime::Executable> ng_exec, size_t idx) {
m_executable_pipelined_tensors_map.at(ng_exec).return_tensors(idx);
}

// TF Graph for the cluster
Graph m_graph;

Expand Down Expand Up @@ -219,6 +242,13 @@ class NGraphEncapsulateImpl {
// A single instance of freshness_tracker is used across all
// nGraphEncapsulateOp and nGraphVariable op
NGraphFreshnessTracker* m_freshness_tracker;

bool m_executable_can_create_tensor = false;
std::unordered_map<std::shared_ptr<ngraph::runtime::Executable>,
PipelinedTensorsStore>
m_executable_pipelined_tensors_map;

int m_depth{2}; // TODO make this settable
};

} // namespace ngraph_bridge
Expand Down
51 changes: 49 additions & 2 deletions ngraph_bridge/ngraph_encapsulate_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "ngraph_bridge/ngraph_encapsulate_op.h"
#include "ngraph_bridge/ngraph_freshness_tracker.h"
#include "ngraph_bridge/ngraph_mark_for_clustering.h"
#include "ngraph_bridge/ngraph_pipelined_tensors.h"
#include "ngraph_bridge/ngraph_timer.h"
#include "ngraph_bridge/ngraph_utils.h"

Expand Down Expand Up @@ -203,6 +204,13 @@ NGraphEncapsulateOp::NGraphEncapsulateOp(OpKernelConstruction* ctx)
BackendManager::SetConfig(ng_encap_impl.GetOpBackend(),
additional_attribute_map);

ng_encap_impl.SetExecCanCreateTensor(
BackendManager::GetBackend(ng_encap_impl.GetOpBackend())
->executable_can_create_tensors());
NGRAPH_VLOG(5) << "Executable can "
<< (ng_encap_impl.GetExecCanCreateTensor() ? "" : "not")
<< " create tensors";

event.Stop();
ngraph::Event::write_trace(event);
}
Expand Down Expand Up @@ -261,6 +269,7 @@ NGraphEncapsulateOp::~NGraphEncapsulateOp() {
ng_encap_impl.ClearNgExecOutputCache();
ng_encap_impl.ClearNgExecMap();
ng_encap_impl.ClearNgFunctionMap();
ng_encap_impl.ClearNgExecPipelinedTensorMap();

// Release the backend
NGRAPH_VLOG(2) << "~NGraphEncapsulateOp():: ReleaseBackend";
Expand Down Expand Up @@ -319,6 +328,31 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {

Timer create_or_lookup_tensors;

int pipeline_idx = -1;
PipelinedTensorVector inp_group_from_pipeline;
PipelinedTensorVector out_group_from_pipeline;
if (ng_encap_impl.GetExecCanCreateTensor()) {
OP_REQUIRES_OK(ctx, ng_encap_impl.CachePipelinedTensorIfNeeded(ng_exec));
// Cache must contain the ng_exec at this point

try {
std::tie(pipeline_idx, inp_group_from_pipeline, out_group_from_pipeline) =
ng_encap_impl.GetTensorsFromPipeline(ng_exec);
} catch (const std::exception& exp) {
OP_REQUIRES(
ctx, false,
errors::Internal("Caught exception while getting pipelined tensors: ",
exp.what(), "\n"));
}

if (pipeline_idx < 0) {
OP_REQUIRES(ctx, false,
errors::Internal("Expected GetTensorsFromPipeline to return "
"an index >= 0, but got ",
pipeline_idx));
}
}

if (ng_encap_impl.GetNgraphFreshnessTracker() == nullptr) {
auto creator = [](NGraphFreshnessTracker** tracker) {
*tracker = new NGraphFreshnessTracker();
Expand All @@ -343,7 +377,8 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
int ng_input_tensor_size_in_bytes = 0;

OP_REQUIRES_OK(ctx, ng_encap_impl.AllocateNGInputTensors(
tf_input_tensors, ng_exec, op_backend, ng_inputs));
tf_input_tensors, ng_exec, inp_group_from_pipeline,
op_backend, ng_inputs));

event_alloc_input.Stop();

Expand Down Expand Up @@ -384,7 +419,8 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
}

OP_REQUIRES_OK(ctx, ng_encap_impl.AllocateNGOutputTensors(
tf_output_tensors, ng_exec, op_backend, ng_outputs));
tf_output_tensors, ng_exec, out_group_from_pipeline,
op_backend, ng_outputs));
auto output_caches = ng_encap_impl.GetNgExecOutputCacheMap(ng_exec);

event_alloc_output.Stop();
Expand Down Expand Up @@ -659,6 +695,17 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
int time_copy_output_tensors_to_host =
copy_output_tensors_to_host.ElapsedInMS();

if (ng_encap_impl.GetExecCanCreateTensor()) {
try {
ng_encap_impl.ReturnPipelinedTensors(ng_exec, pipeline_idx);
} catch (const std::exception& exp) {
OP_REQUIRES(ctx, false,
errors::Internal(
"Caught exception while returning pipelined tensors: ",
exp.what(), "\n"));
}
}

NGRAPH_VLOG(4)
<< "NGraphEncapsulateOp::Compute done marking fresh for cluster "
<< ng_encap_impl.GetNgraphCluster();
Expand Down
Loading

0 comments on commit c0760d5

Please sign in to comment.