aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/batching
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-12-20 13:31:06 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-12-20 13:43:29 -0800
commit3d01a46171b5d256617627537187ab819c196aa4 (patch)
treeafea1473bb19a15a2c4adc3988f74c6f37a0a01b /tensorflow/contrib/batching
parent47249f349d13f5a11a8dc8c4026c54b49c88cfe0 (diff)
Destroy batch_thread_pool_ first. For the new in flight batches implementation, the callbacks scheduled on this thread pool refer to other class members which must remain alive until the thread pool is empty.
Also fix a similar lifetime issue in the unit tests. PiperOrigin-RevId: 179726389
Diffstat (limited to 'tensorflow/contrib/batching')
-rw-r--r--tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h5
-rw-r--r--tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc19
2 files changed, 14 insertions, 10 deletions
diff --git a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h
index a2cb146b8d..6773accc6f 100644
--- a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h
+++ b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler.h
@@ -93,6 +93,11 @@ class AdaptiveSharedBatchScheduler
: public std::enable_shared_from_this<
AdaptiveSharedBatchScheduler<TaskType>> {
public:
+ ~AdaptiveSharedBatchScheduler() {
+ // Finish processing batches before destorying other class members.
+ batch_thread_pool_.reset();
+ }
+
struct Options {
// The name to use for the pool of batch threads.
string thread_pool_name = {"batch_threads"};
diff --git a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc
index 18f1e55452..68ee277327 100644
--- a/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc
+++ b/tensorflow/contrib/batching/adaptive_shared_batch_scheduler_test.cc
@@ -450,10 +450,6 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesImplementation) {
options.use_in_flight_batches_implementation = true;
options.initial_in_flight_batches_limit = 2;
options.batches_to_average_over = 1000;
- std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
- TF_ASSERT_OK(
- AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
- std::unique_ptr<BatchScheduler<FakeTask>> queue;
mutex mu;
int processed_batches = 0;
Notification finish_processing;
@@ -474,7 +470,10 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesImplementation) {
}
finish_processing.WaitForNotification();
};
-
+ std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+ TF_ASSERT_OK(
+ AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+ std::unique_ptr<BatchScheduler<FakeTask>> queue;
TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
// Enqueue 3 batches.
@@ -494,10 +493,6 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimitTuning) {
options.use_in_flight_batches_implementation = true;
options.initial_in_flight_batches_limit = 2;
options.batches_to_average_over = 1;
- std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
- TF_ASSERT_OK(
- AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
- std::unique_ptr<BatchScheduler<FakeTask>> queue;
auto queue_callback = [&env](std::unique_ptr<Batch<FakeTask>> batch) {
ASSERT_TRUE(batch->IsClosed());
switch (batch->size()) {
@@ -515,8 +510,12 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimitTuning) {
break;
}
};
-
+ std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+ TF_ASSERT_OK(
+ AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+ std::unique_ptr<BatchScheduler<FakeTask>> queue;
TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
TF_ASSERT_OK(ScheduleTask(0, queue.get()));
double in_flight_batches_limit = 2;
while (scheduler->in_flight_batches_limit() == in_flight_batches_limit) {