aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/verbs
diff options
context:
space:
mode:
authorGravatar Jonathan Hseu <jhseu@google.com>2017-08-25 14:01:05 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-08-25 14:04:48 -0700
commit008910f1122d115a6d7430bfcc63cf4296c7467d (patch)
treee50199dcceed004cecc8510f9251f5e04734800f /tensorflow/contrib/verbs
parent005a88f6cc6e4e8c94a4f2d1980737855c4592f4 (diff)
Merge changes from github.
END_PUBLIC --- Commit b30ce4714 authored by James Qin<jamesqin@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Revamp CudnnRNN Saveables 1. Use a lossy way to save/restore cudnn biases during checkpointing. Cudnn uses 2 biases each gate for all RNNs while tf uses one. To allow cudnn checkpoints to be compatible with both Cudnn and platform-independent impls, previously both individual bias and summed biases each gate were stored. The new way only stores the bias sum for each gate, and split it half-half when restoring from a cudnn graph. Doing this does not cause problems since RNNs do not use weight-decay to regularize. 2. Use inheritance instead of branching * Split RNNParamsSaveable to 1 base class and 4 subclasses. * Extract common routines and only overwrite rnn-type-specific pieces in subclasses. PiperOrigin-RevId: 166413989 --- Commit ebc421daf authored by Alan Yee<alyee@ucsd.edu> Committed by Jonathan Hseu<vomjom@vomjom.net>: Update documentation for contrib (#12424) * Update __init__.py Remove ## for standardization of api docs * Create README.md Add README to define this directory's purpose * Update __init.py Markdown styling does not show up well in api docs * Update README.md Add short mention of describing what to deprecate * Update README.md Capitalize title * Update README.md Revert README change * Delete README.md --- Commit fd295394d authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Use latest version of nsync library, which now allows use of cmake on MacOS. PiperOrigin-RevId: 166411437 --- Commit 587d728e0 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: [XLA] Refactor reduce-precision-insertion filters, add several more options. In particular, this adds the ability to add reduce-precision operations after fusion nodes based on the contents of those fusion nodes, and the ability to filter operations based on the "op_name" metadata. PiperOrigin-RevId: 166408392 --- Commit 3142f8ef5 authored by Ali Yahya<alive@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Steps toward making ResourceVariables compatible with Eager. This change forces the value of the reuse flag in variable scopes to be tf.AUTO_REUSE when in Eager mode. This change also adds comprehensive Eager tests for ResourceVariable. PiperOrigin-RevId: 166408161 --- Commit b2ce45150 authored by Igor Ganichev<iga@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Make Graph::IsValidNode public It can be reimplemented with existing public APIs, but instead of doing so, making this one public seems better. PiperOrigin-RevId: 166407897 --- Commit 0a2f40e92 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: [XLA::CPU] Fix HLO profiling in parallel CPU backend. PiperOrigin-RevId: 166400211 --- Commit c4a58e3fd authored by Yao Zhang<yaozhang@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Identify frame ids for all nodes in a graph. PiperOrigin-RevId: 166397615 --- Commit 989713f26 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: BEGIN_PUBLIC Automated g4 rollback of changelist 166294015 PiperOrigin-RevId: 166521502
Diffstat (limited to 'tensorflow/contrib/verbs')
-rw-r--r--tensorflow/contrib/verbs/rdma.cc197
-rw-r--r--tensorflow/contrib/verbs/rdma.h7
-rw-r--r--tensorflow/contrib/verbs/rdma_rendezvous_mgr.cc51
-rw-r--r--tensorflow/contrib/verbs/verbs_util.cc49
-rw-r--r--tensorflow/contrib/verbs/verbs_util.h14
5 files changed, 152 insertions, 166 deletions
diff --git a/tensorflow/contrib/verbs/rdma.cc b/tensorflow/contrib/verbs/rdma.cc
index 445cbe290a..ec5adfdaa0 100644
--- a/tensorflow/contrib/verbs/rdma.cc
+++ b/tensorflow/contrib/verbs/rdma.cc
@@ -707,7 +707,6 @@ void RdmaTensorBuffer::SendNextItem() {
bool can_memcpy = DataTypeCanUseMemcpy(in.dtype());
// string tensor needs to be serialized
Tensor copy;
- StringPiece copy_buf;
TensorProto proto;
if (src_dev->tensorflow_gpu_device_info() &&
(!send_args.alloc_attrs.on_host())) {
@@ -721,109 +720,133 @@ void RdmaTensorBuffer::SendNextItem() {
host_alloc_attrs.set_on_host(true);
Allocator* alloc = ProcessState::singleton()->GetCUDAHostAllocator(0);
copy = Tensor(alloc, in.dtype(), in.shape());
- s = VerbsUtil::CopyGPUTensorToCPUSync(
- src_dev, send_args.device_context, &in, &copy);
- CHECK(s.ok()) << "copy tensor from gpu sync";
- copy_buf = copy.tensor_data();
+ tensor_bytes = in.TotalBytes();
+ buffer_size += tensor_bytes;
+ GPUUtil::CopyGPUTensorToCPU(
+ src_dev, send_args.device_context, &in, &copy,
+ [this, copy, tensor_bytes, buffer_size, key, in, step_id,
+ key_with_step_id, is_dead](const Status& s) {
+ CHECK(s.ok()) << "copy tensor from gpu sync";
+ StringPiece copy_buf;
+ copy_buf = copy.tensor_data();
+ PostCopyOperations(true, buffer_size, tensor_bytes, key, in,
+ step_id, is_dead, key_with_step_id, &copy,
+ NULL, &copy_buf);
+ });
} else {
- // "val" is on a GPU. Uses GPUUtil to fill the proto.
- s = VerbsUtil::SetProtoFromGPUSync(
- in, src_dev, send_args.device_context, &proto, is_dead);
- CHECK(s.ok()) << "set proto from gpu sync";
+ // "val" is on a GPU. No longer uses GPUUtil to fill the proto, use
+ // aync instead
+ GPUUtil::SetProtoFromGPU(
+ in, src_dev, send_args.device_context, &proto, is_dead,
+ [this, proto, buffer_size, key, in, step_id, key_with_step_id,
+ is_dead](const Status& s) mutable {
+ CHECK(s.ok()) << "copy proto from gpu sync";
+ auto tensor_bytes = proto.ByteSize();
+ buffer_size += tensor_bytes;
+ PostCopyOperations(false, buffer_size, tensor_bytes, key, in,
+ step_id, is_dead, key_with_step_id, NULL,
+ &proto, NULL);
+ });
}
} else {
// tensor is in CPU memory.
+ StringPiece copy_buf;
if (can_memcpy) {
copy_buf = in.tensor_data();
+ tensor_bytes = in.TotalBytes();
} else {
in.AsProtoTensorContent(&proto);
+ tensor_bytes = proto.ByteSize();
}
- }
- if (can_memcpy) {
- tensor_bytes = in.TotalBytes();
- } else {
- tensor_bytes = proto.ByteSize();
+ buffer_size += tensor_bytes;
+ PostCopyOperations(can_memcpy, buffer_size, tensor_bytes, key, in,
+ step_id, is_dead, key_with_step_id, &copy, &proto,
+ &copy_buf);
}
// maybe some margin for string tensor?
- buffer_size += tensor_bytes;
- // prepare message
- RdmaMessage rm;
- rm.name_size_ = key.size();
- rm.name_ = key;
- rm.tensor_shape_ = in.shape();
- rm.data_type_ = in.dtype();
- rm.step_id_ = step_id;
- rm.is_dead_ = is_dead;
- rm.tensor_bytes_ = tensor_bytes;
- rm.buffer_size_ = buffer_size;
- mu_.lock();
- if (local_status_ == none ||
- (buffer_size > size_ && local_status_ == idle &&
- remote_status_ == idle)) {
- if ((local_status_ != none) && (buffer_size > size_)) {
- VLOG(2) << "Extend RDMA buffer from " << size_ << " to "
- << buffer_size;
- }
- CreateCPUBuffer(buffer_size, false);
- mu_.unlock();
- // put back the key since it is not sent;
- EnqueueItem(key_with_step_id);
- // ask the remote to create the same buffer
- rm.type_ = RDMA_MESSAGE_BUFFER_REQUEST;
- rm.remote_addr_ = reinterpret_cast<uint64_t>(buffer_);
- rm.rkey_ = self_->rkey;
- string message = RdmaMessage::CreateMessage(rm);
- channel_->tx_message_buffer_->EnqueueItem(message);
- channel_->tx_message_buffer_->SendNextItem();
- } else if ((local_status_ == idle) && (remote_status_ == idle)) {
- // both buffers are ready, send the tensor
- local_status_ = busy;
- remote_status_ = busy;
- // local/remote_status_ won't be set back to idle
- // unitl Write() is successful
- mu_.unlock();
- if (!((buffer_size == size_ && rm.data_type_ != DT_STRING) ||
- (buffer_size <= size_ && rm.data_type_ == DT_STRING))) {
- VLOG(2) << "Tensor and buffer size do not agree,"
- << " buffer_size = " << size_
- << " requested tensor size = "
- << buffer_size << in.DebugString();
- }
- uint32_t imm_data = LookupBufferIndex(key);
- rm.type_ = RDMA_MESSAGE_TENSOR_WRITE;
- string message = RdmaMessage::CreateMessage(rm);
- memcpy(buffer_, message.data(), message.size());
- if (!is_dead) {
- // copy the tensor buffer content
- void* output =
- static_cast<void*>(static_cast<char*>(buffer_) +
- RdmaMessage::kTensorBufferStartIndex);
- CHECK(tensor_bytes + RdmaMessage::kTensorBufferStartIndex <= size_);
- if (can_memcpy) {
- CHECK(copy_buf.size() == tensor_bytes)
- << "unexpected tensor size: "
- << copy_buf.size()
- << " != "
- << tensor_bytes;
- memcpy(output, copy_buf.data(), tensor_bytes);
- } else {
- proto.SerializeToArray(output, tensor_bytes);
- }
- } else {
- buffer_size = RdmaMessage::kMessageTotalBytes;
- }
- Write(imm_data, buffer_size);
- } else {
- mu_.unlock();
- // put back the key since it is not sent;
- EnqueueItem(key_with_step_id);
- }
};
+
channel_->adapter_->worker_env_->rendezvous_mgr->RecvLocalAsync(step_id,
parsed, cb);
}
}
+void RdmaTensorBuffer::PostCopyOperations(
+ bool can_memcpy, size_t buffer_size, size_t tensor_bytes, const string& key,
+ const Tensor& in, int64 step_id, bool is_dead,
+ const string& key_with_step_id, const Tensor* copy,
+ const TensorProto* proto, const StringPiece* copy_buf) {
+ // prepare message
+ RdmaMessage rm;
+ rm.name_size_ = key.size();
+ rm.name_ = key;
+ rm.tensor_shape_ = in.shape();
+ rm.data_type_ = in.dtype();
+ rm.step_id_ = step_id;
+ rm.is_dead_ = is_dead;
+ rm.tensor_bytes_ = tensor_bytes;
+ rm.buffer_size_ = buffer_size;
+ mu_.lock();
+ if (local_status_ == none || (buffer_size > size_ && local_status_ == idle &&
+ remote_status_ == idle)) {
+ if ((local_status_ != none) && (buffer_size > size_)) {
+ VLOG(2) << "Extend RDMA buffer from " << size_ << " to " << buffer_size;
+ }
+ CreateCPUBuffer(buffer_size, false);
+ mu_.unlock();
+ // put back the key since it is not sent;
+ EnqueueItem(key_with_step_id);
+ // ask the remote to create the same buffer
+ rm.type_ = RDMA_MESSAGE_BUFFER_REQUEST;
+ rm.remote_addr_ = reinterpret_cast<uint64_t>(buffer_);
+ rm.rkey_ = self_->rkey;
+ string message = RdmaMessage::CreateMessage(rm);
+ channel_->tx_message_buffer_->EnqueueItem(message);
+ channel_->tx_message_buffer_->SendNextItem();
+ } else if ((local_status_ == idle) && (remote_status_ == idle)) {
+ // both buffers are ready, send the tensor
+ local_status_ = busy;
+ remote_status_ = busy;
+ // local/remote_status_ won't be set back to idle
+ // unitl Write() is successful
+ mu_.unlock();
+ if (!((buffer_size == size_ && rm.data_type_ != DT_STRING) ||
+ (buffer_size <= size_ && rm.data_type_ == DT_STRING))) {
+ VLOG(2) << "Tensor and buffer size do not agree,"
+ << " buffer_size = " << size_
+ << " requested tensor size = " << buffer_size << in.DebugString();
+ }
+ uint32_t imm_data = LookupBufferIndex(key);
+ rm.type_ = RDMA_MESSAGE_TENSOR_WRITE;
+ string message = RdmaMessage::CreateMessage(rm);
+ memcpy(buffer_, message.data(), message.size());
+ if (!is_dead) {
+ // copy the tensor buffer content
+ void* output = static_cast<void*>(static_cast<char*>(buffer_) +
+ RdmaMessage::kTensorBufferStartIndex);
+ CHECK(tensor_bytes + RdmaMessage::kTensorBufferStartIndex <= size_);
+ if (can_memcpy) {
+ CHECK(copy != NULL) << "callback missing pointer to copy tensor";
+ CHECK(copy_buf != NULL) << "callback missing pointer to copy buffer";
+ CHECK(copy_buf->size() == tensor_bytes)
+ << "unexpected tensor size: " << copy_buf->size()
+ << " != " << tensor_bytes;
+ memcpy(output, copy_buf->data(), tensor_bytes);
+ } else {
+ CHECK(proto != NULL) << "callback missing pointer to proto tensor";
+ proto->SerializeToArray(output, tensor_bytes);
+ }
+ } else {
+ buffer_size = RdmaMessage::kMessageTotalBytes;
+ }
+ Write(imm_data, buffer_size);
+ } else {
+ mu_.unlock();
+ // put back the key since it is not sent;
+ EnqueueItem(key_with_step_id);
+ }
+}
+
// Create a RdmaMessage according to the pre-defined format
// Args:
// rm: the message structure
diff --git a/tensorflow/contrib/verbs/rdma.h b/tensorflow/contrib/verbs/rdma.h
index 10cbbe58d9..16ef58bc62 100644
--- a/tensorflow/contrib/verbs/rdma.h
+++ b/tensorflow/contrib/verbs/rdma.h
@@ -28,6 +28,7 @@ limitations under the License.
#include <vector>
#include "tensorflow/core/distributed_runtime/worker_env.h"
+#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/framework/tensor_shape.h"
#include "tensorflow/core/framework/types.h"
#include "tensorflow/core/platform/env.h"
@@ -225,6 +226,12 @@ class RdmaTensorBuffer : public RdmaBuffer {
explicit RdmaTensorBuffer(RdmaChannel* channel, string name);
virtual ~RdmaTensorBuffer() override {}
void SendNextItem() override;
+ void PostCopyOperations(bool can_memcpy, size_t buffer_size,
+ size_t tensor_bytes, const string& key,
+ const Tensor& in, int64 step_id, bool is_dead,
+ const string& key_with_step_id, const Tensor* copy,
+ const TensorProto* proto,
+ const StringPiece* copy_buf);
};
struct RdmaMessage {
diff --git a/tensorflow/contrib/verbs/rdma_rendezvous_mgr.cc b/tensorflow/contrib/verbs/rdma_rendezvous_mgr.cc
index 3ba6510711..ce82ca2883 100644
--- a/tensorflow/contrib/verbs/rdma_rendezvous_mgr.cc
+++ b/tensorflow/contrib/verbs/rdma_rendezvous_mgr.cc
@@ -21,6 +21,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/device.h"
#include "tensorflow/core/common_runtime/device_mgr.h"
#include "tensorflow/core/common_runtime/dma_helper.h"
+#include "tensorflow/core/common_runtime/gpu/gpu_util.h"
#include "tensorflow/core/common_runtime/gpu/process_state.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/strings/numbers.h"
@@ -33,6 +34,11 @@ class RdmaRemoteRendezvous : public BaseRemoteRendezvous {
RdmaRemoteRendezvous(const WorkerEnv* env, int64 step_id, RdmaMgr* rdma_mgr)
: BaseRemoteRendezvous(env, step_id), rdma_mgr_(rdma_mgr) {}
+ void RecvPostCopyOps(const string& key, const string& key_with_step_id,
+ const Rendezvous::Args& recv_args,
+ const DoneCallback& done, const RdmaMessage& rm,
+ RdmaChannel* rc, Tensor& val, const Status& s);
+
protected:
void RecvFromRemoteAsync(const Rendezvous::ParsedKey& parsed,
const Rendezvous::Args& args,
@@ -113,10 +119,18 @@ void RdmaRemoteRendezvous::RecvFromRemoteAsync(
Allocator* dst_alloc = dst_dev->GetAllocator(recv_args.alloc_attrs);
Tensor gpu_copy(dst_alloc, rm.data_type_, rm.tensor_shape_);
- s = VerbsUtil::CopyCPUTensorToGPUSync(&copy, recv_args.device_context,
- dst_dev, &gpu_copy);
- CHECK(s.ok()) << "copy tensor to gpu sync";
- val = std::move(gpu_copy);
+
+ GPUUtil::CopyCPUTensorToGPU(
+ &copy, recv_args.device_context, dst_dev, &gpu_copy,
+ [this, gpu_copy, key, key_with_step_id, recv_args, done, rm,
+ rc](const Status& s) {
+ CHECK(s.ok()) << "copy tensor to gpu sync";
+ Tensor val;
+ val = std::move(gpu_copy);
+ RecvPostCopyOps(key, key_with_step_id, recv_args, done, rm, rc,
+ val, s);
+ });
+ return;
} else {
AllocatorAttributes host_alloc_attrs;
host_alloc_attrs.set_gpu_compatible(true);
@@ -135,18 +149,7 @@ void RdmaRemoteRendezvous::RecvFromRemoteAsync(
s = dst_dev->MakeTensorFromProto(proto, recv_args.alloc_attrs, &val);
}
}
-
- rc->RemoveRecvCallback(key_with_step_id);
- // create message
- RdmaMessage br;
- br.type_ = RDMA_MESSAGE_BUFFER_IDLE;
- br.name_size_ = key.size();
- br.name_ = key;
- string message = RdmaMessage::CreateMessage(br);
- RdmaBuffer* tb = rc->tx_message_buffer_;
- tb->EnqueueItem(message);
- tb->SendNextItem();
- done(s, Args(), recv_args, val, rm.is_dead_);
+ RecvPostCopyOps(key, key_with_step_id, recv_args, done, rm, rc, val, s);
});
// append key to message queue
RdmaBuffer* rb = rc->tx_message_buffer_;
@@ -160,6 +163,22 @@ void RdmaRemoteRendezvous::RecvFromRemoteAsync(
rb->SendNextItem();
}
+void RdmaRemoteRendezvous::RecvPostCopyOps(
+ const string& key, const string& key_with_step_id,
+ const Rendezvous::Args& recv_args, const DoneCallback& done,
+ const RdmaMessage& rm, RdmaChannel* rc, Tensor& val, const Status& s) {
+ rc->RemoveRecvCallback(key_with_step_id);
+ RdmaMessage br;
+ br.type_ = RDMA_MESSAGE_BUFFER_IDLE;
+ br.name_size_ = key.size();
+ br.name_ = key;
+ string message = RdmaMessage::CreateMessage(br);
+ RdmaBuffer* tb = rc->tx_message_buffer_;
+ tb->EnqueueItem(message);
+ tb->SendNextItem();
+ done(s, Args(), recv_args, val, rm.is_dead_);
+}
+
RdmaRendezvousMgr::RdmaRendezvousMgr(const WorkerEnv* env)
: BaseRendezvousMgr(env) {}
diff --git a/tensorflow/contrib/verbs/verbs_util.cc b/tensorflow/contrib/verbs/verbs_util.cc
index 76e44d34a9..4f5c731a18 100644
--- a/tensorflow/contrib/verbs/verbs_util.cc
+++ b/tensorflow/contrib/verbs/verbs_util.cc
@@ -20,55 +20,6 @@ limitations under the License.
#include "tensorflow/core/lib/strings/str_util.h"
namespace tensorflow {
-// static sync wrapper:
-Status VerbsUtil::CopyGPUTensorToCPUSync(Device* gpu_device,
- const DeviceContext* device_context,
- const Tensor* gpu_tensor,
- Tensor* cpu_tensor) {
- Notification n;
- Status status;
- GPUUtil::CopyGPUTensorToCPU(gpu_device, device_context,
- gpu_tensor, cpu_tensor,
- [&n, &status](const Status& s) {
- status = s;
- n.Notify();
- });
- n.WaitForNotification();
- return status;
-}
-
-// static sync wrapper:
-Status VerbsUtil::CopyCPUTensorToGPUSync(const Tensor* cpu_tensor,
- const DeviceContext* device_context,
- Device* gpu_device,
- Tensor* gpu_tensor) {
- Notification n;
- Status status;
- GPUUtil::CopyCPUTensorToGPU(cpu_tensor, device_context,
- gpu_device, gpu_tensor,
- [&n, &status](const Status& s) {
- status = s;
- n.Notify();
- });
- n.WaitForNotification();
- return status;
-}
-
-// static sync wrapper:
-Status VerbsUtil::SetProtoFromGPUSync(const Tensor& tensor, Device* dev,
- const DeviceContext* device_context,
- TensorProto* proto, bool is_dead) {
- Notification n;
- Status status;
- GPUUtil::SetProtoFromGPU(tensor, dev, device_context, proto, is_dead,
- [&n, &status](const Status& s) {
- status = s;
- n.Notify();
- });
- n.WaitForNotification();
- return status;
-}
-
// static
string VerbsUtil::AppendStepidToKey(const string& key, int64 step_id) {
return strings::StrCat(key, ";", step_id);
diff --git a/tensorflow/contrib/verbs/verbs_util.h b/tensorflow/contrib/verbs/verbs_util.h
index d9da396228..8b44adaedc 100644
--- a/tensorflow/contrib/verbs/verbs_util.h
+++ b/tensorflow/contrib/verbs/verbs_util.h
@@ -28,20 +28,6 @@ class TensorProto;
class VerbsUtil {
public:
- // synchronous wrapper of CopyGPUTensorToCPU
- static Status CopyGPUTensorToCPUSync(Device* gpu_device,
- const DeviceContext* device_context,
- const Tensor* gpu_tensor,
- Tensor* cpu_tensor);
- // synchronous wrapper of CopyCPUTensorToGPU
- static Status CopyCPUTensorToGPUSync(const Tensor* cpu_tensor,
- const DeviceContext* device_context,
- Device* gpu_device,
- Tensor* gpu_tensor);
- // synchronous wrapper of SetProtoFromGPU
- static Status SetProtoFromGPUSync(const Tensor& tensor, Device* dev,
- const DeviceContext* device_context,
- TensorProto* proto, bool is_dead);
static string AppendStepidToKey(const string& key, int64 step_id);
static void GetKeyAndStepId(const string& key_with_step_id, string& key,
int64& step_id);