diff options
author | 2018-09-20 11:09:47 -0700 | |
---|---|---|
committer | 2018-09-20 11:15:45 -0700 | |
commit | ae59f459cd1e6bd2f2bdeb3b49cfedf0cdaf51a1 (patch) | |
tree | 74861418a375e96be9bd7d4b1de8373ed705422b /tensorflow/core/kernels/data | |
parent | b874ada5731ca2315600f97a2703561a30b82b89 (diff) |
[data-stats] Adds number of filtered elements as scalar summary, also adds number of filtered elements to monitoring counter.
PiperOrigin-RevId: 213846793
Diffstat (limited to 'tensorflow/core/kernels/data')
-rw-r--r-- | tensorflow/core/kernels/data/filter_dataset_op.cc | 48 |
1 files changed, 47 insertions, 1 deletions
diff --git a/tensorflow/core/kernels/data/filter_dataset_op.cc b/tensorflow/core/kernels/data/filter_dataset_op.cc index 19c35f94a6..00884314a9 100644 --- a/tensorflow/core/kernels/data/filter_dataset_op.cc +++ b/tensorflow/core/kernels/data/filter_dataset_op.cc @@ -14,11 +14,13 @@ limitations under the License. ==============================================================================*/ #include "tensorflow/core/common_runtime/function.h" #include "tensorflow/core/framework/partial_tensor_shape.h" +#include "tensorflow/core/framework/stats_aggregator.h" #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/kernels/data/captured_function.h" #include "tensorflow/core/kernels/data/dataset.h" #include "tensorflow/core/lib/gtl/cleanup.h" #include "tensorflow/core/lib/random/random.h" +#include "tensorflow/core/lib/strings/str_util.h" namespace tensorflow { namespace data { @@ -139,7 +141,13 @@ class FilterDatasetOp : public UnaryDatasetOpKernel { class Iterator : public DatasetIterator<FilterDatasetBase> { public: explicit Iterator(const Params& params) - : DatasetIterator<FilterDatasetBase>(params) {} + : DatasetIterator<FilterDatasetBase>(params), + filtered_elements_(0), + dropped_elements_(0) { + std::vector<string> components = + str_util::Split(params.prefix, "::", str_util::SkipEmpty()); + prefix_end_ = components.back(); + } Status Initialize(IteratorContext* ctx) override { TF_RETURN_IF_ERROR( @@ -154,6 +162,7 @@ class FilterDatasetOp : public UnaryDatasetOpKernel { // `input_impl_` and `f` are thread-safe. However, if multiple // threads enter this method, outputs may be observed in a // non-deterministic order. + auto stats_aggregator = ctx->stats_aggregator(); bool matched; do { { @@ -176,8 +185,34 @@ class FilterDatasetOp : public UnaryDatasetOpKernel { if (!matched) { // Clear the output tensor list since it didn't match. out_tensors->clear(); + if (stats_aggregator) { + mutex_lock l(mu_); + dropped_elements_++; + stats_aggregator->AddScalar( + strings::StrCat(prefix_end_, "::dropped_elements"), + static_cast<float>((dropped_elements_))); + // TODO(shivaniagrawal): multiple pipelines would collect + // aggregated number of dropped elements for all the pipelines, + // exploit tagged_context here. + stats_aggregator->IncrementCounter( + prefix_end_, "dropped_elements", static_cast<float>(1)); + } } } while (!matched); + // TODO(shivaniagrawal): add ratio of dropped_elements and + // filtered_elements as a histogram. + if (stats_aggregator) { + mutex_lock l(mu_); + filtered_elements_++; + stats_aggregator->AddScalar( + strings::StrCat(prefix_end_, "::filtered_elements"), + static_cast<float>((filtered_elements_))); + // TODO(shivaniagrawal): multiple pipelines would collect aggregated + // number of filtered elements for all the pipelines, exploit + // tagged_context here. + stats_aggregator->IncrementCounter(prefix_end_, "filtered_elements", + static_cast<float>(1)); + } *end_of_sequence = false; return Status::OK(); } @@ -190,6 +225,10 @@ class FilterDatasetOp : public UnaryDatasetOpKernel { else TF_RETURN_IF_ERROR( writer->WriteScalar(full_name("input_impls_empty"), "")); + TF_RETURN_IF_ERROR(writer->WriteScalar(full_name("filtered_elements"), + filtered_elements_)); + TF_RETURN_IF_ERROR(writer->WriteScalar(full_name("dropped_elements"), + dropped_elements_)); return Status::OK(); } @@ -200,12 +239,19 @@ class FilterDatasetOp : public UnaryDatasetOpKernel { input_impl_.reset(); else TF_RETURN_IF_ERROR(RestoreInput(ctx, reader, input_impl_)); + TF_RETURN_IF_ERROR(reader->ReadScalar(full_name("filtered_elements"), + &filtered_elements_)); + TF_RETURN_IF_ERROR(reader->ReadScalar(full_name("dropped_elements"), + &dropped_elements_)); return Status::OK(); } private: mutex mu_; std::unique_ptr<IteratorBase> input_impl_ GUARDED_BY(mu_); + int64 filtered_elements_ GUARDED_BY(mu_); + int64 dropped_elements_ GUARDED_BY(mu_); + string prefix_end_; }; const DatasetBase* const input_; |