aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/mpi_collectives
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/mpi_collectives')
-rw-r--r--tensorflow/contrib/mpi_collectives/mpi_ops.py163
-rw-r--r--tensorflow/contrib/mpi_collectives/ring.cc80
-rw-r--r--tensorflow/contrib/mpi_collectives/ring.cu.cc117
-rw-r--r--tensorflow/contrib/mpi_collectives/ring.h327
4 files changed, 0 insertions, 687 deletions
diff --git a/tensorflow/contrib/mpi_collectives/mpi_ops.py b/tensorflow/contrib/mpi_collectives/mpi_ops.py
deleted file mode 100644
index bd7096d9ce..0000000000
--- a/tensorflow/contrib/mpi_collectives/mpi_ops.py
+++ /dev/null
@@ -1,163 +0,0 @@
-# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# =============================================================================
-"""Inter-process communication using MPI."""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import tensorflow as tf
-
-from tensorflow.python.framework import errors
-from tensorflow.python.framework import load_library
-from tensorflow.python.framework import ops
-from tensorflow.python.platform import resource_loader
-from tensorflow.python.platform import tf_logging as logging
-
-
-def _load_library(name, op_list=None):
- """Loads a .so file containing the specified operators.
-
- Args:
- name: The name of the .so file to load.
- op_list: A list of names of operators that the library should have. If None
- then the .so file's contents will not be verified.
-
- Raises:
- NameError if one of the required ops is missing.
- """
- try:
- filename = resource_loader.get_path_to_datafile(name)
- library = load_library.load_op_library(filename)
- for expected_op in (op_list or []):
- for lib_op in library.OP_LIST.op:
- if lib_op.name == expected_op:
- break
- else:
- raise NameError('Could not find operator %s in dynamic library %s' %
- (expected_op, name))
- return library
- except errors.NotFoundError:
- logging.warning('%s file could not be loaded.', name)
-
-
-MPI_LIB = _load_library(
- 'mpi_collectives.so',
- ['MPISize', 'MPIRank', 'MPILocalRank', 'MPIAllgather', 'MPIAllreduce'])
-
-
-def size(name=None):
- """An op which returns the number of MPI processes.
-
- This is equivalent to running `MPI_Comm_size(MPI_COMM_WORLD, ...)` to get the
- size of the global communicator.
-
- Returns:
- An integer scalar containing the number of MPI processes.
- """
- return MPI_LIB.mpi_size(name=name)
-
-
-ops.NotDifferentiable('MPISize')
-
-
-def rank(name=None):
- """An op which returns the MPI rank of the calling process.
-
- This is equivalent to running `MPI_Comm_rank(MPI_COMM_WORLD, ...)` to get the
- rank of the current process in the global communicator.
-
- Returns:
- An integer scalar with the MPI rank of the calling process.
- """
- return MPI_LIB.mpi_rank(name=name)
-
-
-ops.NotDifferentiable('MPIRank')
-
-
-def init(name=None):
- """An op which initializes MPI on the device on which it is run.
-
- All future MPI ops must be run on the same device that the `init` op was run
- on.
- """
- return MPI_LIB.mpi_init(name=name)
-
-
-ops.NotDifferentiable('MPIInit')
-
-
-def local_rank(name=None):
- """An op which returns the local MPI rank of the calling process, within the
- node that it is running on. For example, if there are seven processes running
- on a node, their local ranks will be zero through six, inclusive.
-
- This is equivalent to running `MPI_Comm_rank(...)` on a new communicator
- which only includes processes on the same node.
-
- Returns:
- An integer scalar with the local MPI rank of the calling process.
- """
- return MPI_LIB.mpi_local_rank(name=name)
-
-
-ops.NotDifferentiable('MPILocalRank')
-
-
-def _allreduce(tensor, name=None):
- """An op which sums an input tensor over all the MPI processes.
-
- The reduction operation is keyed by the name of the op. The tensor type and
- shape must be the same on all MPI processes for a given name. The reduction
- will not start until all processes are ready to send and receive the tensor.
-
- Returns:
- A tensor of the same shape and type as `tensor`, summed across all
- processes.
- """
- return MPI_LIB.mpi_allreduce(tensor, name=name)
-
-
-ops.NotDifferentiable('MPIAllreduce')
-
-
-def allgather(tensor, name=None):
- """An op which concatenates the input tensor with the same input tensor on
- all other MPI processes.
-
- The concatenation is done on the first dimension, so the input tensors on the
- different processes must have the same rank and shape, except for the first
- dimension, which is allowed to be different.
-
- Returns:
- A tensor of the same type as `tensor`, concatenated on dimension zero
- across all processes. The shape is identical to the input shape, except for
- the first dimension, which may be greater and is the sum of all first
- dimensions of the tensors in different MPI processes.
- """
- # Specify that first allgather is to collect the tensor gather sizes,
- # indicated by passing in a scalar (0-D tensor) of value 0
- sizes_flag = tf.constant(0, dtype=tf.int64, name='size_flag_const')
- my_size = tf.slice(
- tf.shape(tensor, out_type=tf.int64), [0], [1], name='size_slice')
- if name is None:
- name = 'allgather'
- sizing_name = '{}_sizing'.format(name)
- sizes = MPI_LIB.mpi_allgather(my_size, sizes_flag, name=sizing_name)
- return MPI_LIB.mpi_allgather(tensor, sizes, name=name)
-
-
-ops.NotDifferentiable('MPIAllgather')
diff --git a/tensorflow/contrib/mpi_collectives/ring.cc b/tensorflow/contrib/mpi_collectives/ring.cc
deleted file mode 100644
index d93233eb21..0000000000
--- a/tensorflow/contrib/mpi_collectives/ring.cc
+++ /dev/null
@@ -1,80 +0,0 @@
-/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#ifdef TENSORFLOW_USE_MPI
-
-#define EIGEN_USE_THREADS
-
-#include "tensorflow/contrib/mpi_collectives/ring.h"
-
-namespace tensorflow {
-namespace contrib {
-namespace mpi {
-
-using CPUDevice = Eigen::ThreadPoolDevice;
-
-extern template MPI_Datatype MPIType<float>();
-extern template MPI_Datatype MPIType<int>();
-extern template MPI_Datatype MPIType<long long>();
-extern template DataType TensorFlowDataType<float>();
-extern template DataType TensorFlowDataType<int>();
-extern template DataType TensorFlowDataType<long long>();
-
-// Generate all necessary specializations for RingAllreduce.
-template Status RingAllreduce<CPUDevice, int>(OpKernelContext*, const Tensor*,
- Tensor*, Tensor*);
-template Status RingAllreduce<CPUDevice, long long>(OpKernelContext*,
- const Tensor*, Tensor*,
- Tensor*);
-template Status RingAllreduce<CPUDevice, float>(OpKernelContext*, const Tensor*,
- Tensor*, Tensor*);
-
-// Generate all necessary specializations for RingAllgather.
-template Status RingAllgather<CPUDevice, int>(OpKernelContext*, const Tensor*,
- const std::vector<size_t>&,
- Tensor*);
-template Status RingAllgather<CPUDevice, long long>(OpKernelContext*,
- const Tensor*,
- const std::vector<size_t>&,
- Tensor*);
-template Status RingAllgather<CPUDevice, float>(OpKernelContext*, const Tensor*,
- const std::vector<size_t>&,
- Tensor*);
-
-// Copy data on a CPU using a straight-forward memcpy.
-template <>
-void CopyTensorData<CPUDevice>(void* dst, void* src, size_t size) {
- std::memcpy(dst, src, size);
-};
-
-// Accumulate values on a CPU.
-#define GENERATE_ACCUMULATE(type) \
- template <> \
- void AccumulateTensorData<CPUDevice, type>(type * dst, type * src, \
- size_t size) { \
- for (unsigned int i = 0; i < size; i++) { \
- dst[i] += src[i]; \
- } \
- };
-GENERATE_ACCUMULATE(int);
-GENERATE_ACCUMULATE(long long);
-GENERATE_ACCUMULATE(float);
-#undef GENERATE_ACCUMULATE
-
-} // namespace mpi
-} // namespace contrib
-} // namespace tensorflow
-
-#endif // TENSORFLOW_USE_MPI
diff --git a/tensorflow/contrib/mpi_collectives/ring.cu.cc b/tensorflow/contrib/mpi_collectives/ring.cu.cc
deleted file mode 100644
index 2f3eef366a..0000000000
--- a/tensorflow/contrib/mpi_collectives/ring.cu.cc
+++ /dev/null
@@ -1,117 +0,0 @@
-/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#ifdef TENSORFLOW_USE_MPI
-
-#if GOOGLE_CUDA
-
-#define EIGEN_USE_GPU
-
-#include "tensorflow/contrib/mpi_collectives/ring.h"
-
-namespace tensorflow {
-namespace contrib {
-namespace mpi {
-
-using CPUDevice = Eigen::ThreadPoolDevice;
-
-template <>
-MPI_Datatype MPIType<float>() {
- return MPI_FLOAT;
-};
-template <>
-MPI_Datatype MPIType<int>() {
- return MPI_INT;
-};
-template <>
-MPI_Datatype MPIType<long long>() {
- return MPI_LONG_LONG;
-};
-
-template <>
-DataType TensorFlowDataType<float>() {
- return DT_FLOAT;
-};
-template <>
-DataType TensorFlowDataType<int>() {
- return DT_INT32;
-};
-template <>
-DataType TensorFlowDataType<long long>() {
- return DT_INT64;
-};
-
-// Generate all necessary specializations for RingAllreduce.
-template Status RingAllreduce<GPUDevice, int>(OpKernelContext*, const Tensor*,
- Tensor*, Tensor*);
-template Status RingAllreduce<GPUDevice, long long>(OpKernelContext*,
- const Tensor*, Tensor*,
- Tensor*);
-template Status RingAllreduce<GPUDevice, float>(OpKernelContext*, const Tensor*,
- Tensor*, Tensor*);
-
-// Generate all necessary specializations for RingAllgather.
-template Status RingAllgather<GPUDevice, int>(OpKernelContext*, const Tensor*,
- const std::vector<size_t>&,
- Tensor*);
-template Status RingAllgather<GPUDevice, long long>(OpKernelContext*,
- const Tensor*,
- const std::vector<size_t>&,
- Tensor*);
-template Status RingAllgather<GPUDevice, float>(OpKernelContext*, const Tensor*,
- const std::vector<size_t>&,
- Tensor*);
-
-// Synchronously copy data on the GPU, using a different stream than the default
-// and than TensorFlow to avoid synchronizing on operations unrelated to the
-// allreduce.
-template <>
-void CopyTensorData<GPUDevice>(void* dst, void* src, size_t size) {
- auto stream = CudaStreamForMPI();
- cudaMemcpyAsync(dst, src, size, cudaMemcpyDeviceToDevice, stream);
- cudaStreamSynchronize(stream);
-};
-
-// Elementwise accumulation kernel for GPU.
-template <typename T>
-__global__ void elemwise_accum(T* out, const T* in, const size_t N) {
- for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < N;
- i += blockDim.x * gridDim.x) {
- out[i] += in[i];
- }
-}
-
-// Synchronously accumulate tensors on the GPU, using a different stream than
-// the default and than TensorFlow to avoid synchronizing on operations
-// unrelated to the allreduce.
-#define GENERATE_ACCUMULATE(type) \
- template <> \
- void AccumulateTensorData<GPUDevice, type>(type * dst, type * src, \
- size_t size) { \
- auto stream = CudaStreamForMPI(); \
- elemwise_accum<type><<<32, 256, 0, stream>>>(dst, src, size); \
- cudaStreamSynchronize(stream); \
- };
-GENERATE_ACCUMULATE(int);
-GENERATE_ACCUMULATE(long long);
-GENERATE_ACCUMULATE(float);
-#undef GENERATE_ACCUMULATE
-
-} // namespace mpi
-} // namespace contrib
-} // namespace tensorflow
-#endif // GOOGLE_CUDA
-
-#endif // TENSORFLOW_USE_MPI
diff --git a/tensorflow/contrib/mpi_collectives/ring.h b/tensorflow/contrib/mpi_collectives/ring.h
deleted file mode 100644
index cae57ce60e..0000000000
--- a/tensorflow/contrib/mpi_collectives/ring.h
+++ /dev/null
@@ -1,327 +0,0 @@
-/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-==============================================================================*/
-
-#ifndef TENSORFLOW_CONTRIB_MPI_H_
-#define TENSORFLOW_CONTRIB_MPI_H_
-
-#ifdef TENSORFLOW_USE_MPI
-
-#include "tensorflow/core/framework/op.h"
-#include "tensorflow/core/framework/op_kernel.h"
-#include "tensorflow/core/framework/shape_inference.h"
-
-#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
-#include "tensorflow/core/framework/tensor_types.h"
-
-#if GOOGLE_CUDA
-#include "cuda_runtime.h"
-#endif
-
-// Needed to avoid header issues with C++-supporting MPI implementations
-#define OMPI_SKIP_MPICXX
-#include "third_party/mpi/mpi.h"
-
-#define TAG_TENSOR 12
-
-namespace tensorflow {
-namespace contrib {
-namespace mpi {
-
-using CPUDevice = Eigen::ThreadPoolDevice;
-using GPUDevice = Eigen::GpuDevice;
-
-// Convert from templated types to values we can pass to MPI.
-template <typename T>
-MPI_Datatype MPIType();
-
-// Convert from templated types to TensorFlow data types.
-template <typename T>
-DataType TensorFlowDataType();
-
-#define MPI_REQUIRES_OK(MPI_STATUS) \
- if ((MPI_STATUS) != MPI_SUCCESS) { \
- return errors::Unknown("MPI operation failed unexpectedly."); \
- }
-
-// Copy data from one tensor to another tensor.
-// This uses a custom CUDA stream on GPU, which is necessary to overlay the
-// backpropagation computations with the allreduce.
-template <typename Device>
-void CopyTensorData(void* destination, void* source, size_t size);
-
-// Add a tensor into another tensor, accumulating in place.
-// This uses a custom CUDA stream on GPU, which is necessary to overlay the
-// backpropagation computations with the allreduce.
-template <typename Device, typename T>
-void AccumulateTensorData(T* destination, T* source, size_t size);
-
-// We need to get the right stream for doing CUDA memory transfers and
-// operations, which is possibly different from the standard TensorFlow stream.
-#if GOOGLE_CUDA
-cudaStream_t CudaStreamForMPI();
-#endif
-
-/* Perform a ring allreduce on the data. Allocate the necessary output tensor
- * and store it in the output parameter.
- *
- * Assumes that all MPI processes are doing an allreduce of the same tensor,
- * with the same dimensions.
- *
- * A ring allreduce is a bandwidth-optimal way to do an allreduce. To do the
- * allreduce, the nodes involved are arranged in a ring:
- *
- * .--0--.
- * / \
- * 3 1
- * \ /
- * *--2--*
- *
- * Each node always sends to the next clockwise node in the ring, and receives
- * from the previous one.
- *
- * The allreduce is done in two parts: a scatter-reduce and an allgather. In
- * the scatter reduce, a reduction is done, so that each node ends up with a
- * chunk of the final output tensor which has contributions from all other
- * nodes. In the allgather, those chunks are distributed among all the nodes,
- * so that all nodes have the entire output tensor.
- *
- * Both of these operations are done by dividing the input tensor into N
- * evenly sized chunks (where N is the number of nodes in the ring).
- *
- * The scatter-reduce is done in N-1 steps. In the ith step, node j will send
- * the (j - i)th chunk and receive the (j - i - 1)th chunk, adding it in to
- * its existing data for that chunk. For example, in the first iteration with
- * the ring depicted above, you will have the following transfers:
- *
- * Segment 0: Node 0 --> Node 1
- * Segment 1: Node 1 --> Node 2
- * Segment 2: Node 2 --> Node 3
- * Segment 3: Node 3 --> Node 0
- *
- * In the second iteration, you'll have the following transfers:
- *
- * Segment 0: Node 1 --> Node 2
- * Segment 1: Node 2 --> Node 3
- * Segment 2: Node 3 --> Node 0
- * Segment 3: Node 0 --> Node 1
- *
- * After this iteration, Node 2 has 3 of the four contributions to Segment 0.
- * The last iteration has the following transfers:
- *
- * Segment 0: Node 2 --> Node 3
- * Segment 1: Node 3 --> Node 0
- * Segment 2: Node 0 --> Node 1
- * Segment 3: Node 1 --> Node 2
- *
- * After this iteration, Node 3 has the fully accumulated Segment 0; Node 0
- * has the fully accumulated Segment 1; and so on. The scatter-reduce is
- * complete.
- *
- * Next, the allgather distributes these fully accumululated chunks across all
- * nodes. Communication proceeds in the same ring, once again in N-1 steps. At
- * the ith step, node j will send chunk (j - i + 1) and receive chunk (j - i).
- * For example, at the first iteration, the following transfers will occur:
- *
- * Segment 0: Node 3 --> Node 0
- * Segment 1: Node 0 --> Node 1
- * Segment 2: Node 1 --> Node 2
- * Segment 3: Node 2 --> Node 3
- *
- * After the first iteration, Node 0 will have a fully accumulated Segment 0
- * (from Node 3) and Segment 1. In the next iteration, Node 0 will send its
- * just-received Segment 0 onward to Node 1, and receive Segment 3 from Node 3.
- * After this has continued for N - 1 iterations, all nodes will have a the
- * fully accumulated tensor.
- *
- * Each node will do (N-1) sends for the scatter-reduce and (N-1) sends for the
- * allgather. Each send will contain K / N bytes, if there are K bytes in the
- * original tensor on every node. Thus, each node sends and receives 2K(N - 1)/N
- * bytes of data, and the performance of the allreduce (assuming no latency in
- * connections) is constrained by the slowest interconnect between the nodes.
- *
- */
-template <typename Device, typename T>
-Status RingAllreduce(OpKernelContext* context, const Tensor* input,
- Tensor* temp, Tensor* output) {
- // Acquire MPI size and rank
- int n, r;
- MPI_REQUIRES_OK(MPI_Comm_size(MPI_COMM_WORLD, &n));
- MPI_REQUIRES_OK(MPI_Comm_rank(MPI_COMM_WORLD, &r));
-
- T* buffer = (T*)output->tensor_data().data();
-
- CopyTensorData<Device>((void*)buffer, (void*)input->tensor_data().data(),
- output->tensor_data().size());
-
- // Calculate segment sizes and segment ends
- const size_t elements_to_reduce = input->NumElements();
- const size_t segment_size = elements_to_reduce / n;
- std::vector<size_t> segment_sizes(n, segment_size);
-
- const size_t residual = elements_to_reduce % n;
- for (size_t i = 0; i < residual; ++i) {
- segment_sizes[i]++;
- }
-
- std::vector<size_t> segment_starts(n);
- segment_starts[0] = 0;
- for (size_t i = 1; i < segment_starts.size(); ++i) {
- segment_starts[i] = segment_starts[i - 1] + segment_sizes[i - 1];
- }
-
- assert(segment_starts[n - 1] + segment_sizes[n - 1] == elements_to_reduce);
-
- T* segment_recv = (T*)temp->tensor_data().data();
-
- // Receive from your left neighbor with wrap-around
- const size_t recv_from = ((r - 1) + n) % n;
-
- // Send to your right neighbor with wrap-around
- const size_t send_to = (r + 1) % n;
-
- MPI_Status recv_status;
- MPI_Request recv_req;
-
- // Now start ring. At every step, for every rank, we iterate through
- // segments with wraparound and send and recv from our neighbors and reduce
- // locally. At the i'th iteration, rank r, sends segment (r-i) and receives
- // segment (r-i-1).
- for (int i = 0; i < n - 1; i++) {
- const size_t send_seg_id = ((r - i) + n) % n;
- const size_t recv_seg_id = ((r - i - 1) + n) % n;
-
- T* segment_send = &(buffer[segment_starts[send_seg_id]]);
-
- MPI_REQUIRES_OK(MPI_Irecv(segment_recv, segment_sizes[recv_seg_id],
- MPIType<T>(), recv_from, TAG_TENSOR,
- MPI_COMM_WORLD, &recv_req));
-
- MPI_REQUIRES_OK(MPI_Send(segment_send, segment_sizes[send_seg_id],
- MPIType<T>(), send_to, TAG_TENSOR,
- MPI_COMM_WORLD));
-
- T* segment_update = &(buffer[segment_starts[recv_seg_id]]);
-
- // Wait for recv to complete before reduction
- MPI_REQUIRES_OK(MPI_Wait(&recv_req, &recv_status));
-
- const size_t recv_seg_size = segment_sizes[recv_seg_id];
- AccumulateTensorData<Device, T>(segment_update, segment_recv,
- recv_seg_size);
- }
-
- // Now start pipelined ring allgather. At every step, for every rank, we
- // iterate through segments with wraparound and send and recv from our
- // neighbors. At the i'th iteration, rank r, sends segment (r-i+1) and
- // receives segment (r-i).
- for (size_t i = 0; i < n - 1; ++i) {
- const size_t send_seg_id = ((r - i + 1) + n) % n;
- const size_t recv_seg_id = ((r - i) + n) % n;
-
- // Segment to send - at every iteration we send segment (r-i+1)
- T* segment_send = &(buffer[segment_starts[send_seg_id]]);
-
- // Segment to recv - at every iteration we receive segment (r-i)
- T* segment_recv = &(buffer[segment_starts[recv_seg_id]]);
-
- MPI_REQUIRES_OK(MPI_Sendrecv(
- segment_send, segment_sizes[send_seg_id], MPIType<T>(), send_to,
- TAG_TENSOR, segment_recv, segment_sizes[recv_seg_id], MPIType<T>(),
- recv_from, TAG_TENSOR, MPI_COMM_WORLD, &recv_status));
- }
-
- return Status::OK();
-}
-
-// Perform a ring allgather on a Tensor. Other ranks may allgather with a
-// tensor which differs in the first dimension only; all other dimensions must
-// be the same.
-//
-// For more information on the ring allgather, read the documentation for the
-// ring allreduce, which includes a ring allgather.
-template <typename Device, typename T>
-Status RingAllgather(OpKernelContext* context, const Tensor* input,
- const std::vector<size_t>& sizes, Tensor* output) {
- // Acquire MPI size and rank
- int n, r;
- MPI_REQUIRES_OK(MPI_Comm_size(MPI_COMM_WORLD, &n));
- MPI_REQUIRES_OK(MPI_Comm_rank(MPI_COMM_WORLD, &r));
-
- assert(sizes.size() == n);
- assert(input->dim_size(0) == sizes[r]);
-
- // Compute number of elements in every "row". We can't compute number of
- // elements in every chunks, because those chunks are variable length.
- size_t elements_per_row = 1;
- for (int i = 1; i < input->shape().dims(); i++) {
- elements_per_row *= input->dim_size(i);
- }
-
- // Copy data from input tensor to correct place in output tensor.
- std::vector<size_t> segment_starts(n);
- segment_starts[0] = 0;
- for (int i = 1; i < n; i++) {
- segment_starts[i] = segment_starts[i - 1] + elements_per_row * sizes[i - 1];
- }
- size_t offset = segment_starts[r];
-
- // Copy data to the right offset for this rank.
- T* buffer = (T*)output->tensor_data().data();
- CopyTensorData<Device>((void*)(buffer + offset),
- (void*)input->tensor_data().data(),
- elements_per_row * sizes[r] * sizeof(T));
-
- // Receive from your left neighbor with wrap-around
- const size_t recv_from = ((r - 1) + n) % n;
-
- // Send to your right neighbor with wrap-around
- const size_t send_to = (r + 1) % n;
-
- // Perform a ring allgather. At every step, for every rank, we iterate
- // through segments with wraparound and send and recv from our neighbors.
- // At the i'th iteration, rank r, sends segment (r-i) and receives segment
- // (r-1-i).
- MPI_Status recv_status;
- for (size_t i = 0; i < n - 1; ++i) {
- const size_t send_seg_id = ((r - i) + n) % n;
- const size_t recv_seg_id = ((r - i - 1) + n) % n;
-
- // Segment to send - at every iteration we send segment (r-i)
- size_t offset_send = segment_starts[send_seg_id];
- size_t rows_send = sizes[send_seg_id];
- T* segment_send = &(buffer[offset_send]);
-
- // Segment to recv - at every iteration we receive segment (r-1-i)
- size_t offset_recv = segment_starts[recv_seg_id];
- size_t rows_recv = sizes[recv_seg_id];
- T* segment_recv = &(buffer[offset_recv]);
-
- MPI_REQUIRES_OK(MPI_Sendrecv(
- segment_send, elements_per_row * rows_send, MPIType<T>(), send_to,
- TAG_TENSOR, segment_recv, elements_per_row * rows_recv, MPIType<T>(),
- recv_from, TAG_TENSOR, MPI_COMM_WORLD, &recv_status));
- }
-
- return Status::OK();
-}
-
-} // namespace mpi
-} // namespace contrib
-} // namespace tensorflow
-
-#endif // TENSORFLOW_USE_MPI
-
-#undef TENSORFLOW_CONTRIB_MPI_H_
-#endif // TENSORFLOW_CONTRIB_MPI_H_