aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc')
-rw-r--r--tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc27
1 files changed, 15 insertions, 12 deletions
diff --git a/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc b/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc
index cb2fde7dba..f91e377049 100644
--- a/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc
+++ b/tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc
@@ -35,14 +35,18 @@ limitations under the License.
namespace tensorflow {
+static void StartAbortRendevous(Rendezvous* rendez, const Status& s) {
+ rendez->StartAbort(s);
+ rendez->Unref();
+}
+
BaseRendezvousMgr::BaseRendezvousMgr(const WorkerEnv* worker_env)
: worker_env_(worker_env) {}
BaseRendezvousMgr::~BaseRendezvousMgr() {
for (auto& p : table_) {
- BaseRemoteRendezvous* rendez = p.second;
- rendez->StartAbort(errors::Aborted("Shutdown"));
- rendez->Unref();
+ auto rendez = p.second;
+ StartAbortRendevous(rendez, errors::Aborted("Shutdown"));
}
}
@@ -52,7 +56,7 @@ RemoteRendezvous* BaseRendezvousMgr::Find(int64 step_id) {
BaseRemoteRendezvous* BaseRendezvousMgr::FindOrCreate(int64 step_id) {
mutex_lock l(mu_);
- Table::iterator iter = table_.find(step_id);
+ auto iter = table_.find(step_id);
if (iter == table_.end()) {
auto rr = Create(step_id, worker_env_);
iter = table_.insert({step_id, rr}).first;
@@ -64,7 +68,7 @@ BaseRemoteRendezvous* BaseRendezvousMgr::FindOrCreate(int64 step_id) {
void BaseRendezvousMgr::RecvLocalAsync(int64 step_id,
const Rendezvous::ParsedKey& parsed,
Rendezvous::DoneCallback done) {
- BaseRemoteRendezvous* rendez = FindOrCreate(step_id);
+ auto rendez = FindOrCreate(step_id);
using namespace std::placeholders;
Rendezvous::DoneCallback done_cb = std::bind(
[rendez](Rendezvous::DoneCallback done,
@@ -101,15 +105,15 @@ void BaseRendezvousMgr::Cleanup(int64 step_id) {
Rendezvous* rendez = nullptr;
{
mutex_lock l(mu_);
- Table::iterator iter = table_.find(step_id);
+ auto iter = table_.find(step_id);
if (iter != table_.end()) {
rendez = iter->second;
table_.erase(iter);
}
}
- if (!rendez) return;
- rendez->StartAbort(errors::Aborted("Cleanup ", step_id));
- rendez->Unref();
+ if (rendez) {
+ StartAbortRendevous(rendez, errors::Aborted("Cleanup ", step_id));
+ }
}
void BaseRendezvousMgr::CleanupAll() {
@@ -122,8 +126,7 @@ void BaseRendezvousMgr::CleanupAll() {
table_.clear();
}
for (auto rendez : rendezs) {
- rendez->StartAbort(errors::Aborted("Shutdown"));
- rendez->Unref();
+ StartAbortRendevous(rendez, errors::Aborted("Shutdown"));
}
}
@@ -165,7 +168,7 @@ Status BaseRemoteRendezvous::Initialize(WorkerSession* session) {
session_ = session;
std::swap(deferred_calls, deferred_calls_);
}
- for (DeferredCall& call : deferred_calls) {
+ for (auto& call : deferred_calls) {
RecvLocalAsyncInternal(call.parsed, std::move(call.done));
}
return Status::OK();