diff options
-rw-r--r-- | tensorflow/core/kernels/example_parsing_ops.cc | 24 | ||||
-rw-r--r-- | tensorflow/core/kernels/example_parsing_ops_test.cc | 54 | ||||
-rw-r--r-- | tensorflow/core/ops/parsing_ops.cc | 13 | ||||
-rw-r--r-- | tensorflow/core/ops/parsing_ops_test.cc | 42 | ||||
-rw-r--r-- | tensorflow/core/util/example_proto_fast_parsing.cc | 317 | ||||
-rw-r--r-- | tensorflow/core/util/example_proto_fast_parsing.h | 1 | ||||
-rw-r--r-- | tensorflow/core/util/example_proto_helper.h | 26 | ||||
-rw-r--r-- | tensorflow/python/kernel_tests/parsing_ops_test.py | 98 | ||||
-rw-r--r-- | tensorflow/python/ops/parsing_ops.py | 48 |
9 files changed, 525 insertions, 98 deletions
diff --git a/tensorflow/core/kernels/example_parsing_ops.cc b/tensorflow/core/kernels/example_parsing_ops.cc index d03f8fa33a..f4c4460fa4 100644 --- a/tensorflow/core/kernels/example_parsing_ops.cc +++ b/tensorflow/core/kernels/example_parsing_ops.cc @@ -92,7 +92,18 @@ class ExampleParserOp : public OpKernel { for (int d = 0; d < static_cast<int>(attrs_.num_dense); ++d) { const Tensor& def_value = dense_defaults[d]; - if (def_value.NumElements() > 0) { + if (attrs_.variable_length[d]) { + OP_REQUIRES(ctx, def_value.NumElements() == 1, + errors::InvalidArgument( + "dense_shape[", d, "] is a variable length shape: ", + attrs_.dense_shapes[d].DebugString(), + ", therefore " + "def_value[", + d, + "] must contain a single element (" + "the padding element). But its shape is: ", + def_value.shape().DebugString())); + } else if (def_value.NumElements() > 0) { OP_REQUIRES(ctx, attrs_.dense_shapes[d].IsCompatibleWith(def_value.shape()), errors::InvalidArgument( @@ -100,12 +111,12 @@ class ExampleParserOp : public OpKernel { "].shape() == ", def_value.shape().DebugString(), " is not compatible with dense_shapes_[", d, "] == ", attrs_.dense_shapes[d].DebugString())); - OP_REQUIRES(ctx, def_value.dtype() == attrs_.dense_types[d], - errors::InvalidArgument( - "dense_defaults[", d, "].dtype() == ", - DataTypeString(def_value.dtype()), " != dense_types_[", - d, "] == ", DataTypeString(attrs_.dense_types[d]))); } + OP_REQUIRES(ctx, def_value.dtype() == attrs_.dense_types[d], + errors::InvalidArgument( + "dense_defaults[", d, "].dtype() == ", + DataTypeString(def_value.dtype()), " != dense_types_[", d, + "] == ", DataTypeString(attrs_.dense_types[d]))); } example::Result result; @@ -114,6 +125,7 @@ class ExampleParserOp : public OpKernel { for (int d = 0; d < attrs_.num_dense; ++d) { config.dense.push_back({dense_keys_t[d], attrs_.dense_types[d], attrs_.dense_shapes[d], dense_defaults[d], + attrs_.variable_length[d], attrs_.elements_per_stride[d]}); } for (int d = 0; d < attrs_.num_sparse; ++d) { diff --git a/tensorflow/core/kernels/example_parsing_ops_test.cc b/tensorflow/core/kernels/example_parsing_ops_test.cc index 67ac477713..29dbfd3b1b 100644 --- a/tensorflow/core/kernels/example_parsing_ops_test.cc +++ b/tensorflow/core/kernels/example_parsing_ops_test.cc @@ -127,9 +127,11 @@ template <> ExampleTensorMap ExampleStore<FloatFiller>::serialized_example = ExampleStore<FloatFiller>::GetSerializedExamples(); -template <typename S, bool BenchmarkDense> +enum BenchmarkType { kDense, kSparse, kVarLenDense }; + +template <typename S, BenchmarkType b_type> struct BenchmarkOptions { - bool benchmark_dense = BenchmarkDense; + int benchmark_type = b_type; typedef S Store; typename S::Filler filler; }; @@ -145,19 +147,28 @@ static Graph* ParseExample(int batch_size, int num_keys, int feature_size) { std::vector<NodeBuilder::NodeOut> dense_keys; std::vector<NodeBuilder::NodeOut> dense_defaults; std::vector<DataType> sparse_types; - std::vector<TensorShape> dense_shapes; + std::vector<PartialTensorShape> dense_shapes; Options opt; for (int i = 0; i < num_keys; ++i) { Tensor key(DT_STRING, TensorShape()); key.scalar<string>()() = strings::Printf("feature_%d", i); - if (opt.benchmark_dense) { - dense_keys.emplace_back(test::graph::Constant(g, key)); - dense_defaults.emplace_back(test::graph::Constant( - g, opt.filler.make_dense_default(feature_size))); - dense_shapes.push_back(TensorShape({feature_size})); - } else { - sparse_keys.emplace_back(test::graph::Constant(g, key)); - sparse_types.push_back(opt.filler.dtype); + switch (opt.benchmark_type) { + case kDense: + dense_keys.emplace_back(test::graph::Constant(g, key)); + dense_defaults.emplace_back(test::graph::Constant( + g, opt.filler.make_dense_default(feature_size))); + dense_shapes.push_back(PartialTensorShape({feature_size})); + break; + case kVarLenDense: + dense_keys.emplace_back(test::graph::Constant(g, key)); + dense_defaults.emplace_back( + test::graph::Constant(g, opt.filler.make_dense_default(1))); + dense_shapes.push_back(PartialTensorShape({-1})); + break; + case kSparse: + sparse_keys.emplace_back(test::graph::Constant(g, key)); + sparse_types.push_back(opt.filler.dtype); + break; } } @@ -176,12 +187,18 @@ static Graph* ParseExample(int batch_size, int num_keys, int feature_size) { } // Benchmark settings (Sparse, Dense) X (Bytes, Int64, Float) -typedef BenchmarkOptions<ExampleStore<BytesFiller>, false> SparseString; -typedef BenchmarkOptions<ExampleStore<BytesFiller>, true> DenseString; -typedef BenchmarkOptions<ExampleStore<Int64Filler>, false> SparseInt64; -typedef BenchmarkOptions<ExampleStore<Int64Filler>, true> DenseInt64; -typedef BenchmarkOptions<ExampleStore<FloatFiller>, false> SparseFloat; -typedef BenchmarkOptions<ExampleStore<FloatFiller>, true> DenseFloat; +typedef BenchmarkOptions<ExampleStore<BytesFiller>, kSparse> SparseString; +typedef BenchmarkOptions<ExampleStore<BytesFiller>, kDense> DenseString; +typedef BenchmarkOptions<ExampleStore<BytesFiller>, kVarLenDense> + VarLenDenseString; +typedef BenchmarkOptions<ExampleStore<Int64Filler>, kSparse> SparseInt64; +typedef BenchmarkOptions<ExampleStore<Int64Filler>, kDense> DenseInt64; +typedef BenchmarkOptions<ExampleStore<Int64Filler>, kVarLenDense> + VarLenDenseInt64; +typedef BenchmarkOptions<ExampleStore<FloatFiller>, kSparse> SparseFloat; +typedef BenchmarkOptions<ExampleStore<FloatFiller>, kDense> DenseFloat; +typedef BenchmarkOptions<ExampleStore<FloatFiller>, kVarLenDense> + VarLenDenseFloat; // B == batch_size, K == num_keys. F == feature_size. // K must be one of 10, 100, 1000 @@ -205,9 +222,12 @@ typedef BenchmarkOptions<ExampleStore<FloatFiller>, true> DenseFloat; BM_AllParseExample(SparseString); BM_AllParseExample(DenseString); +BM_AllParseExample(VarLenDenseString); BM_AllParseExample(SparseInt64); BM_AllParseExample(DenseInt64); +BM_AllParseExample(VarLenDenseInt64); BM_AllParseExample(SparseFloat); BM_AllParseExample(DenseFloat); +BM_AllParseExample(VarLenDenseFloat); } // end namespace tensorflow diff --git a/tensorflow/core/ops/parsing_ops.cc b/tensorflow/core/ops/parsing_ops.cc index 4ca3f2e07e..b563656f39 100644 --- a/tensorflow/core/ops/parsing_ops.cc +++ b/tensorflow/core/ops/parsing_ops.cc @@ -113,7 +113,11 @@ dense_defaults: A list of Ndense Tensors (some may be empty). when the example's feature_map lacks dense_key[j]. If an empty Tensor is provided for dense_defaults[j], then the Feature dense_keys[j] is required. The input type is inferred from dense_defaults[j], even when it's empty. - If dense_defaults[j] is not empty, its shape must match dense_shapes[j]. + If dense_defaults[j] is not empty, and dense_shapes[j] is fully defined, + then the shape of dense_defaults[j] must match that of dense_shapes[j]. + If dense_shapes[j] has an undefined major dimension (variable strides dense + feature), dense_defaults[j] must contain a single element: + the padding element. dense_shapes: A list of Ndense shapes; the shapes of data in each Feature given in dense_keys. The number of elements in the Feature corresponding to dense_key[j] @@ -121,6 +125,13 @@ dense_shapes: A list of Ndense shapes; the shapes of data in each Feature If dense_shapes[j] == (D0, D1, ..., DN) then the shape of output Tensor dense_values[j] will be (|serialized|, D0, D1, ..., DN): The dense outputs are just the inputs row-stacked by batch. + This works for dense_shapes[j] = (-1, D1, ..., DN). In this case + the shape of the output Tensor dense_values[j] will be + (|serialized|, M, D1, .., DN), where M is the maximum number of blocks + of elements of length D1 * .... * DN, across all minibatch entries + in the input. Any minibatch entry with less than M blocks of elements of + length D1 * ... * DN will be padded with the corresponding default_value + scalar element along the second dimension. sparse_keys: A list of Nsparse string Tensors (scalars). The keys expected in the Examples' features associated with sparse values. sparse_types: A list of Nsparse types; the data types of data in each Feature diff --git a/tensorflow/core/ops/parsing_ops_test.cc b/tensorflow/core/ops/parsing_ops_test.cc index 6167c136b1..dc2aa19ee1 100644 --- a/tensorflow/core/ops/parsing_ops_test.cc +++ b/tensorflow/core/ops/parsing_ops_test.cc @@ -61,11 +61,19 @@ TEST(ParsingOpsTest, DecodeCSV_ShapeFn) { } static std::vector<TensorShapeProto> MakeDenseShapes(int size, - bool add_extra_shape) { + bool add_extra_shape, + int unknown_outer_dims) { std::vector<TensorShapeProto> shapes(size); for (int i = 0; i < size; ++i) { - // Make shapes be the sequence [1]; [1,2], [1,2,3]... - if (i > 0) shapes[i] = shapes[i - 1]; + // Make shapes be the sequence [?,1]; [?,1,2], [?,1,2,3]... + // where the number of prefixed ? depends on unknown_outer_dims. + if (i == 0) { + for (int d = 0; d < unknown_outer_dims; ++d) { + shapes[i].add_dim()->set_size(-1); + } + } else { + shapes[i] = shapes[i - 1]; + } shapes[i].add_dim()->set_size(i + 1); } if (add_extra_shape) { @@ -77,7 +85,8 @@ static std::vector<TensorShapeProto> MakeDenseShapes(int size, TEST(ParsingOpsTest, ParseExample_ShapeFn) { ShapeInferenceTestOp op("ParseExample"); auto set_outputs = [&op](int num_sparse, int num_dense, - bool add_extra_shape = false) { + bool add_extra_shape = false, + int unknown_outer_dims = 0) { using NodeOutList = std::vector<NodeDefBuilder::NodeOut>; using DataTypeList = std::vector<DataType>; NodeDefBuilder::NodeOut string_in{"a", 0, DT_STRING}; @@ -91,7 +100,8 @@ TEST(ParsingOpsTest, ParseExample_ShapeFn) { .Input(NodeOutList(num_dense, string_in)) .Attr("sparse_types", DataTypeList(num_sparse, DT_FLOAT)) .Attr("dense_types", DataTypeList(num_dense, DT_FLOAT)) - .Attr("dense_shapes", MakeDenseShapes(num_dense, add_extra_shape)) + .Attr("dense_shapes", MakeDenseShapes(num_dense, add_extra_shape, + unknown_outer_dims)) .Finalize(&op.node_def)); }; @@ -115,6 +125,24 @@ TEST(ParsingOpsTest, ParseExample_ShapeFn) { set_outputs(2, 3, true /* add_extra_shape */); INFER_ERROR("len(dense_keys) != len(dense_shapes)", op, "?;?;?;?;?;?;?;?;?;?"); + + // Allow variable strides + set_outputs(2, 3, false /* add_extra_shape */, 1 /* unknown_outer_dims */); + INFER_OK(op, "?;?;?;?;?;?;?;?;?;?", + ("[?,2];[?,2];[?];[?];[2];[2];" // sparse outputs + "[?,?,1];[?,?,1,2];[?,?,1,2,3]")); // dense outputs + INFER_OK(op, "[10];?;?;?;?;?;?;?;?;?", + ("[?,2];[?,2];[?];[?];[2];[2];" // sparse outputs + "[d0_0,?,1];[d0_0,?,1,2];[d0_0,?,1,2,3]")); // dense outputs + + set_outputs(2, 3, true /* add_extra_shape */, 1 /* unknown_outer_dims */); + INFER_ERROR("len(dense_keys) != len(dense_shapes)", op, + "?;?;?;?;?;?;?;?;?;?"); + + // Variable inner dimensions are not supported + set_outputs(2, 3, false /* add_extra_shape */, 2 /* unknown_outer_dims */); + INFER_ERROR("shapes[0] has unknown rank or unknown inner dimensions", op, + "?;?;?;?;?;?;?;?;?;?"); } TEST(ParsingOpsTest, ParseSingleSequenceExample_ShapeFn) { @@ -142,13 +170,13 @@ TEST(ParsingOpsTest, ParseSingleSequenceExample_ShapeFn) { .Attr("context_dense_types", DataTypeList(num_context_dense, DT_FLOAT)) .Attr("context_dense_shapes", - MakeDenseShapes(num_context_dense, add_extra_shape)) + MakeDenseShapes(num_context_dense, add_extra_shape, 0)) .Attr("feature_list_sparse_types", DataTypeList(num_feature_list_sparse, DT_FLOAT)) .Attr("feature_list_dense_types", DataTypeList(num_feature_list_dense, DT_FLOAT)) .Attr("feature_list_dense_shapes", - MakeDenseShapes(num_feature_list_dense, add_extra_shape)) + MakeDenseShapes(num_feature_list_dense, add_extra_shape, 0)) .Finalize(&op.node_def)); }; diff --git a/tensorflow/core/util/example_proto_fast_parsing.cc b/tensorflow/core/util/example_proto_fast_parsing.cc index e14f50551e..facb092dbc 100644 --- a/tensorflow/core/util/example_proto_fast_parsing.cc +++ b/tensorflow/core/util/example_proto_fast_parsing.cc @@ -424,6 +424,7 @@ Status FastParseSerializedExample( const size_t example_index, const Config& config, const PresizedCuckooMap<std::pair<size_t, Type>>& config_index, SeededHasher hasher, std::vector<Tensor>* output_dense, + std::vector<SparseBuffer>* output_varlen_dense, std::vector<SparseBuffer>* output_sparse) { DCHECK(output_dense != nullptr); DCHECK(output_sparse != nullptr); @@ -463,9 +464,9 @@ Status FastParseSerializedExample( } auto example_error = [&](StringPiece suffix) { - return errors::InvalidArgument("Name: ", example_name, ", Key: ", - feature_name, ", Index: ", example_index, - ". ", suffix); + return errors::InvalidArgument("Name: ", example_name, + ", Key: ", feature_name, + ", Index: ", example_index, ". ", suffix); }; auto parse_error = [&] { @@ -494,54 +495,117 @@ Status FastParseSerializedExample( dense_feature_last_example[d] = example_index; if (example_dtype != config.dense[d].dtype) { - return example_error( - strings::StrCat("Data types don't match. Data type: ", - DataTypeString(example_dtype), "Expected type: ", - DataTypeString(config.dense[d].dtype))); + return example_error(strings::StrCat( + "Data types don't match. Data type: ", + DataTypeString(example_dtype), + "Expected type: ", DataTypeString(config.dense[d].dtype))); } - Tensor& out = (*output_dense)[d]; + if (!config.dense[d].variable_length) { + Tensor& out = (*output_dense)[d]; + + const std::size_t num_elements = config.dense[d].elements_per_stride; + const std::size_t offset = example_index * num_elements; + + auto shape_error = [&](size_t size, StringPiece type_str) { + return example_error(strings::StrCat( + "Number of ", type_str, + " values != expected. " + "Values size: ", + size, + " but output shape: ", config.dense[d].shape.DebugString())); + }; + + switch (config.dense[d].dtype) { + case DT_INT64: { + auto out_p = out.flat<int64>().data() + offset; + LimitedArraySlice<int64> slice(out_p, num_elements); + if (!feature.ParseInt64List(&slice)) return parse_error(); + if (slice.EndDistance() != 0) { + return shape_error(num_elements - slice.EndDistance(), "int64"); + } + break; + } + case DT_FLOAT: { + auto out_p = out.flat<float>().data() + offset; + LimitedArraySlice<float> slice(out_p, num_elements); + if (!feature.ParseFloatList(&slice)) return parse_error(); + if (slice.EndDistance() != 0) { + return shape_error(num_elements - slice.EndDistance(), "float"); + } + break; + } + case DT_STRING: { + auto out_p = out.flat<string>().data() + offset; + LimitedArraySlice<string> slice(out_p, num_elements); + if (!feature.ParseBytesList(&slice)) return parse_error(); + if (slice.EndDistance() != 0) { + return shape_error(num_elements - slice.EndDistance(), "bytes"); + } + break; + } + default: + CHECK(false) << "Should not happen."; + } + } else { // if variable length + SparseBuffer& out = (*output_varlen_dense)[d]; - const std::size_t num_elements = config.dense[d].elements_per_stride; - const std::size_t offset = example_index * num_elements; + const std::size_t num_elements = config.dense[d].elements_per_stride; - auto shape_error = [&](size_t size, StringPiece type_str) { - return example_error(strings::StrCat( - "Number of ", type_str, - " values != expected. " - "Values size: ", - size, " but output shape: ", config.dense[d].shape.DebugString())); - }; + if (example_dtype != DT_INVALID && + example_dtype != config.dense[d].dtype) { + return example_error(strings::StrCat( + "Data types don't match. ", + "Expected type: ", DataTypeString(config.dense[d].dtype))); + } - switch (config.dense[d].dtype) { - case DT_INT64: { - auto out_p = out.flat<int64>().data() + offset; - LimitedArraySlice<int64> slice(out_p, num_elements); - if (!feature.ParseInt64List(&slice)) return parse_error(); - if (slice.EndDistance() != 0) { - return shape_error(num_elements - slice.EndDistance(), "int64"); + auto shape_error = [&](size_t size, StringPiece type_str) { + return example_error(strings::StrCat( + "Number of ", type_str, + " values is not a multiple of stride length. Saw ", size, + " values but output shape is: ", + config.dense[d].shape.DebugString())); + }; + + switch (config.dense[d].dtype) { + case DT_INT64: { + if (example_dtype != DT_INVALID) { + if (!feature.ParseInt64List(&out.int64_list)) { + return parse_error(); + } + if (out.int64_list.size() % num_elements != 0) { + return shape_error(out.int64_list.size(), "int64"); + } + } + out.example_end_indices.push_back(out.int64_list.size()); + break; } - break; - } - case DT_FLOAT: { - auto out_p = out.flat<float>().data() + offset; - LimitedArraySlice<float> slice(out_p, num_elements); - if (!feature.ParseFloatList(&slice)) return parse_error(); - if (slice.EndDistance() != 0) { - return shape_error(num_elements - slice.EndDistance(), "float"); + case DT_FLOAT: { + if (example_dtype != DT_INVALID) { + if (!feature.ParseFloatList(&out.float_list)) { + return parse_error(); + } + if (out.float_list.size() % num_elements != 0) { + return shape_error(out.float_list.size(), "float"); + } + } + out.example_end_indices.push_back(out.float_list.size()); + break; } - break; - } - case DT_STRING: { - auto out_p = out.flat<string>().data() + offset; - LimitedArraySlice<string> slice(out_p, num_elements); - if (!feature.ParseBytesList(&slice)) return parse_error(); - if (slice.EndDistance() != 0) { - return shape_error(num_elements - slice.EndDistance(), "bytes"); + case DT_STRING: { + if (example_dtype != DT_INVALID) { + if (!feature.ParseBytesList(&out.bytes_list)) { + return parse_error(); + } + if (out.bytes_list.size() % num_elements != 0) { + return shape_error(out.bytes_list.size(), "bytes"); + } + } + out.example_end_indices.push_back(out.bytes_list.size()); + break; } - break; + default: + CHECK(false) << "Should not happen."; } - default: - CHECK(false) << "Should not happen."; } } else { // If feature was already visited, skip. @@ -563,9 +627,9 @@ Status FastParseSerializedExample( SparseBuffer& out = (*output_sparse)[d]; if (example_dtype != DT_INVALID && example_dtype != config.sparse[d].dtype) { - return example_error( - strings::StrCat("Data types don't match. ", "Expected type: ", - DataTypeString(config.sparse[d].dtype))); + return example_error(strings::StrCat( + "Data types don't match. ", + "Expected type: ", DataTypeString(config.sparse[d].dtype))); } switch (config.sparse[d].dtype) { @@ -602,8 +666,9 @@ Status FastParseSerializedExample( } } - // Handle missing dense features. + // Handle missing dense features for fixed strides. for (size_t d = 0; d < config.dense.size(); ++d) { + if (config.dense[d].variable_length) continue; if (dense_feature_last_example[d] == example_index) continue; if (config.dense[d].default_value.NumElements() == 0) { return errors::InvalidArgument( @@ -637,6 +702,16 @@ Status FastParseSerializedExample( } } + // Handle missing varlen dense features. + for (size_t d = 0; d < config.dense.size(); ++d) { + if (!config.dense[d].variable_length) continue; + if (dense_feature_last_example[d] == example_index) continue; + SparseBuffer& out = (*output_varlen_dense)[d]; + size_t prev_example_end_index = + out.example_end_indices.empty() ? 0 : out.example_end_indices.back(); + out.example_end_indices.push_back(prev_example_end_index); + } + // Handle missing sparse features. for (size_t d = 0; d < config.sparse.size(); ++d) { if (sparse_feature_last_example[d] == example_index) continue; @@ -661,6 +736,65 @@ Status CheckConfigDataType(DataType dtype) { } } +template <typename T> +const SmallVector<T>& GetListFromBuffer(const SparseBuffer& buffer); + +template <> +const SmallVector<int64>& GetListFromBuffer<int64>(const SparseBuffer& buffer) { + return buffer.int64_list; +} +template <> +const SmallVector<float>& GetListFromBuffer<float>(const SparseBuffer& buffer) { + return buffer.float_list; +} +template <> +const SmallVector<string>& GetListFromBuffer<string>( + const SparseBuffer& buffer) { + return buffer.bytes_list; +} + +template <typename T> +void CopyOrMoveBlock(const T* b, const T* e, T* t) { + std::copy(b, e, t); +} +template <> +void CopyOrMoveBlock(const string* b, const string* e, string* t) { + std::move(b, e, t); +} + +template <typename T> +void FillAndCopyVarLen( + const int d, const size_t num_elements, + const size_t num_elements_per_minibatch, const size_t data_stride_size, + const Config& config, + const std::vector<std::vector<SparseBuffer>>& varlen_dense_buffers, + Tensor* values) { + const Tensor& default_value = config.dense[d].default_value; + + // Copy-fill the tensors (creating the zero/fill-padding) + std::fill(values->flat<T>().data(), values->flat<T>().data() + num_elements, + default_value.flat<T>()(0)); + + // Iterate over minibatch elements + for (size_t i = 0; i < varlen_dense_buffers.size(); ++i) { + const SparseBuffer& buffer = varlen_dense_buffers[i][d]; + const size_t offset = i * num_elements_per_minibatch; + const size_t stride_size = config.dense[d].elements_per_stride; + + // Copy values over. + auto& list = GetListFromBuffer<T>(buffer); + auto list_ptr = list.begin(); + auto data = values->flat<T>().data() + offset; + DCHECK(list.size() % stride_size == 0); + const size_t num_entries = list.size() / stride_size; + for (size_t j = 0; j < num_entries; ++j) { + CopyOrMoveBlock(list_ptr, list_ptr + stride_size, data); + list_ptr += stride_size; + data += data_stride_size; + } + } +} + } // namespace Status FastParseExample(const Config& config, @@ -701,14 +835,17 @@ Status FastParseExample(const Config& config, "Could not avoid collision. This should not happen."); } - // Allocate dense output (sparse have to be buffered). + // Allocate dense output for fixed length dense values + // (variable-length dense and sparse have to be buffered). + std::vector<Tensor> fixed_dense_values(config.dense.size()); for (size_t d = 0; d < config.dense.size(); ++d) { + if (config.dense[d].variable_length) continue; TensorShape out_shape; out_shape.AddDim(serialized.size()); for (const int64 dim : config.dense[d].shape.dim_sizes()) { out_shape.AddDim(dim); } - result->dense_values.emplace_back(config.dense[d].dtype, out_shape); + fixed_dense_values[d] = Tensor(config.dense[d].dtype, out_shape); } // This parameter affects performance in a big and data-dependent way. @@ -750,17 +887,19 @@ Status FastParseExample(const Config& config, // Do minibatches in parallel. std::vector<std::vector<SparseBuffer>> sparse_buffers(num_minibatches); + std::vector<std::vector<SparseBuffer>> varlen_dense_buffers(num_minibatches); std::vector<Status> status_of_minibatch(num_minibatches); auto ProcessMiniBatch = [&](size_t minibatch) { sparse_buffers[minibatch].resize(config.sparse.size()); + varlen_dense_buffers[minibatch].resize(config.dense.size()); size_t start = first_example_of_minibatch(minibatch); size_t end = first_example_of_minibatch(minibatch + 1); for (size_t e = start; e < end; ++e) { status_of_minibatch[minibatch] = FastParseSerializedExample( serialized[e], (example_names.size() > 0 ? example_names[e] : "<unknown>"), e, - config, config_index, hasher, &result->dense_values, - &sparse_buffers[minibatch]); + config, config_index, hasher, &fixed_dense_values, + &varlen_dense_buffers[minibatch], &sparse_buffers[minibatch]); if (!status_of_minibatch[minibatch].ok()) break; } }; @@ -771,8 +910,12 @@ Status FastParseExample(const Config& config, TF_RETURN_IF_ERROR(status); } + for (size_t d = 0; d < config.dense.size(); ++d) { + result->dense_values.push_back(std::move(fixed_dense_values[d])); + } + // Merge SparseBuffers from all minibatches for every config.sparse. - auto MergeMinibatches = [&](size_t d) { + auto MergeSparseMinibatches = [&](size_t d) { // Loop over minibatches size_t total_num_features = 0; size_t max_num_features = 0; @@ -849,8 +992,76 @@ Status FastParseExample(const Config& config, } }; + // Merge SparseBuffers from all minibatches for every config.dense having + // variable_length. + auto MergeDenseVarLenMinibatches = [&](size_t d) { + if (!config.dense[d].variable_length) return; + + // Loop over minibatches + size_t max_num_features = 0; + for (auto& dense_values_tmp : varlen_dense_buffers) { + std::vector<size_t>& end_indices = + dense_values_tmp[d].example_end_indices; + max_num_features = std::max(max_num_features, end_indices[0]); + for (size_t i = 1; i < end_indices.size(); ++i) { + size_t example_size = end_indices[i] - end_indices[i - 1]; + max_num_features = std::max(max_num_features, example_size); + } + } + + const size_t stride_size = config.dense[d].elements_per_stride; + const size_t max_num_elements = max_num_features / stride_size; + TensorShape values_shape; + DCHECK(max_num_features % config.dense[d].elements_per_stride == 0); + const size_t batch_size = serialized.size(); + values_shape.AddDim(batch_size); + values_shape.AddDim(max_num_elements); + for (int i = 1; i < config.dense[d].shape.dims(); ++i) { + values_shape.AddDim(config.dense[d].shape.dim_size(i)); + } + Tensor values(config.dense[d].dtype, values_shape); + result->dense_values[d] = values; + const size_t num_elements = values.NumElements(); + + // Nothing to write, exit early. + if (num_elements == 0) return; + + const size_t num_elements_per_minibatch = num_elements / batch_size; + const size_t data_stride_size = + (max_num_elements == 0) + ? 0 + : (num_elements_per_minibatch / max_num_elements); + + switch (config.dense[d].dtype) { + case DT_INT64: { + FillAndCopyVarLen<int64>(d, num_elements, num_elements_per_minibatch, + data_stride_size, config, varlen_dense_buffers, + &values); + break; + } + case DT_FLOAT: { + FillAndCopyVarLen<float>(d, num_elements, num_elements_per_minibatch, + data_stride_size, config, varlen_dense_buffers, + &values); + break; + } + case DT_STRING: { + FillAndCopyVarLen<string>(d, num_elements, num_elements_per_minibatch, + data_stride_size, config, + varlen_dense_buffers, &values); + break; + } + default: + CHECK(false) << "Should not happen."; + } + }; + + for (size_t d = 0; d < config.dense.size(); ++d) { + MergeDenseVarLenMinibatches(d); + } + for (size_t d = 0; d < config.sparse.size(); ++d) { - MergeMinibatches(d); + MergeSparseMinibatches(d); } return Status::OK(); diff --git a/tensorflow/core/util/example_proto_fast_parsing.h b/tensorflow/core/util/example_proto_fast_parsing.h index 4878199802..5f8b4af5fe 100644 --- a/tensorflow/core/util/example_proto_fast_parsing.h +++ b/tensorflow/core/util/example_proto_fast_parsing.h @@ -48,6 +48,7 @@ struct FastParseExampleConfig { // Documentation is avaliable in: tensorflow/core/ops/parsing_ops.cc PartialTensorShape shape; Tensor default_value; + bool variable_length; std::size_t elements_per_stride; }; diff --git a/tensorflow/core/util/example_proto_helper.h b/tensorflow/core/util/example_proto_helper.h index 971d97266c..44838d2e54 100644 --- a/tensorflow/core/util/example_proto_helper.h +++ b/tensorflow/core/util/example_proto_helper.h @@ -161,13 +161,32 @@ class ParseSingleExampleAttrs { // Temporary check until we start allowing a variable length outer // dimension. for (int i = 0; i < dense_shapes.size(); ++i) { - if (!dense_shapes[i].IsFullyDefined()) { + bool shape_ok = true; + if (dense_shapes[i].dims() == -1) { + shape_ok = false; + } else { + for (int d = 1; d < dense_shapes[i].dims(); ++d) { + if (dense_shapes[i].dim_size(d) == -1) { + shape_ok = false; + } + } + } + if (!shape_ok) { return errors::InvalidArgument( "dense_shapes[", i, - "] is not fully defined: ", dense_shapes[i].DebugString()); + "] has unknown rank or unknown inner dimensions: ", + dense_shapes[i].DebugString()); } TensorShape dense_shape; - dense_shapes[i].AsTensorShape(&dense_shape); + if (dense_shapes[i].dims() > 0 && dense_shapes[i].dim_size(0) == -1) { + variable_length.push_back(true); + for (int d = 1; d < dense_shapes[i].dims(); ++d) { + dense_shape.AddDim(dense_shapes[i].dim_size(d)); + } + } else { + variable_length.push_back(false); + dense_shapes[i].AsTensorShape(&dense_shape); + } elements_per_stride.push_back(dense_shape.num_elements()); } return FinishInit(); @@ -178,6 +197,7 @@ class ParseSingleExampleAttrs { std::vector<DataType> sparse_types; std::vector<DataType> dense_types; std::vector<PartialTensorShape> dense_shapes; + std::vector<bool> variable_length; std::vector<std::size_t> elements_per_stride; private: diff --git a/tensorflow/python/kernel_tests/parsing_ops_test.py b/tensorflow/python/kernel_tests/parsing_ops_test.py index b66d271f3c..c19d5a9536 100644 --- a/tensorflow/python/kernel_tests/parsing_ops_test.py +++ b/tensorflow/python/kernel_tests/parsing_ops_test.py @@ -92,6 +92,7 @@ class ParseExampleTest(test.TestCase): expected_err[1]): out = parsing_ops.parse_example(**kwargs) sess.run(flatten_values_tensors_or_sparse(out.values())) + return else: # Returns dict w/ Tensors and SparseTensors. out = parsing_ops.parse_example(**kwargs) @@ -636,6 +637,103 @@ class ParseExampleTest(test.TestCase): } }, expected_output) + def testSerializedContainingVarLenDense(self): + aname = "a" + bname = "b" + cname = "c" + dname = "d" + example_names = ["in1", "in2", "in3", "in4"] + original = [ + example(features=features({ + cname: int64_feature([2]), + })), + example(features=features({ + aname: float_feature([1, 1]), + bname: bytes_feature([b"b0_str", b"b1_str"]), + })), + example(features=features({ + aname: float_feature([-1, -1, 2, 2]), + bname: bytes_feature([b"b1"]), + })), + example(features=features({ + aname: float_feature([]), + cname: int64_feature([3]), + })), + ] + + serialized = [m.SerializeToString() for m in original] + + expected_output = { + aname: + np.array( + [ + [0, 0, 0, 0], + [1, 1, 0, 0], + [-1, -1, 2, 2], + [0, 0, 0, 0], + ], + dtype=np.float32).reshape(4, 2, 2, 1), + bname: + np.array( + [["", ""], ["b0_str", "b1_str"], ["b1", ""], ["", ""]], + dtype=bytes).reshape(4, 2, 1, 1, 1), + cname: + np.array([2, 0, 0, 3], dtype=np.int64).reshape(4, 1), + dname: + np.empty(shape=(4, 0), dtype=bytes), + } + + self._test({ + "example_names": example_names, + "serialized": ops.convert_to_tensor(serialized), + "features": { + aname: + parsing_ops.FixedLenFeature((None, 2, 1), dtype=dtypes.float32), + bname: + parsing_ops.FixedLenFeature( + (None, 1, 1, 1), dtype=dtypes.string), + cname: + parsing_ops.FixedLenFeature((None,), dtype=dtypes.int64), + dname: + parsing_ops.FixedLenFeature((None,), dtype=dtypes.string), + } + }, expected_output) + + # Change number of required values so the inputs are not a + # multiple of this size. + self._test( + { + "example_names": example_names, + "serialized": ops.convert_to_tensor(serialized), + "features": { + aname: + parsing_ops.FixedLenFeature( + (None, 2, 1), dtype=dtypes.float32), + bname: + parsing_ops.FixedLenFeature( + (None, 2, 1, 1), dtype=dtypes.string), + } + }, + expected_err=( + errors_impl.OpError, "Name: in3, Key: b, Index: 2. " + "Number of bytes values is not a multiple of stride length.")) + + self._test( + { + "example_names": example_names, + "serialized": ops.convert_to_tensor(serialized), + "features": { + aname: + parsing_ops.FixedLenFeature( + (None, 2, 1), dtype=dtypes.float32, default_value=[]), + bname: + parsing_ops.FixedLenFeature( + (None, 2, 1, 1), dtype=dtypes.string), + } + }, + expected_err=(ValueError, + "Cannot reshape a tensor with 0 elements to shape")) + class ParseSingleExampleTest(test.TestCase): diff --git a/tensorflow/python/ops/parsing_ops.py b/tensorflow/python/ops/parsing_ops.py index 079837bce3..77c7cd397a 100644 --- a/tensorflow/python/ops/parsing_ops.py +++ b/tensorflow/python/ops/parsing_ops.py @@ -476,8 +476,13 @@ def _parse_example_raw(serialized, The keys of the dict must match the dense_keys of the feature. dense_shapes: A list of tuples with the same length as `dense_keys`. The shape of the data for each dense feature referenced by `dense_keys`. - Required for any input tensors identified by `dense_keys` whose shapes are - anything other than `[]` or `[1]`. + Required for any input tensors identified by `dense_keys`. Must be + either fully defined, or may contain an unknown first dimension. + An unknown first dimension means the feature is treated as having + a variable number of blocks, and the output shape along this dimension + is considered unknown at graph build time. Padding is applied for + minibatch elements smaller than the maximum number of blocks for the + given feature along this dimension. name: A name for this operation (optional). Returns: @@ -516,21 +521,42 @@ def _parse_example_raw(serialized, "Dense and sparse keys must not intersect; intersection: %s" % set(dense_keys).intersection(set(sparse_keys))) + # Convert dense_shapes to TensorShape object. + dense_shapes = [tensor_shape.as_shape(shape) for shape in dense_shapes] + dense_defaults_vec = [] for i, key in enumerate(dense_keys): default_value = dense_defaults.get(key) - if default_value is None: - default_value = constant_op.constant([], dtype=dense_types[i]) - elif not isinstance(default_value, ops.Tensor): - key_name = "key_" + re.sub("[^A-Za-z0-9_.\\-/]", "_", key) - default_value = ops.convert_to_tensor( - default_value, dtype=dense_types[i], name=key_name) - default_value = array_ops.reshape(default_value, dense_shapes[i]) + dense_shape = dense_shapes[i] + if (dense_shape.ndims is not None and dense_shape.ndims > 0 and + dense_shape[0].value is None): + # Variable stride dense shape, the default value should be a + # scalar padding value + if default_value is None: + default_value = ops.convert_to_tensor( + "" if dense_types[i] == dtypes.string else 0, + dtype=dense_types[i]) + else: + # Reshape to a scalar to ensure user gets an error if they + # provide a tensor that's not intended to be a padding value + # (0 or 2+ elements). + key_name = "padding_" + re.sub("[^A-Za-z0-9_.\\-/]", "_", key) + default_value = ops.convert_to_tensor( + default_value, dtype=dense_types[i], name=key_name) + default_value = array_ops.reshape(default_value, []) + else: + if default_value is None: + default_value = constant_op.constant([], dtype=dense_types[i]) + elif not isinstance(default_value, ops.Tensor): + key_name = "key_" + re.sub("[^A-Za-z0-9_.\\-/]", "_", key) + default_value = ops.convert_to_tensor( + default_value, dtype=dense_types[i], name=key_name) + default_value = array_ops.reshape(default_value, dense_shape) dense_defaults_vec.append(default_value) - dense_shapes = [tensor_shape.as_shape(shape).as_proto() - for shape in dense_shapes] + # Finally, convert dense_shapes to TensorShapeProto + dense_shapes = [shape.as_proto() for shape in dense_shapes] # pylint: disable=protected-access outputs = gen_parsing_ops._parse_example( |