diff options
author | 2018-09-07 10:25:58 -0700 | |
---|---|---|
committer | 2018-09-07 10:29:55 -0700 | |
commit | 9b15806d96cdb1ecaac1400582a01e3944b58406 (patch) | |
tree | fcdd3bd9dc7a125f827b256d6087e638d91988a6 /tensorflow/core/kernels/data | |
parent | 9a96685ec3b9ea4c50b1e8739daa15f870167110 (diff) |
[data-stats] Adds `buffer_utilization` statistics for PrefetchDataset.
RELNOTES: n/a
PiperOrigin-RevId: 211995741
Diffstat (limited to 'tensorflow/core/kernels/data')
-rw-r--r-- | tensorflow/core/kernels/data/BUILD | 1 | ||||
-rw-r--r-- | tensorflow/core/kernels/data/prefetch_dataset_op.cc | 26 |
2 files changed, 22 insertions, 5 deletions
diff --git a/tensorflow/core/kernels/data/BUILD b/tensorflow/core/kernels/data/BUILD index 3a1ac73f64..7c75212963 100644 --- a/tensorflow/core/kernels/data/BUILD +++ b/tensorflow/core/kernels/data/BUILD @@ -401,6 +401,7 @@ tf_kernel_library( "//tensorflow/core:lib", "//tensorflow/core:lib_internal", "//tensorflow/core:protos_all_cc", + "@com_google_absl//absl/strings", ], ) diff --git a/tensorflow/core/kernels/data/prefetch_dataset_op.cc b/tensorflow/core/kernels/data/prefetch_dataset_op.cc index baf448e572..0b4d79b02e 100644 --- a/tensorflow/core/kernels/data/prefetch_dataset_op.cc +++ b/tensorflow/core/kernels/data/prefetch_dataset_op.cc @@ -12,11 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include <deque> - #include "tensorflow/core/kernels/data/prefetch_dataset_op.h" +#include <deque> + +#include "absl/strings/string_view.h" +#include "absl/strings/util.h" #include "tensorflow/core/framework/partial_tensor_shape.h" +#include "tensorflow/core/framework/stats_aggregator.h" #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/lib/core/error_codes.pb.h" @@ -71,7 +74,11 @@ class PrefetchDatasetOp::Dataset : public DatasetBase { public: explicit Iterator(const Params& params) : DatasetIterator<Dataset>(params), - auto_tuner_(params.dataset->buffer_size_) {} + auto_tuner_(params.dataset->buffer_size_) { + std::vector<string> components = + std::move(absl::StrSplit(params.prefix, "::", absl::SkipEmpty())); + prefix_end_ = components.back(); + } ~Iterator() override { // Signal the prefetch thread to terminate it. We will then @@ -98,6 +105,7 @@ class PrefetchDatasetOp::Dataset : public DatasetBase { bool* end_of_sequence) override { { mutex_lock l(mu_); + auto stats_aggregator = ctx->stats_aggregator(); TF_RETURN_IF_ERROR(EnsurePrefetchThreadStarted(ctx)); // Wait until the next element in the buffer has been // produced, or we are shutting down. @@ -113,7 +121,7 @@ class PrefetchDatasetOp::Dataset : public DatasetBase { } if (!buffer_.empty()) { - return Consume(out_tensors, end_of_sequence); + return Consume(out_tensors, end_of_sequence, stats_aggregator); } if (prefetch_thread_finished_) { @@ -201,8 +209,15 @@ class PrefetchDatasetOp::Dataset : public DatasetBase { std::vector<Tensor> value; }; - Status Consume(std::vector<Tensor>* out_tensors, bool* end_of_sequence) + Status Consume(std::vector<Tensor>* out_tensors, bool* end_of_sequence, + const std::shared_ptr<StatsAggregator>& stats_aggregator) EXCLUSIVE_LOCKS_REQUIRED(mu_) { + if (stats_aggregator) { + stats_aggregator->AddToHistogram( + strings::StrCat(prefix_end_, "::buffer_utilization"), + {static_cast<float>(buffer_.size()) / + static_cast<float>(auto_tuner_.buffer_limit())}); + } // A new element is available. Forward the status from computing it, and // (if we successfully got an element) the output values. Status s = buffer_.front().status; @@ -326,6 +341,7 @@ class PrefetchDatasetOp::Dataset : public DatasetBase { mutex parent_mu_ ACQUIRED_BEFORE(mu_); std::unique_ptr<IteratorBase> input_impl_ GUARDED_BY(parent_mu_); condition_variable cond_var_; + string prefix_end_; PrefetchAutotuner auto_tuner_ GUARDED_BY(mu_); std::deque<BufferElement> buffer_ GUARDED_BY(mu_); std::unique_ptr<Thread> prefetch_thread_ GUARDED_BY(mu_); |