diff options
-rw-r--r-- | tensorflow/core/distributed_runtime/master.cc | 28 | ||||
-rw-r--r-- | tensorflow/core/distributed_runtime/master_session.cc | 39 | ||||
-rw-r--r-- | tensorflow/core/distributed_runtime/master_session.h | 4 |
3 files changed, 49 insertions, 22 deletions
diff --git a/tensorflow/core/distributed_runtime/master.cc b/tensorflow/core/distributed_runtime/master.cc index de468f00b7..2537a9f138 100644 --- a/tensorflow/core/distributed_runtime/master.cc +++ b/tensorflow/core/distributed_runtime/master.cc @@ -102,6 +102,7 @@ void Master::GC() { << "on a staggered delay, session_gc_seconds may need " << "to be raised."; sess->Close(); + sess->Unref(); }); } } @@ -256,13 +257,13 @@ void Master::CreateSession(const CreateSessionRequest* req, const_cast<CreateSessionRequest*>(req)->mutable_graph_def(); Status create_status = session->Create(gdef); if (!create_status.ok()) { - // Takes ownership of `session` (and destroys it). session->Close(); + session->Unref(); done(create_status); return; } resp->set_session_handle(session->handle()); - // Insert into the session map. + // Insert into the session map, which takes ownership of the session. { mutex_lock l(mu_); CHECK(sessions_.insert({session->handle(), session}).second); @@ -282,6 +283,7 @@ void Master::ExtendSession(const ExtendSessionRequest* req, done(errors::Aborted("Session ", req->session_handle(), " is not found.")); return; } + session->Ref(); mu_.unlock(); SchedClosure([session, req, resp, done]() { @@ -289,6 +291,7 @@ void Master::ExtendSession(const ExtendSessionRequest* req, if (status.ok()) { status = session->Extend(req, resp); } + session->Unref(); done(status); }); } @@ -302,10 +305,13 @@ void Master::PartialRunSetup(const PartialRunSetupRequest* req, done(errors::Aborted("Session ", req->session_handle(), " is not found.")); return; } + session->Ref(); mu_.unlock(); SchedClosure([this, session, req, resp, done]() { - done(session->PartialRunSetup(req, resp)); + Status s = session->PartialRunSetup(req, resp); + session->Unref(); + done(s); }); } @@ -319,10 +325,12 @@ void Master::RunStep(CallOptions* opts, const RunStepRequest* req, done(errors::Aborted("Session ", req->session_handle(), " is not found.")); return; } + session->Ref(); mu_.unlock(); SchedClosure([this, start_time, session, opts, req, resp, done]() { Status status = session->Run(opts, req, resp); + session->Unref(); uint64 done_time = env_->env->NowMicros(); done(status); mutex_lock l(mu_); @@ -344,6 +352,8 @@ void Master::CloseSession(const CloseSessionRequest* req, " is not found. Possibly, this master has restarted.")); return; } + // NOTE(mrry): One reference to the session is transferred from + // `sessions_[req->session_handle()]` to `session`. session = iter->second; sessions_.erase(iter); mu_.unlock(); @@ -353,6 +363,7 @@ void Master::CloseSession(const CloseSessionRequest* req, // delete it in non-critical thread. SchedClosure([session, done]() { Status s = session->Close(); + session->Unref(); done(s); }); } @@ -409,21 +420,24 @@ void Master::Reset(const ResetRequest* req, ResetResponse* resp, MyClosure done) { // Vector to hold the session pointers present in the sessions_ // (string->Session*) map. - std::vector<MasterSession*> sessions; + std::vector<MasterSession*> sessions_to_close; { mutex_lock l(mu_); + // NOTE(mrry): Transfer one reference to each session from the + // `sessions_` map to the `sessions_to_close` vector. for (const auto& entry : sessions_) { - sessions.push_back(entry.second); + sessions_to_close.push_back(entry.second); } sessions_.clear(); } CleanupWorkers(*req); - SchedClosure([sessions, done]() { + SchedClosure([sessions_to_close, done]() { Status s; - for (MasterSession* session : sessions) { + for (MasterSession* session : sessions_to_close) { s.Update(session->Close()); + session->Unref(); } done(s); }); diff --git a/tensorflow/core/distributed_runtime/master_session.cc b/tensorflow/core/distributed_runtime/master_session.cc index 0baff5c190..fc09f8190e 100644 --- a/tensorflow/core/distributed_runtime/master_session.cc +++ b/tensorflow/core/distributed_runtime/master_session.cc @@ -881,19 +881,23 @@ void MasterSession::ReffedClientGraph::DeregisterPartitions() { DeregisterGraphResponse resp; }; for (Part& part : partitions_) { - Call* c = new Call; - c->req.set_graph_handle(part.graph_handle); - WorkerInterface* w = part.worker; - auto cb = [c, w](const Status& s) { - if (!s.ok()) { - // This error is potentially benign, so we don't log at the - // error level. - LOG(INFO) << "DeregisterGraph error: " << s; - } - delete c; - delete w; - }; - w->DeregisterGraphAsync(&c->req, &c->resp, cb); + // The graph handle may be empty if we failed during partition registration. + if (!part.graph_handle.empty()) { + Call* c = new Call; + c->req.set_graph_handle(part.graph_handle); + WorkerInterface* w = part.worker; + CHECK_NOTNULL(w); + auto cb = [c, w](const Status& s) { + if (!s.ok()) { + // This error is potentially benign, so we don't log at the + // error level. + LOG(INFO) << "DeregisterGraph error: " << s; + } + delete c; + delete w; + }; + w->DeregisterGraphAsync(&c->req, &c->resp, cb); + } } } @@ -1028,6 +1032,10 @@ Status MasterSession::Extend(const ExtendSessionRequest* req, std::unique_ptr<SimpleGraphExecutionState> extended_execution_state; { mutex_lock l(mu_); + if (closed_) { + return errors::FailedPrecondition("Session is closed."); + } + // TODO(mrry): Redesign the locking with reader/writer locks to prevent // starvation due to concurrent steps being issued. This is not // immediately important because we expect Extend to be used in @@ -1152,6 +1160,9 @@ Status MasterSession::Run(CallOptions* opts, const RunStepRequest* req, UpdateLastAccessTime(); { mutex_lock l(mu_); + if (closed_) { + return errors::FailedPrecondition("Session is closed."); + } ++num_running_; } Status status; @@ -1384,11 +1395,11 @@ Status MasterSession::Close() { while (num_running_ != 0) { num_running_is_zero_.wait(l); } + closed_ = true; // All subsequent calls to Run() or Extend() will fail. ClearRunsTable(&to_unref, &run_graphs_); ClearRunsTable(&to_unref, &partial_run_graphs_); } for (ReffedClientGraph* rcg : to_unref) rcg->Unref(); - delete this; return Status::OK(); } diff --git a/tensorflow/core/distributed_runtime/master_session.h b/tensorflow/core/distributed_runtime/master_session.h index 4af6ab6681..12d242be65 100644 --- a/tensorflow/core/distributed_runtime/master_session.h +++ b/tensorflow/core/distributed_runtime/master_session.h @@ -36,7 +36,7 @@ struct MasterEnv; // A session encapsulates a graph computation (resource allocation, // placement, execution, etc.). -class MasterSession { +class MasterSession : public core::RefCounted { public: // This session encapsulates the graph computation for a graph. // @@ -156,6 +156,8 @@ class MasterSession { condition_variable num_running_is_zero_; int32 num_running_ GUARDED_BY(mu_) = 0; + bool closed_ GUARDED_BY(mu_) = false; + std::unordered_map<uint64, int64> subgraph_execution_counts_ GUARDED_BY(mu_); // We need to ensure that certain nodes added (e.g., send and recv |