diff options
author | Shanqing Cai <cais@google.com> | 2017-02-08 04:32:26 -0800 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2017-02-08 04:46:20 -0800 |
commit | aabc7972b94af5a678550427534d4fba7fda327c (patch) | |
tree | 31fbd7ff13df006e04554c98441cb17794bfc4ea /tensorflow/core/debug/debug_io_utils.cc | |
parent | 6bbbd7e9d2016dfd201797d1f1354ccc48bd9e13 (diff) |
tfdbg core: add core metadata to debugger data stream + better support of concurrent debugged runs
* Let the debugger send/dump an Event proto holding a JSON string in its log_message.message field. The JSON metadata includes,
1) An optional, client-specified global_step field that defaults to -1 if not supplied
2) A session run count
3) An executor invocation step count
4) Input names (feed keys)
5) Output names (fetched Tensor names)
6) Target node names
* grpc_debug_server.EventListenerBaseServicer now requires a constructor of the type EventListenerBaseStreamHandler and will construct a new handler object from it, for every stream. This leads to better support of concurrent debugged Session::Run() calls.
* Add support for path names in grpc:// URLs, such as "grpc://localhost:6000/thread1". Different path names will lead to separate gRPC streams being opened to the same server:port, supporting concurrent debugged Session::Run() calls.
Change: 146896481
Diffstat (limited to 'tensorflow/core/debug/debug_io_utils.cc')
-rw-r--r-- | tensorflow/core/debug/debug_io_utils.cc | 126 |
1 files changed, 103 insertions, 23 deletions
diff --git a/tensorflow/core/debug/debug_io_utils.cc b/tensorflow/core/debug/debug_io_utils.cc index 7819600765..f4136dcaba 100644 --- a/tensorflow/core/debug/debug_io_utils.cc +++ b/tensorflow/core/debug/debug_io_utils.cc @@ -99,6 +99,83 @@ const char* const DebugIO::kFileURLScheme = "file://"; const char* const DebugIO::kGrpcURLScheme = "grpc://"; // static +Status DebugIO::PublishDebugMetadata( + const int64 global_step, const int64 session_run_count, + const int64 executor_step_count, const std::vector<string>& input_names, + const std::vector<string>& output_names, + const std::vector<string>& target_nodes, + const std::unordered_set<string>& debug_urls) { + std::ostringstream oss; + + // Construct a JSON string to carry the metadata. + oss << "{"; + oss << "\"global_step\":" << global_step << ","; + oss << "\"session_run_count\":" << session_run_count << ","; + oss << "\"executor_step_count\":" << executor_step_count << ","; + oss << "\"input_names\":["; + for (size_t i = 0; i < input_names.size(); ++i) { + oss << "\"" << input_names[i] << "\""; + if (i < input_names.size() - 1) { + oss << ","; + } + } + oss << "],"; + oss << "\"output_names\":["; + for (size_t i = 0; i < output_names.size(); ++i) { + oss << "\"" << output_names[i] << "\""; + if (i < output_names.size() - 1) { + oss << ","; + } + } + oss << "],"; + oss << "\"target_nodes\":["; + for (size_t i = 0; i < target_nodes.size(); ++i) { + oss << "\"" << target_nodes[i] << "\""; + if (i < target_nodes.size() - 1) { + oss << ","; + } + } + oss << "]"; + oss << "}"; + + const string json_metadata = oss.str(); + Event event; + event.set_wall_time(static_cast<double>(Env::Default()->NowMicros())); + LogMessage* log_message = event.mutable_log_message(); + log_message->set_message(json_metadata); + + Status status; + for (const string& url : debug_urls) { + if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) { + Event grpc_event; + + // Determine the path (if any) in the grpc:// URL, and add it as a field + // of the JSON string. + const string address = url.substr(strlen(DebugIO::kFileURLScheme)); + const string path = address.find("/") == string::npos + ? "" + : address.substr(address.find("/")); + grpc_event.set_wall_time(event.wall_time()); + LogMessage* log_message_grpc = grpc_event.mutable_log_message(); + log_message_grpc->set_message( + strings::StrCat(json_metadata.substr(0, json_metadata.size() - 1), + ",\"grpc_path\":\"", path, "\"}")); + + status.Update( + DebugGrpcIO::SendEventProtoThroughGrpcStream(grpc_event, url)); + } else if (str_util::Lowercase(url).find(kFileURLScheme) == 0) { + const string dump_root_dir = url.substr(strlen(kFileURLScheme)); + const string file_name = + strings::StrCat("_tfdbg_core_metadata_", Env::Default()->NowMicros()); + status.Update( + DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name)); + } + } + + return status; +} + +// static Status DebugIO::PublishDebugTensor(const string& tensor_name, const string& debug_op, const Tensor& tensor, const uint64 wall_time_us, @@ -136,10 +213,8 @@ Status DebugIO::PublishDebugTensor(const string& tensor_name, fail_statuses.push_back(s); } } else if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) { - const string grpc_server_stream_addr = url.substr(strlen(kGrpcURLScheme)); Status s = DebugGrpcIO::SendTensorThroughGrpcStream( - node_name, output_slot, debug_op, tensor, wall_time_us, - grpc_server_stream_addr); + node_name, output_slot, debug_op, tensor, wall_time_us, url); if (!s.ok()) { num_failed_urls++; @@ -189,8 +264,7 @@ Status DebugIO::PublishGraph(const Graph& graph, status.Update( DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name)); } else if (debug_url.find(kGrpcURLScheme) == 0) { - DebugGrpcIO::SendEventProtoThroughGrpcStream( - event, debug_url.substr(strlen(kGrpcURLScheme))); + DebugGrpcIO::SendEventProtoThroughGrpcStream(event, debug_url); } } @@ -200,8 +274,7 @@ Status DebugIO::PublishGraph(const Graph& graph, // static Status DebugIO::CloseDebugURL(const string& debug_url) { if (debug_url.find(DebugIO::kGrpcURLScheme) == 0) { - return DebugGrpcIO::CloseGrpcStream( - debug_url.substr(strlen(DebugIO::kGrpcURLScheme))); + return DebugGrpcIO::CloseGrpcStream(debug_url); } else { // No-op for non-gRPC URLs. return Status::OK(); @@ -348,57 +421,64 @@ std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>> DebugGrpcIO::stream_channels; // static -Status DebugGrpcIO::SendTensorThroughGrpcStream( - const string& node_name, const int32 output_slot, const string& debug_op, - const Tensor& tensor, const uint64 wall_time_us, - const string& server_stream_addr) { +Status DebugGrpcIO::SendTensorThroughGrpcStream(const string& node_name, + const int32 output_slot, + const string& debug_op, + const Tensor& tensor, + const uint64 wall_time_us, + const string& grpc_stream_url) { const string tensor_name = strings::StrCat(node_name, ":", output_slot); // Prepare tensor Event data to be sent. Event event = WrapTensorAsEvent(tensor_name, debug_op, tensor, wall_time_us); - return SendEventProtoThroughGrpcStream(event, server_stream_addr); + return SendEventProtoThroughGrpcStream(event, grpc_stream_url); } // static Status DebugGrpcIO::SendEventProtoThroughGrpcStream( - const Event& event_proto, const string& server_stream_addr) { + const Event& event_proto, const string& grpc_stream_url) { + const string addr_with_path = + grpc_stream_url.substr(strlen(DebugIO::kFileURLScheme)); + const string server_stream_addr = + addr_with_path.substr(0, addr_with_path.find('/')); + std::shared_ptr<DebugGrpcChannel> debug_grpc_channel; { mutex_lock l(streams_mu); - if (stream_channels.find(server_stream_addr) == stream_channels.end()) { + if (stream_channels.find(grpc_stream_url) == stream_channels.end()) { debug_grpc_channel.reset(new DebugGrpcChannel(server_stream_addr)); if (!debug_grpc_channel->is_channel_ready()) { return errors::FailedPrecondition( - strings::StrCat("Channel at the following gRPC address is ", - "not ready: ", server_stream_addr)); + strings::StrCat("Channel at the following gRPC stream URL is ", + "not ready: ", grpc_stream_url)); } - stream_channels[server_stream_addr] = debug_grpc_channel; + stream_channels[grpc_stream_url] = debug_grpc_channel; } else { - debug_grpc_channel = stream_channels[server_stream_addr]; + debug_grpc_channel = stream_channels[grpc_stream_url]; } } bool write_ok = debug_grpc_channel->WriteEvent(event_proto); if (!write_ok) { return errors::Cancelled(strings::StrCat("Write event to stream URL ", - server_stream_addr, "failed.")); + grpc_stream_url, "failed.")); } return Status::OK(); } -Status DebugGrpcIO::CloseGrpcStream(const string& server_stream_addr) { +Status DebugGrpcIO::CloseGrpcStream(const string& grpc_stream_url) { mutex_lock l(streams_mu); - if (stream_channels.find(server_stream_addr) != stream_channels.end()) { + if (stream_channels.find(grpc_stream_url) != stream_channels.end()) { // Stream of the specified address exists. Close it and remove it from // record. Status s; - s = stream_channels[server_stream_addr]->Close(); - stream_channels.erase(server_stream_addr); + s = stream_channels[grpc_stream_url]->Close(); + stream_channels.erase(grpc_stream_url); return s; } else { // Stream of the specified address does not exist. No action. |