aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/framework
diff options
context:
space:
mode:
authorGravatar Revan Sopher <rsopher@google.com>2018-09-27 16:16:20 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-09-27 16:21:41 -0700
commitf41573b7956871b4142c97eb85ddf163ad641976 (patch)
treed29d3a1ba427d4f44d606c767e75c3b9cc30096b /tensorflow/core/framework
parentd8a370274d6ab8c68edcce66849b4e96aed2fa0d (diff)
Automated rollback of commit 750466c6e6624d279de7f9a43accd682d487509c
PiperOrigin-RevId: 214853846
Diffstat (limited to 'tensorflow/core/framework')
-rw-r--r--tensorflow/core/framework/run_handler.cc248
-rw-r--r--tensorflow/core/framework/run_handler.h95
-rw-r--r--tensorflow/core/framework/run_handler_util.cc57
-rw-r--r--tensorflow/core/framework/run_handler_util.h43
-rw-r--r--tensorflow/core/framework/run_handler_util_test.cc93
5 files changed, 0 insertions, 536 deletions
diff --git a/tensorflow/core/framework/run_handler.cc b/tensorflow/core/framework/run_handler.cc
deleted file mode 100644
index 9c6490a603..0000000000
--- a/tensorflow/core/framework/run_handler.cc
+++ /dev/null
@@ -1,248 +0,0 @@
-/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#define EIGEN_USE_THREADS
-
-#include "tensorflow/core/framework/run_handler.h"
-
-#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
-#include "tensorflow/core/framework/run_handler_util.h"
-#include "tensorflow/core/platform/mutex.h"
-#include "tensorflow/core/util/ptr_util.h"
-
-namespace tensorflow {
-
-// Contains the concrete implementation of the RunHandler.
-// Externally visible RunHandler class simply forwards the work to this one.
-class RunHandler::Impl {
- public:
- explicit Impl(RunHandlerPool::Impl* pool_impl) : pool_impl_(pool_impl) {
- Reset();
- }
-
- ~Impl() {}
-
- void set_inter_op_scheduling_range(std::uint_fast32_t start,
- std::uint_fast32_t limit) {
- inter_op_scheduling_range_.store(EncodePartition(start, limit),
- std::memory_order_release);
- }
-
- std::uint_fast32_t inter_op_scheduling_range() const {
- return inter_op_scheduling_range_.load(std::memory_order_acquire);
- }
-
- // Stores now time (in microseconds) since unix epoch when the handler is
- // requested via RunHandlerPool::Get().
- uint64 start_time_us() const { return start_time_us_; }
-
- void ScheduleInterOpClosure(std::function<void()> fn);
-
- void Reset();
-
- RunHandlerPool::Impl* pool_impl() { return pool_impl_; }
-
- private:
- // Encoding/decoding logic for storing [start, limit) into a single
- // uint_fast32_t int. We assume that pool_num_threads < (1 << 16).
- const int kMaxPartitionBits = 16;
- const int kMaxThreads = 1 << kMaxPartitionBits;
-
- std::uint_fast32_t EncodePartition(std::uint_fast32_t start,
- std::uint_fast32_t limit) {
- return (start << kMaxPartitionBits) | limit;
- }
-
- void DecodePartition(std::uint_fast32_t val, std::uint_fast32_t* start,
- std::uint_fast32_t* limit) {
- *limit = val & (kMaxThreads - 1);
- val >>= kMaxPartitionBits;
- *start = val;
- }
-
- std::atomic_uint_fast32_t inter_op_scheduling_range_;
- RunHandlerPool::Impl* pool_impl_; // NOT OWNED.
- uint64 start_time_us_;
-};
-
-// Contains shared state across all run handlers present in the pool. Also
-// responsible for pool management decisions.
-// This class is thread safe.
-class RunHandlerPool::Impl {
- public:
- // Maximum number of handlers pre-created during pool construction time. The
- // number has been chosen expecting each handler might at least want 1
- // inter-op thread for execution (during compute intensive workloads like
- // inference).
- static const int kMaxHandlers = 128;
-
- explicit Impl(int num_inter_op_threads)
- : inter_op_thread_pool_(new thread::ThreadPool(
- Env::Default(), ThreadOptions(), "inter_op", num_inter_op_threads)),
- iterations_(0) {
- VLOG(1) << "Creating a RunHandlerPool with max handlers: " << kMaxHandlers;
- for (int i = 0; i < kMaxHandlers; ++i) {
- handlers_.emplace_back(new RunHandler::Impl(this));
- free_handlers_.push_back(handlers_.back().get());
- }
- }
-
- ~Impl() {
- // Sanity check that all handlers have been returned back to the pool before
- // destruction.
- DCHECK_EQ(handlers_.size(), kMaxHandlers);
- DCHECK_EQ(free_handlers_.size(), handlers_.size());
- DCHECK_EQ(sorted_active_handlers_.size(), 0);
- }
-
- thread::ThreadPool* inter_op_thread_pool() const {
- return inter_op_thread_pool_.get();
- }
-
- std::unique_ptr<RunHandler> Get() LOCKS_EXCLUDED(mu_) {
- mutex_lock l(mu_);
- while (free_handlers_.empty()) {
- one_handler_free_.wait(l);
- }
- // Remove the last entry from free_handlers_ and add to the end of
- // sorted_active_handlers_.
- auto* handler_impl = free_handlers_.back();
- handler_impl->Reset();
- // Sortedness isn't violated if we simply add at the end of the list, since
- // handlers are expected to be obtained in increasing order of time.
- sorted_active_handlers_.push_back(handler_impl);
- DCHECK_LE(sorted_active_handlers_.size(), kMaxHandlers);
- free_handlers_.pop_back();
-
- RecomputePoolStatsLocked();
- return WrapUnique<RunHandler>(new RunHandler(handler_impl));
- }
-
- void ReleaseHandler(RunHandler::Impl* handler) LOCKS_EXCLUDED(mu_) {
- {
- mutex_lock l(mu_);
- DCHECK_GT(sorted_active_handlers_.size(), 0);
-
- uint64 now = tensorflow::Env::Default()->NowMicros();
- double elapsed = (now - handler->start_time_us()) / 1000.0;
- time_hist_.Add(elapsed);
-
- // Erase from and update sorted_active_handlers_. Add it to the end of
- // free_handlers_.
- auto iter = std::find(sorted_active_handlers_.begin(),
- sorted_active_handlers_.end(), handler);
- DCHECK(iter != sorted_active_handlers_.end())
- << "Unexpected handler: " << handler
- << " is being requested for release";
-
- // Remove this handler from this list and add it to the list of free
- // handlers.
- sorted_active_handlers_.erase(iter);
- free_handlers_.push_back(handler);
- DCHECK_LE(free_handlers_.size(), kMaxHandlers);
-
- RecomputePoolStatsLocked();
- }
- one_handler_free_.notify_one();
- }
-
- private:
- void RecomputePoolStatsLocked() EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- // Thread safe part.
- const std::unique_ptr<thread::ThreadPool> inter_op_thread_pool_;
-
- // Thread compatible part used only by lock under RunHandlerPool.
- // Handlers are sorted by start time.
- std::vector<RunHandler::Impl*> sorted_active_handlers_ GUARDED_BY(mu_);
- std::vector<RunHandler::Impl*> free_handlers_ GUARDED_BY(mu_);
- std::vector<std::unique_ptr<RunHandler::Impl>> handlers_ GUARDED_BY(mu_);
- // Histogram of elapsed runtime of every handler (in ms).
- histogram::Histogram time_hist_ GUARDED_BY(mu_);
- std::vector<std::uint_fast32_t> inter_op_start_ GUARDED_BY(mu_);
- std::vector<std::uint_fast32_t> inter_op_limit_ GUARDED_BY(mu_);
- int64 iterations_ GUARDED_BY(mu_);
- condition_variable one_handler_free_;
- mutex mu_;
-};
-
-void RunHandlerPool::Impl::RecomputePoolStatsLocked() {
- int num_active_requests = sorted_active_handlers_.size();
- if (num_active_requests == 0) return;
-
- int num_threads = inter_op_thread_pool_->NumThreads();
-
- inter_op_start_.resize(num_active_requests);
- inter_op_limit_.resize(num_active_requests);
-
- const int kMinThreadsPerRequest = 3;
- ComputeInterOpSchedulingRanges(num_active_requests, num_threads,
- kMinThreadsPerRequest, &inter_op_start_,
- &inter_op_limit_);
-
- for (int i = 0; i < num_active_requests; ++i) {
- sorted_active_handlers_[i]->set_inter_op_scheduling_range(
- inter_op_start_[i], inter_op_limit_[i]);
- }
-
- if (iterations_++ % 5000 == 0 && VLOG_IS_ON(1)) {
- VLOG(1) << "Printing time histogram: " << time_hist_.ToString();
- VLOG(1) << "Active session runs: " << num_active_requests;
- uint64 now = tensorflow::Env::Default()->NowMicros();
- string ranges_str = "";
- string times_str = "";
- for (int i = 0; i < num_active_requests; ++i) {
- if (i > 0) {
- times_str += " ";
- ranges_str += " ";
- }
-
- times_str += strings::StrCat(
- (now - sorted_active_handlers_[i]->start_time_us()) / 1000.0, " ms.");
- ranges_str += strings::StrCat("[", inter_op_start_[i], ", ",
- inter_op_limit_[i], ")");
- }
- VLOG(1) << "Elapsed times are: " << times_str;
- VLOG(1) << "Ranges are: " << ranges_str;
- }
-}
-
-void RunHandler::Impl::ScheduleInterOpClosure(std::function<void()> fn) {
- std::uint_fast32_t start = 0, limit = 0;
- DecodePartition(inter_op_scheduling_range(), &start, &limit);
- pool_impl_->inter_op_thread_pool()->Schedule(std::move(fn));
-}
-
-void RunHandler::Impl::Reset() {
- set_inter_op_scheduling_range(
- 0, pool_impl_->inter_op_thread_pool()->NumThreads());
- start_time_us_ = tensorflow::Env::Default()->NowMicros();
-}
-
-RunHandlerPool::RunHandlerPool(int num_inter_op_threads)
- : impl_(new Impl(num_inter_op_threads)) {}
-
-RunHandlerPool::~RunHandlerPool() {}
-
-std::unique_ptr<RunHandler> RunHandlerPool::Get() { return impl_->Get(); }
-
-RunHandler::RunHandler(Impl* impl) : impl_(impl) {}
-
-void RunHandler::ScheduleInterOpClosure(std::function<void()> fn) {
- impl_->ScheduleInterOpClosure(std::move(fn));
-}
-
-RunHandler::~RunHandler() { impl_->pool_impl()->ReleaseHandler(impl_); }
-} // namespace tensorflow
diff --git a/tensorflow/core/framework/run_handler.h b/tensorflow/core/framework/run_handler.h
deleted file mode 100644
index 72fa6301b4..0000000000
--- a/tensorflow/core/framework/run_handler.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#ifndef TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_H_
-#define TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_H_
-
-#include "tensorflow/core/lib/core/threadpool.h"
-#include "tensorflow/core/lib/histogram/histogram.h"
-#include "tensorflow/core/platform/mutex.h"
-#include "tensorflow/core/platform/thread_annotations.h"
-#include "tensorflow/core/protobuf/config.pb.h"
-
-namespace tensorflow {
-
-class RunHandler;
-
-// RunHandlerPool is a fixed size pool of pre-allocated RunHandlers
-// that can be used for tracking inter-op work for a given Session::Run().
-// RunHandler(s) in the pool are initially 'inactive'. A RunHandler becomes
-// 'active' when its unique_ptr is returned by Get() and is being used by a
-// client. It becomes 'inactive' once more when its unique_ptr gets destroyed.
-//
-// Expected usage:
-//
-// * Create a single RunHandlerPool (say run_handler_pool_).
-//
-// * When a Session::Run() is invoked, obtain a handler by:
-// auto handler = run_handler_pool_->Get();
-//
-// * Use handler for scheduling all inter-op work by:
-// handler->ScheduleInterOpClosure(closure);
-//
-// This class is thread safe.
-class RunHandlerPool {
- public:
- explicit RunHandlerPool(int num_inter_op_threads);
- ~RunHandlerPool();
-
- // Returns an inactive RunHandler from the pool.
- //
- // RunHandlers in RunHandlerPool are initially 'inactive'.
- // A RunHandler becomes 'active' when its unique_ptr its returned by Get()
- // and is being used by a client. It becomes 'inactive' once more when the
- // unique_ptr is destroyed.
- //
- // Will block unless there is an inactive handler.
- std::unique_ptr<RunHandler> Get();
-
- private:
- class Impl;
- friend class RunHandler;
-
- std::unique_ptr<Impl> impl_;
-};
-
-// RunHandler can be used to schedule inter-op closures to run on a global pool
-// shared across all Session::Run(s).
-//
-// It can only be created via RunHandlerPool::Get().
-//
-// This class can be used instead of directly scheduling closures on a global
-// pool since it maintains a global view across all sessions and optimizes pool
-// scheduling to improve (median and tail) latency.
-//
-// This class is thread safe.
-class RunHandler {
- public:
- void ScheduleInterOpClosure(std::function<void()> fn);
-
- ~RunHandler();
-
- private:
- class Impl;
- friend class RunHandlerPool::Impl;
-
- explicit RunHandler(Impl* impl);
-
- Impl* impl_; // NOT OWNED.
-};
-
-} // end namespace tensorflow.
-
-#endif // TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_H_
diff --git a/tensorflow/core/framework/run_handler_util.cc b/tensorflow/core/framework/run_handler_util.cc
deleted file mode 100644
index 3087998c69..0000000000
--- a/tensorflow/core/framework/run_handler_util.cc
+++ /dev/null
@@ -1,57 +0,0 @@
-/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#include "tensorflow/core/framework/run_handler_util.h"
-
-#include <algorithm>
-#include <cmath>
-#include "tensorflow/core/platform/logging.h"
-
-namespace tensorflow {
-
-void ComputeInterOpSchedulingRanges(int num_active_requests, int num_threads,
- int min_threads_per_request,
- std::vector<std::uint_fast32_t>* start_vec,
- std::vector<std::uint_fast32_t>* end_vec) {
- // Each request is expected to have weight W[i] = num_active_requests - i.
- // Therefore, total_weight = sum of all request weights.
- float total_weight = 0.5f * num_active_requests * (num_active_requests + 1);
- float demand_factor = static_cast<float>(num_threads) / total_weight;
- float last_cumulative_weight = 0.0;
- min_threads_per_request = std::max(1, min_threads_per_request);
- for (int i = 0; i != num_active_requests; i++) {
- float cumulative_weight =
- static_cast<float>(i + 1) *
- (num_active_requests - static_cast<float>(i) * 0.5f);
- float weight = cumulative_weight - last_cumulative_weight;
- // Quantize thread_demand by rounding up, and also satisfying
- // `min_threads_per_request` constraint.
- // Note: We subtract a small epsilon (0.00001) to prevent ceil(..) from
- // rounding weights like 4.0 to 5.
- int demand =
- std::max(min_threads_per_request,
- static_cast<int>(ceil(weight * demand_factor - 0.00001f)));
- // For the quantized range [start, end); compute the floor of real start,
- // and expand downwards from there with length `demand` and adjust for
- // boundary conditions.
- int start = last_cumulative_weight * demand_factor;
- int end = std::min(num_threads, start + demand);
- start = std::max(0, std::min(start, end - demand));
- start_vec->at(i) = start;
- end_vec->at(i) = end;
- last_cumulative_weight = cumulative_weight;
- }
-}
-} // namespace tensorflow
diff --git a/tensorflow/core/framework/run_handler_util.h b/tensorflow/core/framework/run_handler_util.h
deleted file mode 100644
index c0c36aeccb..0000000000
--- a/tensorflow/core/framework/run_handler_util.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#ifndef TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_UTIL_H_
-#define TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_UTIL_H_
-
-#include <cstdint>
-#include <vector>
-
-namespace tensorflow {
-
-// Assign thread ranges to requests.
-// Requests are numbered 0...num_active_requests-1, and
-// threads are numbered 0...num_threads-1.
-// On return, the range start_vec->at(i)...end_vec->at(i)-1
-// indicates the subrange of the threads available to request i.
-// The ranges given to different requests may overlap.
-// Lower numbered requests will tend to be assigned more threads.
-// Thus, a client might associate older requests with lower
-// array indices so they receive access to more threads.
-// However, the routine ensures that each request is given access
-// to at least min(min_threads_per_request, num_threads) threads.
-// Every thread will be assigned to at least one request range,
-// assuming there is at least one request.
-void ComputeInterOpSchedulingRanges(int num_active_requests, int num_threads,
- int min_threads_per_request,
- std::vector<std::uint_fast32_t>* start_vec,
- std::vector<std::uint_fast32_t>* end_vec);
-
-} // end namespace tensorflow
-#endif // TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_UTIL_H_
diff --git a/tensorflow/core/framework/run_handler_util_test.cc b/tensorflow/core/framework/run_handler_util_test.cc
deleted file mode 100644
index a1928c132b..0000000000
--- a/tensorflow/core/framework/run_handler_util_test.cc
+++ /dev/null
@@ -1,93 +0,0 @@
-/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#include "tensorflow/core/framework/run_handler_util.h"
-
-#include <vector>
-#include "tensorflow/core/lib/strings/strcat.h"
-#include "tensorflow/core/platform/logging.h"
-#include "tensorflow/core/platform/test.h"
-namespace tensorflow {
-namespace {
-
-void VerifyFunction(int num_active_requests, int num_threads,
- int min_threads_per_request, bool print_stats = false) {
- if (print_stats) {
- LOG(INFO) << "Test case# num_active_requests: " << num_active_requests
- << " num_threads: " << num_threads
- << " min_threads: " << min_threads_per_request;
- }
- std::vector<std::uint_fast32_t> start(num_active_requests);
- std::vector<std::uint_fast32_t> end(num_active_requests);
-
- ComputeInterOpSchedulingRanges(num_active_requests, num_threads,
- min_threads_per_request, &start, &end);
- string range_str = "";
- for (int i = 0; i < num_active_requests; ++i) {
- if (i > 0) range_str += " ";
- range_str += strings::StrCat("[", start[i], ", ", end[i], ")");
-
- ASSERT_GE(start[i], 0) << range_str;
- ASSERT_LE(end[i], num_threads) << range_str;
- if (i > 0) {
- // Due to linearly decreasing demand, #threads(i - 1) >= #threads(i)
- ASSERT_GE(end[i - 1] - start[i - 1], end[i] - start[i]) << range_str;
- // No missing threads.
- ASSERT_GE(end[i - 1], start[i]) << range_str;
- }
- // Each interval is at least of size 'min_threads_per_request'.
- ASSERT_GE((end[i] - start[i]), min_threads_per_request) << range_str;
- // Verify that assigned (quantized) threads is not overly estimated
- // from real demand, when the demand is high (>=
- // min_threads_per_request).
- float entry_weight = num_active_requests - i;
- float total_weight = 0.5f * num_active_requests * (num_active_requests + 1);
- float thread_demand = (entry_weight * num_threads) / total_weight;
- if (thread_demand > min_threads_per_request) {
- // We expect some over-estimation of threads due to quantization,
- // but we hope it's not more than 1 extra thread.
- ASSERT_NEAR(end[i] - start[i], thread_demand, 1.0)
- << "Ranges: " << range_str << " thread_demand: " << thread_demand
- << " i: " << i;
- }
- }
- ASSERT_EQ(end[num_active_requests - 1], num_threads);
- ASSERT_EQ(start[0], 0);
- if (print_stats) {
- LOG(INFO) << "Assigned ranges: " << range_str;
- }
-}
-
-TEST(RunHandlerUtilTest, TestComputeInterOpSchedulingRanges) {
- const int kMinThreadsPerRequestBound = 12;
- const int kMaxActiveRequests = 128;
- const int kMaxThreads = 128;
-
- for (int min_threads_per_request = 1;
- min_threads_per_request <= kMinThreadsPerRequestBound;
- ++min_threads_per_request) {
- for (int num_active_requests = 1; num_active_requests <= kMaxActiveRequests;
- ++num_active_requests) {
- for (int num_threads = min_threads_per_request;
- num_threads <= kMaxThreads; ++num_threads) {
- VerifyFunction(num_active_requests, num_threads,
- min_threads_per_request);
- }
- }
- }
-}
-
-} // namespace
-} // namespace tensorflow