diff options
author | Eugene Brevdo <ebrevdo@google.com> | 2017-03-17 17:56:58 -0800 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2017-03-17 19:11:33 -0700 |
commit | 504816b752cea83a848d3b3eb1cdf94b8bea1596 (patch) | |
tree | d1d2cd662bb147f3a8b3077b341f5ff1ab9ad57d | |
parent | 3345311d45a1e9123c72d44515c414126a9634a1 (diff) |
Identify and fix bug in handling of variable-length data in ExampleParserOp.
The bug was that the parsing code generated blocks of output, where each block may contain 0 or more var-len batch entries. I had originally assumed that each block corresponded to a single minibatch entry. The existing unit tests were small enough that this always ended up to be true because for small batch sizes, the thread pool just gave each minibatch entry its own block. But for any reasonable batch size, this assumption broke down and I would accidentally skip copying some minibatch data to the output Tensors.
This has now been fixed.
Change: 150509644
-rw-r--r-- | tensorflow/core/util/example_proto_fast_parsing.cc | 51 | ||||
-rw-r--r-- | tensorflow/python/kernel_tests/parsing_ops_test.py | 61 |
2 files changed, 88 insertions, 24 deletions
diff --git a/tensorflow/core/util/example_proto_fast_parsing.cc b/tensorflow/core/util/example_proto_fast_parsing.cc index 6336cd951e..52044dbe29 100644 --- a/tensorflow/core/util/example_proto_fast_parsing.cc +++ b/tensorflow/core/util/example_proto_fast_parsing.cc @@ -793,8 +793,7 @@ void CopyOrMoveBlock(const string* b, const string* e, string* t) { template <typename T> void FillAndCopyVarLen( const int d, const size_t num_elements, - const size_t num_elements_per_minibatch, const size_t data_stride_size, - const Config& config, + const size_t num_elements_per_minibatch, const Config& config, const std::vector<std::vector<SparseBuffer>>& varlen_dense_buffers, Tensor* values) { const Tensor& default_value = config.dense[d].default_value; @@ -803,23 +802,34 @@ void FillAndCopyVarLen( std::fill(values->flat<T>().data(), values->flat<T>().data() + num_elements, default_value.flat<T>()(0)); + // Data is [batch_size, max_num_elements, data_stride_size] + // and num_elements_per_minibatch = max_num_elements * data_stride_size + auto data = values->flat<T>().data(); + // Iterate over minibatch elements for (size_t i = 0; i < varlen_dense_buffers.size(); ++i) { const SparseBuffer& buffer = varlen_dense_buffers[i][d]; - const size_t offset = i * num_elements_per_minibatch; - const size_t stride_size = config.dense[d].elements_per_stride; + // Number of examples being stored in this buffer + const auto& end_indices = buffer.example_end_indices; + const size_t examples_in_buffer = end_indices.size(); + // const size_t stride_size = config.dense[d].elements_per_stride; - // Copy values over. - auto& list = GetListFromBuffer<T>(buffer); + const auto& list = GetListFromBuffer<T>(buffer); auto list_ptr = list.begin(); - auto data = values->flat<T>().data() + offset; - DCHECK(list.size() % stride_size == 0); - const size_t num_entries = list.size() / stride_size; - for (size_t j = 0; j < num_entries; ++j) { - CopyOrMoveBlock(list_ptr, list_ptr + stride_size, data); - list_ptr += stride_size; - data += data_stride_size; + + size_t elements_tally = 0; + // Iterate through all the examples stored in this buffer. + for (size_t j = 0; j < examples_in_buffer; ++j) { + // Number of elements stored for this example. + const size_t num_elems = end_indices[j] - elements_tally; + CopyOrMoveBlock(list_ptr, list_ptr + num_elems, data); + // Move forward this many elements in the varlen buffer. + list_ptr += num_elems; + // Move forward to the next minibatch entry in the values output. + data += num_elements_per_minibatch; + elements_tally = end_indices[j]; } + DCHECK(elements_tally == list.size()); } } @@ -948,7 +958,7 @@ Status FastParseExample(const Config& config, size_t total_num_features = 0; size_t max_num_features = 0; for (auto& sparse_values_tmp : sparse_buffers) { - std::vector<size_t>& end_indices = + const std::vector<size_t>& end_indices = sparse_values_tmp[d].example_end_indices; total_num_features += end_indices.back(); max_num_features = std::max(max_num_features, end_indices[0]); @@ -1055,28 +1065,21 @@ Status FastParseExample(const Config& config, if (num_elements == 0) return; const size_t num_elements_per_minibatch = num_elements / batch_size; - const size_t data_stride_size = - (max_num_elements == 0) - ? 0 - : (num_elements_per_minibatch / max_num_elements); switch (config.dense[d].dtype) { case DT_INT64: { FillAndCopyVarLen<int64>(d, num_elements, num_elements_per_minibatch, - data_stride_size, config, varlen_dense_buffers, - &values); + config, varlen_dense_buffers, &values); break; } case DT_FLOAT: { FillAndCopyVarLen<float>(d, num_elements, num_elements_per_minibatch, - data_stride_size, config, varlen_dense_buffers, - &values); + config, varlen_dense_buffers, &values); break; } case DT_STRING: { FillAndCopyVarLen<string>(d, num_elements, num_elements_per_minibatch, - data_stride_size, config, - varlen_dense_buffers, &values); + config, varlen_dense_buffers, &values); break; } default: diff --git a/tensorflow/python/kernel_tests/parsing_ops_test.py b/tensorflow/python/kernel_tests/parsing_ops_test.py index 741bc3f392..684c26dc99 100644 --- a/tensorflow/python/kernel_tests/parsing_ops_test.py +++ b/tensorflow/python/kernel_tests/parsing_ops_test.py @@ -18,6 +18,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import copy import itertools import numpy as np @@ -685,6 +686,65 @@ class ParseExampleTest(test.TestCase): } }, expected_output) + def _testSerializedContainingVarLenDenseLargerBatch(self, batch_size): + # During parsing, data read from the serialized proto is stored in buffers. + # For small batch sizes, a buffer will contain one minibatch entry. + # For larger batch sizes, a buffer may contain several minibatch + # entries. This test identified a bug where the code that copied + # data out of the buffers and into the output tensors assumed each + # buffer only contained one minibatch entry. The bug has since been fixed. + truth_int = [i for i in range(batch_size)] + truth_str = [[("foo%d" % i).encode(), ("bar%d" % i).encode()] + for i in range(batch_size)] + + expected_str = copy.deepcopy(truth_str) + + # Delete some intermediate entries + for i in range(batch_size): + col = 1 + if np.random.rand() < 0.25: + # w.p. 25%, drop out the second entry + expected_str[i][col] = b"default" + col -= 1 + truth_str[i].pop() + if np.random.rand() < 0.25: + # w.p. 25%, drop out the second entry (possibly again) + expected_str[i][col] = b"default" + truth_str[i].pop() + + expected_output = { + # Batch size batch_size, 1 time step. + "a": np.array(truth_int, dtype=np.int64).reshape(batch_size, 1), + # Batch size batch_size, 2 time steps. + "b": np.array(expected_str, dtype="|S").reshape(batch_size, 2), + } + + original = [ + example(features=features( + {"a": int64_feature([truth_int[i]]), + "b": bytes_feature(truth_str[i])})) + for i in range(batch_size) + ] + + serialized = [m.SerializeToString() for m in original] + + self._test({ + "serialized": ops.convert_to_tensor(serialized, dtype=dtypes.string), + "features": { + "a": parsing_ops.FixedLenSequenceFeature( + shape=(), dtype=dtypes.int64, allow_missing=True, + default_value=-1), + "b": parsing_ops.FixedLenSequenceFeature( + shape=[], dtype=dtypes.string, allow_missing=True, + default_value="default"), + } + }, expected_output) + + def testSerializedContainingVarLenDenseLargerBatch(self): + np.random.seed(3456) + for batch_size in (1, 10, 20, 100, 256): + self._testSerializedContainingVarLenDenseLargerBatch(batch_size) + def testSerializedContainingVarLenDense(self): aname = "a" bname = "b" @@ -760,6 +820,7 @@ class ParseExampleTest(test.TestCase): [-2, -2, -2, -2], ], dtype=np.float32).reshape(4, 2, 2, 1) + self._test({ "example_names": example_names, "serialized": ops.convert_to_tensor(serialized), |