diff options
Diffstat (limited to 'tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc')
-rw-r--r-- | tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc b/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc index cb2fde7dba..f91e377049 100644 --- a/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc +++ b/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc @@ -35,14 +35,18 @@ limitations under the License. namespace tensorflow { +static void StartAbortRendevous(Rendezvous* rendez, const Status& s) { + rendez->StartAbort(s); + rendez->Unref(); +} + BaseRendezvousMgr::BaseRendezvousMgr(const WorkerEnv* worker_env) : worker_env_(worker_env) {} BaseRendezvousMgr::~BaseRendezvousMgr() { for (auto& p : table_) { - BaseRemoteRendezvous* rendez = p.second; - rendez->StartAbort(errors::Aborted("Shutdown")); - rendez->Unref(); + auto rendez = p.second; + StartAbortRendevous(rendez, errors::Aborted("Shutdown")); } } @@ -52,7 +56,7 @@ RemoteRendezvous* BaseRendezvousMgr::Find(int64 step_id) { BaseRemoteRendezvous* BaseRendezvousMgr::FindOrCreate(int64 step_id) { mutex_lock l(mu_); - Table::iterator iter = table_.find(step_id); + auto iter = table_.find(step_id); if (iter == table_.end()) { auto rr = Create(step_id, worker_env_); iter = table_.insert({step_id, rr}).first; @@ -64,7 +68,7 @@ BaseRemoteRendezvous* BaseRendezvousMgr::FindOrCreate(int64 step_id) { void BaseRendezvousMgr::RecvLocalAsync(int64 step_id, const Rendezvous::ParsedKey& parsed, Rendezvous::DoneCallback done) { - BaseRemoteRendezvous* rendez = FindOrCreate(step_id); + auto rendez = FindOrCreate(step_id); using namespace std::placeholders; Rendezvous::DoneCallback done_cb = std::bind( [rendez](Rendezvous::DoneCallback done, @@ -101,15 +105,15 @@ void BaseRendezvousMgr::Cleanup(int64 step_id) { Rendezvous* rendez = nullptr; { mutex_lock l(mu_); - Table::iterator iter = table_.find(step_id); + auto iter = table_.find(step_id); if (iter != table_.end()) { rendez = iter->second; table_.erase(iter); } } - if (!rendez) return; - rendez->StartAbort(errors::Aborted("Cleanup ", step_id)); - rendez->Unref(); + if (rendez) { + StartAbortRendevous(rendez, errors::Aborted("Cleanup ", step_id)); + } } void BaseRendezvousMgr::CleanupAll() { @@ -122,8 +126,7 @@ void BaseRendezvousMgr::CleanupAll() { table_.clear(); } for (auto rendez : rendezs) { - rendez->StartAbort(errors::Aborted("Shutdown")); - rendez->Unref(); + StartAbortRendevous(rendez, errors::Aborted("Shutdown")); } } @@ -165,7 +168,7 @@ Status BaseRemoteRendezvous::Initialize(WorkerSession* session) { session_ = session; std::swap(deferred_calls, deferred_calls_); } - for (DeferredCall& call : deferred_calls) { + for (auto& call : deferred_calls) { RecvLocalAsyncInternal(call.parsed, std::move(call.done)); } return Status::OK(); |