aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tensorflow/contrib/mpi/README.md10
-rw-r--r--tensorflow/contrib/mpi/mpi_rendezvous_mgr.cc135
-rw-r--r--tensorflow/contrib/mpi/mpi_rendezvous_mgr.h6
-rw-r--r--tensorflow/contrib/mpi/mpi_utils.h16
4 files changed, 85 insertions, 82 deletions
diff --git a/tensorflow/contrib/mpi/README.md b/tensorflow/contrib/mpi/README.md
index c48b354926..b0d03d05a2 100644
--- a/tensorflow/contrib/mpi/README.md
+++ b/tensorflow/contrib/mpi/README.md
@@ -10,7 +10,7 @@
## Overview
-By using this protocol the TensorFlow can take advantage of the high performance networking primitives that are offered via the MPI API. This enables TensorFlow to take advantage of high performance low latency networks such as Infiniband. These changes are largely transparent to the user who only has to change the offered protocol and launch the script using the 'mpirun' launcher. For example:
+By using this protocol TensorFlow can take advantage of the high performance networking primitives that are offered via the MPI API. This enables TensorFlow to take advantage of high performance low latency networks such as Infiniband. These changes are largely transparent to the user who only has to change the offered protocol and launch the script using the 'mpirun' launcher. For example:
```mpirun -np 2 python my_neuralnet.py ```
@@ -28,7 +28,7 @@ This environment variable allows you to disable the MPI path before launch (e.g.
**MPI_OPTIMAL_PATH=[0,1]**
When set to 0 it will use the default path where tensors are encoded to ProtoText before being copied to a remote process. When set to 1 a more optimal path will be taken where only the tensor description is encoded while the actual tensor data is transferred directly from the source buffer to the destination buffer.
-This path is disabled by default as it requires that MPI library can directly access the pointer to the data. For CPU backed buffers this is no problem, however for GPU backed buffers this requires MPI libraries that are built with CUDA support (CUDA Aware). When using non-CUDA aware MPI libraries and GPU buffers you will get segmentation faults.
+This path is disabled by default as it requires that the MPI library can directly access the pointer to the data. For CPU backed buffers this is no problem, however for GPU backed buffers this requires MPI libraries that are built with CUDA support (CUDA Aware). When using non-CUDA aware MPI libraries and GPU buffers you will get segmentation faults.
@@ -50,7 +50,7 @@ The implementation takes over the responsibility for sending and receiving tenso
To this end once the code is loaded a dedicated thread will be launched that handles all MPI operations. This thread will loop through a set of operations:
* Send requests placed on the request queue to the sending process
-Once a request for a tensor is received two callbacks are created. The first one is to request the tensor and the second one is executed once the requested data has arrived. To this end the request is placed in a queue and will be send once the MPI thread serves the queue. This sending is done using non-blocking MPI_Isend operations.
+Once a request for a tensor is received two callbacks are created. The first one is to request the tensor and the second one is executed once the requested data has arrived. To this end the request is placed in a queue and will be sent once the MPI thread services the queue. This sending is done using non-blocking MPI_Isend operations.
* Send tensor data in response to a request call
Once a request has arrived from a remote process the request is forwarded to the original TensorFlow code which looks up the tensor in the waiting table. Once the tensor has been found a callback is executed which places the found tensor on the sendQueue for the MPI thread. Once the sendQueue is served the tensor data will be send using non-blocking send operations (MP_Isend) to the remote process.
@@ -59,11 +59,11 @@ Once a request has arrived from a remote process the request is forwarded to the
The MPI thread will check if there are any incoming tensor request messages on the communication lines using MPI_Iprobe. Once a request has been received it will be passed on to the standard TensorFlow code and eventually will be placed on the sendQueue.
* Receive tensor
-At some point after a request has been send the remote process will transmit the tensor. This tensor will be received and we look-up the callback that is associated with this tensor in our request table and execute the callback on the received data.
+At some point after a request has been sent the remote process will transmit the tensor. This tensor will be received and we look-up the callback that is associated with this tensor in our request table and execute the callback on the received data.
In the implementation all send operations are non-blocking, all probe operations are non-blocking and all receive-operations are blocking. The receive-operations are only executed after the probe has determined that there is something to receive.
-The MPI processes identify each other using an MPI process ID. The TensorFlow gRPC processes identify each other using a name, during launch we create a mapping between the TensorFlow process name and the MPI process ID to allow the processes to communicate with the correct destinations when using MPI operations.
+The MPI processes identify each other using an MPI process ID. The TensorFlow gRPC processes identify each other using a name. During launch we create a mapping between the TensorFlow process name and the MPI process ID to allow the processes to communicate with the correct destinations when using MPI operations.
diff --git a/tensorflow/contrib/mpi/mpi_rendezvous_mgr.cc b/tensorflow/contrib/mpi/mpi_rendezvous_mgr.cc
index fac3a45cc7..2e815477ee 100644
--- a/tensorflow/contrib/mpi/mpi_rendezvous_mgr.cc
+++ b/tensorflow/contrib/mpi/mpi_rendezvous_mgr.cc
@@ -30,6 +30,30 @@ limitations under the License.
namespace tensorflow {
+MPIRendezvousMgr::MPIRendezvousMgr(const WorkerEnv* env)
+ : BaseRendezvousMgr(env), worker_env_2(env), use_optimal_transfer_(false) {
+
+ const char* mpienv = getenv("MPI_OPTIMAL_PATH");
+ if (mpienv && mpienv[0] == '1') {
+ LOG(INFO) << "MPI Optimal copy path enabled (Requires CUDA-Aware MPI when "
+ "using GPUs)\n";
+ use_optimal_transfer_ = true;
+ }
+
+ // extract worker-name
+ auto parsed = env->local_devices[0]->parsed_name();
+ const std::string task_id = strings::StrCat(parsed.job, ":", parsed.replica);
+
+ mpiutils_ = new MPIUtils(task_id);
+ background_thread_ =
+ std::thread(&MPIRendezvousMgr::MPIBackgroundThread, this);
+}
+
+BaseRemoteRendezvous* MPIRendezvousMgr::Create(int64 step_id,
+ const WorkerEnv* worker_env) {
+ return new MPIRemoteRendezvous(worker_env, step_id, mpiutils_, this);
+}
+
void MPIRemoteRendezvous::RecvFromRemoteAsync(
const Rendezvous::ParsedKey& parsed, const Rendezvous::Args& recv_args,
DoneCallback done) {
@@ -38,13 +62,16 @@ void MPIRemoteRendezvous::RecvFromRemoteAsync(
MPIRequestTensorCall* rendezvous_call = new MPIRequestTensorCall();
VLOG(2) << "MPI User requested " << parsed.FullKey()
- << " @ step: " << step_id_ << std::endl;
+ << " @ step: " << step_id_;
- const int dst = mpiutils_->GetSourceID(parsed.FullKey().ToString());
+ std::string src_task =
+ strings::StrCat(parsed.src.job, ":", parsed.src.replica);
+ const int dst = mpiutils_->GetSourceID(src_task);
Device* dst_device;
if (s.ok()) {
s = env_->device_mgr->LookupDevice(parsed.dst_device, &dst_device);
+ CHECK(s.ok()) << "Device lookup failed";
} else {
done(s, Args(), recv_args, Tensor{}, false);
return;
@@ -70,26 +97,28 @@ void MPIRemoteRendezvous::RecvFromRemoteAsync(
// Create the function which is called when the Tensor is send by remote
const int64 temp1 = step_id_;
- rendezvous_call->recv_call_ = [this, parsed, recv_args, done, dst, temp1,
- rendezvous_call](MPIRecvTensorResponse mRes) {
+ rendezvous_call->recv_call_ =
+ [this, parsed, recv_args, done, dst, temp1, rendezvous_call](
+ MPIRecvTensorResponse mpi_response) {
Status s;
Device* dst_device;
if (s.ok()) {
s = env_->device_mgr->LookupDevice(parsed.dst_device, &dst_device);
+ CHECK(s.ok()) << "Device lookup failed";
}
VLOG(3) << "MPI Received tensor " << parsed.FullKey()
- << " @ step: " << temp1 << " single-send: " << mRes.singlesend()
- << std::endl;
+ << " @ step: " << temp1
+ << " single-send: " << mpi_response.singlesend();
Tensor val;
- if (mRes.singlesend()) {
- dst_device->MakeTensorFromProto(mRes.response().tensor(),
+ if (mpi_response.singlesend()) {
+ dst_device->MakeTensorFromProto(mpi_response.response().tensor(),
recv_args.alloc_attrs, &val);
} else {
TensorResponse tr;
tr.InitAlloc(dst_device, recv_args.alloc_attrs);
- tr.InitPartial(mRes.response());
+ tr.InitPartial(mpi_response.response());
const size_t nBytes = tr.tensor().TotalBytes();
void* data = const_cast<void*>(DMAHelper::base(&tr.tensor()));
MPI_Status status;
@@ -98,16 +127,18 @@ void MPIRemoteRendezvous::RecvFromRemoteAsync(
val = std::move(tr.tensor());
}
- done(s, Args(), recv_args, val, mRes.response().is_dead());
+ done(s, Args(), recv_args, val, mpi_response.response().is_dead());
};
- auto mgr = dynamic_cast<MPIRendezvousMgr*>(this->rendezvous_mgr_);
+ MPIRendezvousMgr* mgr =
+ reinterpret_cast<MPIRendezvousMgr*>(this->rendezvous_mgr_);
mgr->QueueRequest(parsed.FullKey().ToString(), step_id_,
std::move(request_call), rendezvous_call);
}
MPIRemoteRendezvous::~MPIRemoteRendezvous() {
- auto mgr = dynamic_cast<MPIRendezvousMgr*>(this->rendezvous_mgr_);
+ MPIRendezvousMgr* mgr =
+ reinterpret_cast<MPIRendezvousMgr*>(this->rendezvous_mgr_);
mgr->RemoveStepID(step_id_);
}
@@ -122,25 +153,28 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
const int64 step_id = request.step_id();
const std::string& key = request.rendezvous_key();
Rendezvous::ParsedKey parsed;
- Status s = Rendezvous::ParseKey(key, &parsed);
+ TF_CHECK_OK(Rendezvous::ParseKey(key, &parsed));
MPIRecvTensorCallBack send_cb = [this, mpi_dst, parsed](
const Status& status, const Rendezvous::Args& send_args,
const Rendezvous::Args& recv_args, const Tensor& val, bool is_dead,
- MPISendTensorCall* mpiSC) {
+ MPISendTensorCall* mpi_send_call) {
// TODO(jbedorf) this should be a loop over max size
- CHECK(mpiSC->mRes_.ByteSize() < INT_MAX)
+ CHECK(mpi_send_call->mRes_.ByteSize() < INT_MAX)
<< "Buffer too large for single transfer";
- MPI_CHECK(MPI_Alloc_mem(mpiSC->mRes_.ByteSize(), MPI_INFO_NULL,
- &mpiSC->send_buffer_));
- mpiSC->mRes_.SerializeToArray(mpiSC->send_buffer_, mpiSC->mRes_.ByteSize());
-
- MPI_CHECK(MPI_Isend(
- mpiSC->send_buffer_, static_cast<int>(mpiSC->mRes_.ByteSize()),
- MPI_CHAR, mpi_dst, TAG_SENDTENSOR, MPI_COMM_WORLD, &(mpiSC->msg1_)));
- MPI_CHECK(MPI_Test(&mpiSC->msg1_, &mpiSC->done1_, MPI_STATUS_IGNORE));
-
- if (!mpiSC->mRes_.singlesend()) {
+ MPI_CHECK(MPI_Alloc_mem(mpi_send_call->mRes_.ByteSize(), MPI_INFO_NULL,
+ &mpi_send_call->send_buffer_));
+ mpi_send_call->mRes_.SerializeToArray(mpi_send_call->send_buffer_,
+ mpi_send_call->mRes_.ByteSize());
+
+ MPI_CHECK(MPI_Isend(mpi_send_call->send_buffer_,
+ static_cast<int>(mpi_send_call->mRes_.ByteSize()),
+ MPI_CHAR, mpi_dst, TAG_SENDTENSOR, MPI_COMM_WORLD,
+ &(mpi_send_call->msg1_)));
+ MPI_CHECK(MPI_Test(&mpi_send_call->msg1_, &mpi_send_call->done1_,
+ MPI_STATUS_IGNORE));
+
+ if (!mpi_send_call->mRes_.singlesend()) {
const int tensor_size = static_cast<int>(val.TotalBytes());
void* temp = const_cast<void*>(DMAHelper::base(&val));
@@ -150,10 +184,10 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
// TODO(jbedorf) this should be a loop over max size
MPI_CHECK(MPI_Isend(temp, tensor_size, MPI_CHAR, mpi_dst, TAG_SENDTENSOR2,
- MPI_COMM_WORLD, &mpiSC->msg2_));
- mpiSC->done2_ = 0;
+ MPI_COMM_WORLD, &mpi_send_call->msg2_));
+ mpi_send_call->done2_ = 0;
}
- return mpiSC;
+ return mpi_send_call;
};
// Wrapper around the read callback to place the callback on our queue
@@ -170,8 +204,8 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
VLOG(3) << "MPI Sending tensor " << parsed.FullKey()
<< " @ step: " << step_id << std::endl;
- auto mpiSC = new MPISendTensorCall();
- mpiSC->Init(parsed, step_id, is_dead);
+ auto mpi_send_call = new MPISendTensorCall();
+ mpi_send_call->Init(parsed, step_id, is_dead);
Device* src_dev = nullptr;
Status s = this->worker_env_2->device_mgr->LookupDevice(parsed.src_device,
@@ -188,11 +222,12 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
if (doOptimalTransfer) {
// First send the Tensor description and in a follow up transfer the data
- mpiSC->mRes_.mutable_response()->mutable_tensor()->set_dtype(val.dtype());
- val.shape().AsProto(mpiSC->mRes_.mutable_response()
+ mpi_send_call->mRes_.mutable_response()->mutable_tensor()->set_dtype(
+ val.dtype());
+ val.shape().AsProto(mpi_send_call->mRes_.mutable_response()
->mutable_tensor()
->mutable_tensor_shape());
- mpiSC->mRes_.set_singlesend(false);
+ mpi_send_call->mRes_.set_singlesend(false);
} else {
// Send the Tensor description and data in a single transfer
if (src_dev->tensorflow_gpu_device_info() &&
@@ -200,7 +235,7 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
Notification n;
GPUUtil::SetProtoFromGPU(
val, src_dev, send_args.device_context,
- mpiSC->mRes_.mutable_response()->mutable_tensor(), is_dead,
+ mpi_send_call->mRes_.mutable_response()->mutable_tensor(), is_dead,
[&n, &s](const Status& s_) {
s = s_;
n.Notify();
@@ -208,12 +243,12 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
n.WaitForNotification();
} else {
val.AsProtoTensorContent(
- mpiSC->mRes_.mutable_response()->mutable_tensor());
+ mpi_send_call->mRes_.mutable_response()->mutable_tensor());
}
}
- std::function<MPISendTensorCall*()> res =
- std::bind(send_cb, status, send_args, recv_args, val, is_dead, mpiSC);
+ std::function<MPISendTensorCall*()> res = std::bind(
+ send_cb, status, send_args, recv_args, val, is_dead, mpi_send_call);
SendQueueEntry req(parsed.FullKey().ToString().c_str(), std::move(res));
@@ -222,7 +257,7 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
// Wait for the notification that indicates the tensor has been
// succesfully transmitted to the remote process. Only needed if we
// have not parsed the tensor to proto
- if (doOptimalTransfer) mpiSC->n_.WaitForNotification();
+ if (doOptimalTransfer) mpi_send_call->n_.WaitForNotification();
}; // done_cb
worker_env_2->compute_pool->Schedule([this, step_id, parsed, done_cb]() {
@@ -230,30 +265,6 @@ void MPIRendezvousMgr::AddRequest(RecvTensorRequest request,
});
}
-MPIRendezvousMgr::MPIRendezvousMgr(const WorkerEnv* env)
- : BaseRendezvousMgr(env), worker_env_2(env), use_optimal_transfer_(false) {
-
- const char* mpienv = getenv("MPI_OPTIMAL_PATH");
- if (mpienv && mpienv[0] == '1') {
- LOG(INFO) << "MPI Optimal copy path enabled (Requires CUDA-Aware MPI when "
- "using GPUs)\n";
- use_optimal_transfer_ = true;
- }
-
- // extract worker-name from somewhere
- std::string worker_name = env->local_devices[0]->name();
- worker_name = worker_name.substr(0, worker_name.rfind('/')); // Strip device
-
- mpiutils_ = new MPIUtils(worker_name);
- background_thread_ =
- std::thread(&MPIRendezvousMgr::MPIBackgroundThread, this);
-}
-
-BaseRemoteRendezvous* MPIRendezvousMgr::Create(int64 step_id,
- const WorkerEnv* worker_env) {
- return new MPIRemoteRendezvous(worker_env, step_id, mpiutils_, this);
-}
-
void MPIRendezvousMgr::MPIBackgroundThread() {
std::list<std::unique_ptr<MPISendTensorCall>> active_sends;
diff --git a/tensorflow/contrib/mpi/mpi_rendezvous_mgr.h b/tensorflow/contrib/mpi/mpi_rendezvous_mgr.h
index a7b66227fc..8c908e635f 100644
--- a/tensorflow/contrib/mpi/mpi_rendezvous_mgr.h
+++ b/tensorflow/contrib/mpi/mpi_rendezvous_mgr.h
@@ -40,7 +40,8 @@ limitations under the License.
namespace tensorflow {
-struct MPISendTensorCall {
+class MPISendTensorCall {
+ public:
char* send_buffer_;
char* send_buffer2_;
@@ -82,7 +83,8 @@ struct MPISendTensorCall {
}
};
-struct MPIRequestTensorCall {
+class MPIRequestTensorCall {
+ public:
Rendezvous::DoneCallback done_;
RecvTensorRequest req_;
MPI_Request mpi_request_;
diff --git a/tensorflow/contrib/mpi/mpi_utils.h b/tensorflow/contrib/mpi/mpi_utils.h
index 8745846f03..4ee060efec 100644
--- a/tensorflow/contrib/mpi/mpi_utils.h
+++ b/tensorflow/contrib/mpi/mpi_utils.h
@@ -39,10 +39,10 @@ class MPIUtils {
public:
explicit MPIUtils(const std::string& worker_name);
- const int GetSourceID(const std::string& key) const {
- auto it = name_to_id_.find(GetWorkerName(key, 0));
+ const int GetSourceID(const std::string& task_id) const {
+ auto it = name_to_id_.find(task_id);
if (it == name_to_id_.end()) {
- LOG(FATAL) << "Failed to convert worker name to MPI index: " << key;
+ LOG(FATAL) << "Failed to convert worker name to MPI index: " << task_id;
}
return it->second;
}
@@ -50,16 +50,6 @@ class MPIUtils {
private:
void InitMPI();
- // Returns the name of the destination specified in a rendezvous key
- // For idx=0 it is the source, for idx=2 it is the destination
- std::string GetWorkerName(const std::string& key, const int idx) const {
- const std::vector<std::string> num_strings = str_util::Split(key, ';');
- // Sanity check, should be 5 src;id;dst;name;frame_iter
- assert(num_strings.size() == 5);
- // Strip the device eg /cpu:0 to get the worker name
- return num_strings[idx].substr(0, num_strings[idx].find_last_of('/'));
- }
-
std::map<std::string, int> name_to_id_;
};
} // namespace tensorflow