aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2018-02-26 18:05:59 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-02-26 18:12:45 -0800
commit60a4b676df017b4ac51ca84a5e5e3a998912cebc (patch)
treeb523b2bd6f41ef489b19069fda9d2286cd2fbe70
parent19f18e377d8ee2f624406527b21444128da344df (diff)
Remove old implementation of the adaptive shared batcher, the in flight batches implemntation delivers similar performance but is simpler and requires less tuning.
PiperOrigin-RevId: 187111685
-rw-r--r--tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler.h172
-rw-r--r--tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler_test.cc488
2 files changed, 140 insertions, 520 deletions
diff --git a/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler.h b/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler.h
index 25c5f9cf42..661ed239d3 100644
--- a/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler.h
+++ b/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler.h
@@ -50,43 +50,26 @@ class ASBSQueue;
// track of a number of queues (one per model or model version) which are
// continuously enqueuing requests. The scheduler groups the requests into
// batches which it periodically sends off for processing (see
-// shared_batch_scheduler.h for more details). The AdaptiveSharedBatchScheduler
-// prioritizes batches by age (i.e. the batch's oldest request) irrespective of
-// queue or batch size.
+// shared_batch_scheduler.h for more details). AdaptiveSharedBatchScheduler
+// (ASBS) prioritizes batches by age (i.e. the batch's oldest request)
+// irrespective of queue or batch size.
//
-// The scheduling decision currently exists in two flavors, controlled by the
-// option use_in_flight_batches_implementation. It is expected that setting this
-// option to true will give universally better results; after a period of
-// testing to confirm, the old implementation will be removed.
-//
-// If use_in_flight_batches_implementation is set to true, the scheduler
-// limits the number of batches which can be processed concurrently. If a new
-// batch is created, and the number of in flight batches is below the limit,
-// the next (i.e. oldest) batch is immediately scheduled. Similarly, when a
-// batch finishes processing, the limit is rechecked, and another batch may be
-// scheduled. To avoid the need to carefully tune the limit for workload,
-// model type, platform, etc, it is dynamically adjusted in order to provide the
-// lowest latency.
-//
-// If use_in_flight_batches_implementation is set to false, the scheduler will
-// process the oldest batch at an adjustable rate, regardless of batch size.
-// The user can provide feedback to help set this rate to achieve some goal
-// (i.e. minimize overall latency, limit cpu usage, etc). The rate (or rather,
-// the corresponding period) is adjusted each time a batch is processed, using
-// an exponentially weighted moving average to smooth noisy feedback:
-// ewma_feedback = ((N - 1) * ewma_feedback + feedback()) / N
-// period *= (1 + K * emwa_feedback)
+// ASBS tries to keep the system busy by maintaining an adjustable number of
+// concurrently processed batches. If a new batch is created, and the number of
+// in flight batches is below the target, the next (i.e. oldest) batch is
+// immediately scheduled. Similarly, when a batch finishes processing, the
+// target is rechecked, and another batch may be scheduled. To avoid the need
+// to carefully tune the target for workload, model type, platform, etc, it is
+// dynamically adjusted in order to provide the lowest average latency.
//
// Some potential use cases:
// Hardware Accelerators (GPUs & TPUs) - If some phase of batch processing
// involves serial processing by a device, from a latency perspective it is
// desirable to keep the device evenly loaded, avoiding the need to wait for
// the device to process prior batches.
-// feedback = num_pending_on_device() - desired_pending.
// CPU utilization - If the batch processing is cpu dominated, you can reap
// latency gains when underutilized by increasing the processing rate, but
// back the rate off when the load increases to avoid overload.
-// feedback = cpu_rate() - desired_cpu_rate.
template <typename TaskType>
class AdaptiveSharedBatchScheduler
@@ -101,13 +84,17 @@ class AdaptiveSharedBatchScheduler
struct Options {
// The name to use for the pool of batch threads.
string thread_pool_name = {"batch_threads"};
- // Number of batch processing threads; equivalently the maximum number of
- // concurrently running batches.
+ // Number of batch processing threads - the maximum value of
+ // in_flight_batches_limit_. It is recommended that this value be set by
+ // running the system under load, observing the learned value for
+ // in_flight_batches_limit_, and setting this maximum to ~ 2x the value.
+ // Under low load, in_flight_batches_limit_ has no substantial effect on
+ // latency and therefore undergoes a random walk. Unreasonably large values
+ // for num_batch_threads allows for large in_flight_batches_limit_, which
+ // will harm latency for some time once load increases again.
int64 num_batch_threads = port::NumSchedulableCPUs();
// The environment to use (typically only overridden by test code).
Env* env = Env::Default();
- // Which implementation to use (described in class comments above).
- bool use_in_flight_batches_implementation = false;
// Initial limit for number of batches being concurrently processed.
// Non-integer values correspond to probabilistic limits - i.e. a value of
// 3.2 results in an actual cap of 3 80% of the time, and 4 20% of the time.
@@ -116,28 +103,6 @@ class AdaptiveSharedBatchScheduler
// numbers will give less noisy latency measurements, but will be less
// responsive to changes in workload.
int64 batches_to_average_over = 1000;
-
- // TODO(kte): remove the rate based implementation and corresponding options
- // below once testing confirms the superiority of the in flight batches
- // implementation.
- // Initial batch scheduling period in microseconds. Will be altered for
- // non-zero rate_feedback.
- double initial_scheduling_period_micros = 500;
- // Minimum batch scheduling period in microseconds. Recommend setting this
- // value greater than 0, otherwise it may take a while to recover from a
- // sustained time of negative scheduling_period_feedback (which may occur
- // under low load).
- double min_scheduling_period_micros = 100;
- // Maximum batch scheduling period in microseconds.
- double max_scheduling_period_micros = 10000;
- // Feedback function used to modify the scheduling period each time a batch
- // is scheduled. Should return values roughly O(1), with positive values
- // resulting in an increased period.
- std::function<double()> scheduling_period_feedback{[] { return 0.; }};
- // To handle potentially noisy scheduling_period_feedback, the period is
- // adjusted using an exponentially weighted moving average over the previous
- // feedback_smoothing_batches batches. Must be greater than 0.
- int64 feedback_smoothing_batches = 10;
};
// Ownership is shared between the caller of Create() and any queues created
@@ -171,17 +136,11 @@ class AdaptiveSharedBatchScheduler
explicit AdaptiveSharedBatchScheduler(const Options& options);
- // Batch scheduling function which runs every scheduling_period_ microseconds.
- // Only used when options_.use_in_flight_batches_implementation == false.
- void ProcessOneBatch();
-
// Tracks processing latency and adjusts in_flight_batches_limit to minimize.
- // Only used when options_.use_in_flight_batches_implementation == true.
void CallbackWrapper(const internal::ASBSBatch<TaskType>* batch,
BatchProcessor callback);
// Schedules batch if in_flight_batches_limit_ is not met.
- // Only used when options_.use_in_flight_batches_implementation == true.
void MaybeScheduleNextBatch() EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Notifies scheduler of non-empty batch which is eligible for processing.
@@ -212,41 +171,22 @@ class AdaptiveSharedBatchScheduler
mutex mu_;
- // Responsible for running ProcessOneBatch. PeriodicFunction was used in order
- // to check for deletion so that the thread can be shut down.
- // Only used when options_.use_in_flight_batches_implementation == false.
- std::unique_ptr<PeriodicFunction> scheduling_thread_;
-
// Responsible for running the batch processing callbacks.
std::unique_ptr<thread::ThreadPool> batch_thread_pool_;
- // Time interval in microseconds between successive ProcessOneBatch calls.
- // Only used when options_.use_in_flight_batches_implementation == false.
- double scheduling_period_;
-
- // Exponentially weighted moving average of
- // options_.scheduling_period_feedback() evaluated in each ProcessOneBatch
- // call.
- // Only used when options_.use_in_flight_batches_implementation == false.
- double ewma_feedback_ = 0;
-
// Limit on number of batches which can be concurrently processed.
// Non-integer values correspond to probabilistic limits - i.e. a value of 3.2
// results in an actual cap of 3 80% of the time, and 4 20% of the time.
- // Only used when options_.use_in_flight_batches_implementation == true.
double in_flight_batches_limit_ GUARDED_BY(mu_);
// Number of batches currently being processed.
- // Only used when options_.use_in_flight_batches_implementation == true.
int64 in_flight_batches_ GUARDED_BY(mu_) = 0;
// RNG engine and distribution.
- // Only used when options_.use_in_flight_batches_implementation == true.
std::default_random_engine rand_engine_;
std::uniform_real_distribution<double> rand_double_;
// Fields controlling the dynamic adjustment of in_flight_batches_limit_.
- // Only used when options_.use_in_flight_batches_implementation == true.
// Number of batches since the last in_flight_batches_limit_ adjustment.
int64 batch_count_ GUARDED_BY(mu_) = 0;
// Sum of processing latency for batches counted by batch_count_.
@@ -348,32 +288,6 @@ Status AdaptiveSharedBatchScheduler<TaskType>::Create(
return errors::InvalidArgument("num_batch_threads must be positive; was ",
options.num_batch_threads);
}
- if (options.min_scheduling_period_micros < 0) {
- return errors::InvalidArgument(
- "min_scheduling_period_micros must be >= 0; was ",
- options.min_scheduling_period_micros);
- }
- if (options.min_scheduling_period_micros >
- options.initial_scheduling_period_micros) {
- return errors::InvalidArgument(
- "initial_scheduling_period_micros (",
- options.initial_scheduling_period_micros,
- ") must be >= min_scheduling_period_micros (",
- options.min_scheduling_period_micros, ")");
- }
- if (options.initial_scheduling_period_micros >
- options.max_scheduling_period_micros) {
- return errors::InvalidArgument(
- "initial_scheduling_period_micros (",
- options.initial_scheduling_period_micros,
- ") must be <= max_scheduling_period_micros (",
- options.max_scheduling_period_micros, ")");
- }
- if (options.feedback_smoothing_batches < 1) {
- return errors::InvalidArgument(
- "feedback_smoothing_batches must be positive; was ",
- options.feedback_smoothing_batches);
- }
if (options.initial_in_flight_batches_limit > options.num_batch_threads) {
return errors::InvalidArgument(
"initial_in_flight_batches_limit (",
@@ -401,20 +315,12 @@ template <typename TaskType>
AdaptiveSharedBatchScheduler<TaskType>::AdaptiveSharedBatchScheduler(
const Options& options)
: options_(options),
- scheduling_period_(options.initial_scheduling_period_micros),
in_flight_batches_limit_(options.initial_in_flight_batches_limit),
rand_double_(0.0, 1.0) {
std::random_device device;
rand_engine_.seed(device());
- PeriodicFunction::Options opts;
- opts.thread_name_prefix = "scheduling_thread";
- opts.env = GetEnv();
batch_thread_pool_.reset(new thread::ThreadPool(
GetEnv(), options.thread_pool_name, options.num_batch_threads));
- if (!options.use_in_flight_batches_implementation) {
- scheduling_thread_.reset(
- new PeriodicFunction([this] { ProcessOneBatch(); }, 0, opts));
- }
}
template <typename TaskType>
@@ -443,9 +349,7 @@ void AdaptiveSharedBatchScheduler<TaskType>::AddBatch(
const internal::ASBSBatch<TaskType>* batch) {
mutex_lock l(mu_);
batches_.push(batch);
- if (options_.use_in_flight_batches_implementation) {
- MaybeScheduleNextBatch();
- }
+ MaybeScheduleNextBatch();
}
template <typename TaskType>
@@ -524,44 +428,6 @@ void AdaptiveSharedBatchScheduler<TaskType>::CallbackWrapper(
}
template <typename TaskType>
-void AdaptiveSharedBatchScheduler<TaskType>::ProcessOneBatch() {
- static const double kFeedbackMultiplier = .001;
- const internal::ASBSBatch<TaskType>* batch = nullptr;
- BatchProcessor callback;
- const int64 start_time_micros = GetEnv()->NowMicros();
- {
- mutex_lock l(mu_);
- if (!batches_.empty()) {
- batch = batches_.top();
- batches_.pop();
- callback = queues_and_callbacks_[batch->queue()];
- }
- }
- if (batch != nullptr) {
- double feedback = options_.scheduling_period_feedback();
- const int64 N = options_.feedback_smoothing_batches;
- ewma_feedback_ = ((N - 1) * ewma_feedback_ + feedback) / N;
- scheduling_period_ *= (1 + kFeedbackMultiplier * ewma_feedback_);
- if (scheduling_period_ < options_.min_scheduling_period_micros) {
- scheduling_period_ = options_.min_scheduling_period_micros;
- } else if (scheduling_period_ > options_.max_scheduling_period_micros) {
- scheduling_period_ = options_.max_scheduling_period_micros;
- }
- // Queue may destroy itself after ReleaseBatch is called.
- batch->queue()->ReleaseBatch(batch);
- batch_thread_pool_->Schedule([callback, batch] {
- callback(std::unique_ptr<Batch<TaskType>>(
- const_cast<internal::ASBSBatch<TaskType>*>(batch)));
- });
- }
- const int64 sleep_time =
- scheduling_period_ - (GetEnv()->NowMicros() - start_time_micros);
- if (sleep_time > 0) {
- GetEnv()->SleepForMicroseconds(sleep_time);
- }
-}
-
-template <typename TaskType>
bool AdaptiveSharedBatchScheduler<TaskType>::BatchCompare::operator()(
const internal::ASBSBatch<TaskType>* a,
const internal::ASBSBatch<TaskType>* b) {
diff --git a/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler_test.cc b/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler_test.cc
index 8ae8ca02ec..109234287e 100644
--- a/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler_test.cc
+++ b/tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler_test.cc
@@ -64,59 +64,6 @@ std::unique_ptr<Thread> CreateFakeClockAdvancerThread(
}));
}
-TEST(AdaptiveSharedBatchSchedulerTest, Basic) {
- for (const bool delete_scheduler_early : {false, true}) {
- for (const bool delete_queue_1_early : {false, true}) {
- int queue_0_tasks = 0;
- auto queue_0_callback =
- [&queue_0_tasks](std::unique_ptr<Batch<FakeTask>> batch) {
- ASSERT_TRUE(batch->IsClosed());
- EXPECT_GT(batch->num_tasks(), 0);
- for (int i = 0; i < batch->num_tasks(); i++) {
- queue_0_tasks += batch->task(i).size();
- }
- };
- int queue_1_tasks = 0;
- auto queue_1_callback =
- [&queue_1_tasks](std::unique_ptr<Batch<FakeTask>> batch) {
- ASSERT_TRUE(batch->IsClosed());
- EXPECT_GT(batch->num_tasks(), 0);
- for (int i = 0; i < batch->num_tasks(); i++) {
- queue_1_tasks += batch->task(i).size();
- }
- };
- {
- std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
- TF_ASSERT_OK(
- AdaptiveSharedBatchScheduler<FakeTask>::Create({}, &scheduler));
-
- // Create two queues.
- std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
- TF_ASSERT_OK(scheduler->AddQueue({}, queue_0_callback, &queue_0));
- std::unique_ptr<BatchScheduler<FakeTask>> queue_1;
- TF_ASSERT_OK(scheduler->AddQueue({}, queue_1_callback, &queue_1));
-
- if (delete_scheduler_early) {
- // Delete our copy of the scheduler. The queues should keep it alive
- // under the covers.
- scheduler = nullptr;
- }
- // Submit tasks to the two queues, and (optionally) remove the queues.
- TF_ASSERT_OK(ScheduleTask(1, queue_0.get()));
- TF_ASSERT_OK(ScheduleTask(2, queue_1.get()));
- TF_ASSERT_OK(ScheduleTask(3, queue_0.get()));
- TF_ASSERT_OK(ScheduleTask(4, queue_1.get()));
- if (delete_queue_1_early) {
- queue_1 = nullptr;
- }
- TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
- }
- EXPECT_EQ(queue_0_tasks, 9);
- EXPECT_EQ(queue_1_tasks, 6);
- }
- }
-}
-
TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
using Scheduler = AdaptiveSharedBatchScheduler<FakeTask>;
std::shared_ptr<Scheduler> scheduler;
@@ -124,24 +71,6 @@ TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
options.num_batch_threads = 0;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
options = Scheduler::Options();
- options.min_scheduling_period_micros = 50;
- options.max_scheduling_period_micros = 100;
- options.initial_scheduling_period_micros = 1;
- EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
- options = Scheduler::Options();
- options.min_scheduling_period_micros = 50;
- options.max_scheduling_period_micros = 100;
- options.initial_scheduling_period_micros = 1000;
- EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
- options = Scheduler::Options();
- options.min_scheduling_period_micros = 100;
- options.max_scheduling_period_micros = 50;
- options.initial_scheduling_period_micros = 75;
- EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
- options = Scheduler::Options();
- options.feedback_smoothing_batches = 0;
- EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
- options = Scheduler::Options();
options.initial_in_flight_batches_limit = 0.5;
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
options = Scheduler::Options();
@@ -153,301 +82,8 @@ TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
}
-TEST(AdaptiveSharedBatchSchedulerTest, ObeysQueueOptions) {
- test_util::FakeClockEnv env(Env::Default());
- Notification start_teardown, stop_teardown;
- std::unique_ptr<Thread> teardown_thread =
- CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
- {
- AdaptiveSharedBatchScheduler<FakeTask>::Options options;
- options.initial_scheduling_period_micros = 1000;
- options.env = &env;
- std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
- TF_ASSERT_OK(
- AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
- std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
- std::unique_ptr<BatchScheduler<FakeTask>> queue_1;
- int queue_0_tasks = 0;
- int queue_1_tasks = 0;
- auto queue_0_callback = [&queue_0_tasks,
- &env](std::unique_ptr<Batch<FakeTask>> batch) {
- ASSERT_TRUE(batch->IsClosed());
- EXPECT_GT(batch->num_tasks(), 0);
- for (int i = 0; i < batch->num_tasks(); i++) {
- queue_0_tasks += batch->task(i).size();
- }
- env.SleepForMicroseconds(1);
- };
- auto queue_1_callback = [&queue_1_tasks,
- &env](std::unique_ptr<Batch<FakeTask>> batch) {
- ASSERT_TRUE(batch->IsClosed());
- EXPECT_GT(batch->num_tasks(), 0);
- for (int i = 0; i < batch->num_tasks(); i++) {
- queue_1_tasks += batch->task(i).size();
- }
- env.SleepForMicroseconds(1);
- };
- AdaptiveSharedBatchScheduler<FakeTask>::QueueOptions queue_options;
- queue_options.max_batch_size = 10;
- queue_options.max_enqueued_batches = 0;
- // Queue must have max_enqueued_batchs > 1.
- EXPECT_FALSE(
- scheduler->AddQueue(queue_options, queue_0_callback, &queue_0).ok());
- queue_options.max_enqueued_batches = 2;
- TF_ASSERT_OK(
- scheduler->AddQueue(queue_options, queue_0_callback, &queue_0));
- EXPECT_EQ(10, queue_0->max_task_size());
- queue_options.max_batch_size = 0;
- // Queue must have max_batch_size > 0.
- EXPECT_FALSE(
- scheduler->AddQueue(queue_options, queue_1_callback, &queue_1).ok());
- queue_options.max_batch_size = 2;
- queue_options.max_enqueued_batches = 1;
- TF_ASSERT_OK(
- scheduler->AddQueue(queue_options, queue_1_callback, &queue_1));
-
- // Wait for scheduling_thread to sleep.
- env.BlockUntilThreadsAsleep(1);
- // Task larger than max_batch_size shouldn't schedule.
- EXPECT_FALSE(ScheduleTask(15, queue_0.get()).ok());
- TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
- TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
- env.AdvanceByMicroseconds(1);
-
- // Task larger than max_batch_size shouldn't schedule.
- EXPECT_FALSE(ScheduleTask(3, queue_1.get()).ok());
- TF_ASSERT_OK(ScheduleTask(1, queue_1.get()));
- TF_ASSERT_OK(ScheduleTask(1, queue_1.get()));
- env.AdvanceByMicroseconds(1);
- // Exceeds max_enqueued_batches, shouldn't schedule.
- EXPECT_FALSE(ScheduleTask(1, queue_1.get()).ok());
-
- TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
- // Exceeds max_enqueued_batches, shouldn't schedule.
- EXPECT_FALSE(ScheduleTask(6, queue_0.get()).ok());
- TF_ASSERT_OK(ScheduleTask(4, queue_0.get()));
-
- // Batches should be processed in order from oldest to newest.
- env.AdvanceByMicroseconds(1000);
- env.BlockUntilThreadsAsleep(2);
- EXPECT_EQ(queue_0_tasks, 10);
- EXPECT_EQ(queue_1_tasks, 0);
-
- env.AdvanceByMicroseconds(1000);
- env.BlockUntilThreadsAsleep(2);
- EXPECT_EQ(queue_0_tasks, 10);
- EXPECT_EQ(queue_1_tasks, 2);
-
- env.AdvanceByMicroseconds(1000);
- env.BlockUntilThreadsAsleep(2);
- EXPECT_EQ(queue_0_tasks, 19);
- EXPECT_EQ(queue_1_tasks, 2);
- start_teardown.Notify();
- }
- stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, RateFeedback) {
- test_util::FakeClockEnv env(Env::Default());
- Notification start_teardown, stop_teardown;
- std::unique_ptr<Thread> teardown_thread =
- CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
- {
- double feedback = 0;
- AdaptiveSharedBatchScheduler<FakeTask>::Options options;
- options.initial_scheduling_period_micros = 1000;
- options.min_scheduling_period_micros = 200;
- options.max_scheduling_period_micros = 2000;
- options.env = &env;
- options.scheduling_period_feedback = [&feedback] { return feedback; };
- options.feedback_smoothing_batches = 1;
- std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
- TF_ASSERT_OK(
- AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
- std::unique_ptr<BatchScheduler<FakeTask>> queue;
- int scheduled_items = 0;
- auto queue_callback = [&scheduled_items,
- &env](std::unique_ptr<Batch<FakeTask>> batch) {
- ASSERT_TRUE(batch->IsClosed());
- EXPECT_GT(batch->num_tasks(), 0);
- scheduled_items = 0;
- for (int i = 0; i < batch->num_tasks(); i++) {
- scheduled_items += batch->task(i).size();
- }
- env.SleepForMicroseconds(1);
- };
-
- TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
-
- // Wait for scheduling_thread to sleep.
- env.BlockUntilThreadsAsleep(1);
- // Enqueue 6 batches.
- for (int i = 0; i < 6; i++) {
- TF_ASSERT_OK(ScheduleTask(900 + i, queue.get()));
- env.AdvanceByMicroseconds(1);
- }
- feedback = -500;
- env.AdvanceByMicroseconds(994);
- env.BlockUntilThreadsAsleep(2); // scheduling period = 500 usec.
- EXPECT_EQ(scheduled_items, 900);
- env.AdvanceByMicroseconds(500);
- env.BlockUntilThreadsAsleep(2); // scheduling period = 250 usec.
- EXPECT_EQ(scheduled_items, 901);
- feedback = 0;
- env.AdvanceByMicroseconds(250);
- env.BlockUntilThreadsAsleep(2); // scheduling period = 250 usec.
- EXPECT_EQ(scheduled_items, 902);
- feedback = 10000; // large feedback should hit max_scheduling_period.
- env.AdvanceByMicroseconds(250);
- env.BlockUntilThreadsAsleep(2); // scheduling period = 2000 usec.
- EXPECT_EQ(scheduled_items, 903);
- feedback = -10000; // large feedback should hit min_scheduling_period.
- env.AdvanceByMicroseconds(1999);
- // No callback scheduled, only scheduling thread sleeping.
- env.BlockUntilThreadsAsleep(1);
- EXPECT_EQ(scheduled_items, 903);
- env.AdvanceByMicroseconds(1);
- env.BlockUntilThreadsAsleep(2); // scheduling period = 200 usec.
- EXPECT_EQ(scheduled_items, 904);
- env.AdvanceByMicroseconds(200);
- env.BlockUntilThreadsAsleep(2);
- EXPECT_EQ(scheduled_items, 905);
- start_teardown.Notify();
- }
- stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, FeedbackSmoothing) {
- test_util::FakeClockEnv env(Env::Default());
- Notification start_teardown, stop_teardown;
- std::unique_ptr<Thread> teardown_thread =
- CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
- {
- double feedback = 0;
- AdaptiveSharedBatchScheduler<FakeTask>::Options options;
- options.initial_scheduling_period_micros = 1000;
- options.env = &env;
- options.scheduling_period_feedback = [&feedback] { return feedback; };
- options.feedback_smoothing_batches = 3;
- std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
- TF_ASSERT_OK(
- AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
- std::unique_ptr<BatchScheduler<FakeTask>> queue;
- int scheduled_items = 0;
- auto queue_callback = [&scheduled_items,
- &env](std::unique_ptr<Batch<FakeTask>> batch) {
- ASSERT_TRUE(batch->IsClosed());
- EXPECT_GT(batch->num_tasks(), 0);
- scheduled_items = 0;
- for (int i = 0; i < batch->num_tasks(); i++) {
- scheduled_items += batch->task(i).size();
- }
- env.SleepForMicroseconds(1);
- };
-
- TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
-
- // Wait for scheduling_thread to sleep.
- env.BlockUntilThreadsAsleep(1);
- // Enqueue 4 batches.
- for (int i = 0; i < 4; i++) {
- TF_ASSERT_OK(ScheduleTask(900 + i, queue.get()));
- env.AdvanceByMicroseconds(1);
- }
- feedback = -300;
- env.AdvanceByMicroseconds(996);
- env.BlockUntilThreadsAsleep(2);
- // ewma_feedback = 100, scheduling_period = 900.
- EXPECT_EQ(scheduled_items, 900);
- env.AdvanceByMicroseconds(899);
- // No callback scheduled, only scheduling thread sleeping.
- env.BlockUntilThreadsAsleep(1);
- EXPECT_EQ(scheduled_items, 900);
- env.AdvanceByMicroseconds(1);
- env.BlockUntilThreadsAsleep(2);
- // ewma_feedback = 167, scheduling_period = 750.
- EXPECT_EQ(scheduled_items, 901);
- env.AdvanceByMicroseconds(749);
- // No callback scheduled, only scheduling thread sleeping.
- env.BlockUntilThreadsAsleep(1);
- EXPECT_EQ(scheduled_items, 901);
- feedback = 1000 / 3.;
- env.AdvanceByMicroseconds(1);
- env.BlockUntilThreadsAsleep(2);
- // emwa_feedback = 0, scheduling_period = 750.
- EXPECT_EQ(scheduled_items, 902);
- env.AdvanceByMicroseconds(749);
- // No callback scheduled, only scheduling thread sleeping.
- env.BlockUntilThreadsAsleep(1);
- EXPECT_EQ(scheduled_items, 902);
- env.AdvanceByMicroseconds(1);
- env.BlockUntilThreadsAsleep(2);
- EXPECT_EQ(scheduled_items, 903);
- start_teardown.Notify();
- }
- stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, QueueCapacityInfo) {
- test_util::FakeClockEnv env(Env::Default());
- Notification start_teardown, stop_teardown;
- std::unique_ptr<Thread> teardown_thread =
- CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
- {
- AdaptiveSharedBatchScheduler<FakeTask>::Options options;
- options.initial_scheduling_period_micros = 1000;
- options.env = &env;
- std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
- TF_ASSERT_OK(
- AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
- std::unique_ptr<BatchScheduler<FakeTask>> queue;
- int scheduled_items = 0;
- auto queue_callback = [&scheduled_items,
- &env](std::unique_ptr<Batch<FakeTask>> batch) {
- ASSERT_TRUE(batch->IsClosed());
- EXPECT_GT(batch->num_tasks(), 0);
- scheduled_items = 0;
- for (int i = 0; i < batch->num_tasks(); i++) {
- scheduled_items += batch->task(i).size();
- }
- env.SleepForMicroseconds(1);
- };
- AdaptiveSharedBatchScheduler<FakeTask>::QueueOptions queue_options;
- queue_options.max_batch_size = 10;
- queue_options.max_enqueued_batches = 10;
- TF_ASSERT_OK(scheduler->AddQueue(queue_options, queue_callback, &queue));
-
- // Wait for scheduling_thread to sleep.
- env.BlockUntilThreadsAsleep(1);
- // Enqueue 3 tasks.
- EXPECT_EQ(queue->NumEnqueuedTasks(), 0);
- EXPECT_EQ(queue->SchedulingCapacity(), 100);
- TF_ASSERT_OK(ScheduleTask(5, queue.get()));
- EXPECT_EQ(queue->NumEnqueuedTasks(), 1);
- EXPECT_EQ(queue->SchedulingCapacity(), 95);
- env.AdvanceByMicroseconds(1);
- TF_ASSERT_OK(ScheduleTask(6, queue.get()));
- EXPECT_EQ(queue->NumEnqueuedTasks(), 2);
- EXPECT_EQ(queue->SchedulingCapacity(), 84);
- env.AdvanceByMicroseconds(1);
- TF_ASSERT_OK(ScheduleTask(1, queue.get()));
- EXPECT_EQ(queue->NumEnqueuedTasks(), 3);
- EXPECT_EQ(queue->SchedulingCapacity(), 83);
-
- env.AdvanceByMicroseconds(998);
- env.BlockUntilThreadsAsleep(2);
- EXPECT_EQ(scheduled_items, 5);
- env.AdvanceByMicroseconds(1000);
- env.BlockUntilThreadsAsleep(2);
- EXPECT_EQ(scheduled_items, 7);
- start_teardown.Notify();
- }
- stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesImplementation) {
+TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimit) {
AdaptiveSharedBatchScheduler<FakeTask>::Options options;
- options.use_in_flight_batches_implementation = true;
options.initial_in_flight_batches_limit = 2;
options.batches_to_average_over = 1000;
mutex mu;
@@ -476,7 +112,7 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesImplementation) {
std::unique_ptr<BatchScheduler<FakeTask>> queue;
TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
- // Enqueue 3 batches.
+ // Enqueue 3 tasks, should result in 3 batches.
for (int i = 0; i < 3; i++) {
TF_ASSERT_OK(ScheduleTask(100, queue.get()));
}
@@ -490,7 +126,6 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimitTuning) {
{
AdaptiveSharedBatchScheduler<FakeTask>::Options options;
options.env = &env;
- options.use_in_flight_batches_implementation = true;
options.initial_in_flight_batches_limit = 2;
options.batches_to_average_over = 1;
auto queue_callback = [&env](std::unique_ptr<Batch<FakeTask>> batch) {
@@ -544,6 +179,125 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimitTuning) {
}
stop_teardown.Notify();
}
+
+TEST(AdaptiveSharedBatchSchedulerTest, DeleteQueue) {
+ AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+ options.initial_in_flight_batches_limit = 1;
+ options.batches_to_average_over = 1000;
+ mutex mu;
+ int processed_batches = 0;
+ Notification finish_processing;
+ auto queue_callback = [&mu, &processed_batches, &finish_processing](
+ std::unique_ptr<Batch<FakeTask>> batch) {
+ ASSERT_TRUE(batch->IsClosed());
+ EXPECT_GT(batch->num_tasks(), 0);
+ finish_processing.WaitForNotification();
+ mu.lock();
+ processed_batches++;
+ mu.unlock();
+ };
+
+ std::unique_ptr<Thread> queue_deleter;
+ std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+ TF_ASSERT_OK(
+ AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+ std::unique_ptr<BatchScheduler<FakeTask>> queue;
+ TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
+ // Enqueue 2 tasks, should result in 2 batches.
+ for (int i = 0; i < 2; i++) {
+ TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+ }
+ // Delete queue, should be kept alive until empty.
+ queue_deleter.reset(Env::Default()->StartThread(
+ {}, "QueueDeleterThread", [&queue, &mu, &processed_batches] {
+ queue.reset();
+ mutex_lock l(mu);
+ EXPECT_EQ(processed_batches, 2);
+ }));
+ // Give queue_deleter thread time to delete queue.
+ Env::Default()->SleepForMicroseconds(1000);
+ finish_processing.Notify();
+}
+
+TEST(AdaptiveSharedBatchSchedulerTest, DeleteScheduler) {
+ AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+ options.initial_in_flight_batches_limit = 1;
+ options.batches_to_average_over = 1000;
+ mutex mu;
+ int processed_batches = 0;
+ Notification finish_processing;
+ auto queue_callback = [&mu, &processed_batches, &finish_processing](
+ std::unique_ptr<Batch<FakeTask>> batch) {
+ ASSERT_TRUE(batch->IsClosed());
+ EXPECT_GT(batch->num_tasks(), 0);
+ finish_processing.WaitForNotification();
+ mu.lock();
+ processed_batches++;
+ mu.unlock();
+ };
+
+ std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+ TF_ASSERT_OK(
+ AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+ std::unique_ptr<BatchScheduler<FakeTask>> queue;
+ TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
+ // Enqueue 2 tasks, should result in 2 batches.
+ for (int i = 0; i < 2; i++) {
+ TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+ }
+ // Delete scheduler, should be kept alive until queues are empty.
+ scheduler.reset();
+ finish_processing.Notify();
+ while (true) {
+ mutex_lock l(mu);
+ if (processed_batches == 2) break;
+ }
+}
+
+TEST(AdaptiveSharedBatchSchedulerTest, QueueCapacityInfo) {
+ AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+ options.initial_in_flight_batches_limit = 1;
+ options.batches_to_average_over = 1000;
+ mutex mu;
+ int processed_batches = 0;
+ Notification finish_processing;
+ auto queue_callback = [&mu, &processed_batches, &finish_processing](
+ std::unique_ptr<Batch<FakeTask>> batch) {
+ ASSERT_TRUE(batch->IsClosed());
+ EXPECT_GT(batch->num_tasks(), 0);
+ mu.lock();
+ int batch_num = ++processed_batches;
+ mu.unlock();
+ if (batch_num == 1) {
+ finish_processing.WaitForNotification();
+ }
+ };
+ std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+ TF_ASSERT_OK(
+ AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+ std::unique_ptr<BatchScheduler<FakeTask>> queue;
+ TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
+ // Enqueue 2 tasks, should result in 2 batches.
+ for (int i = 0; i < 2; i++) {
+ TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+ }
+ // First batch was immediately processed, no longer counts as enqueued.
+ EXPECT_EQ(queue->NumEnqueuedTasks(), 1);
+ EXPECT_EQ(queue->SchedulingCapacity(), 9 * 1000 + 900);
+ // Enqueue 2 more tasks, should fall in same batch.
+ TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+ TF_ASSERT_OK(ScheduleTask(200, queue.get()));
+ EXPECT_EQ(queue->NumEnqueuedTasks(), 3);
+ EXPECT_EQ(queue->SchedulingCapacity(), 9 * 1000 + 600);
+ // Enqueue 1 more task, should create new batch.
+ TF_ASSERT_OK(ScheduleTask(700, queue.get()));
+ EXPECT_EQ(queue->NumEnqueuedTasks(), 4);
+ EXPECT_EQ(queue->SchedulingCapacity(), 8 * 1000 + 300);
+ finish_processing.Notify();
+}
} // namespace anonymous
} // namespace serving
} // namespace tensorflow