diff options
Diffstat (limited to 'tensorflow/core/kernels/dynamic_partition_op_gpu.cu.cc')
-rw-r--r-- | tensorflow/core/kernels/dynamic_partition_op_gpu.cu.cc | 376 |
1 files changed, 0 insertions, 376 deletions
diff --git a/tensorflow/core/kernels/dynamic_partition_op_gpu.cu.cc b/tensorflow/core/kernels/dynamic_partition_op_gpu.cu.cc deleted file mode 100644 index 7249c8c66c..0000000000 --- a/tensorflow/core/kernels/dynamic_partition_op_gpu.cu.cc +++ /dev/null @@ -1,376 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -// The algorithm for dynamic partition has the following steps: -// 1. Let N be the size of partitions. We initialize a new vector indices_in -// with the values 0, 1, 2, ..., N-1. -// 2. We apply cub::DeviceRadixSort::SortPairs to the key - value pairs given -// by partitions and indices_in. This will result in two new vectors -// partitions_out and indices_out, with partitions_out sorted. -// 3. The first dimension of outputs[i] is equal to the length of the interval -// of i-values in partitions_out. We determine it in two steps: -// - compute the starting and ending point of each interval, -// - subtract the starting and ending points to find the length. -// The result is placed in partition_count. -// 4. Because partition_count is on the GPU, we bring it asynchronously to -// the CPU. Then we can allocate the output tensors. -// 5. Finally, we use indices_out and the gather functor to collect the output. -// This works, because for each interval of i-values, indices_out points -// to the slices which should form output[i]. - -#if GOOGLE_CUDA - -#define EIGEN_USE_GPU - -#include "external/cub_archive/cub/device/device_radix_sort.cuh" -#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" -#include "tensorflow/core/framework/op_kernel.h" -#include "tensorflow/core/framework/register_types.h" -#include "tensorflow/core/framework/tensor.h" -#include "tensorflow/core/framework/types.h" -#include "tensorflow/core/kernels/bounds_check.h" -#include "tensorflow/core/kernels/fill_functor.h" -#include "tensorflow/core/kernels/gather_functor_gpu.cu.h" -#include "tensorflow/core/util/cuda_kernel_helper.h" - -namespace tensorflow { - -typedef Eigen::GpuDevice GPUDevice; - -namespace { - -template <typename T> -__global__ void RangeInitKernel(const T start, const T delta, const int32 size, - T* out) { - CUDA_1D_KERNEL_LOOP(i, size) { out[i] = start + i * delta; } -} - -__global__ void FindEndpointsKernel(const int32* partitions, int32 size, - int32 nump, int32* start, int32* end) { - CUDA_1D_KERNEL_LOOP(i, size) { - int32 current = ldg(partitions + i); - if (FastBoundsCheck(current, nump)) { - if (i == 0) - start[current] = i; - else { - int32 before = ldg(partitions + i - 1); - if (before != current) start[current] = i; - } - if (i == size - 1) - end[current] = i + 1; - else { - int32 after = ldg(partitions + i + 1); - if (after != current) end[current] = i + 1; - } - } - } -} - -// We create a local version of subtract, because the tf.subtract kernel -// is not defined for int32. We use it to compute the length of an interval -// by subtracting the endpoints. -__global__ void IntervalLengthKernel(int32* start, int32 size, int32* end) { - CUDA_1D_KERNEL_LOOP(i, size) { - int32 start_point = ldg(start + i); - end[i] = end[i] - start_point; - } -} - -// Initialize out with range start, start + delta, start + 2 * delta, ... -// This is needed because tf.range has no GPU implementation. -template <typename T> -void RangeInit(const GPUDevice& d, const T start, const T delta, - const int32 size, typename TTypes<T>::Flat out) { - CudaLaunchConfig config = GetCudaLaunchConfig(size, d); - RangeInitKernel< - T><<<config.block_count, config.thread_per_block, 0, d.stream()>>>( - start, delta, size, out.data()); -} - -// Partitions is a sorted vector of N non-negative integer numbers. -// This function computes the starting and ending points of each interval -// of values. -void ComputeIntervals(const GPUDevice& d, Tensor* partitions, int32 N, - int32 nump, int32* start_ptr, int32* end_ptr) { - CudaLaunchConfig config = GetCudaLaunchConfig(N, d); - FindEndpointsKernel<<<config.block_count, config.thread_per_block, 0, - d.stream()>>>(partitions->flat<int32>().data(), N, nump, - start_ptr, end_ptr); -} - -// Subtract the ending points of each interval to obtain the interval length. -void ComputeItvLength(const GPUDevice& d, int32 num, int32* start_ptr, - int32* end_ptr) { - CudaLaunchConfig config = GetCudaLaunchConfig(num, d); - IntervalLengthKernel<<<config.block_count, config.thread_per_block, 0, - d.stream()>>>(start_ptr, num, end_ptr); -} - -template <typename T> -void CallGatherKernel(const GPUDevice& d, const T* params, const int32* indices, - T* out, int64 gather_dim_size, int64 indices_size, - int64 slice_size, int64 out_size) { - CudaLaunchConfig config = GetCudaLaunchConfig(out_size, d); - GatherOpKernel< - T, int32, - true><<<config.block_count, config.thread_per_block, 0, d.stream()>>>( - params, indices, out, gather_dim_size, indices_size, slice_size, - out_size); -} - -} // namespace - -// The current implementation has memory cost on GPU -// I + P + max(3N + R, O + N), where: -// I - the size of the input -// N - the size of the partitions tensor -// R - the temporary storage used by cub::RadixSort, about 2N -// P - the number of partitions -// O - the size of the output -// So roughly the cost is I + P + max(5N, O + N). -template <typename T> -class DynamicPartitionOpGPU : public AsyncOpKernel { - public: - explicit DynamicPartitionOpGPU(OpKernelConstruction* c) : AsyncOpKernel(c) { - OP_REQUIRES_OK(c, c->GetAttr("num_partitions", &num_partitions_)); - OP_REQUIRES(c, num_partitions_ >= 1, - errors::InvalidArgument("num_partitions must be at least 1")); - } - - void AllocateTempSpace(OpKernelContext* c, int32 N, Tensor* indices_in, - Tensor* partitions_out, Tensor* indices_out, - DoneCallback done) { - int32 M = std::max(N, num_partitions_); - // indices_in will be made slightly larger to accomodate - // later computations. - OP_REQUIRES_OK_ASYNC( - c, c->allocate_temp(DT_INT32, TensorShape({M}), indices_in), done); - OP_REQUIRES_OK_ASYNC( - c, c->allocate_temp(DT_INT32, TensorShape({N}), partitions_out), done); - OP_REQUIRES_OK_ASYNC( - c, c->allocate_temp(DT_INT32, TensorShape({N}), indices_out), done); - } - - void AllocateOutputs(OpKernelContext* c, const Tensor* data, - const Tensor* partitions, const Tensor* partition_count, - OpOutputList* Tout, DoneCallback done) { - auto e_part_count = partition_count->flat<int32>(); - // Allocate output tensors of the right size - OP_REQUIRES_OK_ASYNC(c, c->output_list("outputs", Tout), done); - for (int p = 0; p < num_partitions_; p++) { - TensorShape shape; - shape.AddDim(e_part_count(p)); - for (int i = partitions->dims(); i < data->dims(); i++) { - shape.AddDim(data->dim_size(i)); - } - Tensor* out; - OP_REQUIRES_OK_ASYNC(c, Tout->allocate(p, shape, &out), done); - } - } - - void ComputeAsync(OpKernelContext* c, DoneCallback done) { - const Tensor& data = c->input(0); - const Tensor& partitions = c->input(1); - - OP_REQUIRES_ASYNC( - c, TensorShapeUtils::StartsWith(data.shape(), partitions.shape()), - errors::InvalidArgument("data.shape must start with partitions.shape, ", - "got data.shape = ", data.shape().DebugString(), - ", partitions.shape = ", - partitions.shape().DebugString()), - done); - - Tensor partition_count; - - // We must handle the case of empty partitions separately, - // because kernels don't work with 0-sized tensors. - if (partitions.NumElements() == 0) { - AllocatorAttributes alloc_attr; - alloc_attr.set_on_host(true); - OP_REQUIRES_OK_ASYNC( - c, c->allocate_temp(DT_INT32, TensorShape({num_partitions_}), - &partition_count, alloc_attr), - done); - auto e_part_count = partition_count.flat<int32>(); - for (int i = 0; i < num_partitions_; i++) e_part_count(i) = 0; - OpOutputList outputs; - this->AllocateOutputs(c, &data, &partitions, &partition_count, &outputs, - done); - if (c->status().ok()) done(); - return; - } - - // Prepare for counting. - OP_REQUIRES_OK_ASYNC( - c, c->allocate_temp(DT_INT32, TensorShape({num_partitions_}), - &partition_count), - done); - Tensor indices_out; - // Count how many times each partition index occurs. - // Also sort the info in partitions and output it in indices_out, - // in preparation for the next step. - this->CountAndSortParts(c, &partitions, &partition_count, &indices_out, - done); - if (!c->status().ok()) return; - - // In order to allocate the output tensor we have to move partition_count - // to CPU. - auto* stream = c->op_device_context()->stream(); - OP_REQUIRES_ASYNC(c, stream, errors::Internal("No GPU stream available."), - done); - Tensor cpu_tensor; - AllocatorAttributes alloc_attr; - alloc_attr.set_on_host(true); - alloc_attr.set_gpu_compatible(true); - OP_REQUIRES_OK_ASYNC( - c, c->allocate_temp(partition_count.dtype(), partition_count.shape(), - &cpu_tensor, alloc_attr), - done); - perftools::gputools::DeviceMemoryBase wrapped( - partition_count.flat<int32>().data(), num_partitions_ * sizeof(int32)); - const bool status = - stream - ->ThenMemcpy(cpu_tensor.flat<int32>().data(), wrapped, - num_partitions_ * sizeof(int32)) - .ok(); - OP_REQUIRES_ASYNC( - c, status, - errors::Internal("Failed to launch copy from device to host."), done); - - // Keep a reference to partition_count so that the buffer - // is not deallocated at the end of the function, before - // memcpy is completed. - TensorReference partition_ref(partition_count); - auto wrapped_callback = [this, c, &data, &partitions, indices_out, - partition_ref, cpu_tensor, done]() { - OpOutputList outputs; - this->AllocateOutputs(c, &data, &partitions, &cpu_tensor, &outputs, done); - if (!c->status().ok()) { - partition_ref.Unref(); - return; - } - int32 N = partitions.NumElements(); - int64 slice_size = data.NumElements() / N; - this->GatherSlices(c, &data, &indices_out, N, slice_size, outputs); - partition_ref.Unref(); - done(); - }; - - c->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute( - stream, wrapped_callback); - } - - protected: - void RadixSort(OpKernelContext* c, const Tensor* partitions, - Tensor* indices_in, Tensor* partitions_out, - Tensor* indices_out, DoneCallback done) { - int32 N = partitions->NumElements(); - const GPUDevice& device = c->eigen_device<GPUDevice>(); - const cudaStream_t& cu_stream = GetCudaStream(c); - - // Initialize the indices_in tensor using the Range GPU kernel. - RangeInit(device, 0, 1, N, indices_in->flat<int32>()); - // Obtain the pointers to inner buffers. - const int32* partitions_ptr = partitions->flat<int32>().data(); - int32* partitions_out_ptr = partitions_out->flat<int32>().data(); - int32* indices_in_ptr = indices_in->flat<int32>().data(); - int32* indices_out_ptr = indices_out->flat<int32>().data(); - // Determine temporary device storage requirements. - Tensor cub_temp_storage; - size_t temp_storage_bytes = 0; - cub::DeviceRadixSort::SortPairs( - NULL, temp_storage_bytes, partitions_ptr, partitions_out_ptr, - indices_in_ptr, indices_out_ptr, N, 0, sizeof(int32) * 8, cu_stream); - // Allocate temporary storage. - OP_REQUIRES_OK_ASYNC( - c, c->allocate_temp( - DT_INT8, TensorShape({static_cast<int64>(temp_storage_bytes)}), - &cub_temp_storage), - done); - // Radix-sort the partition information. - cub::DeviceRadixSort::SortPairs( - cub_temp_storage.flat<int8>().data(), temp_storage_bytes, - partitions_ptr, partitions_out_ptr, indices_in_ptr, indices_out_ptr, N, - 0, sizeof(int32) * 8, cu_stream); - } // At this point cub_temp_storage will be marked for deallocation. - - void CountAndSortParts(OpKernelContext* c, const Tensor* partitions, - Tensor* partition_count, Tensor* indices_out, - DoneCallback done) { - const GPUDevice& device = c->eigen_device<GPUDevice>(); - int32 N = partitions->NumElements(); - Tensor indices_in; - Tensor partitions_out; - - // Allocate memory for Radix-Sort. - this->AllocateTempSpace(c, N, &indices_in, &partitions_out, indices_out, - done); - if (!c->status().ok()) return; - this->RadixSort(c, partitions, &indices_in, &partitions_out, indices_out, - done); - if (!c->status().ok()) return; - // We still need a little bit of additional memory. However, - // we can reuse the indices_in tensor. We could also use atomic - // operations and no additional memory, but this approach seems faster. - - // Zero-out the allocated memory. - functor::SetZeroFunctor<GPUDevice, int32> zero_functor; - zero_functor(device, partition_count->flat<int32>()); - zero_functor(device, indices_in.flat<int32>()); - // Obtain the pointers to inner buffers. - int32* start_ptr = indices_in.flat<int32>().data(); - int32* end_ptr = partition_count->flat<int32>().data(); - // Obtain the starting and ending points of each interval. - ComputeIntervals(device, &partitions_out, N, num_partitions_, start_ptr, - end_ptr); - // Subtract to compute the number of appearances of each id. - ComputeItvLength(device, num_partitions_, start_ptr, end_ptr); - } // At this point indices_in and partitions_out will be marked - // for deallocation. - - void GatherSlices(OpKernelContext* c, const Tensor* data, - const Tensor* indices, int32 N, int64 slice_size, - OpOutputList& outs) { - const GPUDevice& device = c->eigen_device<GPUDevice>(); - const int32* ind_base = indices->flat<int32>().data(); - const T* data_base = data->flat<T>().data(); - - for (int p = 0; p < num_partitions_; p++) { - int32 indices_size = outs[p]->dim_size(0); - int64 out_size = outs[p]->NumElements(); - T* out_base = outs[p]->flat<T>().data(); - if (out_size > 0) - CallGatherKernel<T>(device, data_base, ind_base, out_base, N, - indices_size, slice_size, out_size); - ind_base += indices_size; - } - } - - int num_partitions_; -}; - -#define REGISTER_DYNAMIC_PARTITION_GPU(T) \ - REGISTER_KERNEL_BUILDER( \ - Name("DynamicPartition").Device(DEVICE_GPU).TypeConstraint<T>("T"), \ - DynamicPartitionOpGPU<T>) - -TF_CALL_GPU_NUMBER_TYPES(REGISTER_DYNAMIC_PARTITION_GPU); -TF_CALL_complex64(REGISTER_DYNAMIC_PARTITION_GPU); -TF_CALL_complex128(REGISTER_DYNAMIC_PARTITION_GPU); -#undef REGISTER_DYNAMIC_PARTITION_GPU - -} // namespace tensorflow - -#endif // GOOGLE_CUDA |