aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/framework/rendezvous.cc
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-07-21 19:00:21 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-07-21 19:04:28 -0700
commit7a9a1f55afbf1f5c7c62239ea65bb3d4d0e8609c (patch)
tree78acd1c36122fc0e20fa1b016868e1ac6761a4fc /tensorflow/core/framework/rendezvous.cc
parentceeb9ea3499b97c9dc9ccf8021df1ed2fc7d60e3 (diff)
Internal cleanup.
PiperOrigin-RevId: 162809937
Diffstat (limited to 'tensorflow/core/framework/rendezvous.cc')
-rw-r--r--tensorflow/core/framework/rendezvous.cc19
1 files changed, 7 insertions, 12 deletions
diff --git a/tensorflow/core/framework/rendezvous.cc b/tensorflow/core/framework/rendezvous.cc
index 8edbacbf50..7a03cb31ba 100644
--- a/tensorflow/core/framework/rendezvous.cc
+++ b/tensorflow/core/framework/rendezvous.cc
@@ -155,12 +155,12 @@ class LocalRendezvousImpl : public Rendezvous {
Args recv_args;
uint64 key_hash = KeyHash(key.FullKey());
VLOG(2) << "Send " << this << " " << key_hash << " " << key.FullKey();
+ Item* item = nullptr;
{
mutex_lock l(mu_);
if (!status_.ok()) {
return status_;
}
- Item* item = nullptr;
Table::iterator iter = table_.find(key_hash);
if (iter == table_.end()) {
// There is no waiter for this message. Insert the message
@@ -189,8 +189,6 @@ class LocalRendezvousImpl : public Rendezvous {
// Should not happen unless it has a waiter.
return errors::Aborted("Duplicated send: ", key.FullKey());
}
- // Mark item as complete.
- item->has_been_recvd = true;
// Get item->waiter function into waiter and set item->waiter to null
std::swap(item->waiter, waiter);
@@ -201,8 +199,10 @@ class LocalRendezvousImpl : public Rendezvous {
recv_args.device_context = item->recv_dev_context;
recv_args.alloc_attrs = item->recv_alloc_attrs;
item->recv_dev_context = nullptr;
+ table_.erase(iter);
}
} // mutex
+ delete item;
// Notify the waiter by invoking its done closure, outside scope
// of the table lock.
waiter(Status::OK(), send_args, recv_args, val, is_dead);
@@ -225,17 +225,11 @@ class LocalRendezvousImpl : public Rendezvous {
Table::iterator iter = table_.find(key_hash);
if (iter != table_.end()) {
Item* item = iter->second;
- if (item->has_been_recvd) {
- mu_.unlock();
- done(errors::Aborted("Duplicated recv: ", key.FullKey()), Args(),
- recv_args, Tensor(), false);
- } else if (item->waiter == nullptr) {
+ if (item->waiter == nullptr) {
// A message has already arrived and is stored in the table
// under this key. Consumes the message and invokes the done
// closure.
- Tensor v = item->value;
- item->value = Tensor();
- item->has_been_recvd = true;
+ Tensor v = std::move(item->value);
// Before dropping the table lock, capture the item values.
// DeviceContext is only non-null for non-CPU devices.
// If we capture the send_dev_context, we need to hold a ref on
@@ -247,7 +241,9 @@ class LocalRendezvousImpl : public Rendezvous {
Args send_args;
send_args.device_context = item->send_dev_context;
send_args.alloc_attrs = item->send_alloc_attrs;
+ table_.erase(iter);
mu_.unlock();
+ delete item;
done(Status::OK(), send_args, recv_args, v, is_dead);
if (send_dev_context) send_dev_context->Unref();
} else {
@@ -299,7 +295,6 @@ class LocalRendezvousImpl : public Rendezvous {
DoneCallback waiter = nullptr;
Tensor value;
bool is_dead = false;
- bool has_been_recvd = false;
DeviceContext* send_dev_context = nullptr;
DeviceContext* recv_dev_context = nullptr;
AllocatorAttributes send_alloc_attrs;