aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Eugene Brevdo <ebrevdo@google.com>2017-03-17 17:56:58 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-03-17 19:11:33 -0700
commit504816b752cea83a848d3b3eb1cdf94b8bea1596 (patch)
treed1d2cd662bb147f3a8b3077b341f5ff1ab9ad57d
parent3345311d45a1e9123c72d44515c414126a9634a1 (diff)
Identify and fix bug in handling of variable-length data in ExampleParserOp.
The bug was that the parsing code generated blocks of output, where each block may contain 0 or more var-len batch entries. I had originally assumed that each block corresponded to a single minibatch entry. The existing unit tests were small enough that this always ended up to be true because for small batch sizes, the thread pool just gave each minibatch entry its own block. But for any reasonable batch size, this assumption broke down and I would accidentally skip copying some minibatch data to the output Tensors. This has now been fixed. Change: 150509644
-rw-r--r--tensorflow/core/util/example_proto_fast_parsing.cc51
-rw-r--r--tensorflow/python/kernel_tests/parsing_ops_test.py61
2 files changed, 88 insertions, 24 deletions
diff --git a/tensorflow/core/util/example_proto_fast_parsing.cc b/tensorflow/core/util/example_proto_fast_parsing.cc
index 6336cd951e..52044dbe29 100644
--- a/tensorflow/core/util/example_proto_fast_parsing.cc
+++ b/tensorflow/core/util/example_proto_fast_parsing.cc
@@ -793,8 +793,7 @@ void CopyOrMoveBlock(const string* b, const string* e, string* t) {
template <typename T>
void FillAndCopyVarLen(
const int d, const size_t num_elements,
- const size_t num_elements_per_minibatch, const size_t data_stride_size,
- const Config& config,
+ const size_t num_elements_per_minibatch, const Config& config,
const std::vector<std::vector<SparseBuffer>>& varlen_dense_buffers,
Tensor* values) {
const Tensor& default_value = config.dense[d].default_value;
@@ -803,23 +802,34 @@ void FillAndCopyVarLen(
std::fill(values->flat<T>().data(), values->flat<T>().data() + num_elements,
default_value.flat<T>()(0));
+ // Data is [batch_size, max_num_elements, data_stride_size]
+ // and num_elements_per_minibatch = max_num_elements * data_stride_size
+ auto data = values->flat<T>().data();
+
// Iterate over minibatch elements
for (size_t i = 0; i < varlen_dense_buffers.size(); ++i) {
const SparseBuffer& buffer = varlen_dense_buffers[i][d];
- const size_t offset = i * num_elements_per_minibatch;
- const size_t stride_size = config.dense[d].elements_per_stride;
+ // Number of examples being stored in this buffer
+ const auto& end_indices = buffer.example_end_indices;
+ const size_t examples_in_buffer = end_indices.size();
+ // const size_t stride_size = config.dense[d].elements_per_stride;
- // Copy values over.
- auto& list = GetListFromBuffer<T>(buffer);
+ const auto& list = GetListFromBuffer<T>(buffer);
auto list_ptr = list.begin();
- auto data = values->flat<T>().data() + offset;
- DCHECK(list.size() % stride_size == 0);
- const size_t num_entries = list.size() / stride_size;
- for (size_t j = 0; j < num_entries; ++j) {
- CopyOrMoveBlock(list_ptr, list_ptr + stride_size, data);
- list_ptr += stride_size;
- data += data_stride_size;
+
+ size_t elements_tally = 0;
+ // Iterate through all the examples stored in this buffer.
+ for (size_t j = 0; j < examples_in_buffer; ++j) {
+ // Number of elements stored for this example.
+ const size_t num_elems = end_indices[j] - elements_tally;
+ CopyOrMoveBlock(list_ptr, list_ptr + num_elems, data);
+ // Move forward this many elements in the varlen buffer.
+ list_ptr += num_elems;
+ // Move forward to the next minibatch entry in the values output.
+ data += num_elements_per_minibatch;
+ elements_tally = end_indices[j];
}
+ DCHECK(elements_tally == list.size());
}
}
@@ -948,7 +958,7 @@ Status FastParseExample(const Config& config,
size_t total_num_features = 0;
size_t max_num_features = 0;
for (auto& sparse_values_tmp : sparse_buffers) {
- std::vector<size_t>& end_indices =
+ const std::vector<size_t>& end_indices =
sparse_values_tmp[d].example_end_indices;
total_num_features += end_indices.back();
max_num_features = std::max(max_num_features, end_indices[0]);
@@ -1055,28 +1065,21 @@ Status FastParseExample(const Config& config,
if (num_elements == 0) return;
const size_t num_elements_per_minibatch = num_elements / batch_size;
- const size_t data_stride_size =
- (max_num_elements == 0)
- ? 0
- : (num_elements_per_minibatch / max_num_elements);
switch (config.dense[d].dtype) {
case DT_INT64: {
FillAndCopyVarLen<int64>(d, num_elements, num_elements_per_minibatch,
- data_stride_size, config, varlen_dense_buffers,
- &values);
+ config, varlen_dense_buffers, &values);
break;
}
case DT_FLOAT: {
FillAndCopyVarLen<float>(d, num_elements, num_elements_per_minibatch,
- data_stride_size, config, varlen_dense_buffers,
- &values);
+ config, varlen_dense_buffers, &values);
break;
}
case DT_STRING: {
FillAndCopyVarLen<string>(d, num_elements, num_elements_per_minibatch,
- data_stride_size, config,
- varlen_dense_buffers, &values);
+ config, varlen_dense_buffers, &values);
break;
}
default:
diff --git a/tensorflow/python/kernel_tests/parsing_ops_test.py b/tensorflow/python/kernel_tests/parsing_ops_test.py
index 741bc3f392..684c26dc99 100644
--- a/tensorflow/python/kernel_tests/parsing_ops_test.py
+++ b/tensorflow/python/kernel_tests/parsing_ops_test.py
@@ -18,6 +18,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
+import copy
import itertools
import numpy as np
@@ -685,6 +686,65 @@ class ParseExampleTest(test.TestCase):
}
}, expected_output)
+ def _testSerializedContainingVarLenDenseLargerBatch(self, batch_size):
+ # During parsing, data read from the serialized proto is stored in buffers.
+ # For small batch sizes, a buffer will contain one minibatch entry.
+ # For larger batch sizes, a buffer may contain several minibatch
+ # entries. This test identified a bug where the code that copied
+ # data out of the buffers and into the output tensors assumed each
+ # buffer only contained one minibatch entry. The bug has since been fixed.
+ truth_int = [i for i in range(batch_size)]
+ truth_str = [[("foo%d" % i).encode(), ("bar%d" % i).encode()]
+ for i in range(batch_size)]
+
+ expected_str = copy.deepcopy(truth_str)
+
+ # Delete some intermediate entries
+ for i in range(batch_size):
+ col = 1
+ if np.random.rand() < 0.25:
+ # w.p. 25%, drop out the second entry
+ expected_str[i][col] = b"default"
+ col -= 1
+ truth_str[i].pop()
+ if np.random.rand() < 0.25:
+ # w.p. 25%, drop out the second entry (possibly again)
+ expected_str[i][col] = b"default"
+ truth_str[i].pop()
+
+ expected_output = {
+ # Batch size batch_size, 1 time step.
+ "a": np.array(truth_int, dtype=np.int64).reshape(batch_size, 1),
+ # Batch size batch_size, 2 time steps.
+ "b": np.array(expected_str, dtype="|S").reshape(batch_size, 2),
+ }
+
+ original = [
+ example(features=features(
+ {"a": int64_feature([truth_int[i]]),
+ "b": bytes_feature(truth_str[i])}))
+ for i in range(batch_size)
+ ]
+
+ serialized = [m.SerializeToString() for m in original]
+
+ self._test({
+ "serialized": ops.convert_to_tensor(serialized, dtype=dtypes.string),
+ "features": {
+ "a": parsing_ops.FixedLenSequenceFeature(
+ shape=(), dtype=dtypes.int64, allow_missing=True,
+ default_value=-1),
+ "b": parsing_ops.FixedLenSequenceFeature(
+ shape=[], dtype=dtypes.string, allow_missing=True,
+ default_value="default"),
+ }
+ }, expected_output)
+
+ def testSerializedContainingVarLenDenseLargerBatch(self):
+ np.random.seed(3456)
+ for batch_size in (1, 10, 20, 100, 256):
+ self._testSerializedContainingVarLenDenseLargerBatch(batch_size)
+
def testSerializedContainingVarLenDense(self):
aname = "a"
bname = "b"
@@ -760,6 +820,7 @@ class ParseExampleTest(test.TestCase):
[-2, -2, -2, -2],
],
dtype=np.float32).reshape(4, 2, 2, 1)
+
self._test({
"example_names": example_names,
"serialized": ops.convert_to_tensor(serialized),