diff options
author | Shanqing Cai <cais@google.com> | 2017-07-05 17:15:30 -0700 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2017-07-05 17:19:37 -0700 |
commit | 3f099e7d7f1c8b8d3f4121357fc7c40391f9eafe (patch) | |
tree | fe0d599c34d4fdd936102e24daf326b3198d56be /tensorflow/core/debug | |
parent | dd951bb50d454f026083097b6d6122dae696259c (diff) |
tfdbg: send large Tensor Event protos through gRPC stream in chunks
Also fixes flakiness in session_debug_grpc_test
PiperOrigin-RevId: 161025848
Diffstat (limited to 'tensorflow/core/debug')
-rw-r--r-- | tensorflow/core/debug/debug_io_utils.cc | 200 | ||||
-rw-r--r-- | tensorflow/core/debug/debug_io_utils.h | 58 | ||||
-rw-r--r-- | tensorflow/core/debug/debugger_event_metadata.proto | 2 |
3 files changed, 209 insertions, 51 deletions
diff --git a/tensorflow/core/debug/debug_io_utils.cc b/tensorflow/core/debug/debug_io_utils.cc index 875a4763f8..4e185b37a8 100644 --- a/tensorflow/core/debug/debug_io_utils.cc +++ b/tensorflow/core/debug/debug_io_utils.cc @@ -29,6 +29,7 @@ limitations under the License. #include "tensorflow/core/debug/debugger_event_metadata.pb.h" #include "tensorflow/core/framework/graph.pb.h" #include "tensorflow/core/framework/summary.pb.h" +#include "tensorflow/core/lib/core/bits.h" #include "tensorflow/core/lib/hash/hash.h" #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/strings/str_util.h" @@ -45,27 +46,36 @@ namespace tensorflow { namespace { -// Encapsulate the tensor value inside a Summary proto, and then inside an -// Event proto. -Event WrapTensorAsEvent(const DebugNodeKey& debug_node_key, - const Tensor& tensor, const uint64 wall_time_us) { +// Creates an Event proto representing a chunk of a Tensor. This method only +// populates the field of the Event proto that represent the envelope +// informaion (e.g., timestmap, device_name, num_chunks, chunk_index, dtype, +// shape). It does not set the value.tensor field, which should be set by the +// caller separately. +Event PrepareChunkEventProto(const DebugNodeKey& debug_node_key, + const uint64 wall_time_us, const size_t num_chunks, + const size_t chunk_index, + const DataType& tensor_dtype, + const TensorShapeProto& tensor_shape) { Event event; event.set_wall_time(static_cast<double>(wall_time_us)); - Summary::Value* summ_val = event.mutable_summary()->add_value(); + Summary::Value* value = event.mutable_summary()->add_value(); // Create the debug node_name in the Summary proto. // For example, if tensor_name = "foo/node_a:0", and the debug_op is // "DebugIdentity", the debug node_name in the Summary proto will be // "foo/node_a:0:DebugIdentity". - summ_val->set_node_name(debug_node_key.debug_node_name); + value->set_node_name(debug_node_key.debug_node_name); - // Tag by the node name. This allows TensorBoard to quickly fetch data per op. - summ_val->set_tag(debug_node_key.node_name); + // Tag by the node name. This allows TensorBoard to quickly fetch data + // per op. + value->set_tag(debug_node_key.node_name); // Store data within debugger metadata to be stored for each event. third_party::tensorflow::core::debug::DebuggerEventMetadata metadata; metadata.set_device(debug_node_key.device_name); metadata.set_output_slot(debug_node_key.output_slot); + metadata.set_num_chunks(num_chunks); + metadata.set_chunk_index(chunk_index); // Encode the data in JSON. string json_output; @@ -76,8 +86,8 @@ Event WrapTensorAsEvent(const DebugNodeKey& debug_node_key, if (status.ok()) { // Store summary metadata. Set the plugin to use this data as "debugger". SummaryMetadata::PluginData* plugin_data = - summ_val->mutable_metadata()->add_plugin_data(); - plugin_data->set_plugin_name("debugger"); + value->mutable_metadata()->add_plugin_data(); + plugin_data->set_plugin_name(DebugIO::kDebuggerPluginName); plugin_data->set_content(json_output); } else { LOG(WARNING) << "Failed to convert DebuggerEventMetadata proto to JSON. " @@ -85,19 +95,129 @@ Event WrapTensorAsEvent(const DebugNodeKey& debug_node_key, << "."; } + value->mutable_tensor()->set_dtype(tensor_dtype); + *value->mutable_tensor()->mutable_tensor_shape() = tensor_shape; + + return event; +} + +// Translates the length of a string to number of bytes when the string is +// encoded as bytes in protobuf. Note that this makes a conservative estimate +// (i.e., an estimate that is usually too large, but never too small under the +// gRPC message size limit) of the Varint-encoded length, to workaround the lack +// of a portable length function. +const size_t StringValMaxBytesInProto(const string& str) { +#if defined(PLATFORM_GOOGLE) + return str.size() + DebugGrpcIO::kGrpcMaxVarintLengthSize; +#else + return str.size(); +#endif +} + +// Breaks a string Tensor (represented as a TensorProto) as a vector of Event +// protos. +Status WrapStringTensorAsEvents(const DebugNodeKey& debug_node_key, + const uint64 wall_time_us, + const size_t chunk_size_limit, + TensorProto* tensor_proto, + std::vector<Event>* events) { + const protobuf::RepeatedPtrField<string>& strs = tensor_proto->string_val(); + const size_t num_strs = strs.size(); + const size_t chunk_size_ub = chunk_size_limit > 0 + ? chunk_size_limit + : std::numeric_limits<size_t>::max(); + + // E.g., if cutoffs is {j, k, l}, the chunks will have index ranges: + // [0:a), [a:b), [c:<end>]. + std::vector<size_t> cutoffs; + size_t chunk_size = 0; + for (size_t i = 0; i < num_strs; ++i) { + // Take into account the extra bytes in proto buffer. + if (StringValMaxBytesInProto(strs[i]) > chunk_size_ub) { + return errors::FailedPrecondition( + "string value at index ", i, " from debug node ", + debug_node_key.debug_node_name, + " does not fit gRPC message size limit (", chunk_size_ub, ")"); + } + if (chunk_size + StringValMaxBytesInProto(strs[i]) > chunk_size_ub) { + cutoffs.push_back(i); + chunk_size = 0; + } + chunk_size += StringValMaxBytesInProto(strs[i]); + } + cutoffs.push_back(num_strs); + const size_t num_chunks = cutoffs.size(); + + for (size_t i = 0; i < num_chunks; ++i) { + Event event = PrepareChunkEventProto(debug_node_key, wall_time_us, + num_chunks, i, tensor_proto->dtype(), + tensor_proto->tensor_shape()); + Summary::Value* value = event.mutable_summary()->mutable_value(0); + + if (cutoffs.size() == 1) { + value->mutable_tensor()->mutable_string_val()->Swap( + tensor_proto->mutable_string_val()); + } else { + const size_t begin = (i == 0) ? 0 : cutoffs[i - 1]; + const size_t end = cutoffs[i]; + for (size_t j = begin; j < end; ++j) { + value->mutable_tensor()->add_string_val(strs[j]); + } + } + + events->push_back(std::move(event)); + } + + return Status::OK(); +} + +// Encapsulates the tensor value inside a vector of Event protos. Large tensors +// are broken up to multiple protos to fit the chunk_size_limit. In each Event +// proto the field summary.tensor carries the content of the tensor. +// If chunk_size_limit <= 0, the tensor will not be broken into chunks, i.e., a +// length-1 vector will be returned, regardless of the size of the tensor. +Status WrapTensorAsEvents(const DebugNodeKey& debug_node_key, + const Tensor& tensor, const uint64 wall_time_us, + const size_t chunk_size_limit, + std::vector<Event>* events) { + TensorProto tensor_proto; if (tensor.dtype() == DT_STRING) { - // Treat DT_STRING specially, so that tensor_util.MakeNdarray can convert - // the TensorProto to string-type numpy array. MakeNdarray does not work - // with strings encoded by AsProtoTensorContent() in tensor_content. - tensor.AsProtoField(summ_val->mutable_tensor()); + // Treat DT_STRING specially, so that tensor_util.MakeNdarray in Python can + // convert the TensorProto to string-type numpy array. MakeNdarray does not + // work with strings encoded by AsProtoTensorContent() in tensor_content. + tensor.AsProtoField(&tensor_proto); + + TF_RETURN_IF_ERROR(WrapStringTensorAsEvents( + debug_node_key, wall_time_us, chunk_size_limit, &tensor_proto, events)); } else { - tensor.AsProtoTensorContent(summ_val->mutable_tensor()); + tensor.AsProtoTensorContent(&tensor_proto); + + const size_t total_length = tensor_proto.tensor_content().size(); + const size_t chunk_size_ub = + chunk_size_limit > 0 ? chunk_size_limit : total_length; + const size_t num_chunks = + (total_length == 0) + ? 1 + : (total_length + chunk_size_ub - 1) / chunk_size_ub; + for (size_t i = 0; i < num_chunks; ++i) { + const size_t pos = i * chunk_size_ub; + const size_t len = + (i == num_chunks - 1) ? (total_length - pos) : chunk_size_ub; + Event event = PrepareChunkEventProto(debug_node_key, wall_time_us, + num_chunks, i, tensor_proto.dtype(), + tensor_proto.tensor_shape()); + event.mutable_summary() + ->mutable_value(0) + ->mutable_tensor() + ->set_tensor_content(tensor_proto.tensor_content().substr(pos, len)); + events->push_back(std::move(event)); + } } - return event; + return Status::OK(); } -// Append an underscore and a timestamp to a file path. If the path already +// Appends an underscore and a timestamp to a file path. If the path already // exists on the file system, append a hyphen and a 1-up index. Consecutive // values of the index will be tried until the first unused one is found. // TOCTOU race condition is not of concern here due to the fact that tfdbg @@ -115,23 +235,28 @@ string AppendTimestampToFilePath(const string& in, const uint64 timestamp) { } #if defined(PLATFORM_GOOGLE) +// Publishes encoded GraphDef through a gRPC debugger stream, in chunks, +// conforming to the gRPC message size limit. Status PublishEncodedGraphDefInChunks(const string& encoded_graph_def, const string& device_name, const int64 wall_time, const string& debug_url) { - static const size_t kChunkSizeLimitBytes = 4000 * 1024; const uint64 hash = ::tensorflow::Hash64(encoded_graph_def); const size_t total_length = encoded_graph_def.size(); - const size_t num_chunks = static_cast<size_t>( - std::ceil(static_cast<float>(total_length) / kChunkSizeLimitBytes)); + const size_t num_chunks = + static_cast<size_t>(std::ceil(static_cast<float>(total_length) / + DebugGrpcIO::kGrpcMessageSizeLimitBytes)); for (size_t i = 0; i < num_chunks; ++i) { - const size_t pos = i * kChunkSizeLimitBytes; - const size_t len = - (i == num_chunks - 1) ? (total_length - pos) : kChunkSizeLimitBytes; + const size_t pos = i * DebugGrpcIO::kGrpcMessageSizeLimitBytes; + const size_t len = (i == num_chunks - 1) + ? (total_length - pos) + : DebugGrpcIO::kGrpcMessageSizeLimitBytes; Event event; event.set_wall_time(static_cast<double>(wall_time)); // Prefix the chunk with // <hash64>,<device_name>,<wall_time>|<index>|<num_chunks>|. + // TODO(cais): Use DebuggerEventMetadata to store device_name, num_chunks + // and chunk_index, instead. event.set_graph_def(strings::StrCat(hash, ",", device_name, ",", wall_time, "|", i, "|", num_chunks, "|", encoded_graph_def.substr(pos, len))); @@ -148,6 +273,9 @@ Status PublishEncodedGraphDefInChunks(const string& encoded_graph_def, } // namespace // static +const char* const DebugIO::kDebuggerPluginName = "debugger"; + +// static const char* const DebugIO::kMetadataFilePrefix = "_tfdbg_"; // static @@ -215,6 +343,7 @@ const char* const DebugIO::kFileURLScheme = "file://"; // static const char* const DebugIO::kGrpcURLScheme = "grpc://"; +// Publishes debug metadata to a set of debug URLs. // static Status DebugIO::PublishDebugMetadata( const int64 global_step, const int64 session_run_index, @@ -532,9 +661,11 @@ Status DebugFileIO::DumpTensorToEventFile(const DebugNodeKey& debug_node_key, const Tensor& tensor, const uint64 wall_time_us, const string& file_path) { - return DumpEventProtoToFile( - WrapTensorAsEvent(debug_node_key, tensor, wall_time_us), - io::Dirname(file_path).ToString(), io::Basename(file_path).ToString()); + std::vector<Event> events; + TF_RETURN_IF_ERROR( + WrapTensorAsEvents(debug_node_key, tensor, wall_time_us, 0, &events)); + return DumpEventProtoToFile(events[0], io::Dirname(file_path).ToString(), + io::Basename(file_path).ToString()); } // static @@ -642,6 +773,12 @@ int64 DebugGrpcIO::channel_connection_timeout_micros = 900 * 1000 * 1000; // TODO(cais): Make this configurable? // static +const size_t DebugGrpcIO::kGrpcMessageSizeLimitBytes = 4000 * 1024; + +// static +const size_t DebugGrpcIO::kGrpcMaxVarintLengthSize = 6; + +// static std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>>* DebugGrpcIO::GetStreamChannels() { static std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>>* @@ -658,9 +795,14 @@ Status DebugGrpcIO::SendTensorThroughGrpcStream( if (gated && !IsGateOpen(debug_node_key.debug_node_name, grpc_stream_url)) { return Status::OK(); } else { - return SendEventProtoThroughGrpcStream( - WrapTensorAsEvent(debug_node_key, tensor, wall_time_us), - grpc_stream_url); + std::vector<Event> events; + TF_RETURN_IF_ERROR(WrapTensorAsEvents(debug_node_key, tensor, wall_time_us, + kGrpcMessageSizeLimitBytes, &events)); + for (const Event& event : events) { + TF_RETURN_IF_ERROR( + SendEventProtoThroughGrpcStream(event, grpc_stream_url)); + } + return Status::OK(); } } diff --git a/tensorflow/core/debug/debug_io_utils.h b/tensorflow/core/debug/debug_io_utils.h index 4caa4b5e04..98effed425 100644 --- a/tensorflow/core/debug/debug_io_utils.h +++ b/tensorflow/core/debug/debug_io_utils.h @@ -44,6 +44,9 @@ struct DebugNodeKey { DebugNodeKey(const string& device_name, const string& node_name, const int32 output_slot, const string& debug_op); + // Converts a device name string to a device path string. + // E.g., /job:localhost/replica:0/task:0/cpu:0 will be converted to + // ,job_localhost,replica_0,task_0,cpu_0. static const string DeviceNameToDevicePath(const string& device_name); const string device_name; @@ -56,6 +59,17 @@ struct DebugNodeKey { class DebugIO { public: + static const char* const kDebuggerPluginName; + + static const char* const kMetadataFilePrefix; + static const char* const kCoreMetadataTag; + static const char* const kDeviceTag; + static const char* const kGraphTag; + static const char* const kHashTag; + + static const char* const kFileURLScheme; + static const char* const kGrpcURLScheme; + static Status PublishDebugMetadata( const int64 global_step, const int64 session_run_index, const int64 executor_step_index, const std::vector<string>& input_names, @@ -63,7 +77,7 @@ class DebugIO { const std::vector<string>& target_nodes, const std::unordered_set<string>& debug_urls); - // Publish a tensor to a debug target URL. + // Publishes a tensor to a debug target URL. // // Args: // debug_node_key: A DebugNodeKey identifying the debug node. @@ -84,7 +98,7 @@ class DebugIO { const uint64 wall_time_us, const gtl::ArraySlice<string>& debug_urls); - // Publish a graph to a set of debug URLs. + // Publishes a graph to a set of debug URLs. // // Args: // graph: The graph to be published. @@ -92,7 +106,7 @@ class DebugIO { static Status PublishGraph(const Graph& graph, const string& device_name, const std::unordered_set<string>& debug_urls); - // Determine whether a copy node needs to perform deep-copy of input tensor. + // Determines whether a copy node needs to perform deep-copy of input tensor. // // The input arguments contain sufficient information about the attached // downstream debug ops for this method to determine whether all the said @@ -109,7 +123,7 @@ class DebugIO { static bool IsCopyNodeGateOpen( const std::vector<DebugWatchAndURLSpec>& specs); - // Determine whether a debug node needs to proceed given the current gRPC + // Determines whether a debug node needs to proceed given the current gRPC // gating status. // // Args: @@ -122,7 +136,7 @@ class DebugIO { static bool IsDebugNodeGateOpen(const string& watch_key, const std::vector<string>& debug_urls); - // Determine whether debug information should be sent through a grpc:// + // Determines whether debug information should be sent through a grpc:// // debug URL given the current gRPC gating status. // // Args: @@ -138,21 +152,12 @@ class DebugIO { const string& debug_url); static Status CloseDebugURL(const string& debug_url); - - static const char* const kMetadataFilePrefix; - static const char* const kCoreMetadataTag; - static const char* const kDeviceTag; - static const char* const kGraphTag; - static const char* const kHashTag; - - static const char* const kFileURLScheme; - static const char* const kGrpcURLScheme; }; // Helper class for debug ops. class DebugFileIO { public: - // Encapsulate the Tensor in an Event protobuf and write it to a directory. + // Encapsulates the Tensor in an Event protobuf and write it to a directory. // The actual path of the dump file will be a contactenation of // dump_root_dir, tensor_name, along with the wall_time. // @@ -189,12 +194,18 @@ class DebugFileIO { const DebugNodeKey& debug_node_key, const uint64 wall_time_us); + // Dumps an Event proto to a file. + // + // Args: + // event_prot: The Event proto to be dumped. + // dir_name: Directory path. + // file_name: Base file name. static Status DumpEventProtoToFile(const Event& event_proto, const string& dir_name, const string& file_name); private: - // Encapsulate the Tensor in an Event protobuf and write it to file. + // Encapsulates the Tensor in an Event protobuf and write it to file. static Status DumpTensorToEventFile(const DebugNodeKey& debug_node_key, const Tensor& tensor, const uint64 wall_time_us, @@ -267,20 +278,23 @@ class DebugGrpcChannel { class DebugGrpcIO { public: - // Send a tensor through a debug gRPC stream. + static const size_t kGrpcMessageSizeLimitBytes; + static const size_t kGrpcMaxVarintLengthSize; + + // Sends a tensor through a debug gRPC stream. static Status SendTensorThroughGrpcStream(const DebugNodeKey& debug_node_key, const Tensor& tensor, const uint64 wall_time_us, const string& grpc_stream_url, const bool gated); - // Send an Event proto through a debug gRPC stream. + // Sends an Event proto through a debug gRPC stream. // Thread-safety: Safe with respect to other calls to the same method and // calls to CloseGrpcStream(). static Status SendEventProtoThroughGrpcStream(const Event& event_proto, const string& grpc_stream_url); - // Check whether a debug watch key is allowed to send data to a given grpc:// + // Checks whether a debug watch key is allowed to send data to a given grpc:// // debug URL given the current gating status. // // Args: @@ -293,16 +307,16 @@ class DebugGrpcIO { // proceed. static bool IsGateOpen(const string& watch_key, const string& grpc_debug_url); - // Close a gRPC stream to the given address, if it exists. + // Closes a gRPC stream to the given address, if it exists. // Thread-safety: Safe with respect to other calls to the same method and // calls to SendTensorThroughGrpcStream(). static Status CloseGrpcStream(const string& grpc_stream_url); - // Enable a debug watch key at a grpc:// debug URL. + // Enables a debug watch key at a grpc:// debug URL. static void EnableWatchKey(const string& grpc_debug_url, const string& watch_key); - // Disable a debug watch key at a grpc:// debug URL. + // Disables a debug watch key at a grpc:// debug URL. static void DisableWatchKey(const string& grpc_debug_url, const string& watch_key); diff --git a/tensorflow/core/debug/debugger_event_metadata.proto b/tensorflow/core/debug/debugger_event_metadata.proto index 44ef305f5a..8bdedb1a50 100644 --- a/tensorflow/core/debug/debugger_event_metadata.proto +++ b/tensorflow/core/debug/debugger_event_metadata.proto @@ -6,4 +6,6 @@ package third_party.tensorflow.core.debug; message DebuggerEventMetadata { string device = 1; int32 output_slot = 2; + int32 num_chunks = 3; + int32 chunk_index = 4; }; |