aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Brennan Saeta <saeta@google.com>2018-03-28 14:04:01 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-03-28 14:07:18 -0700
commitd355f4e2644b68ea643f573c564936ec23b93787 (patch)
tree19ec2cfc40018c38bb5037353981f64f391a5393
parent70666858800a55585ae2775f97a1731db305388a (diff)
[tf.data] Autotune prefetch buffer sizes
In order to make it easier for tf.data users to achieve high performance with their input pipelines, this change adds the ability for the prefetch op to automatically tune its buffer size. To use the auto-tuning configuration of the `prefetch` transformation, simply skip passing in a buffer size. Example: ```python dataset = # ... dataset = dataset.prefetch() # Look ma, no buffer value req'd! ``` PiperOrigin-RevId: 190829736
-rw-r--r--tensorflow/contrib/data/__init__.py3
-rw-r--r--tensorflow/core/kernels/data/BUILD21
-rw-r--r--tensorflow/core/kernels/data/prefetch_autotuner.cc46
-rw-r--r--tensorflow/core/kernels/data/prefetch_autotuner.h71
-rw-r--r--tensorflow/core/kernels/data/prefetch_autotuner_test.cc82
-rw-r--r--tensorflow/core/kernels/data/prefetch_dataset_op.cc13
-rw-r--r--tensorflow/python/data/ops/dataset_ops.py2
7 files changed, 235 insertions, 3 deletions
diff --git a/tensorflow/contrib/data/__init__.py b/tensorflow/contrib/data/__init__.py
index 766721d8d2..7c3a9f82ff 100644
--- a/tensorflow/contrib/data/__init__.py
+++ b/tensorflow/contrib/data/__init__.py
@@ -82,3 +82,6 @@ from tensorflow.python.ops.parsing_ops import parse_single_example_v2 as parse_s
from tensorflow.python.util.all_util import remove_undocumented
remove_undocumented(__name__)
+
+# A constant that can be used to enable auto-tuning.
+AUTOTUNE = -1
diff --git a/tensorflow/core/kernels/data/BUILD b/tensorflow/core/kernels/data/BUILD
index 01754ec21a..a8784e3656 100644
--- a/tensorflow/core/kernels/data/BUILD
+++ b/tensorflow/core/kernels/data/BUILD
@@ -10,6 +10,7 @@ licenses(["notice"]) # Apache 2.0
load(
"//tensorflow:tensorflow.bzl",
"tf_kernel_library",
+ "tf_cc_test",
)
filegroup(
@@ -295,11 +296,31 @@ tf_kernel_library(
],
)
+cc_library(
+ name = "prefetch_autotuner",
+ srcs = ["prefetch_autotuner.cc"],
+ hdrs = ["prefetch_autotuner.h"],
+ deps = [
+ "//tensorflow/core:lib",
+ ],
+)
+
+tf_cc_test(
+ name = "prefetch_autotuner_test",
+ srcs = ["prefetch_autotuner_test.cc"],
+ deps = [
+ ":prefetch_autotuner",
+ "//tensorflow/core:test",
+ "//tensorflow/core:test_main",
+ ],
+)
+
tf_kernel_library(
name = "prefetch_dataset_op",
srcs = ["prefetch_dataset_op.cc"],
deps = [
":dataset",
+ ":prefetch_autotuner",
"//tensorflow/core:core_cpu_internal",
"//tensorflow/core:dataset_ops_op_lib",
"//tensorflow/core:framework",
diff --git a/tensorflow/core/kernels/data/prefetch_autotuner.cc b/tensorflow/core/kernels/data/prefetch_autotuner.cc
new file mode 100644
index 0000000000..b3272f6bcd
--- /dev/null
+++ b/tensorflow/core/kernels/data/prefetch_autotuner.cc
@@ -0,0 +1,46 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/core/kernels/data/prefetch_autotuner.h"
+
+namespace tensorflow {
+
+PrefetchAutotuner::PrefetchAutotuner(int64 initial_buffer_size)
+ : buffer_limit_(initial_buffer_size) {
+ if (initial_buffer_size == kAutoTune) {
+ mode_ = Mode::kUpswing;
+ buffer_limit_ = 1;
+ }
+}
+
+void PrefetchAutotuner::RecordConsumption(size_t current_buffer_size) {
+ switch (mode_) {
+ case Mode::kDisabled:
+ return;
+ case Mode::kUpswing:
+ if (current_buffer_size == buffer_limit_) {
+ mode_ = Mode::kDownswing;
+ }
+ return;
+ case Mode::kDownswing:
+ if (current_buffer_size == 0) {
+ buffer_limit_ *= 2; // Increase the buffer size.
+ mode_ = Mode::kUpswing;
+ }
+ return;
+ }
+}
+
+} // namespace tensorflow
diff --git a/tensorflow/core/kernels/data/prefetch_autotuner.h b/tensorflow/core/kernels/data/prefetch_autotuner.h
new file mode 100644
index 0000000000..fa8a184072
--- /dev/null
+++ b/tensorflow/core/kernels/data/prefetch_autotuner.h
@@ -0,0 +1,71 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef TENSORFLOW_CORE_KERNELS_DATA_PREFETCH_AUTOTUNER_H_
+#define TENSORFLOW_CORE_KERNELS_DATA_PREFETCH_AUTOTUNER_H_
+
+#include "tensorflow/core/platform/types.h"
+
+namespace tensorflow {
+
+// PrefetchAutotuner dynamically adjusts the buffer size of a prefetch iterator.
+//
+// PrefetchAutotuner attempts to find the minimum buffer size such that there is
+// always at least 1 element in the prefetch queue every time the downstream
+// iterator calls GetNext().
+//
+// One common failure mode of input pipelines is being throughput bound. No
+// amount of prefetching can address that performance mode. In order to guard
+// against this condition, PrefetchAutotuner will only increase the buffer_limit
+// if the prefetching thread is able to successfully fill the buffer at its
+// current size.
+//
+// Note: in the current implementation, we never decrease the buffer_limit().
+// This should change in the future!
+//
+// PrefetchAutotuner is NOT thread safe.
+class PrefetchAutotuner {
+ public:
+ static const int64 kAutoTune = -1;
+
+ explicit PrefetchAutotuner(int64 initial_buffer_size);
+
+ int64 buffer_limit() const { return buffer_limit_; }
+
+ void RecordConsumption(size_t current_buffer_size);
+ void RecordEmpty() { RecordConsumption(0); }
+
+ private:
+ // PrefetchAutotuner operates as a state machine.
+ enum class Mode {
+ // Disables the autotuning.
+ kDisabled,
+
+ // We have increased the size of the buffer, and will transition to
+ // kDownswing if we successfully fill the buffer.
+ kUpswing,
+
+ // We have successfully filled a buffer of this size. If we ever block the
+ // downstream iterator, we should increase the buffer size.
+ kDownswing,
+ };
+
+ int64 buffer_limit_;
+ Mode mode_ = Mode::kDisabled;
+};
+
+} // namespace tensorflow
+
+#endif // TENSORFLOW_CORE_KERNELS_DATA_PREFETCH_AUTOTUNER_H_
diff --git a/tensorflow/core/kernels/data/prefetch_autotuner_test.cc b/tensorflow/core/kernels/data/prefetch_autotuner_test.cc
new file mode 100644
index 0000000000..2f573dfb35
--- /dev/null
+++ b/tensorflow/core/kernels/data/prefetch_autotuner_test.cc
@@ -0,0 +1,82 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/core/kernels/data/prefetch_autotuner.h"
+
+#include "tensorflow/core/platform/test.h"
+
+namespace tensorflow {
+namespace {
+
+TEST(PrefetchAutotuner, Disabled) {
+ PrefetchAutotuner t(2);
+ EXPECT_EQ(2, t.buffer_limit());
+ t.RecordConsumption(0);
+ t.RecordConsumption(2);
+ t.RecordConsumption(0);
+ t.RecordConsumption(2);
+ EXPECT_EQ(2, t.buffer_limit());
+}
+
+TEST(PrefetchAutotuner, Enabled) {
+ PrefetchAutotuner t(PrefetchAutotuner::kAutoTune);
+ EXPECT_EQ(1, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to increase.
+ EXPECT_EQ(1, t.buffer_limit());
+ t.RecordConsumption(1);
+ EXPECT_EQ(1, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to increase.
+ EXPECT_EQ(2, t.buffer_limit());
+ t.RecordConsumption(2);
+ EXPECT_EQ(2, t.buffer_limit());
+ t.RecordConsumption(1);
+ EXPECT_EQ(2, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to increase.
+ EXPECT_EQ(4, t.buffer_limit());
+ t.RecordConsumption(4);
+ EXPECT_EQ(4, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to increase.
+ EXPECT_EQ(8, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to stay the same!
+ EXPECT_EQ(8, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to stay the same!
+ EXPECT_EQ(8, t.buffer_limit());
+}
+
+TEST(PrefetchAutotuner, EnabledSteady) {
+ PrefetchAutotuner t(PrefetchAutotuner::kAutoTune);
+ EXPECT_EQ(1, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to increase.
+ EXPECT_EQ(1, t.buffer_limit());
+ t.RecordConsumption(1);
+ EXPECT_EQ(1, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to increase.
+ EXPECT_EQ(2, t.buffer_limit());
+ t.RecordConsumption(2);
+ EXPECT_EQ(2, t.buffer_limit());
+ t.RecordConsumption(0); // Expect buffer limit to increase.
+ EXPECT_EQ(4, t.buffer_limit());
+
+ // Never reach zero again.
+ std::vector<size_t> consumption_values = {2, 3, 1, 4, 1, 2, 3, 1};
+ for (int i = 0; i < consumption_values.size(); ++i) {
+ t.RecordConsumption(consumption_values[i]);
+ EXPECT_EQ(4, t.buffer_limit())
+ << "Failed at index " << i << " with value: " << consumption_values[i];
+ }
+}
+
+} // namespace
+} // namespace tensorflow
diff --git a/tensorflow/core/kernels/data/prefetch_dataset_op.cc b/tensorflow/core/kernels/data/prefetch_dataset_op.cc
index 1c548a30d2..536de81fd8 100644
--- a/tensorflow/core/kernels/data/prefetch_dataset_op.cc
+++ b/tensorflow/core/kernels/data/prefetch_dataset_op.cc
@@ -17,6 +17,7 @@ limitations under the License.
#include "tensorflow/core/framework/partial_tensor_shape.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/kernels/data/dataset.h"
+#include "tensorflow/core/kernels/data/prefetch_autotuner.h"
#include "tensorflow/core/lib/core/error_codes.pb.h"
namespace tensorflow {
@@ -37,7 +38,8 @@ class PrefetchDatasetOp : public UnaryDatasetOpKernel {
int64 buffer_size;
OP_REQUIRES_OK(
ctx, ParseScalarArgument<int64>(ctx, "buffer_size", &buffer_size));
- OP_REQUIRES(ctx, buffer_size > 0,
+ OP_REQUIRES(ctx,
+ buffer_size > 0 || buffer_size == PrefetchAutotuner::kAutoTune,
errors::InvalidArgument("buffer_size must be > 0"));
*output = new Dataset(ctx, input, buffer_size);
@@ -85,7 +87,8 @@ class PrefetchDatasetOp : public UnaryDatasetOpKernel {
public:
explicit Iterator(const Params& params)
: DatasetIterator<Dataset>(params),
- input_impl_(params.dataset->input_->MakeIterator(params.prefix)) {}
+ input_impl_(params.dataset->input_->MakeIterator(params.prefix)),
+ auto_tuner_(params.dataset->buffer_size_) {}
~Iterator() override {
// Signal the prefetch thread to terminate it. We will then
@@ -113,6 +116,7 @@ class PrefetchDatasetOp : public UnaryDatasetOpKernel {
// Wait until the next element in the buffer has been
// produced, or we are shutting down.
while (!cancelled_ && !prefetch_thread_finished_ && buffer_.empty()) {
+ auto_tuner_.RecordEmpty();
cond_var_.wait(l);
}
@@ -129,6 +133,7 @@ class PrefetchDatasetOp : public UnaryDatasetOpKernel {
if (s.ok()) {
*out_tensors = std::move(buffer_.front().value);
}
+ auto_tuner_.RecordConsumption(buffer_.size());
buffer_.pop_front();
*end_of_sequence = false;
@@ -242,7 +247,8 @@ class PrefetchDatasetOp : public UnaryDatasetOpKernel {
// 1. Wait for a slot in the buffer.
{
mutex_lock l(mu_);
- while (!cancelled_ && buffer_.size() == dataset()->buffer_size_) {
+ while (!cancelled_ &&
+ buffer_.size() == auto_tuner_.buffer_limit()) {
cond_var_.wait(l);
}
@@ -323,6 +329,7 @@ class PrefetchDatasetOp : public UnaryDatasetOpKernel {
mutex parent_mu_ ACQUIRED_BEFORE(mu_);
const std::unique_ptr<IteratorBase> input_impl_ GUARDED_BY(parent_mu_);
condition_variable cond_var_;
+ PrefetchAutotuner auto_tuner_ GUARDED_BY(mu_);
std::deque<BufferElement> buffer_ GUARDED_BY(mu_);
std::unique_ptr<Thread> prefetch_thread_ GUARDED_BY(mu_);
bool cancelled_ GUARDED_BY(mu_) = false;
diff --git a/tensorflow/python/data/ops/dataset_ops.py b/tensorflow/python/data/ops/dataset_ops.py
index c0a6283be4..8729e085a3 100644
--- a/tensorflow/python/data/ops/dataset_ops.py
+++ b/tensorflow/python/data/ops/dataset_ops.py
@@ -2043,6 +2043,8 @@ class PrefetchDataset(Dataset):
"""See `Dataset.prefetch()` for details."""
super(PrefetchDataset, self).__init__()
self._input_dataset = input_dataset
+ if buffer_size is None:
+ buffer_size = -1 # This is the sentinel for auto-tuning.
self._buffer_size = ops.convert_to_tensor(
buffer_size, dtype=dtypes.int64, name="buffer_size")