diff options
author | Peter Hawkins <phawkins@google.com> | 2018-09-26 13:48:21 -0700 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2018-09-26 13:51:50 -0700 |
commit | 1736e0bbbfdeeba178dff37c970b5a0180ee013f (patch) | |
tree | 390c309b5997a752644d2c50bb4ee5bf8fc1654d /tensorflow/core/framework | |
parent | 652ce1aaefdadd04a9905a0788ab26c6fff93658 (diff) |
[TF] Add new internal ops _VarHandlesOp and _ReadVariablesOp.
The purpose of these ops is to fix a latency problem observed for an inference benchmark. Often a inference step starts by reading the value of many (hundreds) of weights. For a resource variable, this requires a VarHandleOp and a ReadVariableOp per variable. Running hundreds of trivial ops can add hundreds of microseconds of latency to the critical path of an inference step. The inter-op latency of the executor can be hundreds of nanoseconds, which rapidly adds up.
This change introduces two fused ops _VarHandlesOp and _ReadVariablesOp that allow us to read many variables in a pair of larger ops, rather than many tiny ops.
PiperOrigin-RevId: 214662338
Diffstat (limited to 'tensorflow/core/framework')
-rw-r--r-- | tensorflow/core/framework/resource_mgr.cc | 9 | ||||
-rw-r--r-- | tensorflow/core/framework/resource_mgr.h | 106 |
2 files changed, 115 insertions, 0 deletions
diff --git a/tensorflow/core/framework/resource_mgr.cc b/tensorflow/core/framework/resource_mgr.cc index ebdaaec153..508a8d3149 100644 --- a/tensorflow/core/framework/resource_mgr.cc +++ b/tensorflow/core/framework/resource_mgr.cc @@ -288,4 +288,13 @@ Status DeleteResource(OpKernelContext* ctx, const ResourceHandle& p) { return ctx->resource_manager()->Delete(p); } +Status ResourceHandlesShape(shape_inference::InferenceContext* c) { + int n; + TF_RETURN_IF_ERROR(c->GetAttr("N", &n)); + for (int i = 0; i < n; ++i) { + c->set_output(i, c->Scalar()); + } + return Status::OK(); +} + } // end namespace tensorflow diff --git a/tensorflow/core/framework/resource_mgr.h b/tensorflow/core/framework/resource_mgr.h index d58deaa3fc..abb6635984 100644 --- a/tensorflow/core/framework/resource_mgr.h +++ b/tensorflow/core/framework/resource_mgr.h @@ -16,6 +16,7 @@ limitations under the License. #ifndef TENSORFLOW_CORE_FRAMEWORK_RESOURCE_MGR_H_ #define TENSORFLOW_CORE_FRAMEWORK_RESOURCE_MGR_H_ +#include <memory> #include <string> #include <typeindex> #include <typeinfo> @@ -127,6 +128,14 @@ class ResourceMgr { Status Lookup(const string& container, const string& name, T** resource) const TF_MUST_USE_RESULT; + // Similar to Lookup, but looks up multiple resources at once, with only a + // single lock acquisition. + template <typename T> + Status LookupMany(absl::Span<std::pair<const string*, const string*> const> + containers_and_names, + std::vector<std::unique_ptr<T, core::RefCountDeleter>>* + resource) const TF_MUST_USE_RESULT; + // If "container" has a resource "name", returns it in // "*resource". Otherwise, invokes creator() to create the resource. // The caller takes the ownership of one ref on "*resource". @@ -246,6 +255,12 @@ Status CreateResource(OpKernelContext* ctx, const ResourceHandle& p, T* value); template <typename T> Status LookupResource(OpKernelContext* ctx, const ResourceHandle& p, T** value); +// Looks up multiple resources pointed by a sequence of resource handles. +template <typename T> +Status LookupResources( + OpKernelContext* ctx, absl::Span<ResourceHandle const> p, + std::vector<std::unique_ptr<T, core::RefCountDeleter>>* values); + // Looks up or creates a resource. template <typename T> Status LookupOrCreateResource(OpKernelContext* ctx, const ResourceHandle& p, @@ -358,6 +373,26 @@ class ResourceHandleOp : public OpKernel { std::atomic<bool> initialized_{false}; }; +// Utility op kernel to produce a handle to a resource of type T. +template <typename T> +class ResourceHandlesOp : public OpKernel { + public: + explicit ResourceHandlesOp(OpKernelConstruction* context); + + void Compute(OpKernelContext* ctx) override; + + bool IsExpensive() override { return false; } + + private: + std::vector<string> containers_; + std::vector<string> names_; + mutex mutex_; + std::vector<Tensor> resources_; + std::atomic<bool> initialized_{false}; +}; + +Status ResourceHandlesShape(shape_inference::InferenceContext* c); + // Registers a kernel for an op which produces a handle to a resource of the // specified type. #define REGISTER_RESOURCE_HANDLE_KERNEL(Type) \ @@ -390,6 +425,24 @@ Status ResourceMgr::Lookup(const string& container, const string& name, } template <typename T> +Status ResourceMgr::LookupMany( + absl::Span<std::pair<const string*, const string*> const> + containers_and_names, + std::vector<std::unique_ptr<T, core::RefCountDeleter>>* resources) const { + CheckDeriveFromResourceBase<T>(); + tf_shared_lock l(mu_); + resources->resize(containers_and_names.size()); + for (size_t i = 0; i < containers_and_names.size(); ++i) { + T* resource; + TF_RETURN_IF_ERROR(LookupInternal(*containers_and_names[i].first, + *containers_and_names[i].second, + &resource)); + (*resources)[i].reset(resource); + } + return Status::OK(); +} + +template <typename T> Status ResourceMgr::LookupInternal(const string& container, const string& name, T** resource) const { ResourceBase* found = nullptr; @@ -499,6 +552,19 @@ Status LookupResource(OpKernelContext* ctx, const ResourceHandle& p, } template <typename T> +Status LookupResources( + OpKernelContext* ctx, absl::Span<ResourceHandle const* const> p, + std::vector<std::unique_ptr<T, core::RefCountDeleter>>* values) { + std::vector<std::pair<const string*, const string*>> containers_and_names( + p.size()); + for (size_t i = 0; i < p.size(); ++i) { + TF_RETURN_IF_ERROR(internal::ValidateDeviceAndType<T>(ctx, *p[i])); + containers_and_names[i] = {&p[i]->container(), &p[i]->name()}; + } + return ctx->resource_manager()->LookupMany(containers_and_names, values); +} + +template <typename T> Status LookupOrCreateResource(OpKernelContext* ctx, const ResourceHandle& p, T** value, std::function<Status(T**)> creator) { TF_RETURN_IF_ERROR(internal::ValidateDeviceAndType<T>(ctx, p)); @@ -555,6 +621,46 @@ void ResourceHandleOp<T>::Compute(OpKernelContext* ctx) { ctx->set_output(0, resource_); } +template <typename T> +ResourceHandlesOp<T>::ResourceHandlesOp(OpKernelConstruction* context) + : OpKernel(context) { + int n; + OP_REQUIRES_OK(context, context->GetAttr("N", &n)); + OP_REQUIRES_OK(context, context->GetAttr("containers", &containers_)); + OP_REQUIRES_OK(context, context->GetAttr("shared_names", &names_)); + OP_REQUIRES( + context, containers_.size() == n, + errors::InvalidArgument("Number of containers (", containers_.size(), + ") must be equal to N (", n, ")")); + OP_REQUIRES(context, names_.size() == n, + errors::InvalidArgument("Number of names (", containers_.size(), + ") must be equal to N (", n, ")")); + resources_.resize(n); +} + +template <typename T> +void ResourceHandlesOp<T>::Compute(OpKernelContext* ctx) { + if (!initialized_.load()) { + mutex_lock ml(mutex_); + // Checking again to see if another thread has initialized the resource. + if (!initialized_.load()) { + AllocatorAttributes attr; + attr.set_on_host(true); + for (size_t i = 0; i < resources_.size(); ++i) { + OP_REQUIRES_OK(ctx, ctx->allocate_temp(DT_RESOURCE, TensorShape({}), + &resources_[i], attr)); + ResourceHandle h = + MakeResourceHandle<T>(ctx, containers_[i], names_[i]); + resources_[i].template scalar<ResourceHandle>()() = h; + } + initialized_.store(true); + } + } + for (size_t i = 0; i < resources_.size(); ++i) { + ctx->set_output(i, resources_[i]); + } +} + } // end namespace tensorflow #endif // TENSORFLOW_CORE_FRAMEWORK_RESOURCE_MGR_H_ |