diff options
Diffstat (limited to 'tensorflow/core/kernels/map_stage_op.cc')
-rw-r--r-- | tensorflow/core/kernels/map_stage_op.cc | 918 |
1 files changed, 918 insertions, 0 deletions
diff --git a/tensorflow/core/kernels/map_stage_op.cc b/tensorflow/core/kernels/map_stage_op.cc new file mode 100644 index 0000000000..832fa8102b --- /dev/null +++ b/tensorflow/core/kernels/map_stage_op.cc @@ -0,0 +1,918 @@ +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include <map> +#include <mutex> +#include <numeric> +#include <unordered_map> +#include <vector> + +#include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/resource_mgr.h" +#include "tensorflow/core/framework/tensor.h" +#include "tensorflow/core/framework/tensor_shape.h" +#include "tensorflow/core/lib/gtl/optional.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/mutex.h" + +namespace tensorflow { + +namespace { + +// Partial Ordering Comparator for Tensor keys containing scalar int64's +struct KeyTensorLess { + bool operator()(const Tensor & lhs, const Tensor & rhs) const { + return std::less<int64>{}(lhs.scalar<int64>()(), + rhs.scalar<int64>()()); + } +}; + +// Key Equality operator for Tensor keys containing scalar int64's +struct KeyTensorEqual { + bool operator()(const Tensor & lhs, const Tensor & rhs) const { + return std::equal_to<int64>{}(lhs.scalar<int64>()(), + rhs.scalar<int64>()()); + } +}; + +// Hash for Tensor keys containing scalar int64's +struct KeyTensorHash { + std::size_t operator()(const Tensor & key) const { + return std::hash<int64>{}(key.scalar<int64>()()); + } +}; + + +// General Template Definition +template <bool Ordered, typename Data> +struct MapTraits {}; + +// Partially specialise for ordered +template <typename Data> +struct MapTraits<true, Data> +{ + typedef Tensor KeyType; + typedef Data DataType; + typedef std::map<KeyType, Data, KeyTensorLess> MapType; +}; + +// Partially specialise for unordered +template <typename Data> +struct MapTraits<false, Data> +{ + typedef Tensor KeyType; + typedef Data DataType; + typedef std::unordered_map<KeyType, Data, + KeyTensorHash, KeyTensorEqual> MapType; +}; + +// Wrapper around map/unordered_map +template <bool Ordered> +class StagingMap : public ResourceBase +{ +public: + // Public typedefs + typedef std::vector<Tensor> Tuple; + typedef gtl::optional<Tensor> OptionalTensor; + typedef std::vector<OptionalTensor> OptionalTuple; + + typedef MapTraits<Ordered, OptionalTuple> MapTraits_; + typedef typename MapTraits_::MapType MapType; + typedef typename MapTraits_::KeyType KeyType; + + typedef MapTraits<false, OptionalTuple> IncompleteTraits; + typedef typename IncompleteTraits::MapType IncompleteType; + +private: + // Private variables + DataTypeVector dtypes_; + std::size_t capacity_; + std::size_t memory_limit_; + std::size_t current_bytes_; + std::mutex mu_; + std::condition_variable not_empty_; + std::condition_variable full_; + IncompleteType incomplete_; + MapType map_; + +private: + // private methods + + // If map is configured for bounded capacity, notify + // waiting inserters that space is now available + void notify_inserters_if_bounded(std::unique_lock<std::mutex>& l) + { + if(has_capacity() || has_memory_limit()) + { + l.unlock(); + full_.notify_one(); + } + } + + // Notify any removers waiting to extract values + // that data is now available + void notify_removers(std::unique_lock<std::mutex>& l) + { + l.unlock(); + not_empty_.notify_one(); + } + + inline bool has_capacity() + { return capacity_ > 0; } + + inline bool has_memory_limit() + { return memory_limit_ > 0; } + + inline bool would_exceed_memory_limit(std::size_t bytes) + { return bytes + current_bytes_ > memory_limit_; } + + inline bool is_capacity_full() + { return map_.size() >= capacity_; } + + // Get number of bytes in the tuple + inline std::size_t get_tuple_bytes(const Tuple & tuple) + { + return std::accumulate(tuple.begin(), tuple.end(), 0, + [](const std::size_t & lhs, const Tensor & rhs) { + return lhs + rhs.TotalBytes(); + }); + } + + // Get number of bytes in the incomplete tuple + inline std::size_t get_tuple_bytes(const OptionalTuple & tuple) + { + return std::accumulate(tuple.begin(), tuple.end(), 0, + [](const std::size_t & lhs, const OptionalTensor & rhs) { + return (lhs + rhs.has_value()) ? rhs.value().TotalBytes() : 0; + }); + } + + + // Check that the index is within bounds + inline Status check_index(const Tensor & key, std::size_t index) + { + if(index >= dtypes_.size()) + { + return Status(errors::InvalidArgument("Index '", + index, "' for key '", key.scalar<int64>()(), + "' was out of bounds '", dtypes_.size(), "'.")); + } + + return Status::OK(); + } + + inline Status copy_or_move_tensors(OptionalTuple & map_tuple, + const Tensor & key, + const Tensor & indices, + Tuple * output, + bool copy=false) + { + auto findices = indices.flat<int>(); + + // Return values at specified indices + for(std::size_t i = 0; i < findices.dimension(0); ++i) + { + std::size_t index = findices(i); + + TF_RETURN_IF_ERROR(check_index(key, index)); + + // Insist on a value present at the specified index + if(!map_tuple[index].has_value()) + { + return Status(errors::InvalidArgument("Tensor at index '", + index, "' for key '", key.scalar<int64>()(), + "' has already been removed.")); + } + + // Copy the contained tensor and + // remove from the OptionalTuple + output->push_back(map_tuple[index].value()); + + // Clear out the entry if we're not copying (moving) + if(!copy) { + map_tuple[index].reset(); + } + } + + return Status::OK(); + } + + // Check that the optional value at the specified index + // is uninitialized + inline Status check_index_uninitialized(const Tensor & key, + std::size_t index, + const OptionalTuple & tuple) + { + if(tuple[index].has_value()) + { + return Status(errors::InvalidArgument("The tensor for index '", + index, "' for key '", key.scalar<int64>()(), + "' was already initialized '", dtypes_.size(), "'.")); + } + + return Status::OK(); + } + + // Check that the indices are strictly ordered + inline Status check_index_ordering(const Tensor & indices) + { + auto findices = indices.flat<int>(); + + for(std::size_t i = 0; i < findices.dimension(0)-1; ++i) + { + if(findices(i) < findices(i+1)) + { continue; } + + return Status(errors::InvalidArgument("Indices are not " + "strictly ordered")); + } + + return Status::OK(); + } + + // Check bytes are within memory limits memory limits + inline Status check_memory_limit(std::size_t bytes) + { + if(has_memory_limit() && bytes > memory_limit_) { + return Status(errors::ResourceExhausted("Attempted to insert " + "tensors with combined size of '", bytes, "' bytes into " + "Staging Area with a memory limit of '", memory_limit_, "'.")); + } + + return Status::OK(); + } + + // Insert incomplete data into the Barrier + Status put_incomplete(const KeyType & key, + const Tensor & indices, + OptionalTuple * tuple, + std::unique_lock<std::mutex>& l) + { + auto findices = indices.flat<int>(); + + // Search for the key in our incomplete set + auto it = incomplete_.find(key); + + // Check that the tuple fits within the memory limit + std::size_t tuple_bytes = get_tuple_bytes(*tuple); + TF_RETURN_IF_ERROR(check_memory_limit(tuple_bytes)); + + if(has_memory_limit()) + { + full_.wait(l, [tuple_bytes, this]() { + // Stop waiting if we don't exceed the memory limit + return !would_exceed_memory_limit(tuple_bytes); + }); + } + + // This key isn't present in the incomplete set + // Create OptionalTuple and insert + if(it == incomplete_.end()) + { + OptionalTuple empty(dtypes_.size()); + + // Initialize empty tuple with given dta + for(std::size_t i = 0; i < findices.dimension(0); ++i) + { + std::size_t index = findices(i); + TF_RETURN_IF_ERROR(check_index(key, index)); + + // Assign tuple at this index + empty[index] = std::move((*tuple)[i]); + } + + // Insert into incomplete map + incomplete_.insert({key, std::move(empty)}); + + // Increment size + current_bytes_ += tuple_bytes; + } + // Found an entry in the incomplete index + // Update with given data and insert complete entries + // into the main map + else + { + // Reference existing incomplete tuple + OptionalTuple & present = it->second; + + // Assign given data + for(std::size_t i = 0; i < findices.dimension(0); ++i) + { + std::size_t index = findices(i); + TF_RETURN_IF_ERROR(check_index(key, index)); + TF_RETURN_IF_ERROR(check_index_uninitialized(key, + index, present)); + + // Assign tuple at this index + present[index] = std::move((*tuple)[i]); + } + + // Increment size + current_bytes_ += tuple_bytes; + + // Do we have values at all tuple elements? + bool complete = std::all_of(present.begin(), present.end(), + [](const OptionalTensor & v) { return v.has_value(); }); + + // If so, put the tuple in the actual map + if(complete) + { + OptionalTuple insert_tuple = std::move(it->second); + + // Remove from incomplete + incomplete_.erase(it); + + TF_RETURN_IF_ERROR(put_complete(key, &insert_tuple, l)); + } + } + + return Status::OK(); + } + + // Does the insertion into the actual staging area + Status put_complete(const KeyType & key, OptionalTuple * tuple, + std::unique_lock<std::mutex> & l) + { + // Insert key and tuples into the map + map_.insert({key, std::move(*tuple)}); + + notify_removers(l); + + return Status::OK(); + } + +public: + // public methods + explicit StagingMap(const DataTypeVector & dtypes, + std::size_t capacity, std::size_t memory_limit) : + dtypes_(dtypes), + capacity_(capacity), + memory_limit_(memory_limit), + current_bytes_(0) {} + + Status put(KeyType* key, const Tensor * indices, + OptionalTuple* tuple) + { + std::unique_lock<std::mutex> l(mu_); + + // Sanity check the indices + TF_RETURN_IF_ERROR(check_index_ordering(*indices)); + + // Handle incomplete inserts + if(indices->NumElements() != dtypes_.size()) + { + return put_incomplete(*key, *indices, tuple, l); + } + + std::size_t tuple_bytes = get_tuple_bytes(*tuple); + // Check that tuple_bytes fits within the memory limit + TF_RETURN_IF_ERROR(check_memory_limit(tuple_bytes)); + + // If map capacity is bounded wait until map is not full + if(has_capacity() || has_memory_limit()) { + full_.wait(l, [tuple_bytes, this]() { + // If there's a memory limit, check if there's space for insertion + bool memory_limit_valid = has_memory_limit() ? + !would_exceed_memory_limit(tuple_bytes) : true; + // If we're configured for capacity check if there's space for insertion + bool capacity_valid = has_capacity() ? !is_capacity_full() : true; + + // Stop waiting upon success for both conditions + return memory_limit_valid && capacity_valid; + }); + } + + // Do the put operation + TF_RETURN_IF_ERROR(put_complete(*key, tuple, l)); + + // Update the current size + current_bytes_ += tuple_bytes; + + return Status::OK(); + } + + Status get(const KeyType* key, const Tensor * indices, + Tuple* tuple) + { + std::unique_lock<std::mutex> l(mu_); + + // Sanity check the indices + TF_RETURN_IF_ERROR(check_index_ordering(*indices)); + + typename MapType::iterator it; + + // Wait until the element with the requested key is present + not_empty_.wait(l, [&, this]() { + it = map_.find(*key); + return it != map_.end(); + }); + + TF_RETURN_IF_ERROR(copy_or_move_tensors(it->second, *key, + *indices, tuple, + true)); + + // Update bytes in the Staging Area + current_bytes_ -= get_tuple_bytes(*tuple); + + return Status::OK(); + } + + Status pop(const KeyType* key, const Tensor * indices, Tuple* tuple) + { + std::unique_lock<std::mutex> l(mu_); + + // Sanity check the indices + TF_RETURN_IF_ERROR(check_index_ordering(*indices)); + + typename MapType::iterator it; + + // Wait until the element with the requested key is present + not_empty_.wait(l, [&, this]() { + it = map_.find(*key); + return it != this->map_.end(); + }); + + TF_RETURN_IF_ERROR(copy_or_move_tensors(it->second, *key, + *indices, tuple)); + + // Remove entry if all the values have been consumed + bool any_left = std::any_of(it->second.begin(), it->second.end(), + [](const OptionalTensor & T) { return T.has_value(); }); + + if(!any_left) { + map_.erase(it); + } + + // Update bytes in the Staging Area + current_bytes_ -= get_tuple_bytes(*tuple); + + notify_inserters_if_bounded(l); + + return Status::OK(); + } + + Status popitem(KeyType* key, const Tensor * indices, Tuple* tuple) + { + std::unique_lock<std::mutex> l(mu_); + + // Sanity check the indices + TF_RETURN_IF_ERROR(check_index_ordering(*indices)); + + // Wait until map is not empty + not_empty_.wait(l, [this]() { return !this->map_.empty(); }); + + // Move from the first element and erase it + + auto it = map_.begin(); + + TF_RETURN_IF_ERROR(copy_or_move_tensors(it->second, *key, + *indices, tuple)); + + *key = it->first; + + // Remove entry if all the values have been consumed + bool any_left = std::any_of(it->second.begin(), it->second.end(), + [](const OptionalTensor & T) { return T.has_value(); }); + + if(!any_left) { + map_.erase(it); + } + + // Update bytes in the Staging Area + current_bytes_ -= get_tuple_bytes(*tuple); + + notify_inserters_if_bounded(l); + + return Status::OK(); + } + + Status clear() + { + std::unique_lock<std::mutex> l(mu_); + map_.clear(); + incomplete_.clear(); + current_bytes_ = 0; + + notify_inserters_if_bounded(l); + + return Status::OK(); + } + + size_t incomplete_size() + { + std::unique_lock<std::mutex> l(mu_); + return incomplete_.size(); + } + + size_t size() + { + // Lock the map and return the size + std::unique_lock<std::mutex> l(mu_); + return map_.size(); + } + + string DebugString() + { + return "StagingMap"; + } +}; + +template <bool Ordered> +Status GetStagingMap(OpKernelContext* ctx, + const NodeDef& ndef, + StagingMap<Ordered>** map) +{ + auto rm = ctx->resource_manager(); + ContainerInfo cinfo; + + // Lambda for creating the Staging Area + auto create_fn = [&ndef](StagingMap<Ordered>** ret) -> Status + { + DataTypeVector dtypes; + int64 capacity; + int64 memory_limit; + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "dtypes", &dtypes)); + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "capacity", &capacity)); + TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "memory_limit", &memory_limit)); + *ret = new StagingMap<Ordered>(dtypes, capacity, memory_limit); + return Status::OK(); + }; + + TF_RETURN_IF_ERROR(cinfo.Init(rm, ndef, true /* use name() */)); + TF_RETURN_IF_ERROR(rm->LookupOrCreate<StagingMap<Ordered>>( + cinfo.container(), cinfo.name(), + map, create_fn)); + return Status::OK(); +} + +template <bool Ordered> +class MapStageOp : public OpKernel +{ + public: + explicit MapStageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override { + StagingMap<Ordered>* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + typename StagingMap<Ordered>::OptionalTuple tuple; + + const Tensor * key_tensor; + const Tensor * indices_tensor; + OpInputList values_tensor; + + OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor)); + OP_REQUIRES_OK(ctx, ctx->input("indices", &indices_tensor)); + OP_REQUIRES_OK(ctx, ctx->input_list("values", &values_tensor)); + + // Create copy for insertion into Staging Area + Tensor key(*key_tensor); + + // Create the tuple to store + for (std::size_t i = 0; i < values_tensor.size(); ++i) { + tuple.push_back(values_tensor[i]); + } + + // Store the tuple in the map + OP_REQUIRES_OK(ctx, map->put(&key, indices_tensor, &tuple)); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapStage").Device(DEVICE_CPU), + MapStageOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapStage").Device(DEVICE_CPU), + MapStageOp<true>); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapStage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapStageOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapStage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapStageOp<true>); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapStage").HostMemory("key") + .Device(DEVICE_SYCL), MapStageOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapStage").HostMemory("key") + .Device(DEVICE_SYCL), MapStageOp<true>); + +#endif // TENSORFLOW_USE_SYCL + +template <bool Ordered> +class MapUnstageOp : public OpKernel +{ + public: + explicit MapUnstageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + StagingMap<Ordered>* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + typename StagingMap<Ordered>::Tuple tuple; + + const Tensor * key_tensor; + const Tensor * indices_tensor; + OpInputList values_tensor; + + OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor)); + OP_REQUIRES_OK(ctx, ctx->input("indices", &indices_tensor)); + OP_REQUIRES_OK(ctx, map->pop(key_tensor, indices_tensor, &tuple)); + + OP_REQUIRES(ctx, + tuple.size() == indices_tensor->NumElements(), + errors::InvalidArgument("output/indices size mismatch: ", tuple.size(), + " vs. ", indices_tensor->NumElements())); + + for (size_t i = 0; i < tuple.size(); ++i) { + ctx->set_output(i, tuple[i]); + } + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapUnstage").Device(DEVICE_CPU), + MapUnstageOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage").Device(DEVICE_CPU), + MapUnstageOp<true>); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapUnstage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapUnstageOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapUnstageOp<true>); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapUnstage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_SYCL), MapUnstageOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstage") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_SYCL), MapUnstageOp<true>); +#endif // TENSORFLOW_USE_SYCL + +template <bool Ordered> +class MapPeekOp : public OpKernel +{ + public: + explicit MapPeekOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + StagingMap<Ordered>* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + typename StagingMap<Ordered>::Tuple tuple; + + const Tensor * key_tensor; + const Tensor * indices_tensor; + OpInputList values_tensor; + + OP_REQUIRES_OK(ctx, ctx->input("key", &key_tensor)); + OP_REQUIRES_OK(ctx, ctx->input("indices", &indices_tensor)); + OP_REQUIRES_OK(ctx, map->get(key_tensor, indices_tensor, &tuple)); + + OP_REQUIRES(ctx, + tuple.size() == indices_tensor->NumElements(), + errors::InvalidArgument("output/indices size mismatch: ", tuple.size(), + " vs. ", indices_tensor->NumElements())); + + for (size_t i = 0; i < tuple.size(); ++i) { + ctx->set_output(i, tuple[i]); + } + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapPeek").Device(DEVICE_CPU), + MapPeekOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek").Device(DEVICE_CPU), + MapPeekOp<true>); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapPeek") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapPeekOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapPeekOp<true>); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapPeek") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_SYCL), MapPeekOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapPeek") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_SYCL), MapPeekOp<true>); +#endif // TENSORFLOW_USE_SYCL + + + +template <bool Ordered> +class MapUnstageNoKeyOp : public OpKernel +{ + public: + explicit MapUnstageNoKeyOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + // Using this op in such a way that it blocks forever + // is an error. As such cancellation is not handled. + void Compute(OpKernelContext* ctx) override { + StagingMap<Ordered>* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + // Pop a random (key, value) off the map + typename StagingMap<Ordered>::KeyType key; + typename StagingMap<Ordered>::Tuple tuple; + + const Tensor * indices_tensor; + + OP_REQUIRES_OK(ctx, ctx->input("indices", &indices_tensor)); + OP_REQUIRES_OK(ctx, map->popitem(&key, indices_tensor, &tuple)); + + // Allocate a key tensor and assign the key as the first output + ctx->set_output(0, key); + + // Set the rest of the outputs to the tuple Tensors + OP_REQUIRES(ctx, + tuple.size() == indices_tensor->NumElements(), + errors::InvalidArgument("output/indices size mismatch: ", tuple.size(), + " vs. ", indices_tensor->NumElements())); + + for (size_t i = 0; i < tuple.size(); ++i) { + ctx->set_output(i+1, tuple[i]); + } + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey").Device(DEVICE_CPU), + MapUnstageNoKeyOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey").Device(DEVICE_CPU), + MapUnstageNoKeyOp<true>); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapUnstageNoKeyOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_GPU), MapUnstageNoKeyOp<true>); + +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapUnstageNoKey") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_SYCL), MapUnstageNoKeyOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapUnstageNoKey") + .HostMemory("key") + .HostMemory("indices") + .Device(DEVICE_SYCL), MapUnstageNoKeyOp<true>); +#endif // TENSORFLOW_USE_SYCL + + +template <bool Ordered> +class MapSizeOp : public OpKernel +{ + public: + explicit MapSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override + { + StagingMap<Ordered>* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + // Allocate size output tensor + Tensor * size = nullptr; + OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), + &size)); + + // Set it to the actual size + size->scalar<int32>().setConstant(map->size()); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_CPU), + MapSizeOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_CPU), + MapSizeOp<true>); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_GPU) + .HostMemory("size"), MapSizeOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_GPU) + .HostMemory("size"), MapSizeOp<true>); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapSizeOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapSizeOp<true>); +#endif // TENSORFLOW_USE_SYCL + +template <bool Ordered> +class MapIncompleteSizeOp : public OpKernel +{ + public: + explicit MapIncompleteSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override + { + StagingMap<Ordered>* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + // Allocate size output tensor + Tensor * size = nullptr; + OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), + &size)); + + // Set it to the actual size + size->scalar<int32>().setConstant(map->incomplete_size()); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_CPU), + MapIncompleteSizeOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_CPU), + MapIncompleteSizeOp<true>); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_GPU) + .HostMemory("size"), MapIncompleteSizeOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_GPU) + .HostMemory("size"), MapIncompleteSizeOp<true>); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapIncompleteSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapIncompleteSizeOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapIncompleteSize").Device(DEVICE_SYCL) + .HostMemory("size"), MapIncompleteSizeOp<true>); +#endif // TENSORFLOW_USE_SYCL + +template <bool Ordered> +class MapClearOp : public OpKernel +{ + public: + explicit MapClearOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext* ctx) override + { + StagingMap<Ordered>* map = nullptr; + OP_REQUIRES_OK(ctx, GetStagingMap(ctx, def(), &map)); + core::ScopedUnref scope(map); + + OP_REQUIRES_OK(ctx, map->clear()); + } +}; + +REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_CPU), + MapClearOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_CPU), + MapClearOp<true>); + +#if GOOGLE_CUDA +REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_GPU), + MapClearOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_GPU), + MapClearOp<true>); +#endif +#ifdef TENSORFLOW_USE_SYCL +REGISTER_KERNEL_BUILDER(Name("MapClear").Device(DEVICE_SYCL), + MapClearOp<false>); +REGISTER_KERNEL_BUILDER(Name("OrderedMapClear").Device(DEVICE_SYCL), + MapClearOp<true>); +#endif // TENSORFLOW_USE_SYCL + +} // namespace + +} // namespace tensorflow |