diff options
author | 2018-09-27 16:16:20 -0700 | |
---|---|---|
committer | 2018-09-27 16:21:41 -0700 | |
commit | f41573b7956871b4142c97eb85ddf163ad641976 (patch) | |
tree | d29d3a1ba427d4f44d606c767e75c3b9cc30096b /tensorflow/core/framework | |
parent | d8a370274d6ab8c68edcce66849b4e96aed2fa0d (diff) |
Automated rollback of commit 750466c6e6624d279de7f9a43accd682d487509c
PiperOrigin-RevId: 214853846
Diffstat (limited to 'tensorflow/core/framework')
-rw-r--r-- | tensorflow/core/framework/run_handler.cc | 248 | ||||
-rw-r--r-- | tensorflow/core/framework/run_handler.h | 95 | ||||
-rw-r--r-- | tensorflow/core/framework/run_handler_util.cc | 57 | ||||
-rw-r--r-- | tensorflow/core/framework/run_handler_util.h | 43 | ||||
-rw-r--r-- | tensorflow/core/framework/run_handler_util_test.cc | 93 |
5 files changed, 0 insertions, 536 deletions
diff --git a/tensorflow/core/framework/run_handler.cc b/tensorflow/core/framework/run_handler.cc deleted file mode 100644 index 9c6490a603..0000000000 --- a/tensorflow/core/framework/run_handler.cc +++ /dev/null @@ -1,248 +0,0 @@ -/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#define EIGEN_USE_THREADS - -#include "tensorflow/core/framework/run_handler.h" - -#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" -#include "tensorflow/core/framework/run_handler_util.h" -#include "tensorflow/core/platform/mutex.h" -#include "tensorflow/core/util/ptr_util.h" - -namespace tensorflow { - -// Contains the concrete implementation of the RunHandler. -// Externally visible RunHandler class simply forwards the work to this one. -class RunHandler::Impl { - public: - explicit Impl(RunHandlerPool::Impl* pool_impl) : pool_impl_(pool_impl) { - Reset(); - } - - ~Impl() {} - - void set_inter_op_scheduling_range(std::uint_fast32_t start, - std::uint_fast32_t limit) { - inter_op_scheduling_range_.store(EncodePartition(start, limit), - std::memory_order_release); - } - - std::uint_fast32_t inter_op_scheduling_range() const { - return inter_op_scheduling_range_.load(std::memory_order_acquire); - } - - // Stores now time (in microseconds) since unix epoch when the handler is - // requested via RunHandlerPool::Get(). - uint64 start_time_us() const { return start_time_us_; } - - void ScheduleInterOpClosure(std::function<void()> fn); - - void Reset(); - - RunHandlerPool::Impl* pool_impl() { return pool_impl_; } - - private: - // Encoding/decoding logic for storing [start, limit) into a single - // uint_fast32_t int. We assume that pool_num_threads < (1 << 16). - const int kMaxPartitionBits = 16; - const int kMaxThreads = 1 << kMaxPartitionBits; - - std::uint_fast32_t EncodePartition(std::uint_fast32_t start, - std::uint_fast32_t limit) { - return (start << kMaxPartitionBits) | limit; - } - - void DecodePartition(std::uint_fast32_t val, std::uint_fast32_t* start, - std::uint_fast32_t* limit) { - *limit = val & (kMaxThreads - 1); - val >>= kMaxPartitionBits; - *start = val; - } - - std::atomic_uint_fast32_t inter_op_scheduling_range_; - RunHandlerPool::Impl* pool_impl_; // NOT OWNED. - uint64 start_time_us_; -}; - -// Contains shared state across all run handlers present in the pool. Also -// responsible for pool management decisions. -// This class is thread safe. -class RunHandlerPool::Impl { - public: - // Maximum number of handlers pre-created during pool construction time. The - // number has been chosen expecting each handler might at least want 1 - // inter-op thread for execution (during compute intensive workloads like - // inference). - static const int kMaxHandlers = 128; - - explicit Impl(int num_inter_op_threads) - : inter_op_thread_pool_(new thread::ThreadPool( - Env::Default(), ThreadOptions(), "inter_op", num_inter_op_threads)), - iterations_(0) { - VLOG(1) << "Creating a RunHandlerPool with max handlers: " << kMaxHandlers; - for (int i = 0; i < kMaxHandlers; ++i) { - handlers_.emplace_back(new RunHandler::Impl(this)); - free_handlers_.push_back(handlers_.back().get()); - } - } - - ~Impl() { - // Sanity check that all handlers have been returned back to the pool before - // destruction. - DCHECK_EQ(handlers_.size(), kMaxHandlers); - DCHECK_EQ(free_handlers_.size(), handlers_.size()); - DCHECK_EQ(sorted_active_handlers_.size(), 0); - } - - thread::ThreadPool* inter_op_thread_pool() const { - return inter_op_thread_pool_.get(); - } - - std::unique_ptr<RunHandler> Get() LOCKS_EXCLUDED(mu_) { - mutex_lock l(mu_); - while (free_handlers_.empty()) { - one_handler_free_.wait(l); - } - // Remove the last entry from free_handlers_ and add to the end of - // sorted_active_handlers_. - auto* handler_impl = free_handlers_.back(); - handler_impl->Reset(); - // Sortedness isn't violated if we simply add at the end of the list, since - // handlers are expected to be obtained in increasing order of time. - sorted_active_handlers_.push_back(handler_impl); - DCHECK_LE(sorted_active_handlers_.size(), kMaxHandlers); - free_handlers_.pop_back(); - - RecomputePoolStatsLocked(); - return WrapUnique<RunHandler>(new RunHandler(handler_impl)); - } - - void ReleaseHandler(RunHandler::Impl* handler) LOCKS_EXCLUDED(mu_) { - { - mutex_lock l(mu_); - DCHECK_GT(sorted_active_handlers_.size(), 0); - - uint64 now = tensorflow::Env::Default()->NowMicros(); - double elapsed = (now - handler->start_time_us()) / 1000.0; - time_hist_.Add(elapsed); - - // Erase from and update sorted_active_handlers_. Add it to the end of - // free_handlers_. - auto iter = std::find(sorted_active_handlers_.begin(), - sorted_active_handlers_.end(), handler); - DCHECK(iter != sorted_active_handlers_.end()) - << "Unexpected handler: " << handler - << " is being requested for release"; - - // Remove this handler from this list and add it to the list of free - // handlers. - sorted_active_handlers_.erase(iter); - free_handlers_.push_back(handler); - DCHECK_LE(free_handlers_.size(), kMaxHandlers); - - RecomputePoolStatsLocked(); - } - one_handler_free_.notify_one(); - } - - private: - void RecomputePoolStatsLocked() EXCLUSIVE_LOCKS_REQUIRED(mu_); - - // Thread safe part. - const std::unique_ptr<thread::ThreadPool> inter_op_thread_pool_; - - // Thread compatible part used only by lock under RunHandlerPool. - // Handlers are sorted by start time. - std::vector<RunHandler::Impl*> sorted_active_handlers_ GUARDED_BY(mu_); - std::vector<RunHandler::Impl*> free_handlers_ GUARDED_BY(mu_); - std::vector<std::unique_ptr<RunHandler::Impl>> handlers_ GUARDED_BY(mu_); - // Histogram of elapsed runtime of every handler (in ms). - histogram::Histogram time_hist_ GUARDED_BY(mu_); - std::vector<std::uint_fast32_t> inter_op_start_ GUARDED_BY(mu_); - std::vector<std::uint_fast32_t> inter_op_limit_ GUARDED_BY(mu_); - int64 iterations_ GUARDED_BY(mu_); - condition_variable one_handler_free_; - mutex mu_; -}; - -void RunHandlerPool::Impl::RecomputePoolStatsLocked() { - int num_active_requests = sorted_active_handlers_.size(); - if (num_active_requests == 0) return; - - int num_threads = inter_op_thread_pool_->NumThreads(); - - inter_op_start_.resize(num_active_requests); - inter_op_limit_.resize(num_active_requests); - - const int kMinThreadsPerRequest = 3; - ComputeInterOpSchedulingRanges(num_active_requests, num_threads, - kMinThreadsPerRequest, &inter_op_start_, - &inter_op_limit_); - - for (int i = 0; i < num_active_requests; ++i) { - sorted_active_handlers_[i]->set_inter_op_scheduling_range( - inter_op_start_[i], inter_op_limit_[i]); - } - - if (iterations_++ % 5000 == 0 && VLOG_IS_ON(1)) { - VLOG(1) << "Printing time histogram: " << time_hist_.ToString(); - VLOG(1) << "Active session runs: " << num_active_requests; - uint64 now = tensorflow::Env::Default()->NowMicros(); - string ranges_str = ""; - string times_str = ""; - for (int i = 0; i < num_active_requests; ++i) { - if (i > 0) { - times_str += " "; - ranges_str += " "; - } - - times_str += strings::StrCat( - (now - sorted_active_handlers_[i]->start_time_us()) / 1000.0, " ms."); - ranges_str += strings::StrCat("[", inter_op_start_[i], ", ", - inter_op_limit_[i], ")"); - } - VLOG(1) << "Elapsed times are: " << times_str; - VLOG(1) << "Ranges are: " << ranges_str; - } -} - -void RunHandler::Impl::ScheduleInterOpClosure(std::function<void()> fn) { - std::uint_fast32_t start = 0, limit = 0; - DecodePartition(inter_op_scheduling_range(), &start, &limit); - pool_impl_->inter_op_thread_pool()->Schedule(std::move(fn)); -} - -void RunHandler::Impl::Reset() { - set_inter_op_scheduling_range( - 0, pool_impl_->inter_op_thread_pool()->NumThreads()); - start_time_us_ = tensorflow::Env::Default()->NowMicros(); -} - -RunHandlerPool::RunHandlerPool(int num_inter_op_threads) - : impl_(new Impl(num_inter_op_threads)) {} - -RunHandlerPool::~RunHandlerPool() {} - -std::unique_ptr<RunHandler> RunHandlerPool::Get() { return impl_->Get(); } - -RunHandler::RunHandler(Impl* impl) : impl_(impl) {} - -void RunHandler::ScheduleInterOpClosure(std::function<void()> fn) { - impl_->ScheduleInterOpClosure(std::move(fn)); -} - -RunHandler::~RunHandler() { impl_->pool_impl()->ReleaseHandler(impl_); } -} // namespace tensorflow diff --git a/tensorflow/core/framework/run_handler.h b/tensorflow/core/framework/run_handler.h deleted file mode 100644 index 72fa6301b4..0000000000 --- a/tensorflow/core/framework/run_handler.h +++ /dev/null @@ -1,95 +0,0 @@ -/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_H_ -#define TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_H_ - -#include "tensorflow/core/lib/core/threadpool.h" -#include "tensorflow/core/lib/histogram/histogram.h" -#include "tensorflow/core/platform/mutex.h" -#include "tensorflow/core/platform/thread_annotations.h" -#include "tensorflow/core/protobuf/config.pb.h" - -namespace tensorflow { - -class RunHandler; - -// RunHandlerPool is a fixed size pool of pre-allocated RunHandlers -// that can be used for tracking inter-op work for a given Session::Run(). -// RunHandler(s) in the pool are initially 'inactive'. A RunHandler becomes -// 'active' when its unique_ptr is returned by Get() and is being used by a -// client. It becomes 'inactive' once more when its unique_ptr gets destroyed. -// -// Expected usage: -// -// * Create a single RunHandlerPool (say run_handler_pool_). -// -// * When a Session::Run() is invoked, obtain a handler by: -// auto handler = run_handler_pool_->Get(); -// -// * Use handler for scheduling all inter-op work by: -// handler->ScheduleInterOpClosure(closure); -// -// This class is thread safe. -class RunHandlerPool { - public: - explicit RunHandlerPool(int num_inter_op_threads); - ~RunHandlerPool(); - - // Returns an inactive RunHandler from the pool. - // - // RunHandlers in RunHandlerPool are initially 'inactive'. - // A RunHandler becomes 'active' when its unique_ptr its returned by Get() - // and is being used by a client. It becomes 'inactive' once more when the - // unique_ptr is destroyed. - // - // Will block unless there is an inactive handler. - std::unique_ptr<RunHandler> Get(); - - private: - class Impl; - friend class RunHandler; - - std::unique_ptr<Impl> impl_; -}; - -// RunHandler can be used to schedule inter-op closures to run on a global pool -// shared across all Session::Run(s). -// -// It can only be created via RunHandlerPool::Get(). -// -// This class can be used instead of directly scheduling closures on a global -// pool since it maintains a global view across all sessions and optimizes pool -// scheduling to improve (median and tail) latency. -// -// This class is thread safe. -class RunHandler { - public: - void ScheduleInterOpClosure(std::function<void()> fn); - - ~RunHandler(); - - private: - class Impl; - friend class RunHandlerPool::Impl; - - explicit RunHandler(Impl* impl); - - Impl* impl_; // NOT OWNED. -}; - -} // end namespace tensorflow. - -#endif // TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_H_ diff --git a/tensorflow/core/framework/run_handler_util.cc b/tensorflow/core/framework/run_handler_util.cc deleted file mode 100644 index 3087998c69..0000000000 --- a/tensorflow/core/framework/run_handler_util.cc +++ /dev/null @@ -1,57 +0,0 @@ -/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/core/framework/run_handler_util.h" - -#include <algorithm> -#include <cmath> -#include "tensorflow/core/platform/logging.h" - -namespace tensorflow { - -void ComputeInterOpSchedulingRanges(int num_active_requests, int num_threads, - int min_threads_per_request, - std::vector<std::uint_fast32_t>* start_vec, - std::vector<std::uint_fast32_t>* end_vec) { - // Each request is expected to have weight W[i] = num_active_requests - i. - // Therefore, total_weight = sum of all request weights. - float total_weight = 0.5f * num_active_requests * (num_active_requests + 1); - float demand_factor = static_cast<float>(num_threads) / total_weight; - float last_cumulative_weight = 0.0; - min_threads_per_request = std::max(1, min_threads_per_request); - for (int i = 0; i != num_active_requests; i++) { - float cumulative_weight = - static_cast<float>(i + 1) * - (num_active_requests - static_cast<float>(i) * 0.5f); - float weight = cumulative_weight - last_cumulative_weight; - // Quantize thread_demand by rounding up, and also satisfying - // `min_threads_per_request` constraint. - // Note: We subtract a small epsilon (0.00001) to prevent ceil(..) from - // rounding weights like 4.0 to 5. - int demand = - std::max(min_threads_per_request, - static_cast<int>(ceil(weight * demand_factor - 0.00001f))); - // For the quantized range [start, end); compute the floor of real start, - // and expand downwards from there with length `demand` and adjust for - // boundary conditions. - int start = last_cumulative_weight * demand_factor; - int end = std::min(num_threads, start + demand); - start = std::max(0, std::min(start, end - demand)); - start_vec->at(i) = start; - end_vec->at(i) = end; - last_cumulative_weight = cumulative_weight; - } -} -} // namespace tensorflow diff --git a/tensorflow/core/framework/run_handler_util.h b/tensorflow/core/framework/run_handler_util.h deleted file mode 100644 index c0c36aeccb..0000000000 --- a/tensorflow/core/framework/run_handler_util.h +++ /dev/null @@ -1,43 +0,0 @@ -/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_UTIL_H_ -#define TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_UTIL_H_ - -#include <cstdint> -#include <vector> - -namespace tensorflow { - -// Assign thread ranges to requests. -// Requests are numbered 0...num_active_requests-1, and -// threads are numbered 0...num_threads-1. -// On return, the range start_vec->at(i)...end_vec->at(i)-1 -// indicates the subrange of the threads available to request i. -// The ranges given to different requests may overlap. -// Lower numbered requests will tend to be assigned more threads. -// Thus, a client might associate older requests with lower -// array indices so they receive access to more threads. -// However, the routine ensures that each request is given access -// to at least min(min_threads_per_request, num_threads) threads. -// Every thread will be assigned to at least one request range, -// assuming there is at least one request. -void ComputeInterOpSchedulingRanges(int num_active_requests, int num_threads, - int min_threads_per_request, - std::vector<std::uint_fast32_t>* start_vec, - std::vector<std::uint_fast32_t>* end_vec); - -} // end namespace tensorflow -#endif // TENSORFLOW_CORE_FRAMEWORK_RUN_HANDLER_UTIL_H_ diff --git a/tensorflow/core/framework/run_handler_util_test.cc b/tensorflow/core/framework/run_handler_util_test.cc deleted file mode 100644 index a1928c132b..0000000000 --- a/tensorflow/core/framework/run_handler_util_test.cc +++ /dev/null @@ -1,93 +0,0 @@ -/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/core/framework/run_handler_util.h" - -#include <vector> -#include "tensorflow/core/lib/strings/strcat.h" -#include "tensorflow/core/platform/logging.h" -#include "tensorflow/core/platform/test.h" -namespace tensorflow { -namespace { - -void VerifyFunction(int num_active_requests, int num_threads, - int min_threads_per_request, bool print_stats = false) { - if (print_stats) { - LOG(INFO) << "Test case# num_active_requests: " << num_active_requests - << " num_threads: " << num_threads - << " min_threads: " << min_threads_per_request; - } - std::vector<std::uint_fast32_t> start(num_active_requests); - std::vector<std::uint_fast32_t> end(num_active_requests); - - ComputeInterOpSchedulingRanges(num_active_requests, num_threads, - min_threads_per_request, &start, &end); - string range_str = ""; - for (int i = 0; i < num_active_requests; ++i) { - if (i > 0) range_str += " "; - range_str += strings::StrCat("[", start[i], ", ", end[i], ")"); - - ASSERT_GE(start[i], 0) << range_str; - ASSERT_LE(end[i], num_threads) << range_str; - if (i > 0) { - // Due to linearly decreasing demand, #threads(i - 1) >= #threads(i) - ASSERT_GE(end[i - 1] - start[i - 1], end[i] - start[i]) << range_str; - // No missing threads. - ASSERT_GE(end[i - 1], start[i]) << range_str; - } - // Each interval is at least of size 'min_threads_per_request'. - ASSERT_GE((end[i] - start[i]), min_threads_per_request) << range_str; - // Verify that assigned (quantized) threads is not overly estimated - // from real demand, when the demand is high (>= - // min_threads_per_request). - float entry_weight = num_active_requests - i; - float total_weight = 0.5f * num_active_requests * (num_active_requests + 1); - float thread_demand = (entry_weight * num_threads) / total_weight; - if (thread_demand > min_threads_per_request) { - // We expect some over-estimation of threads due to quantization, - // but we hope it's not more than 1 extra thread. - ASSERT_NEAR(end[i] - start[i], thread_demand, 1.0) - << "Ranges: " << range_str << " thread_demand: " << thread_demand - << " i: " << i; - } - } - ASSERT_EQ(end[num_active_requests - 1], num_threads); - ASSERT_EQ(start[0], 0); - if (print_stats) { - LOG(INFO) << "Assigned ranges: " << range_str; - } -} - -TEST(RunHandlerUtilTest, TestComputeInterOpSchedulingRanges) { - const int kMinThreadsPerRequestBound = 12; - const int kMaxActiveRequests = 128; - const int kMaxThreads = 128; - - for (int min_threads_per_request = 1; - min_threads_per_request <= kMinThreadsPerRequestBound; - ++min_threads_per_request) { - for (int num_active_requests = 1; num_active_requests <= kMaxActiveRequests; - ++num_active_requests) { - for (int num_threads = min_threads_per_request; - num_threads <= kMaxThreads; ++num_threads) { - VerifyFunction(num_active_requests, num_threads, - min_threads_per_request); - } - } - } -} - -} // namespace -} // namespace tensorflow |