aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tensorflow/core/distributed_runtime/master.cc28
-rw-r--r--tensorflow/core/distributed_runtime/master_session.cc39
-rw-r--r--tensorflow/core/distributed_runtime/master_session.h4
3 files changed, 49 insertions, 22 deletions
diff --git a/tensorflow/core/distributed_runtime/master.cc b/tensorflow/core/distributed_runtime/master.cc
index de468f00b7..2537a9f138 100644
--- a/tensorflow/core/distributed_runtime/master.cc
+++ b/tensorflow/core/distributed_runtime/master.cc
@@ -102,6 +102,7 @@ void Master::GC() {
<< "on a staggered delay, session_gc_seconds may need "
<< "to be raised.";
sess->Close();
+ sess->Unref();
});
}
}
@@ -256,13 +257,13 @@ void Master::CreateSession(const CreateSessionRequest* req,
const_cast<CreateSessionRequest*>(req)->mutable_graph_def();
Status create_status = session->Create(gdef);
if (!create_status.ok()) {
- // Takes ownership of `session` (and destroys it).
session->Close();
+ session->Unref();
done(create_status);
return;
}
resp->set_session_handle(session->handle());
- // Insert into the session map.
+ // Insert into the session map, which takes ownership of the session.
{
mutex_lock l(mu_);
CHECK(sessions_.insert({session->handle(), session}).second);
@@ -282,6 +283,7 @@ void Master::ExtendSession(const ExtendSessionRequest* req,
done(errors::Aborted("Session ", req->session_handle(), " is not found."));
return;
}
+ session->Ref();
mu_.unlock();
SchedClosure([session, req, resp, done]() {
@@ -289,6 +291,7 @@ void Master::ExtendSession(const ExtendSessionRequest* req,
if (status.ok()) {
status = session->Extend(req, resp);
}
+ session->Unref();
done(status);
});
}
@@ -302,10 +305,13 @@ void Master::PartialRunSetup(const PartialRunSetupRequest* req,
done(errors::Aborted("Session ", req->session_handle(), " is not found."));
return;
}
+ session->Ref();
mu_.unlock();
SchedClosure([this, session, req, resp, done]() {
- done(session->PartialRunSetup(req, resp));
+ Status s = session->PartialRunSetup(req, resp);
+ session->Unref();
+ done(s);
});
}
@@ -319,10 +325,12 @@ void Master::RunStep(CallOptions* opts, const RunStepRequest* req,
done(errors::Aborted("Session ", req->session_handle(), " is not found."));
return;
}
+ session->Ref();
mu_.unlock();
SchedClosure([this, start_time, session, opts, req, resp, done]() {
Status status = session->Run(opts, req, resp);
+ session->Unref();
uint64 done_time = env_->env->NowMicros();
done(status);
mutex_lock l(mu_);
@@ -344,6 +352,8 @@ void Master::CloseSession(const CloseSessionRequest* req,
" is not found. Possibly, this master has restarted."));
return;
}
+ // NOTE(mrry): One reference to the session is transferred from
+ // `sessions_[req->session_handle()]` to `session`.
session = iter->second;
sessions_.erase(iter);
mu_.unlock();
@@ -353,6 +363,7 @@ void Master::CloseSession(const CloseSessionRequest* req,
// delete it in non-critical thread.
SchedClosure([session, done]() {
Status s = session->Close();
+ session->Unref();
done(s);
});
}
@@ -409,21 +420,24 @@ void Master::Reset(const ResetRequest* req, ResetResponse* resp,
MyClosure done) {
// Vector to hold the session pointers present in the sessions_
// (string->Session*) map.
- std::vector<MasterSession*> sessions;
+ std::vector<MasterSession*> sessions_to_close;
{
mutex_lock l(mu_);
+ // NOTE(mrry): Transfer one reference to each session from the
+ // `sessions_` map to the `sessions_to_close` vector.
for (const auto& entry : sessions_) {
- sessions.push_back(entry.second);
+ sessions_to_close.push_back(entry.second);
}
sessions_.clear();
}
CleanupWorkers(*req);
- SchedClosure([sessions, done]() {
+ SchedClosure([sessions_to_close, done]() {
Status s;
- for (MasterSession* session : sessions) {
+ for (MasterSession* session : sessions_to_close) {
s.Update(session->Close());
+ session->Unref();
}
done(s);
});
diff --git a/tensorflow/core/distributed_runtime/master_session.cc b/tensorflow/core/distributed_runtime/master_session.cc
index 0baff5c190..fc09f8190e 100644
--- a/tensorflow/core/distributed_runtime/master_session.cc
+++ b/tensorflow/core/distributed_runtime/master_session.cc
@@ -881,19 +881,23 @@ void MasterSession::ReffedClientGraph::DeregisterPartitions() {
DeregisterGraphResponse resp;
};
for (Part& part : partitions_) {
- Call* c = new Call;
- c->req.set_graph_handle(part.graph_handle);
- WorkerInterface* w = part.worker;
- auto cb = [c, w](const Status& s) {
- if (!s.ok()) {
- // This error is potentially benign, so we don't log at the
- // error level.
- LOG(INFO) << "DeregisterGraph error: " << s;
- }
- delete c;
- delete w;
- };
- w->DeregisterGraphAsync(&c->req, &c->resp, cb);
+ // The graph handle may be empty if we failed during partition registration.
+ if (!part.graph_handle.empty()) {
+ Call* c = new Call;
+ c->req.set_graph_handle(part.graph_handle);
+ WorkerInterface* w = part.worker;
+ CHECK_NOTNULL(w);
+ auto cb = [c, w](const Status& s) {
+ if (!s.ok()) {
+ // This error is potentially benign, so we don't log at the
+ // error level.
+ LOG(INFO) << "DeregisterGraph error: " << s;
+ }
+ delete c;
+ delete w;
+ };
+ w->DeregisterGraphAsync(&c->req, &c->resp, cb);
+ }
}
}
@@ -1028,6 +1032,10 @@ Status MasterSession::Extend(const ExtendSessionRequest* req,
std::unique_ptr<SimpleGraphExecutionState> extended_execution_state;
{
mutex_lock l(mu_);
+ if (closed_) {
+ return errors::FailedPrecondition("Session is closed.");
+ }
+
// TODO(mrry): Redesign the locking with reader/writer locks to prevent
// starvation due to concurrent steps being issued. This is not
// immediately important because we expect Extend to be used in
@@ -1152,6 +1160,9 @@ Status MasterSession::Run(CallOptions* opts, const RunStepRequest* req,
UpdateLastAccessTime();
{
mutex_lock l(mu_);
+ if (closed_) {
+ return errors::FailedPrecondition("Session is closed.");
+ }
++num_running_;
}
Status status;
@@ -1384,11 +1395,11 @@ Status MasterSession::Close() {
while (num_running_ != 0) {
num_running_is_zero_.wait(l);
}
+ closed_ = true; // All subsequent calls to Run() or Extend() will fail.
ClearRunsTable(&to_unref, &run_graphs_);
ClearRunsTable(&to_unref, &partial_run_graphs_);
}
for (ReffedClientGraph* rcg : to_unref) rcg->Unref();
- delete this;
return Status::OK();
}
diff --git a/tensorflow/core/distributed_runtime/master_session.h b/tensorflow/core/distributed_runtime/master_session.h
index 4af6ab6681..12d242be65 100644
--- a/tensorflow/core/distributed_runtime/master_session.h
+++ b/tensorflow/core/distributed_runtime/master_session.h
@@ -36,7 +36,7 @@ struct MasterEnv;
// A session encapsulates a graph computation (resource allocation,
// placement, execution, etc.).
-class MasterSession {
+class MasterSession : public core::RefCounted {
public:
// This session encapsulates the graph computation for a graph.
//
@@ -156,6 +156,8 @@ class MasterSession {
condition_variable num_running_is_zero_;
int32 num_running_ GUARDED_BY(mu_) = 0;
+ bool closed_ GUARDED_BY(mu_) = false;
+
std::unordered_map<uint64, int64> subgraph_execution_counts_ GUARDED_BY(mu_);
// We need to ensure that certain nodes added (e.g., send and recv