aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/kernels/data
diff options
context:
space:
mode:
authorGravatar Shivani Agrawal <shivaniagrawal@google.com>2018-09-07 10:25:58 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-09-07 10:29:55 -0700
commit9b15806d96cdb1ecaac1400582a01e3944b58406 (patch)
treefcdd3bd9dc7a125f827b256d6087e638d91988a6 /tensorflow/core/kernels/data
parent9a96685ec3b9ea4c50b1e8739daa15f870167110 (diff)
[data-stats] Adds `buffer_utilization` statistics for PrefetchDataset.
RELNOTES: n/a PiperOrigin-RevId: 211995741
Diffstat (limited to 'tensorflow/core/kernels/data')
-rw-r--r--tensorflow/core/kernels/data/BUILD1
-rw-r--r--tensorflow/core/kernels/data/prefetch_dataset_op.cc26
2 files changed, 22 insertions, 5 deletions
diff --git a/tensorflow/core/kernels/data/BUILD b/tensorflow/core/kernels/data/BUILD
index 3a1ac73f64..7c75212963 100644
--- a/tensorflow/core/kernels/data/BUILD
+++ b/tensorflow/core/kernels/data/BUILD
@@ -401,6 +401,7 @@ tf_kernel_library(
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:protos_all_cc",
+ "@com_google_absl//absl/strings",
],
)
diff --git a/tensorflow/core/kernels/data/prefetch_dataset_op.cc b/tensorflow/core/kernels/data/prefetch_dataset_op.cc
index baf448e572..0b4d79b02e 100644
--- a/tensorflow/core/kernels/data/prefetch_dataset_op.cc
+++ b/tensorflow/core/kernels/data/prefetch_dataset_op.cc
@@ -12,11 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
-#include <deque>
-
#include "tensorflow/core/kernels/data/prefetch_dataset_op.h"
+#include <deque>
+
+#include "absl/strings/string_view.h"
+#include "absl/strings/util.h"
#include "tensorflow/core/framework/partial_tensor_shape.h"
+#include "tensorflow/core/framework/stats_aggregator.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/lib/core/error_codes.pb.h"
@@ -71,7 +74,11 @@ class PrefetchDatasetOp::Dataset : public DatasetBase {
public:
explicit Iterator(const Params& params)
: DatasetIterator<Dataset>(params),
- auto_tuner_(params.dataset->buffer_size_) {}
+ auto_tuner_(params.dataset->buffer_size_) {
+ std::vector<string> components =
+ std::move(absl::StrSplit(params.prefix, "::", absl::SkipEmpty()));
+ prefix_end_ = components.back();
+ }
~Iterator() override {
// Signal the prefetch thread to terminate it. We will then
@@ -98,6 +105,7 @@ class PrefetchDatasetOp::Dataset : public DatasetBase {
bool* end_of_sequence) override {
{
mutex_lock l(mu_);
+ auto stats_aggregator = ctx->stats_aggregator();
TF_RETURN_IF_ERROR(EnsurePrefetchThreadStarted(ctx));
// Wait until the next element in the buffer has been
// produced, or we are shutting down.
@@ -113,7 +121,7 @@ class PrefetchDatasetOp::Dataset : public DatasetBase {
}
if (!buffer_.empty()) {
- return Consume(out_tensors, end_of_sequence);
+ return Consume(out_tensors, end_of_sequence, stats_aggregator);
}
if (prefetch_thread_finished_) {
@@ -201,8 +209,15 @@ class PrefetchDatasetOp::Dataset : public DatasetBase {
std::vector<Tensor> value;
};
- Status Consume(std::vector<Tensor>* out_tensors, bool* end_of_sequence)
+ Status Consume(std::vector<Tensor>* out_tensors, bool* end_of_sequence,
+ const std::shared_ptr<StatsAggregator>& stats_aggregator)
EXCLUSIVE_LOCKS_REQUIRED(mu_) {
+ if (stats_aggregator) {
+ stats_aggregator->AddToHistogram(
+ strings::StrCat(prefix_end_, "::buffer_utilization"),
+ {static_cast<float>(buffer_.size()) /
+ static_cast<float>(auto_tuner_.buffer_limit())});
+ }
// A new element is available. Forward the status from computing it, and
// (if we successfully got an element) the output values.
Status s = buffer_.front().status;
@@ -326,6 +341,7 @@ class PrefetchDatasetOp::Dataset : public DatasetBase {
mutex parent_mu_ ACQUIRED_BEFORE(mu_);
std::unique_ptr<IteratorBase> input_impl_ GUARDED_BY(parent_mu_);
condition_variable cond_var_;
+ string prefix_end_;
PrefetchAutotuner auto_tuner_ GUARDED_BY(mu_);
std::deque<BufferElement> buffer_ GUARDED_BY(mu_);
std::unique_ptr<Thread> prefetch_thread_ GUARDED_BY(mu_);