aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/debug/debug_io_utils.cc
diff options
context:
space:
mode:
authorGravatar Shanqing Cai <cais@google.com>2017-02-08 04:32:26 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-02-08 04:46:20 -0800
commitaabc7972b94af5a678550427534d4fba7fda327c (patch)
tree31fbd7ff13df006e04554c98441cb17794bfc4ea /tensorflow/core/debug/debug_io_utils.cc
parent6bbbd7e9d2016dfd201797d1f1354ccc48bd9e13 (diff)
tfdbg core: add core metadata to debugger data stream + better support of concurrent debugged runs
* Let the debugger send/dump an Event proto holding a JSON string in its log_message.message field. The JSON metadata includes, 1) An optional, client-specified global_step field that defaults to -1 if not supplied 2) A session run count 3) An executor invocation step count 4) Input names (feed keys) 5) Output names (fetched Tensor names) 6) Target node names * grpc_debug_server.EventListenerBaseServicer now requires a constructor of the type EventListenerBaseStreamHandler and will construct a new handler object from it, for every stream. This leads to better support of concurrent debugged Session::Run() calls. * Add support for path names in grpc:// URLs, such as "grpc://localhost:6000/thread1". Different path names will lead to separate gRPC streams being opened to the same server:port, supporting concurrent debugged Session::Run() calls. Change: 146896481
Diffstat (limited to 'tensorflow/core/debug/debug_io_utils.cc')
-rw-r--r--tensorflow/core/debug/debug_io_utils.cc126
1 files changed, 103 insertions, 23 deletions
diff --git a/tensorflow/core/debug/debug_io_utils.cc b/tensorflow/core/debug/debug_io_utils.cc
index 7819600765..f4136dcaba 100644
--- a/tensorflow/core/debug/debug_io_utils.cc
+++ b/tensorflow/core/debug/debug_io_utils.cc
@@ -99,6 +99,83 @@ const char* const DebugIO::kFileURLScheme = "file://";
const char* const DebugIO::kGrpcURLScheme = "grpc://";
// static
+Status DebugIO::PublishDebugMetadata(
+ const int64 global_step, const int64 session_run_count,
+ const int64 executor_step_count, const std::vector<string>& input_names,
+ const std::vector<string>& output_names,
+ const std::vector<string>& target_nodes,
+ const std::unordered_set<string>& debug_urls) {
+ std::ostringstream oss;
+
+ // Construct a JSON string to carry the metadata.
+ oss << "{";
+ oss << "\"global_step\":" << global_step << ",";
+ oss << "\"session_run_count\":" << session_run_count << ",";
+ oss << "\"executor_step_count\":" << executor_step_count << ",";
+ oss << "\"input_names\":[";
+ for (size_t i = 0; i < input_names.size(); ++i) {
+ oss << "\"" << input_names[i] << "\"";
+ if (i < input_names.size() - 1) {
+ oss << ",";
+ }
+ }
+ oss << "],";
+ oss << "\"output_names\":[";
+ for (size_t i = 0; i < output_names.size(); ++i) {
+ oss << "\"" << output_names[i] << "\"";
+ if (i < output_names.size() - 1) {
+ oss << ",";
+ }
+ }
+ oss << "],";
+ oss << "\"target_nodes\":[";
+ for (size_t i = 0; i < target_nodes.size(); ++i) {
+ oss << "\"" << target_nodes[i] << "\"";
+ if (i < target_nodes.size() - 1) {
+ oss << ",";
+ }
+ }
+ oss << "]";
+ oss << "}";
+
+ const string json_metadata = oss.str();
+ Event event;
+ event.set_wall_time(static_cast<double>(Env::Default()->NowMicros()));
+ LogMessage* log_message = event.mutable_log_message();
+ log_message->set_message(json_metadata);
+
+ Status status;
+ for (const string& url : debug_urls) {
+ if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) {
+ Event grpc_event;
+
+ // Determine the path (if any) in the grpc:// URL, and add it as a field
+ // of the JSON string.
+ const string address = url.substr(strlen(DebugIO::kFileURLScheme));
+ const string path = address.find("/") == string::npos
+ ? ""
+ : address.substr(address.find("/"));
+ grpc_event.set_wall_time(event.wall_time());
+ LogMessage* log_message_grpc = grpc_event.mutable_log_message();
+ log_message_grpc->set_message(
+ strings::StrCat(json_metadata.substr(0, json_metadata.size() - 1),
+ ",\"grpc_path\":\"", path, "\"}"));
+
+ status.Update(
+ DebugGrpcIO::SendEventProtoThroughGrpcStream(grpc_event, url));
+ } else if (str_util::Lowercase(url).find(kFileURLScheme) == 0) {
+ const string dump_root_dir = url.substr(strlen(kFileURLScheme));
+ const string file_name =
+ strings::StrCat("_tfdbg_core_metadata_", Env::Default()->NowMicros());
+ status.Update(
+ DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name));
+ }
+ }
+
+ return status;
+}
+
+// static
Status DebugIO::PublishDebugTensor(const string& tensor_name,
const string& debug_op, const Tensor& tensor,
const uint64 wall_time_us,
@@ -136,10 +213,8 @@ Status DebugIO::PublishDebugTensor(const string& tensor_name,
fail_statuses.push_back(s);
}
} else if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) {
- const string grpc_server_stream_addr = url.substr(strlen(kGrpcURLScheme));
Status s = DebugGrpcIO::SendTensorThroughGrpcStream(
- node_name, output_slot, debug_op, tensor, wall_time_us,
- grpc_server_stream_addr);
+ node_name, output_slot, debug_op, tensor, wall_time_us, url);
if (!s.ok()) {
num_failed_urls++;
@@ -189,8 +264,7 @@ Status DebugIO::PublishGraph(const Graph& graph,
status.Update(
DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name));
} else if (debug_url.find(kGrpcURLScheme) == 0) {
- DebugGrpcIO::SendEventProtoThroughGrpcStream(
- event, debug_url.substr(strlen(kGrpcURLScheme)));
+ DebugGrpcIO::SendEventProtoThroughGrpcStream(event, debug_url);
}
}
@@ -200,8 +274,7 @@ Status DebugIO::PublishGraph(const Graph& graph,
// static
Status DebugIO::CloseDebugURL(const string& debug_url) {
if (debug_url.find(DebugIO::kGrpcURLScheme) == 0) {
- return DebugGrpcIO::CloseGrpcStream(
- debug_url.substr(strlen(DebugIO::kGrpcURLScheme)));
+ return DebugGrpcIO::CloseGrpcStream(debug_url);
} else {
// No-op for non-gRPC URLs.
return Status::OK();
@@ -348,57 +421,64 @@ std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>>
DebugGrpcIO::stream_channels;
// static
-Status DebugGrpcIO::SendTensorThroughGrpcStream(
- const string& node_name, const int32 output_slot, const string& debug_op,
- const Tensor& tensor, const uint64 wall_time_us,
- const string& server_stream_addr) {
+Status DebugGrpcIO::SendTensorThroughGrpcStream(const string& node_name,
+ const int32 output_slot,
+ const string& debug_op,
+ const Tensor& tensor,
+ const uint64 wall_time_us,
+ const string& grpc_stream_url) {
const string tensor_name = strings::StrCat(node_name, ":", output_slot);
// Prepare tensor Event data to be sent.
Event event = WrapTensorAsEvent(tensor_name, debug_op, tensor, wall_time_us);
- return SendEventProtoThroughGrpcStream(event, server_stream_addr);
+ return SendEventProtoThroughGrpcStream(event, grpc_stream_url);
}
// static
Status DebugGrpcIO::SendEventProtoThroughGrpcStream(
- const Event& event_proto, const string& server_stream_addr) {
+ const Event& event_proto, const string& grpc_stream_url) {
+ const string addr_with_path =
+ grpc_stream_url.substr(strlen(DebugIO::kFileURLScheme));
+ const string server_stream_addr =
+ addr_with_path.substr(0, addr_with_path.find('/'));
+
std::shared_ptr<DebugGrpcChannel> debug_grpc_channel;
{
mutex_lock l(streams_mu);
- if (stream_channels.find(server_stream_addr) == stream_channels.end()) {
+ if (stream_channels.find(grpc_stream_url) == stream_channels.end()) {
debug_grpc_channel.reset(new DebugGrpcChannel(server_stream_addr));
if (!debug_grpc_channel->is_channel_ready()) {
return errors::FailedPrecondition(
- strings::StrCat("Channel at the following gRPC address is ",
- "not ready: ", server_stream_addr));
+ strings::StrCat("Channel at the following gRPC stream URL is ",
+ "not ready: ", grpc_stream_url));
}
- stream_channels[server_stream_addr] = debug_grpc_channel;
+ stream_channels[grpc_stream_url] = debug_grpc_channel;
} else {
- debug_grpc_channel = stream_channels[server_stream_addr];
+ debug_grpc_channel = stream_channels[grpc_stream_url];
}
}
bool write_ok = debug_grpc_channel->WriteEvent(event_proto);
if (!write_ok) {
return errors::Cancelled(strings::StrCat("Write event to stream URL ",
- server_stream_addr, "failed."));
+ grpc_stream_url, "failed."));
}
return Status::OK();
}
-Status DebugGrpcIO::CloseGrpcStream(const string& server_stream_addr) {
+Status DebugGrpcIO::CloseGrpcStream(const string& grpc_stream_url) {
mutex_lock l(streams_mu);
- if (stream_channels.find(server_stream_addr) != stream_channels.end()) {
+ if (stream_channels.find(grpc_stream_url) != stream_channels.end()) {
// Stream of the specified address exists. Close it and remove it from
// record.
Status s;
- s = stream_channels[server_stream_addr]->Close();
- stream_channels.erase(server_stream_addr);
+ s = stream_channels[grpc_stream_url]->Close();
+ stream_channels.erase(grpc_stream_url);
return s;
} else {
// Stream of the specified address does not exist. No action.