aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/batching
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-12-13 14:43:00 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-12-13 14:46:18 -0800
commit82c1af4a693395489cb43c1bbb7c7be41af654b8 (patch)
tree02337187780b3d05fafd35b28907205d2fac7ad4 /tensorflow/contrib/batching
parent38b20f83dbaada96902bdd5b419feb5a8e47395c (diff)
Simplify and improve AdaptiveSharedBatchScheduler implementation. The new implementation will exist alongside the old one (selectable through the scheduler options) until its superiority is confirmed, at which point the old rate-based implementation will be removed.
The new implementation requires fewer options and no user feedback to achieve a low latency batching. Instead of processing batches at an adjustable rate, we limit the number of batches which can be concurrently processed. Below the limit, batches are immediately processed upon creation. At the limit, the oldest batch is processed once an in-processing batch finishes. The scheduler continuously adjusts the limit in order to maintain the smallest overall latency. PiperOrigin-RevId: 178960621
Diffstat (limited to 'tensorflow/contrib/batching')
-rw-r--r--tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h218
-rw-r--r--tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc111
2 files changed, 315 insertions, 14 deletions
diff --git a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h
index 9e32bee505..a2cb146b8d 100644
--- a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h
+++ b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h
@@ -16,9 +16,11 @@ limitations under the License.
#ifndef THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_
#define THIRD_PARTY_TENSORFLOW_CONTRIB_BATCHING_ADAPTIVE_SHARED_BATCH_SCHEDULER_H_
+#include <algorithm>
#include <functional>
#include <memory>
#include <queue>
+#include <random>
#include <unordered_map>
#include <vector>
@@ -42,19 +44,36 @@ template <typename TaskType>
class ASBSQueue;
} // namespace internal
+// EXPERIMENTAL: API MAY BE SUBJECTED TO SUDDEN CHANGES.
+//
// Shared batch scheduler designed to minimize latency. The scheduler keeps
// track of a number of queues (one per model or model version) which are
// continuously enqueuing requests. The scheduler groups the requests into
// batches which it periodically sends off for processing (see
// shared_batch_scheduler.h for more details). The AdaptiveSharedBatchScheduler
// prioritizes batches by age (i.e. the batch's oldest request) irrespective of
-// queue. The scheduler will process the oldest batch at an adjustable rate,
-// regardless of batch size. The user can provide feedback to help set this rate
-// to achieve some goal (i.e. minimize overall latency, limit cpu usage, etc).
+// queue or batch size.
+//
+// The scheduling decision currently exists in two flavors, controlled by the
+// option use_in_flight_batches_implementation. It is expected that setting this
+// option to true will give universally better results; after a period of
+// testing to confirm, the old implementation will be removed.
//
-// The rate (or rather, the corresponding period) is adjusted each time a batch
-// is processed, using an exponentially weighted moving average to smooth
-// potentially noisy feedback:
+// If use_in_flight_batches_implementation is set to true, the scheduler
+// limits the number of batches which can be processed concurrently. If a new
+// batch is created, and the number of in flight batches is below the limit,
+// the next (i.e. oldest) batch is immediately scheduled. Similarly, when a
+// batch finishes processing, the limit is rechecked, and another batch may be
+// scheduled. To avoid the need to carefully tune the limit for workload,
+// model type, platform, etc, it is dynamically adjusted in order to provide the
+// lowest latency.
+//
+// If use_in_flight_batches_implementation is set to false, the scheduler will
+// process the oldest batch at an adjustable rate, regardless of batch size.
+// The user can provide feedback to help set this rate to achieve some goal
+// (i.e. minimize overall latency, limit cpu usage, etc). The rate (or rather,
+// the corresponding period) is adjusted each time a batch is processed, using
+// an exponentially weighted moving average to smooth noisy feedback:
// ewma_feedback = ((N - 1) * ewma_feedback + feedback()) / N
// period *= (1 + K * emwa_feedback)
//
@@ -82,6 +101,20 @@ class AdaptiveSharedBatchScheduler
int64 num_batch_threads = port::NumSchedulableCPUs();
// The environment to use (typically only overridden by test code).
Env* env = Env::Default();
+ // Which implementation to use (described in class comments above).
+ bool use_in_flight_batches_implementation = false;
+ // Initial limit for number of batches being concurrently processed.
+ // Non-integer values correspond to probabilistic limits - i.e. a value of
+ // 3.2 results in an actual cap of 3 80% of the time, and 4 20% of the time.
+ double initial_in_flight_batches_limit = 3;
+ // Number of batches between adjustments of in_flight_batches_limit. Larger
+ // numbers will give less noisy latency measurements, but will be less
+ // responsive to changes in workload.
+ int64 batches_to_average_over = 1000;
+
+ // TODO(kte): remove the rate based implementation and corresponding options
+ // below once testing confirms the superiority of the in flight batches
+ // implementation.
// Initial batch scheduling period in microseconds. Will be altered for
// non-zero rate_feedback.
double initial_scheduling_period_micros = 500;
@@ -122,6 +155,11 @@ class AdaptiveSharedBatchScheduler
BatchProcessor process_batch_callback,
std::unique_ptr<BatchScheduler<TaskType>>* queue);
+ double in_flight_batches_limit() {
+ mutex_lock l(mu_);
+ return in_flight_batches_limit_;
+ }
+
private:
// access to AddBatch, RemoveQueue, GetEnv.
friend class internal::ASBSQueue<TaskType>;
@@ -129,10 +167,20 @@ class AdaptiveSharedBatchScheduler
explicit AdaptiveSharedBatchScheduler(const Options& options);
// Batch scheduling function which runs every scheduling_period_ microseconds.
+ // Only used when options_.use_in_flight_batches_implementation == false.
void ProcessOneBatch();
+ // Tracks processing latency and adjusts in_flight_batches_limit to minimize.
+ // Only used when options_.use_in_flight_batches_implementation == true.
+ void CallbackWrapper(const internal::ASBSBatch<TaskType>* batch,
+ BatchProcessor callback);
+
+ // Schedules batch if in_flight_batches_limit_ is not met.
+ // Only used when options_.use_in_flight_batches_implementation == true.
+ void MaybeScheduleNextBatch() EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
// Notifies scheduler of non-empty batch which is eligible for processing.
- void AddBatch(internal::ASBSBatch<TaskType>*);
+ void AddBatch(const internal::ASBSBatch<TaskType>* batch);
// Removes queue from scheduler.
void RemoveQueue(const internal::ASBSQueue<TaskType>* queue);
@@ -149,7 +197,8 @@ class AdaptiveSharedBatchScheduler
// Collection of batches added by AddBatch, ordered by age. Owned by scheduler
// until they are released for processing.
std::priority_queue<const internal::ASBSBatch<TaskType>*,
- std::vector<internal::ASBSBatch<TaskType>*>, BatchCompare>
+ std::vector<const internal::ASBSBatch<TaskType>*>,
+ BatchCompare>
batches_ GUARDED_BY(mu_);
// Unowned queues and callbacks added by AddQueue.
@@ -160,19 +209,56 @@ class AdaptiveSharedBatchScheduler
// Responsible for running ProcessOneBatch. PeriodicFunction was used in order
// to check for deletion so that the thread can be shut down.
+ // Only used when options_.use_in_flight_batches_implementation == false.
std::unique_ptr<PeriodicFunction> scheduling_thread_;
// Responsible for running the batch processing callbacks.
std::unique_ptr<thread::ThreadPool> batch_thread_pool_;
// Time interval in microseconds between successive ProcessOneBatch calls.
+ // Only used when options_.use_in_flight_batches_implementation == false.
double scheduling_period_;
// Exponentially weighted moving average of
// options_.scheduling_period_feedback() evaluated in each ProcessOneBatch
// call.
+ // Only used when options_.use_in_flight_batches_implementation == false.
double ewma_feedback_ = 0;
+ // Limit on number of batches which can be concurrently processed.
+ // Non-integer values correspond to probabilistic limits - i.e. a value of 3.2
+ // results in an actual cap of 3 80% of the time, and 4 20% of the time.
+ // Only used when options_.use_in_flight_batches_implementation == true.
+ double in_flight_batches_limit_ GUARDED_BY(mu_);
+
+ // Number of batches currently being processed.
+ // Only used when options_.use_in_flight_batches_implementation == true.
+ int64 in_flight_batches_ GUARDED_BY(mu_) = 0;
+
+ // RNG engine and distribution.
+ // Only used when options_.use_in_flight_batches_implementation == true.
+ std::default_random_engine rand_engine_;
+ std::uniform_real_distribution<double> rand_double_;
+
+ // Fields controlling the dynamic adjustment of in_flight_batches_limit_.
+ // Only used when options_.use_in_flight_batches_implementation == true.
+ // Number of batches since the last in_flight_batches_limit_ adjustment.
+ int64 batch_count_ GUARDED_BY(mu_) = 0;
+ // Sum of processing latency for batches counted by batch_count_.
+ int64 batch_latency_sum_ GUARDED_BY(mu_) = 0;
+ // Average batch latency for previous value of in_flight_batches_limit_.
+ double last_avg_latency_ms_ GUARDED_BY(mu_) = 0;
+ // Did last_avg_latency_ms_ decrease from the previous last_avg_latency_ms_?
+ bool last_latency_decreased_ GUARDED_BY(mu_) = false;
+ // Current direction (+-) to adjust in_flight_batches_limit_
+ int step_direction_ GUARDED_BY(mu_) = 1;
+ // Max adjustment size (as a fraction of in_flight_batches_limit_).
+ constexpr static double kMaxStepSizeMultiplier = 0.125; // 1/8;
+ // Min adjustment size (as a fraction of in_flight_batches_limit_).
+ constexpr static double kMinStepSizeMultiplier = 0.0078125; // 1/128
+ // Current adjustment size (as a fraction of in_flight_batches_limit_).
+ double step_size_multiplier_ GUARDED_BY(mu_) = kMaxStepSizeMultiplier;
+
TF_DISALLOW_COPY_AND_ASSIGN(AdaptiveSharedBatchScheduler);
};
@@ -244,6 +330,12 @@ class ASBSBatch : public Batch<TaskType> {
// ---------------- AdaptiveSharedBatchScheduler ----------------
template <typename TaskType>
+constexpr double AdaptiveSharedBatchScheduler<TaskType>::kMaxStepSizeMultiplier;
+
+template <typename TaskType>
+constexpr double AdaptiveSharedBatchScheduler<TaskType>::kMinStepSizeMultiplier;
+
+template <typename TaskType>
Status AdaptiveSharedBatchScheduler<TaskType>::Create(
const Options& options,
std::shared_ptr<AdaptiveSharedBatchScheduler<TaskType>>* scheduler) {
@@ -277,6 +369,25 @@ Status AdaptiveSharedBatchScheduler<TaskType>::Create(
"feedback_smoothing_batches must be positive; was ",
options.feedback_smoothing_batches);
}
+ if (options.initial_in_flight_batches_limit > options.num_batch_threads) {
+ return errors::InvalidArgument(
+ "initial_in_flight_batches_limit (",
+ options.initial_in_flight_batches_limit,
+ ") should not be larger than num_batch_threads (",
+ options.num_batch_threads, ")");
+ }
+ if (options.initial_in_flight_batches_limit < 1) {
+ return errors::InvalidArgument(
+ "initial_in_flight_batches_limit should be "
+ "greater than or equal to 1; was ",
+ options.initial_in_flight_batches_limit);
+ }
+ if (options.batches_to_average_over < 1) {
+ return errors::InvalidArgument(
+ "batches_to_average_over should be "
+ "greater than or equal to 1; was ",
+ options.batches_to_average_over);
+ }
scheduler->reset(new AdaptiveSharedBatchScheduler<TaskType>(options));
return Status::OK();
}
@@ -285,14 +396,20 @@ template <typename TaskType>
AdaptiveSharedBatchScheduler<TaskType>::AdaptiveSharedBatchScheduler(
const Options& options)
: options_(options),
- scheduling_period_(options.initial_scheduling_period_micros) {
+ scheduling_period_(options.initial_scheduling_period_micros),
+ in_flight_batches_limit_(options.initial_in_flight_batches_limit),
+ rand_double_(0.0, 1.0) {
+ std::random_device device;
+ rand_engine_.seed(device());
PeriodicFunction::Options opts;
opts.thread_name_prefix = "scheduling_thread";
opts.env = GetEnv();
- scheduling_thread_.reset(
- new PeriodicFunction([this] { ProcessOneBatch(); }, 0, opts));
batch_thread_pool_.reset(new thread::ThreadPool(
GetEnv(), options.thread_pool_name, options.num_batch_threads));
+ if (!options.use_in_flight_batches_implementation) {
+ scheduling_thread_.reset(
+ new PeriodicFunction([this] { ProcessOneBatch(); }, 0, opts));
+ }
}
template <typename TaskType>
@@ -318,9 +435,12 @@ Status AdaptiveSharedBatchScheduler<TaskType>::AddQueue(
template <typename TaskType>
void AdaptiveSharedBatchScheduler<TaskType>::AddBatch(
- internal::ASBSBatch<TaskType>* batch) {
+ const internal::ASBSBatch<TaskType>* batch) {
mutex_lock l(mu_);
batches_.push(batch);
+ if (options_.use_in_flight_batches_implementation) {
+ MaybeScheduleNextBatch();
+ }
}
template <typename TaskType>
@@ -331,9 +451,77 @@ void AdaptiveSharedBatchScheduler<TaskType>::RemoveQueue(
}
template <typename TaskType>
+void AdaptiveSharedBatchScheduler<TaskType>::MaybeScheduleNextBatch() {
+ if (batches_.empty() || in_flight_batches_ >= in_flight_batches_limit_)
+ return;
+ // Non-integer limit handled probabilistially.
+ if (in_flight_batches_limit_ - in_flight_batches_ < 1 &&
+ rand_double_(rand_engine_) >
+ (in_flight_batches_limit_ - in_flight_batches_))
+ return;
+ const internal::ASBSBatch<TaskType>* batch = batches_.top();
+ batches_.pop();
+ // Queue may destroy itself after ReleaseBatch is called.
+ batch->queue()->ReleaseBatch(batch);
+ batch_thread_pool_->Schedule(
+ std::bind(&AdaptiveSharedBatchScheduler<TaskType>::CallbackWrapper, this,
+ batch, queues_and_callbacks_[batch->queue()]));
+ in_flight_batches_++;
+}
+
+template <typename TaskType>
+void AdaptiveSharedBatchScheduler<TaskType>::CallbackWrapper(
+ const internal::ASBSBatch<TaskType>* batch,
+ AdaptiveSharedBatchScheduler<TaskType>::BatchProcessor callback) {
+ int64 start_time = batch->creation_time_micros();
+ callback(std::unique_ptr<Batch<TaskType>>(
+ const_cast<internal::ASBSBatch<TaskType>*>(batch)));
+ int64 end_time = GetEnv()->NowMicros();
+ mutex_lock l(mu_);
+ in_flight_batches_--;
+ batch_count_++;
+ batch_latency_sum_ += end_time - start_time;
+ // Occasionally adjust in_flight_batches_limit_ to minimize average latency.
+ // Although the optimal value may depend on the workload, the latency should
+ // be a simple convex function of in_flight_batches_limit_, allowing us to
+ // locate the global minimum relatively quickly.
+ if (batch_count_ == options_.batches_to_average_over) {
+ double current_avg_latency_ms = (batch_latency_sum_ / 1000.) / batch_count_;
+ bool current_latency_decreased =
+ current_avg_latency_ms < last_avg_latency_ms_;
+ if (current_latency_decreased) {
+ // If latency improvement was because we're moving in the correct
+ // direction, increase step_size so that we can get to the minimum faster.
+ // If latency improvement was due to backtracking from a previous failure,
+ // decrease step_size in order to refine our location.
+ step_size_multiplier_ *= (last_latency_decreased_ ? 2 : 0.5);
+ step_size_multiplier_ =
+ std::min(step_size_multiplier_, kMaxStepSizeMultiplier);
+ step_size_multiplier_ =
+ std::max(step_size_multiplier_, kMinStepSizeMultiplier);
+ } else {
+ // Return (nearly) to previous position and confirm that latency is better
+ // there before decreasing step size.
+ step_direction_ = -step_direction_;
+ }
+ in_flight_batches_limit_ +=
+ step_direction_ * in_flight_batches_limit_ * step_size_multiplier_;
+ in_flight_batches_limit_ =
+ std::min(in_flight_batches_limit_,
+ static_cast<double>(options_.num_batch_threads));
+ in_flight_batches_limit_ = std::max(in_flight_batches_limit_, 1.0);
+ last_avg_latency_ms_ = current_avg_latency_ms;
+ last_latency_decreased_ = current_latency_decreased;
+ batch_count_ = 0;
+ batch_latency_sum_ = 0;
+ }
+ MaybeScheduleNextBatch();
+}
+
+template <typename TaskType>
void AdaptiveSharedBatchScheduler<TaskType>::ProcessOneBatch() {
static const double kFeedbackMultiplier = .001;
- internal::ASBSBatch<TaskType>* batch = nullptr;
+ const internal::ASBSBatch<TaskType>* batch = nullptr;
BatchProcessor callback;
const int64 start_time_micros = GetEnv()->NowMicros();
{
@@ -357,7 +545,8 @@ void AdaptiveSharedBatchScheduler<TaskType>::ProcessOneBatch() {
// Queue may destroy itself after ReleaseBatch is called.
batch->queue()->ReleaseBatch(batch);
batch_thread_pool_->Schedule([callback, batch] {
- callback(std::unique_ptr<Batch<TaskType>>(batch));
+ callback(std::unique_ptr<Batch<TaskType>>(
+ const_cast<internal::ASBSBatch<TaskType>*>(batch)));
});
}
const int64 sleep_time =
@@ -427,6 +616,7 @@ Status ASBSQueue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
current_batch_->AddTask(std::move(*task));
num_enqueued_tasks_++;
}
+ // AddBatch must be called outside of lock, since it may call ReleaseBatch.
if (new_batch != nullptr) scheduler_->AddBatch(new_batch);
return Status::OK();
}
diff --git a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc
index e2aac54eeb..18f1e55452 100644
--- a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc
+++ b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc
@@ -141,6 +141,16 @@ TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
options = Scheduler::Options();
options.feedback_smoothing_batches = 0;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
+ options = Scheduler::Options();
+ options.initial_in_flight_batches_limit = 0.5;
+ EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
+ options = Scheduler::Options();
+ options.num_batch_threads = 5;
+ options.initial_in_flight_batches_limit = 8;
+ EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
+ options = Scheduler::Options();
+ options.batches_to_average_over = -5;
+ EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
}
TEST(AdaptiveSharedBatchSchedulerTest, ObeysQueueOptions) {
@@ -434,6 +444,107 @@ TEST(AdaptiveSharedBatchSchedulerTest, QueueCapacityInfo) {
}
stop_teardown.Notify();
}
+
+TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesImplementation) {
+ AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+ options.use_in_flight_batches_implementation = true;
+ options.initial_in_flight_batches_limit = 2;
+ options.batches_to_average_over = 1000;
+ std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+ TF_ASSERT_OK(
+ AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+ std::unique_ptr<BatchScheduler<FakeTask>> queue;
+ mutex mu;
+ int processed_batches = 0;
+ Notification finish_processing;
+ auto queue_callback = [&mu, &processed_batches, &finish_processing](
+ std::unique_ptr<Batch<FakeTask>> batch) {
+ ASSERT_TRUE(batch->IsClosed());
+ EXPECT_GT(batch->num_tasks(), 0);
+ mu.lock();
+ int batch_num = ++processed_batches;
+ mu.unlock();
+ if (batch_num == 2) {
+ // Give third batch a chance to process if it's going to.
+ Env::Default()->SleepForMicroseconds(1000);
+ finish_processing.Notify();
+ }
+ if (batch_num == 3) {
+ ASSERT_TRUE(finish_processing.HasBeenNotified());
+ }
+ finish_processing.WaitForNotification();
+ };
+
+ TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
+ // Enqueue 3 batches.
+ for (int i = 0; i < 3; i++) {
+ TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+ }
+}
+
+TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimitTuning) {
+ test_util::FakeClockEnv env(Env::Default());
+ Notification start_teardown, stop_teardown;
+ std::unique_ptr<Thread> teardown_thread =
+ CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
+ {
+ AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+ options.env = &env;
+ options.use_in_flight_batches_implementation = true;
+ options.initial_in_flight_batches_limit = 2;
+ options.batches_to_average_over = 1;
+ std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+ TF_ASSERT_OK(
+ AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+ std::unique_ptr<BatchScheduler<FakeTask>> queue;
+ auto queue_callback = [&env](std::unique_ptr<Batch<FakeTask>> batch) {
+ ASSERT_TRUE(batch->IsClosed());
+ switch (batch->size()) {
+ case 0:
+ env.AdvanceByMicroseconds(10);
+ break;
+ case 1:
+ env.AdvanceByMicroseconds(15);
+ break;
+ case 2:
+ env.AdvanceByMicroseconds(10);
+ break;
+ case 3:
+ env.AdvanceByMicroseconds(11);
+ break;
+ }
+ };
+
+ TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+ TF_ASSERT_OK(ScheduleTask(0, queue.get()));
+ double in_flight_batches_limit = 2;
+ while (scheduler->in_flight_batches_limit() == in_flight_batches_limit) {
+ }
+ // Initial direction will be negative.
+ EXPECT_LT(scheduler->in_flight_batches_limit(), in_flight_batches_limit);
+ in_flight_batches_limit = scheduler->in_flight_batches_limit();
+ TF_ASSERT_OK(ScheduleTask(1, queue.get()));
+ while (scheduler->in_flight_batches_limit() == in_flight_batches_limit) {
+ }
+ // Latency increased -> change direction.
+ EXPECT_GT(scheduler->in_flight_batches_limit(), in_flight_batches_limit);
+ in_flight_batches_limit = scheduler->in_flight_batches_limit();
+ TF_ASSERT_OK(ScheduleTask(2, queue.get()));
+ while (scheduler->in_flight_batches_limit() == in_flight_batches_limit) {
+ }
+ // Latency decreased -> keep going in same direction.
+ EXPECT_GT(scheduler->in_flight_batches_limit(), in_flight_batches_limit);
+ in_flight_batches_limit = scheduler->in_flight_batches_limit();
+ TF_ASSERT_OK(ScheduleTask(3, queue.get()));
+ while (scheduler->in_flight_batches_limit() == in_flight_batches_limit) {
+ }
+ // Latency increased -> change direction.
+ EXPECT_LT(scheduler->in_flight_batches_limit(), in_flight_batches_limit);
+ start_teardown.Notify();
+ }
+ stop_teardown.Notify();
+}
} // namespace anonymous
} // namespace serving
} // namespace tensorflow