aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/debug
diff options
context:
space:
mode:
authorGravatar Shanqing Cai <cais@google.com>2017-07-05 17:15:30 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-07-05 17:19:37 -0700
commit3f099e7d7f1c8b8d3f4121357fc7c40391f9eafe (patch)
treefe0d599c34d4fdd936102e24daf326b3198d56be /tensorflow/core/debug
parentdd951bb50d454f026083097b6d6122dae696259c (diff)
tfdbg: send large Tensor Event protos through gRPC stream in chunks
Also fixes flakiness in session_debug_grpc_test PiperOrigin-RevId: 161025848
Diffstat (limited to 'tensorflow/core/debug')
-rw-r--r--tensorflow/core/debug/debug_io_utils.cc200
-rw-r--r--tensorflow/core/debug/debug_io_utils.h58
-rw-r--r--tensorflow/core/debug/debugger_event_metadata.proto2
3 files changed, 209 insertions, 51 deletions
diff --git a/tensorflow/core/debug/debug_io_utils.cc b/tensorflow/core/debug/debug_io_utils.cc
index 875a4763f8..4e185b37a8 100644
--- a/tensorflow/core/debug/debug_io_utils.cc
+++ b/tensorflow/core/debug/debug_io_utils.cc
@@ -29,6 +29,7 @@ limitations under the License.
#include "tensorflow/core/debug/debugger_event_metadata.pb.h"
#include "tensorflow/core/framework/graph.pb.h"
#include "tensorflow/core/framework/summary.pb.h"
+#include "tensorflow/core/lib/core/bits.h"
#include "tensorflow/core/lib/hash/hash.h"
#include "tensorflow/core/lib/io/path.h"
#include "tensorflow/core/lib/strings/str_util.h"
@@ -45,27 +46,36 @@ namespace tensorflow {
namespace {
-// Encapsulate the tensor value inside a Summary proto, and then inside an
-// Event proto.
-Event WrapTensorAsEvent(const DebugNodeKey& debug_node_key,
- const Tensor& tensor, const uint64 wall_time_us) {
+// Creates an Event proto representing a chunk of a Tensor. This method only
+// populates the field of the Event proto that represent the envelope
+// informaion (e.g., timestmap, device_name, num_chunks, chunk_index, dtype,
+// shape). It does not set the value.tensor field, which should be set by the
+// caller separately.
+Event PrepareChunkEventProto(const DebugNodeKey& debug_node_key,
+ const uint64 wall_time_us, const size_t num_chunks,
+ const size_t chunk_index,
+ const DataType& tensor_dtype,
+ const TensorShapeProto& tensor_shape) {
Event event;
event.set_wall_time(static_cast<double>(wall_time_us));
- Summary::Value* summ_val = event.mutable_summary()->add_value();
+ Summary::Value* value = event.mutable_summary()->add_value();
// Create the debug node_name in the Summary proto.
// For example, if tensor_name = "foo/node_a:0", and the debug_op is
// "DebugIdentity", the debug node_name in the Summary proto will be
// "foo/node_a:0:DebugIdentity".
- summ_val->set_node_name(debug_node_key.debug_node_name);
+ value->set_node_name(debug_node_key.debug_node_name);
- // Tag by the node name. This allows TensorBoard to quickly fetch data per op.
- summ_val->set_tag(debug_node_key.node_name);
+ // Tag by the node name. This allows TensorBoard to quickly fetch data
+ // per op.
+ value->set_tag(debug_node_key.node_name);
// Store data within debugger metadata to be stored for each event.
third_party::tensorflow::core::debug::DebuggerEventMetadata metadata;
metadata.set_device(debug_node_key.device_name);
metadata.set_output_slot(debug_node_key.output_slot);
+ metadata.set_num_chunks(num_chunks);
+ metadata.set_chunk_index(chunk_index);
// Encode the data in JSON.
string json_output;
@@ -76,8 +86,8 @@ Event WrapTensorAsEvent(const DebugNodeKey& debug_node_key,
if (status.ok()) {
// Store summary metadata. Set the plugin to use this data as "debugger".
SummaryMetadata::PluginData* plugin_data =
- summ_val->mutable_metadata()->add_plugin_data();
- plugin_data->set_plugin_name("debugger");
+ value->mutable_metadata()->add_plugin_data();
+ plugin_data->set_plugin_name(DebugIO::kDebuggerPluginName);
plugin_data->set_content(json_output);
} else {
LOG(WARNING) << "Failed to convert DebuggerEventMetadata proto to JSON. "
@@ -85,19 +95,129 @@ Event WrapTensorAsEvent(const DebugNodeKey& debug_node_key,
<< ".";
}
+ value->mutable_tensor()->set_dtype(tensor_dtype);
+ *value->mutable_tensor()->mutable_tensor_shape() = tensor_shape;
+
+ return event;
+}
+
+// Translates the length of a string to number of bytes when the string is
+// encoded as bytes in protobuf. Note that this makes a conservative estimate
+// (i.e., an estimate that is usually too large, but never too small under the
+// gRPC message size limit) of the Varint-encoded length, to workaround the lack
+// of a portable length function.
+const size_t StringValMaxBytesInProto(const string& str) {
+#if defined(PLATFORM_GOOGLE)
+ return str.size() + DebugGrpcIO::kGrpcMaxVarintLengthSize;
+#else
+ return str.size();
+#endif
+}
+
+// Breaks a string Tensor (represented as a TensorProto) as a vector of Event
+// protos.
+Status WrapStringTensorAsEvents(const DebugNodeKey& debug_node_key,
+ const uint64 wall_time_us,
+ const size_t chunk_size_limit,
+ TensorProto* tensor_proto,
+ std::vector<Event>* events) {
+ const protobuf::RepeatedPtrField<string>& strs = tensor_proto->string_val();
+ const size_t num_strs = strs.size();
+ const size_t chunk_size_ub = chunk_size_limit > 0
+ ? chunk_size_limit
+ : std::numeric_limits<size_t>::max();
+
+ // E.g., if cutoffs is {j, k, l}, the chunks will have index ranges:
+ // [0:a), [a:b), [c:<end>].
+ std::vector<size_t> cutoffs;
+ size_t chunk_size = 0;
+ for (size_t i = 0; i < num_strs; ++i) {
+ // Take into account the extra bytes in proto buffer.
+ if (StringValMaxBytesInProto(strs[i]) > chunk_size_ub) {
+ return errors::FailedPrecondition(
+ "string value at index ", i, " from debug node ",
+ debug_node_key.debug_node_name,
+ " does not fit gRPC message size limit (", chunk_size_ub, ")");
+ }
+ if (chunk_size + StringValMaxBytesInProto(strs[i]) > chunk_size_ub) {
+ cutoffs.push_back(i);
+ chunk_size = 0;
+ }
+ chunk_size += StringValMaxBytesInProto(strs[i]);
+ }
+ cutoffs.push_back(num_strs);
+ const size_t num_chunks = cutoffs.size();
+
+ for (size_t i = 0; i < num_chunks; ++i) {
+ Event event = PrepareChunkEventProto(debug_node_key, wall_time_us,
+ num_chunks, i, tensor_proto->dtype(),
+ tensor_proto->tensor_shape());
+ Summary::Value* value = event.mutable_summary()->mutable_value(0);
+
+ if (cutoffs.size() == 1) {
+ value->mutable_tensor()->mutable_string_val()->Swap(
+ tensor_proto->mutable_string_val());
+ } else {
+ const size_t begin = (i == 0) ? 0 : cutoffs[i - 1];
+ const size_t end = cutoffs[i];
+ for (size_t j = begin; j < end; ++j) {
+ value->mutable_tensor()->add_string_val(strs[j]);
+ }
+ }
+
+ events->push_back(std::move(event));
+ }
+
+ return Status::OK();
+}
+
+// Encapsulates the tensor value inside a vector of Event protos. Large tensors
+// are broken up to multiple protos to fit the chunk_size_limit. In each Event
+// proto the field summary.tensor carries the content of the tensor.
+// If chunk_size_limit <= 0, the tensor will not be broken into chunks, i.e., a
+// length-1 vector will be returned, regardless of the size of the tensor.
+Status WrapTensorAsEvents(const DebugNodeKey& debug_node_key,
+ const Tensor& tensor, const uint64 wall_time_us,
+ const size_t chunk_size_limit,
+ std::vector<Event>* events) {
+ TensorProto tensor_proto;
if (tensor.dtype() == DT_STRING) {
- // Treat DT_STRING specially, so that tensor_util.MakeNdarray can convert
- // the TensorProto to string-type numpy array. MakeNdarray does not work
- // with strings encoded by AsProtoTensorContent() in tensor_content.
- tensor.AsProtoField(summ_val->mutable_tensor());
+ // Treat DT_STRING specially, so that tensor_util.MakeNdarray in Python can
+ // convert the TensorProto to string-type numpy array. MakeNdarray does not
+ // work with strings encoded by AsProtoTensorContent() in tensor_content.
+ tensor.AsProtoField(&tensor_proto);
+
+ TF_RETURN_IF_ERROR(WrapStringTensorAsEvents(
+ debug_node_key, wall_time_us, chunk_size_limit, &tensor_proto, events));
} else {
- tensor.AsProtoTensorContent(summ_val->mutable_tensor());
+ tensor.AsProtoTensorContent(&tensor_proto);
+
+ const size_t total_length = tensor_proto.tensor_content().size();
+ const size_t chunk_size_ub =
+ chunk_size_limit > 0 ? chunk_size_limit : total_length;
+ const size_t num_chunks =
+ (total_length == 0)
+ ? 1
+ : (total_length + chunk_size_ub - 1) / chunk_size_ub;
+ for (size_t i = 0; i < num_chunks; ++i) {
+ const size_t pos = i * chunk_size_ub;
+ const size_t len =
+ (i == num_chunks - 1) ? (total_length - pos) : chunk_size_ub;
+ Event event = PrepareChunkEventProto(debug_node_key, wall_time_us,
+ num_chunks, i, tensor_proto.dtype(),
+ tensor_proto.tensor_shape());
+ event.mutable_summary()
+ ->mutable_value(0)
+ ->mutable_tensor()
+ ->set_tensor_content(tensor_proto.tensor_content().substr(pos, len));
+ events->push_back(std::move(event));
+ }
}
- return event;
+ return Status::OK();
}
-// Append an underscore and a timestamp to a file path. If the path already
+// Appends an underscore and a timestamp to a file path. If the path already
// exists on the file system, append a hyphen and a 1-up index. Consecutive
// values of the index will be tried until the first unused one is found.
// TOCTOU race condition is not of concern here due to the fact that tfdbg
@@ -115,23 +235,28 @@ string AppendTimestampToFilePath(const string& in, const uint64 timestamp) {
}
#if defined(PLATFORM_GOOGLE)
+// Publishes encoded GraphDef through a gRPC debugger stream, in chunks,
+// conforming to the gRPC message size limit.
Status PublishEncodedGraphDefInChunks(const string& encoded_graph_def,
const string& device_name,
const int64 wall_time,
const string& debug_url) {
- static const size_t kChunkSizeLimitBytes = 4000 * 1024;
const uint64 hash = ::tensorflow::Hash64(encoded_graph_def);
const size_t total_length = encoded_graph_def.size();
- const size_t num_chunks = static_cast<size_t>(
- std::ceil(static_cast<float>(total_length) / kChunkSizeLimitBytes));
+ const size_t num_chunks =
+ static_cast<size_t>(std::ceil(static_cast<float>(total_length) /
+ DebugGrpcIO::kGrpcMessageSizeLimitBytes));
for (size_t i = 0; i < num_chunks; ++i) {
- const size_t pos = i * kChunkSizeLimitBytes;
- const size_t len =
- (i == num_chunks - 1) ? (total_length - pos) : kChunkSizeLimitBytes;
+ const size_t pos = i * DebugGrpcIO::kGrpcMessageSizeLimitBytes;
+ const size_t len = (i == num_chunks - 1)
+ ? (total_length - pos)
+ : DebugGrpcIO::kGrpcMessageSizeLimitBytes;
Event event;
event.set_wall_time(static_cast<double>(wall_time));
// Prefix the chunk with
// <hash64>,<device_name>,<wall_time>|<index>|<num_chunks>|.
+ // TODO(cais): Use DebuggerEventMetadata to store device_name, num_chunks
+ // and chunk_index, instead.
event.set_graph_def(strings::StrCat(hash, ",", device_name, ",", wall_time,
"|", i, "|", num_chunks, "|",
encoded_graph_def.substr(pos, len)));
@@ -148,6 +273,9 @@ Status PublishEncodedGraphDefInChunks(const string& encoded_graph_def,
} // namespace
// static
+const char* const DebugIO::kDebuggerPluginName = "debugger";
+
+// static
const char* const DebugIO::kMetadataFilePrefix = "_tfdbg_";
// static
@@ -215,6 +343,7 @@ const char* const DebugIO::kFileURLScheme = "file://";
// static
const char* const DebugIO::kGrpcURLScheme = "grpc://";
+// Publishes debug metadata to a set of debug URLs.
// static
Status DebugIO::PublishDebugMetadata(
const int64 global_step, const int64 session_run_index,
@@ -532,9 +661,11 @@ Status DebugFileIO::DumpTensorToEventFile(const DebugNodeKey& debug_node_key,
const Tensor& tensor,
const uint64 wall_time_us,
const string& file_path) {
- return DumpEventProtoToFile(
- WrapTensorAsEvent(debug_node_key, tensor, wall_time_us),
- io::Dirname(file_path).ToString(), io::Basename(file_path).ToString());
+ std::vector<Event> events;
+ TF_RETURN_IF_ERROR(
+ WrapTensorAsEvents(debug_node_key, tensor, wall_time_us, 0, &events));
+ return DumpEventProtoToFile(events[0], io::Dirname(file_path).ToString(),
+ io::Basename(file_path).ToString());
}
// static
@@ -642,6 +773,12 @@ int64 DebugGrpcIO::channel_connection_timeout_micros = 900 * 1000 * 1000;
// TODO(cais): Make this configurable?
// static
+const size_t DebugGrpcIO::kGrpcMessageSizeLimitBytes = 4000 * 1024;
+
+// static
+const size_t DebugGrpcIO::kGrpcMaxVarintLengthSize = 6;
+
+// static
std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>>*
DebugGrpcIO::GetStreamChannels() {
static std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>>*
@@ -658,9 +795,14 @@ Status DebugGrpcIO::SendTensorThroughGrpcStream(
if (gated && !IsGateOpen(debug_node_key.debug_node_name, grpc_stream_url)) {
return Status::OK();
} else {
- return SendEventProtoThroughGrpcStream(
- WrapTensorAsEvent(debug_node_key, tensor, wall_time_us),
- grpc_stream_url);
+ std::vector<Event> events;
+ TF_RETURN_IF_ERROR(WrapTensorAsEvents(debug_node_key, tensor, wall_time_us,
+ kGrpcMessageSizeLimitBytes, &events));
+ for (const Event& event : events) {
+ TF_RETURN_IF_ERROR(
+ SendEventProtoThroughGrpcStream(event, grpc_stream_url));
+ }
+ return Status::OK();
}
}
diff --git a/tensorflow/core/debug/debug_io_utils.h b/tensorflow/core/debug/debug_io_utils.h
index 4caa4b5e04..98effed425 100644
--- a/tensorflow/core/debug/debug_io_utils.h
+++ b/tensorflow/core/debug/debug_io_utils.h
@@ -44,6 +44,9 @@ struct DebugNodeKey {
DebugNodeKey(const string& device_name, const string& node_name,
const int32 output_slot, const string& debug_op);
+ // Converts a device name string to a device path string.
+ // E.g., /job:localhost/replica:0/task:0/cpu:0 will be converted to
+ // ,job_localhost,replica_0,task_0,cpu_0.
static const string DeviceNameToDevicePath(const string& device_name);
const string device_name;
@@ -56,6 +59,17 @@ struct DebugNodeKey {
class DebugIO {
public:
+ static const char* const kDebuggerPluginName;
+
+ static const char* const kMetadataFilePrefix;
+ static const char* const kCoreMetadataTag;
+ static const char* const kDeviceTag;
+ static const char* const kGraphTag;
+ static const char* const kHashTag;
+
+ static const char* const kFileURLScheme;
+ static const char* const kGrpcURLScheme;
+
static Status PublishDebugMetadata(
const int64 global_step, const int64 session_run_index,
const int64 executor_step_index, const std::vector<string>& input_names,
@@ -63,7 +77,7 @@ class DebugIO {
const std::vector<string>& target_nodes,
const std::unordered_set<string>& debug_urls);
- // Publish a tensor to a debug target URL.
+ // Publishes a tensor to a debug target URL.
//
// Args:
// debug_node_key: A DebugNodeKey identifying the debug node.
@@ -84,7 +98,7 @@ class DebugIO {
const uint64 wall_time_us,
const gtl::ArraySlice<string>& debug_urls);
- // Publish a graph to a set of debug URLs.
+ // Publishes a graph to a set of debug URLs.
//
// Args:
// graph: The graph to be published.
@@ -92,7 +106,7 @@ class DebugIO {
static Status PublishGraph(const Graph& graph, const string& device_name,
const std::unordered_set<string>& debug_urls);
- // Determine whether a copy node needs to perform deep-copy of input tensor.
+ // Determines whether a copy node needs to perform deep-copy of input tensor.
//
// The input arguments contain sufficient information about the attached
// downstream debug ops for this method to determine whether all the said
@@ -109,7 +123,7 @@ class DebugIO {
static bool IsCopyNodeGateOpen(
const std::vector<DebugWatchAndURLSpec>& specs);
- // Determine whether a debug node needs to proceed given the current gRPC
+ // Determines whether a debug node needs to proceed given the current gRPC
// gating status.
//
// Args:
@@ -122,7 +136,7 @@ class DebugIO {
static bool IsDebugNodeGateOpen(const string& watch_key,
const std::vector<string>& debug_urls);
- // Determine whether debug information should be sent through a grpc://
+ // Determines whether debug information should be sent through a grpc://
// debug URL given the current gRPC gating status.
//
// Args:
@@ -138,21 +152,12 @@ class DebugIO {
const string& debug_url);
static Status CloseDebugURL(const string& debug_url);
-
- static const char* const kMetadataFilePrefix;
- static const char* const kCoreMetadataTag;
- static const char* const kDeviceTag;
- static const char* const kGraphTag;
- static const char* const kHashTag;
-
- static const char* const kFileURLScheme;
- static const char* const kGrpcURLScheme;
};
// Helper class for debug ops.
class DebugFileIO {
public:
- // Encapsulate the Tensor in an Event protobuf and write it to a directory.
+ // Encapsulates the Tensor in an Event protobuf and write it to a directory.
// The actual path of the dump file will be a contactenation of
// dump_root_dir, tensor_name, along with the wall_time.
//
@@ -189,12 +194,18 @@ class DebugFileIO {
const DebugNodeKey& debug_node_key,
const uint64 wall_time_us);
+ // Dumps an Event proto to a file.
+ //
+ // Args:
+ // event_prot: The Event proto to be dumped.
+ // dir_name: Directory path.
+ // file_name: Base file name.
static Status DumpEventProtoToFile(const Event& event_proto,
const string& dir_name,
const string& file_name);
private:
- // Encapsulate the Tensor in an Event protobuf and write it to file.
+ // Encapsulates the Tensor in an Event protobuf and write it to file.
static Status DumpTensorToEventFile(const DebugNodeKey& debug_node_key,
const Tensor& tensor,
const uint64 wall_time_us,
@@ -267,20 +278,23 @@ class DebugGrpcChannel {
class DebugGrpcIO {
public:
- // Send a tensor through a debug gRPC stream.
+ static const size_t kGrpcMessageSizeLimitBytes;
+ static const size_t kGrpcMaxVarintLengthSize;
+
+ // Sends a tensor through a debug gRPC stream.
static Status SendTensorThroughGrpcStream(const DebugNodeKey& debug_node_key,
const Tensor& tensor,
const uint64 wall_time_us,
const string& grpc_stream_url,
const bool gated);
- // Send an Event proto through a debug gRPC stream.
+ // Sends an Event proto through a debug gRPC stream.
// Thread-safety: Safe with respect to other calls to the same method and
// calls to CloseGrpcStream().
static Status SendEventProtoThroughGrpcStream(const Event& event_proto,
const string& grpc_stream_url);
- // Check whether a debug watch key is allowed to send data to a given grpc://
+ // Checks whether a debug watch key is allowed to send data to a given grpc://
// debug URL given the current gating status.
//
// Args:
@@ -293,16 +307,16 @@ class DebugGrpcIO {
// proceed.
static bool IsGateOpen(const string& watch_key, const string& grpc_debug_url);
- // Close a gRPC stream to the given address, if it exists.
+ // Closes a gRPC stream to the given address, if it exists.
// Thread-safety: Safe with respect to other calls to the same method and
// calls to SendTensorThroughGrpcStream().
static Status CloseGrpcStream(const string& grpc_stream_url);
- // Enable a debug watch key at a grpc:// debug URL.
+ // Enables a debug watch key at a grpc:// debug URL.
static void EnableWatchKey(const string& grpc_debug_url,
const string& watch_key);
- // Disable a debug watch key at a grpc:// debug URL.
+ // Disables a debug watch key at a grpc:// debug URL.
static void DisableWatchKey(const string& grpc_debug_url,
const string& watch_key);
diff --git a/tensorflow/core/debug/debugger_event_metadata.proto b/tensorflow/core/debug/debugger_event_metadata.proto
index 44ef305f5a..8bdedb1a50 100644
--- a/tensorflow/core/debug/debugger_event_metadata.proto
+++ b/tensorflow/core/debug/debugger_event_metadata.proto
@@ -6,4 +6,6 @@ package third_party.tensorflow.core.debug;
message DebuggerEventMetadata {
string device = 1;
int32 output_slot = 2;
+ int32 num_chunks = 3;
+ int32 chunk_index = 4;
};