diff options
Diffstat (limited to 'tensorflow/core/kernels/boosted_trees/quantile_ops.cc')
-rw-r--r-- | tensorflow/core/kernels/boosted_trees/quantile_ops.cc | 453 |
1 files changed, 453 insertions, 0 deletions
diff --git a/tensorflow/core/kernels/boosted_trees/quantile_ops.cc b/tensorflow/core/kernels/boosted_trees/quantile_ops.cc new file mode 100644 index 0000000000..d1840941c1 --- /dev/null +++ b/tensorflow/core/kernels/boosted_trees/quantile_ops.cc @@ -0,0 +1,453 @@ +// Copyright 2017 The TensorFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============================================================================= +#include <algorithm> +#include <iterator> +#include <string> +#include <vector> + +#include "tensorflow/core/framework/device_base.h" +#include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/resource_mgr.h" +#include "tensorflow/core/framework/tensor.h" +#include "tensorflow/core/framework/tensor_shape.h" +#include "tensorflow/core/framework/types.h" +#include "tensorflow/core/kernels/boosted_trees/quantiles/quantile_stream_resource.h" +#include "tensorflow/core/kernels/boosted_trees/quantiles/weighted_quantiles_stream.h" +#include "tensorflow/core/kernels/boosted_trees/quantiles/weighted_quantiles_summary.h" +#include "tensorflow/core/lib/core/errors.h" +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/strings/stringprintf.h" +#include "tensorflow/core/platform/types.h" +#include "tensorflow/core/util/work_sharder.h" + +namespace tensorflow { + +const char* const kExampleWeightsName = "example_weights"; +const char* const kMaxElementsName = "max_elements"; +const char* const kGenerateQuantiles = "generate_quantiles"; +const char* const kNumBucketsName = "num_buckets"; +const char* const kEpsilonName = "epsilon"; +const char* const kBucketBoundariesName = "bucket_boundaries"; +const char* const kBucketsName = "buckets"; +const char* const kSummariesName = "summaries"; +const char* const kNumStreamsName = "num_streams"; +const char* const kNumFeaturesName = "num_features"; +const char* const kFloatFeaturesName = "float_values"; +const char* const kResourceHandleName = "quantile_stream_resource_handle"; + +using QuantileStreamResource = BoostedTreesQuantileStreamResource; +using QuantileStream = + boosted_trees::quantiles::WeightedQuantilesStream<float, float>; +using QuantileSummary = + boosted_trees::quantiles::WeightedQuantilesSummary<float, float>; +using QuantileSummaryEntry = + boosted_trees::quantiles::WeightedQuantilesSummary<float, + float>::SummaryEntry; + +// Generates quantiles on a finalized QuantileStream. +std::vector<float> GenerateBoundaries(const QuantileStream& stream, + const int64 num_boundaries) { + std::vector<float> boundaries = stream.GenerateBoundaries(num_boundaries); + + // Uniquify elements as we may get dupes. + auto end_it = std::unique(boundaries.begin(), boundaries.end()); + boundaries.resize(std::distance(boundaries.begin(), end_it)); + return boundaries; +} + +// Generates quantiles on a finalized QuantileStream. +std::vector<float> GenerateQuantiles(const QuantileStream& stream, + const int64 num_quantiles) { + // Do not de-dup boundaries. Exactly num_quantiles+1 boundary values + // will be returned. + std::vector<float> boundaries = stream.GenerateQuantiles(num_quantiles - 1); + CHECK_EQ(boundaries.size(), num_quantiles); + return boundaries; +} + +std::vector<float> GetBuckets(const int32 feature, + const OpInputList& buckets_list) { + const auto& buckets = buckets_list[feature].flat<float>(); + std::vector<float> buckets_vector(buckets.data(), + buckets.data() + buckets.size()); + return buckets_vector; +} + +REGISTER_RESOURCE_HANDLE_KERNEL(BoostedTreesQuantileStreamResource); + +REGISTER_KERNEL_BUILDER( + Name("IsBoostedTreesQuantileStreamResourceInitialized").Device(DEVICE_CPU), + IsResourceInitialized<BoostedTreesQuantileStreamResource>); + +class BoostedTreesCreateQuantileStreamResourceOp : public OpKernel { + public: + explicit BoostedTreesCreateQuantileStreamResourceOp( + OpKernelConstruction* const context) + : OpKernel(context) { + OP_REQUIRES_OK(context, context->GetAttr(kMaxElementsName, &max_elements_)); + } + + void Compute(OpKernelContext* context) override { + // Only create one, if one does not exist already. Report status for all + // other exceptions. If one already exists, it unrefs the new one. + // An epsilon value of zero could cause perfoamance issues and is therefore, + // disallowed. + const Tensor* epsilon_t; + OP_REQUIRES_OK(context, context->input(kEpsilonName, &epsilon_t)); + float epsilon = epsilon_t->scalar<float>()(); + OP_REQUIRES( + context, epsilon > 0, + errors::InvalidArgument("An epsilon value of zero is not allowed.")); + + const Tensor* num_streams_t; + OP_REQUIRES_OK(context, context->input(kNumStreamsName, &num_streams_t)); + int64 num_streams = num_streams_t->scalar<int64>()(); + + auto result = + new QuantileStreamResource(epsilon, max_elements_, num_streams); + auto status = CreateResource(context, HandleFromInput(context, 0), result); + if (!status.ok() && status.code() != tensorflow::error::ALREADY_EXISTS) { + OP_REQUIRES(context, false, status); + } + } + + private: + // An upper bound on the number of entries that the summaries might have + // for a feature. + int64 max_elements_; +}; + +REGISTER_KERNEL_BUILDER( + Name("BoostedTreesCreateQuantileStreamResource").Device(DEVICE_CPU), + BoostedTreesCreateQuantileStreamResourceOp); + +class BoostedTreesMakeQuantileSummariesOp : public OpKernel { + public: + explicit BoostedTreesMakeQuantileSummariesOp( + OpKernelConstruction* const context) + : OpKernel(context) { + OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_)); + } + + void Compute(OpKernelContext* const context) override { + // Read float features list; + OpInputList float_features_list; + OP_REQUIRES_OK( + context, context->input_list(kFloatFeaturesName, &float_features_list)); + + // Parse example weights and get batch size. + const Tensor* example_weights_t; + OP_REQUIRES_OK(context, + context->input(kExampleWeightsName, &example_weights_t)); + auto example_weights = example_weights_t->flat<float>(); + const int64 batch_size = example_weights.size(); + const Tensor* epsilon_t; + OP_REQUIRES_OK(context, context->input(kEpsilonName, &epsilon_t)); + float epsilon = epsilon_t->scalar<float>()(); + + OpOutputList summaries_output_list; + OP_REQUIRES_OK( + context, context->output_list(kSummariesName, &summaries_output_list)); + + auto do_quantile_summary_gen = [&](const int64 begin, const int64 end) { + // Iterating features. + for (int64 index = begin; index < end; index++) { + const auto feature_values = float_features_list[index].flat<float>(); + QuantileStream stream(epsilon, batch_size + 1); + // Run quantile summary generation. + for (int64 j = 0; j < batch_size; j++) { + stream.PushEntry(feature_values(j), example_weights(j)); + } + stream.Finalize(); + const auto summary_entry_list = stream.GetFinalSummary().GetEntryList(); + Tensor* output_t; + OP_REQUIRES_OK( + context, + summaries_output_list.allocate( + index, + TensorShape({static_cast<int64>(summary_entry_list.size()), 4}), + &output_t)); + auto output = output_t->matrix<float>(); + for (auto row = 0; row < summary_entry_list.size(); row++) { + const auto& entry = summary_entry_list[row]; + output(row, 0) = entry.value; + output(row, 1) = entry.weight; + output(row, 2) = entry.min_rank; + output(row, 3) = entry.max_rank; + } + } + }; + // TODO(tanzheny): comment on the magic number. + const int64 kCostPerUnit = 500 * batch_size; + const DeviceBase::CpuWorkerThreads& worker_threads = + *context->device()->tensorflow_cpu_worker_threads(); + Shard(worker_threads.num_threads, worker_threads.workers, num_features_, + kCostPerUnit, do_quantile_summary_gen); + } + + private: + int64 num_features_; +}; + +REGISTER_KERNEL_BUILDER( + Name("BoostedTreesMakeQuantileSummaries").Device(DEVICE_CPU), + BoostedTreesMakeQuantileSummariesOp); + +class BoostedTreesQuantileStreamResourceAddSummariesOp : public OpKernel { + public: + explicit BoostedTreesQuantileStreamResourceAddSummariesOp( + OpKernelConstruction* const context) + : OpKernel(context) {} + + void Compute(OpKernelContext* context) override { + ResourceHandle handle; + OP_REQUIRES_OK(context, + HandleFromInput(context, kResourceHandleName, &handle)); + QuantileStreamResource* stream_resource; + // Create a reference to the underlying resource using the handle. + OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource)); + // Remove the reference at the end of this scope. + mutex_lock l(*stream_resource->mutex()); + core::ScopedUnref unref_me(stream_resource); + + OpInputList summaries_list; + OP_REQUIRES_OK(context, + context->input_list(kSummariesName, &summaries_list)); + int32 num_streams = stream_resource->num_streams(); + CHECK_EQ(static_cast<int>(num_streams), summaries_list.size()); + + auto do_quantile_add_summary = [&](const int64 begin, const int64 end) { + // Iterating all features. + for (int64 feature_idx = begin; feature_idx < end; ++feature_idx) { + const Tensor& summaries = summaries_list[feature_idx]; + const auto summary_values = summaries.matrix<float>(); + const auto& tensor_shape = summaries.shape(); + const int64 entries_size = tensor_shape.dim_size(0); + CHECK_EQ(tensor_shape.dim_size(1), 4); + std::vector<QuantileSummaryEntry> summary_entries; + summary_entries.reserve(entries_size); + for (int64 i = 0; i < entries_size; i++) { + float value = summary_values(i, 0); + float weight = summary_values(i, 1); + float min_rank = summary_values(i, 2); + float max_rank = summary_values(i, 3); + QuantileSummaryEntry entry(value, weight, min_rank, max_rank); + summary_entries.push_back(entry); + } + stream_resource->stream(feature_idx)->PushSummary(summary_entries); + } + }; + + // TODO(tanzheny): comment on the magic number. + const int64 kCostPerUnit = 500 * num_streams; + const DeviceBase::CpuWorkerThreads& worker_threads = + *context->device()->tensorflow_cpu_worker_threads(); + Shard(worker_threads.num_threads, worker_threads.workers, num_streams, + kCostPerUnit, do_quantile_add_summary); + } +}; + +REGISTER_KERNEL_BUILDER( + Name("BoostedTreesQuantileStreamResourceAddSummaries").Device(DEVICE_CPU), + BoostedTreesQuantileStreamResourceAddSummariesOp); + +class BoostedTreesQuantileStreamResourceFlushOp : public OpKernel { + public: + explicit BoostedTreesQuantileStreamResourceFlushOp( + OpKernelConstruction* const context) + : OpKernel(context) { + OP_REQUIRES_OK(context, + context->GetAttr(kGenerateQuantiles, &generate_quantiles_)); + } + + void Compute(OpKernelContext* context) override { + ResourceHandle handle; + OP_REQUIRES_OK(context, + HandleFromInput(context, kResourceHandleName, &handle)); + QuantileStreamResource* stream_resource; + // Create a reference to the underlying resource using the handle. + OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource)); + // Remove the reference at the end of this scope. + mutex_lock l(*stream_resource->mutex()); + core::ScopedUnref unref_me(stream_resource); + + const Tensor* num_buckets_t; + OP_REQUIRES_OK(context, context->input(kNumBucketsName, &num_buckets_t)); + const int64 num_buckets = num_buckets_t->scalar<int64>()(); + const int64 num_streams = stream_resource->num_streams(); + + auto do_quantile_flush = [&](const int64 begin, const int64 end) { + // Iterating over all streams. + for (int64 stream_idx = begin; stream_idx < end; ++stream_idx) { + QuantileStream* stream = stream_resource->stream(stream_idx); + stream->Finalize(); + stream_resource->set_boundaries( + generate_quantiles_ ? GenerateQuantiles(*stream, num_buckets) + : GenerateBoundaries(*stream, num_buckets), + stream_idx); + } + }; + + // TODO(tanzheny): comment on the magic number. + const int64 kCostPerUnit = 500 * num_streams; + const DeviceBase::CpuWorkerThreads& worker_threads = + *context->device()->tensorflow_cpu_worker_threads(); + Shard(worker_threads.num_threads, worker_threads.workers, num_streams, + kCostPerUnit, do_quantile_flush); + + stream_resource->set_buckets_ready(true); + } + + private: + bool generate_quantiles_; +}; + +REGISTER_KERNEL_BUILDER( + Name("BoostedTreesQuantileStreamResourceFlush").Device(DEVICE_CPU), + BoostedTreesQuantileStreamResourceFlushOp); + +class BoostedTreesQuantileStreamResourceGetBucketBoundariesOp + : public OpKernel { + public: + explicit BoostedTreesQuantileStreamResourceGetBucketBoundariesOp( + OpKernelConstruction* const context) + : OpKernel(context) { + OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_)); + } + + void Compute(OpKernelContext* const context) override { + ResourceHandle handle; + OP_REQUIRES_OK(context, + HandleFromInput(context, kResourceHandleName, &handle)); + QuantileStreamResource* stream_resource; + // Create a reference to the underlying resource using the handle. + OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource)); + // Remove the reference at the end of this scope. + mutex_lock l(*stream_resource->mutex()); + core::ScopedUnref unref_me(stream_resource); + + const int64 num_streams = stream_resource->num_streams(); + CHECK_EQ(num_features_, num_streams); + OpOutputList bucket_boundaries_list; + OP_REQUIRES_OK(context, context->output_list(kBucketBoundariesName, + &bucket_boundaries_list)); + + auto do_quantile_get_buckets = [&](const int64 begin, const int64 end) { + // Iterating over all streams. + for (int64 stream_idx = begin; stream_idx < end; stream_idx++) { + const auto& boundaries = stream_resource->boundaries(stream_idx); + Tensor* bucket_boundaries_t = nullptr; + OP_REQUIRES_OK(context, + bucket_boundaries_list.allocate( + stream_idx, {static_cast<int64>(boundaries.size())}, + &bucket_boundaries_t)); + auto* quantiles_flat = bucket_boundaries_t->flat<float>().data(); + memcpy(quantiles_flat, boundaries.data(), + sizeof(float) * boundaries.size()); + } + }; + + // TODO(tanzheny): comment on the magic number. + const int64 kCostPerUnit = 500 * num_streams; + const DeviceBase::CpuWorkerThreads& worker_threads = + *context->device()->tensorflow_cpu_worker_threads(); + Shard(worker_threads.num_threads, worker_threads.workers, num_streams, + kCostPerUnit, do_quantile_get_buckets); + } + + private: + int64 num_features_; +}; + +REGISTER_KERNEL_BUILDER( + Name("BoostedTreesQuantileStreamResourceGetBucketBoundaries") + .Device(DEVICE_CPU), + BoostedTreesQuantileStreamResourceGetBucketBoundariesOp); + +// Given the calculated quantiles thresholds and input data, this operation +// converts the input features into the buckets (categorical values), depending +// on which quantile they fall into. +class BoostedTreesBucketizeOp : public OpKernel { + public: + explicit BoostedTreesBucketizeOp(OpKernelConstruction* const context) + : OpKernel(context) { + OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_)); + } + + void Compute(OpKernelContext* const context) override { + // Read float features list; + OpInputList float_features_list; + OP_REQUIRES_OK( + context, context->input_list(kFloatFeaturesName, &float_features_list)); + OpInputList bucket_boundaries_list; + OP_REQUIRES_OK(context, context->input_list(kBucketBoundariesName, + &bucket_boundaries_list)); + OP_REQUIRES(context, + tensorflow::TensorShapeUtils::IsVector( + bucket_boundaries_list[0].shape()), + errors::InvalidArgument( + strings::Printf("Buckets should be flat vectors."))); + OpOutputList buckets_list; + OP_REQUIRES_OK(context, context->output_list(kBucketsName, &buckets_list)); + + auto do_quantile_get_quantiles = [&](const int64 begin, const int64 end) { + // Iterating over all resources + for (int64 feature_idx = begin; feature_idx < end; feature_idx++) { + const Tensor& values_tensor = float_features_list[feature_idx]; + const int64 num_values = values_tensor.dim_size(0); + + Tensor* output_t = nullptr; + OP_REQUIRES_OK( + context, buckets_list.allocate( + feature_idx, TensorShape({num_values, 1}), &output_t)); + auto output = output_t->matrix<int32>(); + + const std::vector<float>& bucket_boundaries_vector = + GetBuckets(feature_idx, bucket_boundaries_list); + CHECK(!bucket_boundaries_vector.empty()) + << "Got empty buckets for feature " << feature_idx; + auto flat_values = values_tensor.flat<float>(); + for (int64 instance = 0; instance < num_values; instance++) { + const float value = flat_values(instance); + auto bucket_iter = + std::lower_bound(bucket_boundaries_vector.begin(), + bucket_boundaries_vector.end(), value); + if (bucket_iter == bucket_boundaries_vector.end()) { + --bucket_iter; + } + const int32 bucket = static_cast<int32>( + bucket_iter - bucket_boundaries_vector.begin()); + // Bucket id. + output(instance, 0) = bucket; + } + } + }; + + // TODO(tanzheny): comment on the magic number. + const int64 kCostPerUnit = 500 * num_features_; + const DeviceBase::CpuWorkerThreads& worker_threads = + *context->device()->tensorflow_cpu_worker_threads(); + Shard(worker_threads.num_threads, worker_threads.workers, num_features_, + kCostPerUnit, do_quantile_get_quantiles); + } + + private: + int64 num_features_; +}; + +REGISTER_KERNEL_BUILDER(Name("BoostedTreesBucketize").Device(DEVICE_CPU), + BoostedTreesBucketizeOp); + +} // namespace tensorflow |