diff options
-rw-r--r-- | tensorflow/core/debug/debug_io_utils.cc | 38 | ||||
-rw-r--r-- | tensorflow/core/debug/debug_io_utils.h | 8 | ||||
-rw-r--r-- | tensorflow/core/debug/debug_io_utils_test.cc | 28 | ||||
-rw-r--r-- | tensorflow/core/debug/grpc_session_debug_test.cc | 10 | ||||
-rw-r--r-- | tensorflow/core/kernels/debug_ops_test.cc | 21 | ||||
-rw-r--r-- | tensorflow/python/debug/BUILD | 16 | ||||
-rw-r--r-- | tensorflow/python/debug/lib/debug_data.py | 809 | ||||
-rw-r--r-- | tensorflow/python/debug/lib/debug_data_test.py | 140 | ||||
-rw-r--r-- | tensorflow/python/debug/lib/session_debug_multi_gpu_test.py | 93 | ||||
-rw-r--r-- | tensorflow/python/debug/lib/session_debug_testlib.py | 47 |
10 files changed, 884 insertions, 326 deletions
diff --git a/tensorflow/core/debug/debug_io_utils.cc b/tensorflow/core/debug/debug_io_utils.cc index 25847a20a4..54366ce249 100644 --- a/tensorflow/core/debug/debug_io_utils.cc +++ b/tensorflow/core/debug/debug_io_utils.cc @@ -119,6 +119,18 @@ Status PublishEncodedGraphDefInChunks(const string& encoded_graph_def, } // namespace +// static +const char* const DebugIO::kMetadataFilePrefix = "_tfdbg_"; + +// static +const char* const DebugIO::kCoreMetadataTag = "core_metadata_"; + +// static +const char* const DebugIO::kDeviceTag = "device_"; + +// static +const char* const DebugIO::kGraphTag = "graph_"; + DebugNodeKey::DebugNodeKey(const string& device_name, const string& node_name, const int32 output_slot, const string& debug_op) : device_name(device_name), @@ -126,7 +138,8 @@ DebugNodeKey::DebugNodeKey(const string& device_name, const string& node_name, output_slot(output_slot), debug_op(debug_op), debug_node_name( - strings::StrCat(node_name, ":", output_slot, ":", debug_op)) {} + strings::StrCat(node_name, ":", output_slot, ":", debug_op)), + device_path(DeviceNameToDevicePath(device_name)) {} Status ReadEventFromFile(const string& dump_file_path, Event* event) { Env* env(Env::Default()); @@ -158,6 +171,15 @@ Status ReadEventFromFile(const string& dump_file_path, Event* event) { } // static +const string DebugNodeKey::DeviceNameToDevicePath(const string& device_name) { + return strings::StrCat( + DebugIO::kMetadataFilePrefix, DebugIO::kDeviceTag, + str_util::StringReplace( + str_util::StringReplace(device_name, ":", "_", true), "/", ",", + true)); +} + +// static const char* const DebugIO::kFileURLScheme = "file://"; // static const char* const DebugIO::kGrpcURLScheme = "grpc://"; @@ -236,7 +258,8 @@ Status DebugIO::PublishDebugMetadata( const string core_metadata_path = AppendTimestampToFilePath( io::JoinPath( dump_root_dir, - strings::StrCat("_tfdbg_core_metadata_", "sessionrun", + strings::StrCat(DebugIO::kMetadataFilePrefix, + DebugIO::kCoreMetadataTag, "sessionrun", strings::Printf("%.14lld", session_run_index))), Env::Default()->NowMicros()); status.Update(DebugFileIO::DumpEventProtoToFile( @@ -325,10 +348,11 @@ Status DebugIO::PublishGraph(const Graph& graph, const string& device_name, Status status = Status::OK(); for (const string& debug_url : debug_urls) { if (debug_url.find(kFileURLScheme) == 0) { - const string dump_root_dir = debug_url.substr(strlen(kFileURLScheme)); - // TODO(cais): (b/38325442) Serialize the GraphDef to a directory that - // reflects the device name. - const string file_name = strings::StrCat("_tfdbg_graph_", now_micros); + const string dump_root_dir = + io::JoinPath(debug_url.substr(strlen(kFileURLScheme)), + DebugNodeKey::DeviceNameToDevicePath(device_name)); + const string file_name = strings::StrCat(DebugIO::kMetadataFilePrefix, + DebugIO::kGraphTag, now_micros); status.Update( DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name)); @@ -437,7 +461,7 @@ string DebugFileIO::GetDumpFilePath(const string& dump_root_dir, const DebugNodeKey& debug_node_key, const uint64 wall_time_us) { return AppendTimestampToFilePath( - io::JoinPath(dump_root_dir, + io::JoinPath(dump_root_dir, debug_node_key.device_path, strings::StrCat(debug_node_key.node_name, "_", debug_node_key.output_slot, "_", debug_node_key.debug_op)), diff --git a/tensorflow/core/debug/debug_io_utils.h b/tensorflow/core/debug/debug_io_utils.h index f3e76cc0ee..69d8c7bd4e 100644 --- a/tensorflow/core/debug/debug_io_utils.h +++ b/tensorflow/core/debug/debug_io_utils.h @@ -44,11 +44,14 @@ struct DebugNodeKey { DebugNodeKey(const string& device_name, const string& node_name, const int32 output_slot, const string& debug_op); + static const string DeviceNameToDevicePath(const string& device_name); + const string device_name; const string node_name; const int32 output_slot; const string debug_op; const string debug_node_name; + const string device_path; }; class DebugIO { @@ -136,6 +139,11 @@ class DebugIO { static Status CloseDebugURL(const string& debug_url); + static const char* const kMetadataFilePrefix; + static const char* const kCoreMetadataTag; + static const char* const kDeviceTag; + static const char* const kGraphTag; + static const char* const kFileURLScheme; static const char* const kGrpcURLScheme; }; diff --git a/tensorflow/core/debug/debug_io_utils_test.cc b/tensorflow/core/debug/debug_io_utils_test.cc index 406bcae07f..77039aa4ab 100644 --- a/tensorflow/core/debug/debug_io_utils_test.cc +++ b/tensorflow/core/debug/debug_io_utils_test.cc @@ -19,6 +19,7 @@ limitations under the License. #include "tensorflow/core/lib/core/notification.h" #include "tensorflow/core/lib/core/status_test_util.h" #include "tensorflow/core/lib/core/threadpool.h" +#include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/strings/str_util.h" #include "tensorflow/core/platform/env.h" #include "tensorflow/core/util/event.pb.h" @@ -47,6 +48,18 @@ class DebugIOUtilsTest : public ::testing::Test { std::unique_ptr<Tensor> tensor_b_; }; +TEST_F(DebugIOUtilsTest, ConstructDebugNodeKey) { + DebugNodeKey debug_node_key("/job:worker/replica:1/task:0/gpu:2", + "hidden_1/MatMul", 0, "DebugIdentity"); + EXPECT_EQ("/job:worker/replica:1/task:0/gpu:2", debug_node_key.device_name); + EXPECT_EQ("hidden_1/MatMul", debug_node_key.node_name); + EXPECT_EQ(0, debug_node_key.output_slot); + EXPECT_EQ("DebugIdentity", debug_node_key.debug_op); + EXPECT_EQ("hidden_1/MatMul:0:DebugIdentity", debug_node_key.debug_node_name); + EXPECT_EQ("_tfdbg_device_,job_worker,replica_1,task_0,gpu_2", + debug_node_key.device_path); +} + TEST_F(DebugIOUtilsTest, DumpFloatTensorToFileSunnyDay) { Initialize(); @@ -138,10 +151,14 @@ TEST_F(DebugIOUtilsTest, DumpTensorToFileCannotCreateDirectory) { // First, create the file at the path. const string test_dir = testing::TmpDir(); - const string txt_file_name = strings::StrCat(test_dir, "/baz"); - - if (!env_->FileExists(test_dir).ok()) { - ASSERT_TRUE(env_->CreateDir(test_dir).ok()); + const string kDeviceName = "/job:localhost/replica:0/task:0/cpu:0"; + const DebugNodeKey kDebugNodeKey(kDeviceName, "baz/tensor_a", 0, + "DebugIdentity"); + const string txt_file_dir = + io::JoinPath(test_dir, DebugNodeKey::DeviceNameToDevicePath(kDeviceName)); + const string txt_file_name = io::JoinPath(txt_file_dir, "baz"); + if (!env_->FileExists(txt_file_dir).ok()) { + ASSERT_TRUE(env_->RecursivelyCreateDir(txt_file_dir).ok()); } ASSERT_EQ(error::Code::NOT_FOUND, env_->FileExists(txt_file_name).code()); @@ -157,8 +174,7 @@ TEST_F(DebugIOUtilsTest, DumpTensorToFileCannotCreateDirectory) { // Second, try to dump the tensor to a path that requires "baz" to be a // directory, which should lead to an error. - const DebugNodeKey kDebugNodeKey("/job:localhost/replica:0/task:0/cpu:0", - "baz/tensor_a", 0, "DebugIdentity"); + const uint64 wall_time = env_->NowMicros(); string dump_file_name; diff --git a/tensorflow/core/debug/grpc_session_debug_test.cc b/tensorflow/core/debug/grpc_session_debug_test.cc index 6c68729410..9584d8b9f3 100644 --- a/tensorflow/core/debug/grpc_session_debug_test.cc +++ b/tensorflow/core/debug/grpc_session_debug_test.cc @@ -187,7 +187,10 @@ TEST_F(GrpcSessionDebugTest, FileDebugURL) { IsSingleFloatValue(outputs[0], 4.0); std::vector<Tensor> dumped_tensors; - LoadTensorDumps("n", &dumped_tensors); + LoadTensorDumps(io::JoinPath(DebugNodeKey::DeviceNameToDevicePath( + cluster->devices()[0].name()), + "n"), + &dumped_tensors); if (i == 0 || i == 5) { ASSERT_EQ(0, dumped_tensors.size()); @@ -267,7 +270,10 @@ TEST_F(GrpcSessionDebugTest, MultiDevices_String) { TF_CHECK_OK(session->Close()); std::vector<Tensor> dumped_tensors; - LoadTensorDumps("n", &dumped_tensors); + LoadTensorDumps( + io::JoinPath(DebugNodeKey::DeviceNameToDevicePath(a_dev.name()), + "n"), + &dumped_tensors); ASSERT_EQ(1, dumped_tensors.size()); ASSERT_EQ(TensorShape({2, 2}), dumped_tensors[0].shape()); for (size_t i = 0; i < 4; ++i) { diff --git a/tensorflow/core/kernels/debug_ops_test.cc b/tensorflow/core/kernels/debug_ops_test.cc index 037272e009..1830b2a178 100644 --- a/tensorflow/core/kernels/debug_ops_test.cc +++ b/tensorflow/core/kernels/debug_ops_test.cc @@ -28,6 +28,7 @@ limitations under the License. #include "tensorflow/core/framework/types.pb.h" #include "tensorflow/core/kernels/ops_testutil.h" #include "tensorflow/core/kernels/ops_util.h" +#include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/strings/strcat.h" #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/test.h" @@ -94,7 +95,22 @@ TEST_F(DebugIdentityOpTest, Int32Success_6_FileURLs) { ASSERT_TRUE(env_->FileExists(dump_roots[i]).ok()); ASSERT_TRUE(env_->IsDirectory(dump_roots[i]).ok()); - DIR* dir = opendir(dump_roots[i].c_str()); + std::vector<string> device_roots; + DIR* dir0 = opendir(dump_roots[i].c_str()); + struct dirent* ent0; + const string kDeviceDirPrefix = + strings::StrCat(DebugIO::kMetadataFilePrefix, DebugIO::kDeviceTag); + while ((ent0 = readdir(dir0)) != nullptr) { + if (!strncmp(ent0->d_name, kDeviceDirPrefix.c_str(), + kDeviceDirPrefix.size())) { + device_roots.push_back(io::JoinPath(dump_roots[i], ent0->d_name)); + } + } + ASSERT_EQ(1, device_roots.size()); + closedir(dir0); + + const string& device_root = device_roots[0]; + DIR* dir = opendir(device_root.c_str()); struct dirent* ent; int dump_files_found = 0; while ((ent = readdir(dir)) != nullptr) { @@ -102,8 +118,7 @@ TEST_F(DebugIdentityOpTest, Int32Success_6_FileURLs) { dump_files_found++; // Try reading the file into a Event proto. - const string dump_file_path = - strings::StrCat(dump_roots[i], "/", ent->d_name); + const string dump_file_path = io::JoinPath(device_root, ent->d_name); std::fstream ifs(dump_file_path, std::ios::in | std::ios::binary); Event event; event.ParseFromIstream(&ifs); diff --git a/tensorflow/python/debug/BUILD b/tensorflow/python/debug/BUILD index dfb04a9744..07d0a9ec73 100644 --- a/tensorflow/python/debug/BUILD +++ b/tensorflow/python/debug/BUILD @@ -545,6 +545,22 @@ cuda_py_test( tags = ["notsan"], ) +cuda_py_test( + name = "session_debug_multi_gpu_test", + size = "small", + srcs = ["lib/session_debug_multi_gpu_test.py"], + additional_deps = [ + ":debug_data", + ":debug_utils", + "//tensorflow/python:client", + "//tensorflow/python:framework_for_generated_wrappers", + "//tensorflow/python:framework_test_lib", + "//tensorflow/python:math_ops", + "//tensorflow/python:platform_test", + "//tensorflow/python:variables", + ], +) + py_test( name = "debugger_cli_common_test", size = "small", diff --git a/tensorflow/python/debug/lib/debug_data.py b/tensorflow/python/debug/lib/debug_data.py index 35e9fef29f..748e5d78ba 100644 --- a/tensorflow/python/debug/lib/debug_data.py +++ b/tensorflow/python/debug/lib/debug_data.py @@ -19,10 +19,12 @@ from __future__ import division from __future__ import print_function import collections +import glob import json import os import numpy as np +import six from six.moves import xrange # pylint: disable=redefined-builtin from tensorflow.core.framework import graph_pb2 @@ -32,9 +34,12 @@ from tensorflow.python.framework import tensor_util from tensorflow.python.platform import gfile +# TODO(cais): Tie these string constants in with C++? METADATA_FILE_PREFIX = "_tfdbg_" CORE_METADATA_TAG = "core_metadata_" GRAPH_FILE_TAG = "graph_" +DEVICE_TAG = "device_" + FETCHES_INFO_FILE_TAG = "fetches_info_" FEED_KEYS_INFO_FILE_TAG = "feed_keys_info_" @@ -158,10 +163,6 @@ def parse_node_or_tensor_name(name): return name, None -def _is_core_metadata_file(file_name): - return file_name.startswith(METADATA_FILE_PREFIX + CORE_METADATA_TAG) - - def _is_graph_file(file_name): return file_name.startswith(METADATA_FILE_PREFIX + GRAPH_FILE_TAG) @@ -344,6 +345,28 @@ def extract_core_metadata_from_event_proto(event): json_metadata["target_nodes"]) +def device_name_to_device_path(device_name): + """Convert device name to device path.""" + device_name_items = device_name.split("/") + device_name_items = [item.replace(":", "_") for item in device_name_items] + return METADATA_FILE_PREFIX + DEVICE_TAG + ",".join(device_name_items) + + +def device_path_to_device_name(device_dir): + """Parse device name from device path. + + Args: + device_dir: (str) a directory name for the device. + + Returns: + (str) parsed device name. + """ + path_items = os.path.basename(device_dir)[ + len(METADATA_FILE_PREFIX) + len(DEVICE_TAG):].split(",") + return "/".join([ + path_item.replace("_", ":", 1) for path_item in path_items]) + + class DebugTensorDatum(object): """A single tensor dumped by TensorFlow Debugger (tfdbg). @@ -360,13 +383,17 @@ class DebugTensorDatum(object): """`DebugTensorDatum` constructor. Args: - dump_root: (`str`) Debug dump root directory. + dump_root: (`str`) Debug dump root directory. This path should not include + the path component that represents the device name (see also below). debug_dump_rel_path: (`str`) Path to a debug dump file, relative to the - `dump_root`. For example, suppose the debug dump root - directory is `/tmp/tfdbg_1` and the dump file is at - `/tmp/tfdbg_1/ns_1/node_a_0_DebugIdentity_123456789`, then - the value of the debug_dump_rel_path should be - `ns_1/node_a_0_DebugIdenity_1234456789`. + `dump_root`. The first item of this relative path is assumed to be + a path representing the name of the device that the Tensor belongs to. + See `device_path_to_device_name` for more details on the device path. + For example, suppose the debug dump root + directory is `/tmp/tfdbg_1` and the dump file is at + `/tmp/tfdbg_1/<device_path>/>ns_1/node_a_0_DebugIdentity_123456789`, + then the value of the debug_dump_rel_path should be + `<device_path>/ns_1/node_a_0_DebugIdenity_1234456789`. Raises: ValueError: If the base file name of the dump file does not conform to @@ -374,15 +401,13 @@ class DebugTensorDatum(object): `node_name`_`output_slot`_`debug_op`_`timestamp` """ - base = os.path.basename(debug_dump_rel_path) - + path_components = os.path.normpath(debug_dump_rel_path).split(os.sep) + self._device_name = device_path_to_device_name(path_components[0]) + base = path_components[-1] if base.count("_") < 3: raise ValueError( "Dump file path does not conform to the naming pattern: %s" % base) - # TODO(cais): Add hostname and pid to support dumps from distributed - # sessions. - self._extended_timestamp = base.split("_")[-1] # It may include an index suffix at the end if file path collision happened # due to identical timestamps. @@ -395,31 +420,23 @@ class DebugTensorDatum(object): self._debug_op = base.split("_")[-2] self._output_slot = int(base.split("_")[-3]) - namespace = os.path.dirname(debug_dump_rel_path).replace("\\", "/") node_base_name = "_".join(base.split("_")[:-3]) - if not namespace or namespace == ".": - self._node_name = node_base_name - else: - self._node_name = namespace + "/" + node_base_name + self._node_name = "/".join(path_components[1:-1] + [node_base_name]) self._file_path = os.path.join(dump_root, debug_dump_rel_path) self._dump_size_bytes = (gfile.Stat(self._file_path).length if gfile.Exists(self._file_path) else None) - self._run_fetches_info = None - self._run_feed_keys_info = None - def __str__(self): - return "{DebugTensorDatum: %s:%d @ %s @ %d}" % (self.node_name, - self.output_slot, - self.debug_op, - self.timestamp) + return "{DebugTensorDatum (%s) %s:%d @ %s @ %d}" % (self.device_name, + self.node_name, + self.output_slot, + self.debug_op, + self.timestamp) def __repr__(self): return self.__str__() - # TODO(cais): (b/38325442) Add device name information to this class. - def get_tensor(self): """Get tensor from the dump (`Event`) file. @@ -465,6 +482,16 @@ class DebugTensorDatum(object): return self._debug_op @property + def device_name(self): + """Name of the device that the tensor belongs to. + + Returns: + (`str`) device name. + """ + + return self._device_name + + @property def node_name(self): """Name of the node from which the tensor value was dumped. @@ -529,6 +556,8 @@ class WatchKeyDoesNotExistInDebugDumpDirError(ValueError): pass +# TODO(cais): This class is getting too large in line count. Refactor to make it +# smaller and easier to maintain. class DebugDumpDir(object): """Data set from a debug-dump directory on filesystem. @@ -548,23 +577,54 @@ class DebugDumpDir(object): Raises: IOError: If dump_root does not exist as a directory. + ValueError: If more than one core metadata file is found under the dump + root directory. """ if not gfile.IsDirectory(dump_root): raise IOError("Dump root directory %s does not exist" % dump_root) - self._core_metadata = None - self._load_dumps(dump_root) - self._create_tensor_watch_maps() - self._load_partition_graphs(partition_graphs, validate) + self._core_metadata = [] + + # Find the list of devices. + self._dump_root = dump_root + + self._load_core_metadata() + self._load_fetches_info() + self._load_feeds_info() + self._load_all_device_dumps(partition_graphs, validate) self._python_graph = None - def _load_dumps(self, dump_root): - """Load `DebugTensorDatum` instances from the dump root. + def _load_all_device_dumps(self, partition_graphs, validate): + """Load the dump data for all devices.""" + device_dirs = glob.glob(os.path.join( + self._dump_root, METADATA_FILE_PREFIX + DEVICE_TAG + "*")) + + self._device_names = [] + self._t0s = {} + self._dump_tensor_data = {} + self._dump_graph_file_paths = {} + self._debug_watches = {} + self._watch_key_to_devices = {} + self._watch_key_to_datum = {} + self._watch_key_to_rel_time = {} + self._watch_key_to_dump_size_bytes = {} + for device_dir in device_dirs: + device_name = device_path_to_device_name(device_dir) + self._device_names.append(device_name) + self._load_device_dumps(device_name, device_dir) + self._load_partition_graphs(partition_graphs, validate) + self._calculate_t0() + + for device_name in self._device_names: + self._create_tensor_watch_maps(device_name) - Populates a list of `DebugTensorDatum` instance and sorts the list by - ascending timestamp. + def _load_device_dumps(self, device_name, device_root): + """Load `DebugTensorDatum` instances from the dump root of a given device. + + Populates a map {device_name: a list of `DebugTensorDatum`}, where the list + is sorted by ascending timestamp. This sorting order reflects the order in which the TensorFlow executor processed the nodes of the graph. It is (one of many possible) topological @@ -584,55 +644,67 @@ class DebugDumpDir(object): graphs may not be available, e.g., when the run errors out. Args: - dump_root: (`str`) Dump root directory. - """ + device_name: (`str`) name of the device. + device_root: (`str`) dump root directory of the given device. - self._dump_root = dump_root - self._dump_tensor_data = [] - self._dump_graph_file_paths = [] + Raises: + ValueError: If GraphDef for the device is not available. + """ - self._debug_watches = collections.defaultdict( + self._dump_tensor_data[device_name] = [] + self._debug_watches[device_name] = collections.defaultdict( lambda: collections.defaultdict(set)) - for root, _, files in gfile.Walk(self._dump_root): + for root, _, files in gfile.Walk(device_root): for f in files: - if f.startswith(METADATA_FILE_PREFIX): - if _is_core_metadata_file(f): - self._load_core_metadata(os.path.join(self._dump_root, root, f)) - - if _is_graph_file(f): - self._dump_graph_file_paths.append( - os.path.join(self._dump_root, root, f)) - - if _is_run_fetches_info_file(f): - self._run_fetches_info = _load_log_message_from_event_file( - os.path.join(root, f)) - - if _is_run_feed_keys_info_file(f): - self._run_feed_keys_info = _load_log_message_from_event_file( - os.path.join(root, f)) - - continue - - datum = self._dump_file_name_to_datum(root, f) - self._dump_tensor_data.append(datum) - - self._debug_watches[datum.node_name][datum.output_slot].add( - datum.debug_op) - - self._dump_tensor_data = sorted( - self._dump_tensor_data, key=lambda x: x.extended_timestamp) - - if self._dump_tensor_data: - self._t0 = self._dump_tensor_data[0].timestamp + if _is_graph_file(f): + self._dump_graph_file_paths[device_name] = os.path.join( + device_root, root, f) + else: + datum = self._dump_file_name_to_datum(root, f) + self._dump_tensor_data[device_name].append(datum) + self._debug_watches[device_name][datum.node_name][ + datum.output_slot].add(datum.debug_op) + + self._dump_tensor_data[device_name] = sorted( + self._dump_tensor_data[device_name], + key=lambda x: x.extended_timestamp) + + if self._dump_tensor_data[device_name]: + self._t0s[device_name] = self._dump_tensor_data[device_name][0].timestamp else: - self._t0 = None - - def _load_core_metadata(self, event_file_path): - event = event_pb2.Event() - with gfile.Open(event_file_path, "rb") as f: - event.ParseFromString(f.read()) - self._core_metadata = extract_core_metadata_from_event_proto(event) + self._t0s[device_name] = None + + def _calculate_t0(self): + """Calculate the first timestamp across all devices.""" + t0s = [t0 for t0 in six.itervalues(self._t0s) if t0 is not None] + self._t0 = min(t0s) if t0s else None + + def _load_core_metadata(self): + core_metadata_files = glob.glob(os.path.join( + self._dump_root, METADATA_FILE_PREFIX + CORE_METADATA_TAG + "*")) + for core_metadata_file in core_metadata_files: + with gfile.Open(core_metadata_file, "rb") as f: + event = event_pb2.Event() + event.ParseFromString(f.read()) + self._core_metadata.append( + extract_core_metadata_from_event_proto(event)) + + def _load_fetches_info(self): + fetches_info_files = glob.glob(os.path.join( + self._dump_root, METADATA_FILE_PREFIX + FETCHES_INFO_FILE_TAG + "*")) + self._run_fetches_info = [] + for fetches_info_file in fetches_info_files: + self._run_fetches_info.append( + _load_log_message_from_event_file(fetches_info_file)) + + def _load_feeds_info(self): + feeds_info_files = glob.glob(os.path.join( + self._dump_root, METADATA_FILE_PREFIX + FEED_KEYS_INFO_FILE_TAG + "*")) + self._run_feed_keys_info = [] + for feeds_info_file in feeds_info_files: + self._run_feed_keys_info.append( + _load_log_message_from_event_file(feeds_info_file)) def _dump_file_name_to_datum(self, dir_name, file_name): """Obtain a DebugTensorDatum from the directory and file name. @@ -648,34 +720,39 @@ class DebugDumpDir(object): # Calculate the relative path of the dump file with respect to the root. debug_dump_rel_path = os.path.join( os.path.relpath(dir_name, self._dump_root), file_name) - return DebugTensorDatum(self._dump_root, debug_dump_rel_path) - def _create_tensor_watch_maps(self): + def _create_tensor_watch_maps(self, device_name): """Create maps from tensor watch keys to datum and to timestamps. Create a map from watch key (tensor name + debug op) to `DebugTensorDatum` item. Also make a map from watch key to relative timestamp. "relative" means (absolute timestamp - t0). + + Args: + device_name: (str) name of the device. """ - self._watch_key_to_datum = {} - self._watch_key_to_rel_time = {} - self._watch_key_to_dump_size_bytes = {} - for datum in self._dump_tensor_data: - if datum.watch_key not in self._watch_key_to_datum: - self._watch_key_to_datum[datum.watch_key] = [datum] - self._watch_key_to_rel_time[datum.watch_key] = [ - datum.timestamp - self._t0 - ] - self._watch_key_to_dump_size_bytes[datum.watch_key] = [ - datum.dump_size_bytes - ] + self._watch_key_to_datum[device_name] = {} + self._watch_key_to_rel_time[device_name] = {} + self._watch_key_to_dump_size_bytes[device_name] = {} + for datum in self._dump_tensor_data[device_name]: + if datum.watch_key not in self._watch_key_to_devices: + self._watch_key_to_devices[datum.watch_key] = {device_name} + else: + self._watch_key_to_devices[datum.watch_key].add(device_name) + + if datum.watch_key not in self._watch_key_to_datum[device_name]: + self._watch_key_to_datum[device_name][datum.watch_key] = [datum] + self._watch_key_to_rel_time[device_name][datum.watch_key] = [ + datum.timestamp - self._t0] + self._watch_key_to_dump_size_bytes[device_name][datum.watch_key] = [ + datum.dump_size_bytes] else: - self._watch_key_to_datum[datum.watch_key].append(datum) - self._watch_key_to_rel_time[datum.watch_key].append(datum.timestamp - - self._t0) - self._watch_key_to_dump_size_bytes[datum.watch_key].append( + self._watch_key_to_datum[device_name][datum.watch_key].append(datum) + self._watch_key_to_rel_time[device_name][datum.watch_key].append( + datum.timestamp - self._t0) + self._watch_key_to_dump_size_bytes[device_name][datum.watch_key].append( datum.dump_size_bytes) def set_python_graph(self, python_graph): @@ -733,22 +810,32 @@ class DebugDumpDir(object): `output_names`: Names of the output (fetched) Tensors. `target_nodes`: Names of the target nodes. If the core metadata have not been loaded, `None`. + If more than one core metadata files exist, return a list of the + `nametuple` described above. """ - return self._core_metadata + output = self._core_metadata + return output[0] if len(output) == 1 else output @property def dumped_tensor_data(self): - return self._dump_tensor_data + """Retrieve dumped tensor data.""" + if len(self.devices()) == 1: + return self._dump_tensor_data[self.devices()[0]] + else: + all_devices_data = six.itervalues(self._dump_tensor_data) + data = [] + for device_data in all_devices_data: + data.extend(device_data) + return sorted(data, key=lambda x: x.extended_timestamp) @property def t0(self): - """Absolute timestamp of the first dumped tensor. + """Absolute timestamp of the first dumped tensor across all devices. Returns: (`int`) absolute timestamp of the first dumped tensor, in microseconds. """ - return self._t0 @property @@ -756,10 +843,10 @@ class DebugDumpDir(object): """Total number of dumped tensors in the dump root directory. Returns: - (`int`) total number of dumped tensors in the dump root directory. + (`int`) The total number of dumped tensors in the dump root directory. """ - - return len(self._dump_tensor_data) + return sum(len(self._dump_tensor_data[device_name]) + for device_name in self._dump_tensor_data) def _load_partition_graphs(self, partition_graphs, validate): """Load and process partition graphs. @@ -770,56 +857,73 @@ class DebugDumpDir(object): tensor dumps. Args: - partition_graphs: Partition graphs executed by the TensorFlow runtime, - represented as repeated fields of GraphDef. - If no partition_graph is available, use None. + partition_graphs: A repeated field of GraphDefs representing the + partition graphs executed by the TensorFlow runtime. validate: (`bool`) Whether the dump files are to be validated against the partition graphs. - """ - if partition_graphs: - self._partition_graphs = partition_graphs - elif self._dump_graph_file_paths: - # In case partition graphs are not available from arguments, load them - # from the dump directory. - self._partition_graphs = [ - _load_graph_def_from_event_file(dump_file_path) - for dump_file_path in self._dump_graph_file_paths - ] - else: - self._partition_graphs = None - return + Raises: + ValueError: If the partition GraphDef of one or more devices fail to be + loaded. + """ self._node_attributes = {} - self._node_inputs = {} self._node_ctrl_inputs = {} - self._node_recipients = {} self._node_ctrl_recipients = {} - - self._devices = [] self._node_devices = {} self._node_op_types = {} + self._copy_send_nodes = {} + + self._partition_graphs = {} + for device_name in self._device_names: + partition_graph = None + if device_name in self._dump_graph_file_paths: + partition_graph = _load_graph_def_from_event_file( + self._dump_graph_file_paths[device_name]) + else: + partition_graph = self._find_partition_graph(partition_graphs, + device_name) + + if partition_graph: + self._partition_graphs[device_name] = partition_graph - self._copy_send_nodes = [] + self._node_attributes[device_name] = {} + self._node_inputs[device_name] = {} + self._node_ctrl_inputs[device_name] = {} + self._node_recipients[device_name] = {} + self._node_ctrl_recipients[device_name] = {} + self._node_op_types[device_name] = {} + self._copy_send_nodes[device_name] = [] - for pg in self._partition_graphs: - for node in pg.node: - self._process_partition_graph_node(node) + if partition_graph: + for node in partition_graph.node: + self._process_partition_graph_node(device_name, node) - self._prune_non_control_edges_of_debug_ops() - self._prune_control_edges_of_debug_ops() + self._prune_non_control_edges_of_debug_ops(device_name) + self._prune_control_edges_of_debug_ops(device_name) - self._populate_recipient_maps() + self._populate_recipient_maps(device_name) - if validate: - self._validate_dump_with_graphs() + if device_name in self._partition_graphs and validate: + self._validate_dump_with_graphs(device_name) + + def _find_partition_graph(self, partition_graphs, device_name): + if partition_graphs is None: + return None + else: + for graph_def in partition_graphs: + for node_def in graph_def.node: + if node_def.device == device_name: + return graph_def + return None - def _process_partition_graph_node(self, node): + def _process_partition_graph_node(self, device_name, node): """Process a node from the partition graphs. Args: + device_name: (str) device name. node: (NodeDef) A partition-graph node to be processed. Raises: @@ -833,84 +937,91 @@ class DebugDumpDir(object): (watched_node_name, watched_output_slot, _, debug_op) = parse_debug_node_name(node.name) - self._debug_watches[watched_node_name][watched_output_slot].add( - debug_op) + self._debug_watches[device_name][watched_node_name][ + watched_output_slot].add(debug_op) return - if node.name in self._node_inputs: - raise ValueError("Duplicate node name: '%s'" % node.name) + if node.name in self._node_inputs[device_name]: + raise ValueError("Duplicate node name on device %s: '%s'" % + (device_name, node.name)) - self._node_attributes[node.name] = node.attr + self._node_attributes[device_name][node.name] = node.attr - if node.device not in self._devices and node.device: - self._devices.append(node.device) + self._node_inputs[device_name][node.name] = [] + self._node_ctrl_inputs[device_name][node.name] = [] + self._node_recipients[device_name][node.name] = [] + self._node_ctrl_recipients[device_name][node.name] = [] - self._node_inputs[node.name] = [] - self._node_ctrl_inputs[node.name] = [] - self._node_recipients[node.name] = [] - self._node_ctrl_recipients[node.name] = [] - - self._node_devices[node.name] = node.device - self._node_op_types[node.name] = node.op + if node.name not in self._node_devices: + self._node_devices[node.name] = set() + self._node_devices[node.name].add(node.device) + self._node_op_types[device_name][node.name] = node.op for inp in node.input: if is_copy_node(inp) and (node.op == "_Send" or node.op == "_Retval"): - self._copy_send_nodes.append(node.name) + self._copy_send_nodes[device_name].append(node.name) if inp.startswith("^"): cinp = inp[1:] - self._node_ctrl_inputs[node.name].append(cinp) + self._node_ctrl_inputs[device_name][node.name].append(cinp) else: - self._node_inputs[node.name].append(inp) + self._node_inputs[device_name][node.name].append(inp) - def _prune_nodes_from_input_and_recipient_maps(self, nodes_to_prune): + def _prune_nodes_from_input_and_recipient_maps(self, + device_name, + nodes_to_prune): """Prune nodes out of input and recipient maps. Args: + device_name: (`str`) device name. nodes_to_prune: (`list` of `str`) Names of the nodes to be pruned. """ for node in nodes_to_prune: - del self._node_inputs[node] - del self._node_ctrl_inputs[node] - del self._node_recipients[node] - del self._node_ctrl_recipients[node] + del self._node_inputs[device_name][node] + del self._node_ctrl_inputs[device_name][node] + del self._node_recipients[device_name][node] + del self._node_ctrl_recipients[device_name][node] - def _prune_non_control_edges_of_debug_ops(self): + def _prune_non_control_edges_of_debug_ops(self, device_name): """Prune (non-control) edges related to debug ops. Prune the Copy ops and associated _Send ops inserted by the debugger out from the non-control inputs and output recipients map. Replace the inputs and recipients with original ones. + + Args: + device_name: (`str`) device name. """ copy_nodes = [] - for node in self._node_inputs: - if node in self._copy_send_nodes: + for node in self._node_inputs[device_name]: + if node in self._copy_send_nodes[device_name]: continue if is_copy_node(node): copy_nodes.append(node) - inputs = self._node_inputs[node] + inputs = self._node_inputs[device_name][node] for i in xrange(len(inputs)): inp = inputs[i] if is_copy_node(inp): # Find the input to the Copy node, which should be the original # input to the node. - orig_inp = self._node_inputs[inp][0] + orig_inp = self._node_inputs[device_name][inp][0] inputs[i] = orig_inp - self._prune_nodes_from_input_and_recipient_maps(copy_nodes) - self._prune_nodes_from_input_and_recipient_maps(self._copy_send_nodes) + self._prune_nodes_from_input_and_recipient_maps(device_name, copy_nodes) + self._prune_nodes_from_input_and_recipient_maps( + device_name, self._copy_send_nodes[device_name]) - def _prune_control_edges_of_debug_ops(self): + def _prune_control_edges_of_debug_ops(self, device_name): """Prune control edges related to the debug ops.""" - for node in self._node_ctrl_inputs: - ctrl_inputs = self._node_ctrl_inputs[node] + for node in self._node_ctrl_inputs[device_name]: + ctrl_inputs = self._node_ctrl_inputs[device_name][node] debug_op_inputs = [] for ctrl_inp in ctrl_inputs: if is_debug_node(ctrl_inp): @@ -918,33 +1029,36 @@ class DebugDumpDir(object): for debug_op_inp in debug_op_inputs: ctrl_inputs.remove(debug_op_inp) - def _populate_recipient_maps(self): + def _populate_recipient_maps(self, device_name): """Populate the map from node name to recipient(s) of its output(s).""" - for node in self._node_inputs: - inputs = self._node_inputs[node] + for node in self._node_inputs[device_name]: + inputs = self._node_inputs[device_name][node] for inp in inputs: inp = get_node_name(inp) - if inp not in self._node_recipients: - self._node_recipients[inp] = [] - self._node_recipients[inp].append(node) + if inp not in self._node_recipients[device_name]: + self._node_recipients[device_name][inp] = [] + self._node_recipients[device_name][inp].append(node) - for node in self._node_ctrl_inputs: - ctrl_inputs = self._node_ctrl_inputs[node] + for node in self._node_ctrl_inputs[device_name]: + ctrl_inputs = self._node_ctrl_inputs[device_name][node] for ctrl_inp in ctrl_inputs: - if ctrl_inp in self._copy_send_nodes: + if ctrl_inp in self._copy_send_nodes[device_name]: continue - if ctrl_inp not in self._node_ctrl_recipients: - self._node_ctrl_recipients[ctrl_inp] = [] - self._node_ctrl_recipients[ctrl_inp].append(node) + if ctrl_inp not in self._node_ctrl_recipients[device_name]: + self._node_ctrl_recipients[device_name][ctrl_inp] = [] + self._node_ctrl_recipients[device_name][ctrl_inp].append(node) - def _validate_dump_with_graphs(self): + def _validate_dump_with_graphs(self, device_name): """Validate the dumped tensor data against the partition graphs. Only the watched nodes are validated by this method, because tfdbg allows clients to watch only a subset of the nodes. + Args: + device_name: (`str`) device name. + Raises: LookupError: If the partition graphs have not been loaded yet. ValueError: If dumps contain node names not found in partition graph. @@ -952,33 +1066,35 @@ class DebugDumpDir(object): input relations on the partition graphs. """ - if not self._partition_graphs: - raise LookupError("No partition graphs loaded.") + if not self._partition_graphs[device_name]: + raise LookupError( + "No partition graphs loaded for device %s" % device_name) # Verify that the node names in the dump data are all present in the # partition graphs. - for datum in self._dump_tensor_data: - if datum.node_name not in self._node_inputs: - raise ValueError("Node name '%s' is not found in partition graphs." % - datum.node_name) + for datum in self._dump_tensor_data[device_name]: + if datum.node_name not in self._node_inputs[device_name]: + raise ValueError("Node name '%s' is not found in partition graphs of " + "device %s." % (datum.node_name, device_name)) pending_inputs = {} - for node in self._node_inputs: + for node in self._node_inputs[device_name]: pending_inputs[node] = [] - inputs = self._node_inputs[node] + inputs = self._node_inputs[device_name][node] for inp in inputs: inp_node = get_node_name(inp) inp_output_slot = get_output_slot(inp) # Inputs from Enter and NextIteration nodes are not validated because # DebugNodeInserter::InsertNodes() in the debugger core skips creating # control edges from debug ops watching these types of nodes. - if (inp_node in self._debug_watches and - inp_output_slot in self._debug_watches[inp_node] and - self._node_op_types.get(inp) not in ("Enter", "NextIteration") and + if (inp_node in self._debug_watches[device_name] and + inp_output_slot in self._debug_watches[device_name][inp_node] and + self._node_op_types[device_name].get(inp) not in ( + "Enter", "NextIteration") and (inp_node, inp_output_slot) not in pending_inputs[node]): pending_inputs[node].append((inp_node, inp_output_slot)) - for i, datum in enumerate(self._dump_tensor_data): + for i, datum in enumerate(self._dump_tensor_data[device_name]): node = datum.node_name slot = datum.output_slot # In some cases (e.g., system clocks with insufficient precision), @@ -986,13 +1102,13 @@ class DebugDumpDir(object): # following check examines this possibility and avoids raising an error if # that is the case. if not self._satisfied_at_timestamp( - pending_inputs[node], datum.timestamp, start_i=i + 1): + device_name, pending_inputs[node], datum.timestamp, start_i=i + 1): raise ValueError("Causality violated in timing relations of debug " "dumps: %s (%d): " "these input(s) are not satisfied: %s" % (node, datum.timestamp, repr(pending_inputs[node]))) - recipients = self._node_recipients[node] + recipients = self._node_recipients[device_name][node] for recipient in recipients: recipient_pending_inputs = pending_inputs[recipient] if (node, slot) in recipient_pending_inputs: @@ -1004,12 +1120,13 @@ class DebugDumpDir(object): del recipient_pending_inputs[ recipient_pending_inputs.index((node, slot))] - def _satisfied_at_timestamp(self, pending, timestamp, start_i=0): + def _satisfied_at_timestamp(self, device_name, pending, timestamp, start_i=0): """Determine whether pending inputs are satisfied at given timestamp. Note: This method mutates the input argument "pending". Args: + device_name: (str) device name. pending: A list of 2-tuple (node_name, output_slot): the dependencies to check. timestamp: (int) the timestamp in question. @@ -1023,7 +1140,7 @@ class DebugDumpDir(object): if not pending: return True - for datum in self._dump_tensor_data[start_i:]: + for datum in self._dump_tensor_data[device_name][start_i:]: if datum.timestamp > timestamp: break if (datum.timestamp == timestamp and @@ -1042,7 +1159,7 @@ class DebugDumpDir(object): """Get the partition graphs. Returns: - Partition graphs as repeated fields of GraphDef. + Partition graphs as a list of GraphDef. Raises: LookupError: If no partition graphs have been loaded. @@ -1051,50 +1168,90 @@ class DebugDumpDir(object): if self._partition_graphs is None: raise LookupError("No partition graphs have been loaded.") - return self._partition_graphs + return self._partition_graphs.values() @property def run_fetches_info(self): """Get a str representation of the fetches used in the Session.run() call. Returns: - If the information is available, a `str` obtained from `repr(fetches)`. + If the information is available from one `Session.run` call, a `str` + obtained from `repr(fetches)`. + If the information is available from multiple `Session.run` calls, a + `list` of `str` from `repr(fetches)`. If the information is not available, `None`. """ - return self._run_fetches_info + output = self._run_fetches_info + return output[0] if len(output) == 1 else output @property def run_feed_keys_info(self): """Get a str representation of the feed_dict used in the Session.run() call. Returns: - If the information is available, a `str` obtained from `repr(feed_dict)`. + If the information is available from one `Session.run` call, a `str` + obtained from `repr(feed_dict)`. + If the information is available from multiple `Session.run` calls, a + `list` of `str` obtained from `repr(feed_dict)`. If the information is not available, `None`. """ - return self._run_feed_keys_info + output = self._run_feed_keys_info + return output[0] if len(output) == 1 else output + + def _infer_device_name(self, device_name, node_name): + if device_name is None: + if len(self.devices()) == 1: + return self.devices()[0] + else: + if node_name in self._node_devices: + if len(self._node_devices[node_name]) == 1: + return list(self._node_devices[node_name])[0] + else: + raise ValueError( + "There are multiple (%d) devices with nodes named '%s' but " + "device_name is not specified." % + (len(self._node_devices[node_name]), node_name)) + else: + raise ValueError("None of the %d devices has a node named '%s'." % + (len(self._device_names), node_name)) + else: + return device_name - def nodes(self): + def nodes(self, device_name=None): """Get a list of all nodes from the partition graphs. + Args: + device_name: (`str`) name of device. If there is only one device, this + argumnet is optional. + Returns: All nodes' names, as a list of str. Raises: LookupError: If no partition graphs have been loaded. + ValueError: If there are multiple devices, but device_name is not + specified. """ - if self._partition_graphs is None: raise LookupError("No partition graphs have been loaded.") + if device_name is None: + if len(self.devices()) == 1: + device_name = self.devices()[0] + else: + raise ValueError( + "There are multiple (%d) devices, but " + "device_name is not specified." % len(self.devices())) + return [node_name for node_name in self._node_inputs[device_name]] - return [node_name for node_name in self._node_inputs] - - def node_attributes(self, node_name): + def node_attributes(self, node_name, device_name=None): """Get the attributes of a node. Args: node_name: Name of the node in question. + device_name: (`str`) name of the device. If there is only one device or if + node_name exists on only one device, this argumnet is optional. Returns: Attributes of the node. @@ -1103,22 +1260,25 @@ class DebugDumpDir(object): LookupError: If no partition graphs have been loaded. ValueError: If no node named node_name exists. """ - if self._partition_graphs is None: raise LookupError("No partition graphs have been loaded.") - if node_name in self._node_attributes: - return self._node_attributes[node_name] + device_name = self._infer_device_name(device_name, node_name) + if node_name in self._node_attributes[device_name]: + return self._node_attributes[device_name][node_name] else: - raise ValueError("No node named \"%s\" exists." % node_name) + raise ValueError("No node named \"%s\" exists on device %s." % ( + node_name, device_name)) - def node_inputs(self, node_name, is_control=False): + def node_inputs(self, node_name, is_control=False, device_name=None): """Get the inputs of given node according to partition graphs. Args: node_name: Name of the node. is_control: (`bool`) Whether control inputs, rather than non-control inputs, are to be returned. + device_name: (`str`) name of the device. If there is only one device or if + node_name exists on only one device, this argumnet is optional. Returns: (`list` of `str`) inputs to the node, as a list of node names. @@ -1133,21 +1293,27 @@ class DebugDumpDir(object): raise LookupError( "Node inputs are not loaded from partition graphs yet.") - if node_name not in self._node_inputs: - raise ValueError("Node '%s' does not exist in partition graphs." % - node_name) + device_name = self._infer_device_name(device_name, node_name) + if node_name not in self._node_inputs[device_name]: + raise ValueError("Node '%s' does not exist in the partition graph of " + "device %s." % (node_name, device_name)) if is_control: - return self._node_ctrl_inputs[node_name] + return self._node_ctrl_inputs[device_name][node_name] else: - return self._node_inputs[node_name] + return self._node_inputs[device_name][node_name] - def transitive_inputs(self, node_name, include_control=True): + def transitive_inputs(self, + node_name, + include_control=True, + device_name=None): """Get the transitive inputs of given node according to partition graphs. Args: - node_name: Name of the node + node_name: Name of the node. include_control: Include control inputs (True by default). + device_name: (`str`) name of the device. If there is only one device or if + node_name exists on only one device, this argumnet is optional. Returns: (`list` of `str`) all transitive inputs to the node, as a list of node @@ -1163,9 +1329,11 @@ class DebugDumpDir(object): raise LookupError( "Node inputs are not loaded from partition graphs yet.") - if node_name not in self._node_inputs: - raise ValueError("Node '%s' does not exist in partition graphs." % - node_name) + device_name = self._infer_device_name(device_name, node_name) + if node_name not in self._node_inputs[device_name]: + raise ValueError( + "Node '%s' does not exist in the partition graph of device %s." % + (node_name, device_name)) inputs = [] @@ -1186,21 +1354,21 @@ class DebugDumpDir(object): # Stop the tracing at a Merge op, as it is generally impossible to infer # outside the runtime which input to the Merge op is alive. - if self._node_op_types[node] == "Merge": + if self._node_op_types[device_name][node] == "Merge": return if node in visited_nodes: return visited_nodes.append(node) - for inp in self._node_inputs[node]: + for inp in self._node_inputs[device_name][node]: if inp == node_name: continue inputs.append(inp) trace_inputs(inp) if include_control: - for ctrl_inp in self._node_ctrl_inputs[node]: + for ctrl_inp in self._node_ctrl_inputs[device_name][node]: if ctrl_inp == node_name: continue inputs.append(ctrl_inp) @@ -1210,13 +1378,15 @@ class DebugDumpDir(object): return inputs - def node_recipients(self, node_name, is_control=False): + def node_recipients(self, node_name, is_control=False, device_name=None): """Get recipient of the given node's output according to partition graphs. Args: node_name: (`str`) name of the node. is_control: (`bool`) whether control outputs, rather than non-control outputs, are to be returned. + device_name: (`str`) name of the device. If there is only one device or if + node_name exists on only one device, this argumnet is optional. Returns: (`list` of `str`) all inputs to the node, as a list of node names. @@ -1231,58 +1401,67 @@ class DebugDumpDir(object): raise LookupError( "Node recipients are not loaded from partition graphs yet.") - if node_name not in self._node_recipients: - raise ValueError("Node '%s' does not exist in partition graphs." % - node_name) + device_name = self._infer_device_name(device_name, node_name) + if node_name not in self._node_recipients[device_name]: + raise ValueError( + "Node '%s' does not exist in the partition graph of device %s." % + (node_name, device_name)) if is_control: - return self._node_ctrl_recipients[node_name] + return self._node_ctrl_recipients[device_name][node_name] else: - return self._node_recipients[node_name] + return self._node_recipients[device_name][node_name] def devices(self): - """Get the list of devices. + """Get the list of device names. Returns: (`list` of `str`) names of the devices. - - Raises: - LookupError: If node inputs and control inputs have not been loaded - from partition graphs yet. """ - if self._partition_graphs is None: - raise LookupError("Devices are not loaded from partition graphs yet.") - - return self._devices + return self._device_names - def node_exists(self, node_name): + def node_exists(self, node_name, device_name=None): """Test if a node exists in the partition graphs. Args: node_name: (`str`) name of the node to be checked. + device_name: optional device name. If None, will search for the node + on all available devices. Otherwise, search for the node only on + the given device. Returns: A boolean indicating whether the node exists. Raises: LookupError: If no partition graphs have been loaded yet. + ValueError: If device_name is specified but cannot be found. """ if self._node_inputs is None: raise LookupError( "Nodes have not been loaded from partition graphs yet.") - return node_name in self._node_inputs + if (device_name is not None) and device_name not in self._node_inputs: + raise ValueError( + "The specified device_name '%s' cannot be found." % device_name) + + node_inputs_all_devices = (self._node_inputs if device_name is None + else (self._node_inputs[device_name],)) + + return any(node_name in node_inputs_all_devices[dev_name] + for dev_name in node_inputs_all_devices) def node_device(self, node_name): - """Get the device of a node. + """Get the names of the devices that has nodes of the specified name. Args: node_name: (`str`) name of the node. Returns: - (`str`) name of the device on which the node is placed. + (`str` or `list` of `str`) name of the device(s) on which the node of the + given name is found. Returns a `str` if there is only one such device, + otherwise return a `list` of `str`. Raises: LookupError: If node inputs and control inputs have not been loaded @@ -1298,13 +1477,16 @@ class DebugDumpDir(object): raise ValueError("Node '%s' does not exist in partition graphs." % node_name) - return self._node_devices[node_name] + output = list(self._node_devices[node_name]) + return output[0] if len(output) == 1 else output - def node_op_type(self, node_name): + def node_op_type(self, node_name, device_name=None): """Get the op type of given node. Args: node_name: (`str`) name of the node. + device_name: (`str`) name of the device. If there is only one device or if + node_name exists on only one device, this argumnet is optional. Returns: (`str`) op type of the node. @@ -1319,17 +1501,21 @@ class DebugDumpDir(object): raise LookupError( "Node op types are not loaded from partition graphs yet.") - if node_name not in self._node_op_types: - raise ValueError("Node '%s' does not exist in partition graphs." % - node_name) + device_name = self._infer_device_name(device_name, node_name) + if node_name not in self._node_op_types[device_name]: + raise ValueError( + "Node '%s' does not exist in the partition graph of device '%s'. " % + (node_name, device_name)) - return self._node_op_types[node_name] + return self._node_op_types[device_name][node_name] - def debug_watch_keys(self, node_name): + def debug_watch_keys(self, node_name, device_name=None): """Get all tensor watch keys of given node according to partition graphs. Args: node_name: (`str`) name of the node. + device_name: (`str`) name of the device. If there is only one device or if + node_name exists on only one device, this argumnet is optional. Returns: (`list` of `str`) all debug tensor watch keys. Returns an empty list if @@ -1340,35 +1526,61 @@ class DebugDumpDir(object): partition graphs yet. """ - if node_name not in self._debug_watches: + try: + device_name = self._infer_device_name(device_name, node_name) + except ValueError: + return [] + + if node_name not in self._debug_watches[device_name]: return [] watch_keys = [] - for watched_slot in self._debug_watches[node_name]: - debug_ops = self._debug_watches[node_name][watched_slot] + for watched_slot in self._debug_watches[device_name][node_name]: + debug_ops = self._debug_watches[device_name][node_name][watched_slot] for debug_op in debug_ops: watch_keys.append( _get_tensor_watch_key(node_name, watched_slot, debug_op)) return watch_keys - def watch_key_to_data(self, debug_watch_key): + def watch_key_to_data(self, debug_watch_key, device_name=None): """Get all `DebugTensorDatum` instances corresponding to a debug watch key. Args: debug_watch_key: (`str`) debug watch key. + device_name: (`str`) name of the device. If there is only one device or if + the specified debug_watch_key exists on only one device, this argumnet + is optional. Returns: A list of `DebugTensorDatum` instances that correspond to the debug watch key. If the watch key does not exist, returns an empty list. Raises: - ValueError: If the debug watch key does not exist. + ValueError: If there are multiple devices that have the debug_watch_key, + but device_name is not specified. """ + if device_name is None: + matching_device_names = [ + name for name in self._watch_key_to_datum + if debug_watch_key in self._watch_key_to_datum[name]] + if not matching_device_names: + return [] + elif len(matching_device_names) == 1: + device_name = matching_device_names[0] + else: + raise ValueError( + "The debug watch key '%s' exists on multiple (%d) devices, but " + "device name is not specified." % + (debug_watch_key, len(matching_device_names))) + elif device_name not in self._debug_key_to_datum: + raise ValueError( + "There is no device named '%s' consisting of debug watch keys." % + device_name) - return self._watch_key_to_datum.get(debug_watch_key, []) + return self._watch_key_to_datum[device_name].get(debug_watch_key, []) - def find(self, predicate, first_n=0): + def find(self, predicate, first_n=0, device_name=None): """Find dumped tensor data by a certain predicate. Args: @@ -1386,6 +1598,7 @@ class DebugDumpDir(object): first_n: (`int`) return only the first n `DebugTensotDatum` instances (in time order) for which the predicate returns True. To return all the `DebugTensotDatum` instances, let first_n be <= 0. + device_name: optional device name. Returns: A list of all `DebugTensorDatum` objects in this `DebugDumpDir` object @@ -1394,22 +1607,31 @@ class DebugDumpDir(object): """ matched_data = [] - for datum in self._dump_tensor_data: - if predicate(datum, datum.get_tensor()): - matched_data.append(datum) + for device in (self._dump_tensor_data if device_name is None + else (self._dump_tensor_data[device_name],)): + for datum in self._dump_tensor_data[device]: + if predicate(datum, datum.get_tensor()): + matched_data.append(datum) - if first_n > 0 and len(matched_data) >= first_n: - break + if first_n > 0 and len(matched_data) >= first_n: + return matched_data return matched_data - def get_tensor_file_paths(self, node_name, output_slot, debug_op): + def get_tensor_file_paths(self, + node_name, + output_slot, + debug_op, + device_name=None): """Get the file paths from a debug-dumped tensor. Args: node_name: (`str`) name of the node that the tensor is produced by. output_slot: (`int`) output slot index of tensor. debug_op: (`str`) name of the debug op. + device_name: (`str`) name of the device. If there is only one device or if + the specified debug_watch_key exists on only one device, this argumnet + is optional. Returns: List of file path(s) loaded. This is a list because each debugged tensor @@ -1420,14 +1642,17 @@ class DebugDumpDir(object): the debug-dump data. """ + device_name = self._infer_device_name(device_name, node_name) watch_key = _get_tensor_watch_key(node_name, output_slot, debug_op) - if watch_key not in self._watch_key_to_datum: + if watch_key not in self._watch_key_to_datum[device_name]: raise WatchKeyDoesNotExistInDebugDumpDirError( - "Watch key \"%s\" does not exist in the debug dump" % watch_key) + "Watch key \"%s\" does not exist in the debug dump of device %s" % + (watch_key, device_name)) - return [datum.file_path for datum in self._watch_key_to_datum[watch_key]] + return [datum.file_path for datum in + self._watch_key_to_datum[device_name][watch_key]] - def get_tensors(self, node_name, output_slot, debug_op): + def get_tensors(self, node_name, output_slot, debug_op, device_name=None): """Get the tensor value from for a debug-dumped tensor. The tensor may be dumped multiple times in the dump root directory, so a @@ -1437,6 +1662,9 @@ class DebugDumpDir(object): node_name: (`str`) name of the node that the tensor is produced by. output_slot: (`int`) output slot index of tensor. debug_op: (`str`) name of the debug op. + device_name: (`str`) name of the device. If there is only one device or if + the specified debug_watch_key exists on only one device, this argumnet + is optional. Returns: List of tensors (`numpy.ndarray`) loaded from the debug-dump file(s). @@ -1447,13 +1675,20 @@ class DebugDumpDir(object): """ watch_key = _get_tensor_watch_key(node_name, output_slot, debug_op) - if watch_key not in self._watch_key_to_datum: + try: + device_name = self._infer_device_name(device_name, node_name) + return [datum.get_tensor() for datum in + self._watch_key_to_datum[device_name][watch_key]] + except (ValueError, KeyError): raise WatchKeyDoesNotExistInDebugDumpDirError( - "Watch key \"%s\" does not exist in the debug dump" % watch_key) - - return [datum.get_tensor() for datum in self._watch_key_to_datum[watch_key]] - - def get_rel_timestamps(self, node_name, output_slot, debug_op): + "Watch key \"%s\" does not exist in the debug dump of device %s" % + (watch_key, device_name)) + + def get_rel_timestamps(self, + node_name, + output_slot, + debug_op, + device_name=None): """Get the relative timestamp from for a debug-dumped tensor. Relative timestamp means (absolute timestamp - `t0`), where `t0` is the @@ -1465,6 +1700,9 @@ class DebugDumpDir(object): node_name: (`str`) name of the node that the tensor is produced by. output_slot: (`int`) output slot index of tensor. debug_op: (`str`) name of the debug op. + device_name: (`str`) name of the device. If there is only one device or if + the specified debug_watch_key exists on only one device, this argumnet + is optional. Returns: (`list` of `int`) list of relative timestamps. @@ -1474,14 +1712,20 @@ class DebugDumpDir(object): exist in the debug dump data. """ + device_name = self._infer_device_name(device_name, node_name) watch_key = _get_tensor_watch_key(node_name, output_slot, debug_op) - if watch_key not in self._watch_key_to_datum: + if watch_key not in self._watch_key_to_datum[device_name]: raise WatchKeyDoesNotExistInDebugDumpDirError( "Watch key \"%s\" does not exist in the debug dump" % watch_key) - return self._watch_key_to_rel_time[watch_key] + # TODO(cais): Figure out whether this should be relative to the global t0. + return self._watch_key_to_rel_time[device_name][watch_key] - def get_dump_sizes_bytes(self, node_name, output_slot, debug_op): + def get_dump_sizes_bytes(self, + node_name, + output_slot, + debug_op, + device_name=None): """Get the sizes of the dump files for a debug-dumped tensor. Unit of the file size: byte. @@ -1490,6 +1734,9 @@ class DebugDumpDir(object): node_name: (`str`) name of the node that the tensor is produced by. output_slot: (`int`) output slot index of tensor. debug_op: (`str`) name of the debug op. + device_name: (`str`) name of the device. If there is only one device or if + the specified debug_watch_key exists on only one device, this argumnet + is optional. Returns: (`list` of `int`): list of dump file sizes in bytes. @@ -1499,12 +1746,14 @@ class DebugDumpDir(object): exist in the debug dump data. """ + device_name = self._infer_device_name(device_name, node_name) watch_key = _get_tensor_watch_key(node_name, output_slot, debug_op) - if watch_key not in self._watch_key_to_datum: + if watch_key not in self._watch_key_to_datum[device_name]: raise WatchKeyDoesNotExistInDebugDumpDirError( - "Watch key \"%s\" does not exist in the debug dump" % watch_key) + "Watch key \"%s\" does not exist in the debug dump of device %s" % + (watch_key, device_name)) - return self._watch_key_to_dump_size_bytes[watch_key] + return self._watch_key_to_dump_size_bytes[device_name][watch_key] def node_traceback(self, element_name): """Try to retrieve the Python traceback of node's construction. diff --git a/tensorflow/python/debug/lib/debug_data_test.py b/tensorflow/python/debug/lib/debug_data_test.py index dc45e8df6c..dd621a5afc 100644 --- a/tensorflow/python/debug/lib/debug_data_test.py +++ b/tensorflow/python/debug/lib/debug_data_test.py @@ -23,12 +23,29 @@ import tempfile import numpy as np +from tensorflow.core.framework import graph_pb2 from tensorflow.core.framework import tensor_pb2 from tensorflow.python.debug.lib import debug_data from tensorflow.python.framework import test_util from tensorflow.python.platform import googletest +class DeviceNamePathConversionTest(test_util.TensorFlowTestCase): + + def testDeviceNameToDevicePath(self): + self.assertEqual( + debug_data.METADATA_FILE_PREFIX + debug_data.DEVICE_TAG + + ",job_ps,replica_1,task_2,cpu_0", + debug_data.device_name_to_device_path("/job:ps/replica:1/task:2/cpu:0")) + + def testDevicePathToDeviceName(self): + self.assertEqual( + "/job:ps/replica:1/task:2/cpu:0", + debug_data.device_path_to_device_name( + debug_data.METADATA_FILE_PREFIX + debug_data.DEVICE_TAG + + ",job_ps,replica_1,task_2,cpu_0")) + + class ParseNodeOrTensorNameTest(test_util.TensorFlowTestCase): def testParseNodeName(self): @@ -163,7 +180,10 @@ class DebugTensorDatumTest(test_util.TensorFlowTestCase): def testDebugDatum(self): dump_root = "/tmp/tfdbg_1" - debug_dump_rel_path = "ns1/ns2/node_a_1_2_DebugIdentity_1472563253536385" + debug_dump_rel_path = ( + debug_data.METADATA_FILE_PREFIX + debug_data.DEVICE_TAG + + ",job_localhost,replica_0,task_0,cpu_0" + + "/ns1/ns2/node_a_1_2_DebugIdentity_1472563253536385") datum = debug_data.DebugTensorDatum(dump_root, debug_dump_rel_path) @@ -175,16 +195,18 @@ class DebugTensorDatumTest(test_util.TensorFlowTestCase): self.assertEqual("ns1/ns2/node_a_1:2:DebugIdentity", datum.watch_key) self.assertEqual( os.path.join(dump_root, debug_dump_rel_path), datum.file_path) - self.assertEqual("{DebugTensorDatum: %s:%d @ %s @ %d}" % (datum.node_name, - datum.output_slot, - datum.debug_op, - datum.timestamp), - str(datum)) - self.assertEqual("{DebugTensorDatum: %s:%d @ %s @ %d}" % (datum.node_name, - datum.output_slot, - datum.debug_op, - datum.timestamp), - repr(datum)) + self.assertEqual( + "{DebugTensorDatum (/job:localhost/replica:0/task:0/cpu:0) " + "%s:%d @ %s @ %d}" % (datum.node_name, + datum.output_slot, + datum.debug_op, + datum.timestamp), str(datum)) + self.assertEqual( + "{DebugTensorDatum (/job:localhost/replica:0/task:0/cpu:0) " + "%s:%d @ %s @ %d}" % (datum.node_name, + datum.output_slot, + datum.debug_op, + datum.timestamp), repr(datum)) def testDumpSizeBytesIsNoneForNonexistentFilePath(self): dump_root = "/tmp/tfdbg_1" @@ -204,18 +226,112 @@ class DebugDumpDirTest(test_util.TensorFlowTestCase): # Tear down temporary dump directory. shutil.rmtree(self._dump_root) + def _makeDataDirWithMultipleDevicesAndDuplicateNodeNames(self): + cpu_0_dir = os.path.join( + self._dump_root, + debug_data.METADATA_FILE_PREFIX + debug_data.DEVICE_TAG + + ",job_localhost,replica_0,task_0,cpu_0") + gpu_0_dir = os.path.join( + self._dump_root, + debug_data.METADATA_FILE_PREFIX + debug_data.DEVICE_TAG + + ",job_localhost,replica_0,task_0,gpu_0") + gpu_1_dir = os.path.join( + self._dump_root, + debug_data.METADATA_FILE_PREFIX + debug_data.DEVICE_TAG + + ",job_localhost,replica_0,task_0,gpu_1") + os.makedirs(cpu_0_dir) + os.makedirs(gpu_0_dir) + os.makedirs(gpu_1_dir) + open(os.path.join( + cpu_0_dir, "node_foo_1_2_DebugIdentity_1472563253536386"), "wb") + open(os.path.join( + gpu_0_dir, "node_foo_1_2_DebugIdentity_1472563253536385"), "wb") + open(os.path.join( + gpu_1_dir, "node_foo_1_2_DebugIdentity_1472563253536387"), "wb") + def testDebugDumpDir_nonexistentDumpRoot(self): with self.assertRaisesRegexp(IOError, "does not exist"): debug_data.DebugDumpDir(tempfile.mktemp() + "_foo") def testDebugDumpDir_invalidFileNamingPattern(self): # File name with too few underscores should lead to an exception. - open(os.path.join(self._dump_root, "node1_DebugIdentity_1234"), "wb") + device_dir = os.path.join( + self._dump_root, + debug_data.METADATA_FILE_PREFIX + debug_data.DEVICE_TAG + + ",job_localhost,replica_0,task_0,cpu_0") + os.makedirs(device_dir) + open(os.path.join(device_dir, "node1_DebugIdentity_1234"), "wb") with self.assertRaisesRegexp(ValueError, "does not conform to the naming pattern"): debug_data.DebugDumpDir(self._dump_root) + def testDebugDumpDir_validDuplicateNodeNamesWithMultipleDevices(self): + self._makeDataDirWithMultipleDevicesAndDuplicateNodeNames() + + graph_cpu_0 = graph_pb2.GraphDef() + node = graph_cpu_0.node.add() + node.name = "node_foo_1" + node.op = "FooOp" + node.device = "/job:localhost/replica:0/task:0/cpu:0" + graph_gpu_0 = graph_pb2.GraphDef() + node = graph_gpu_0.node.add() + node.name = "node_foo_1" + node.op = "FooOp" + node.device = "/job:localhost/replica:0/task:0/gpu:0" + graph_gpu_1 = graph_pb2.GraphDef() + node = graph_gpu_1.node.add() + node.name = "node_foo_1" + node.op = "FooOp" + node.device = "/job:localhost/replica:0/task:0/gpu:1" + + dump_dir = debug_data.DebugDumpDir( + self._dump_root, + partition_graphs=[graph_cpu_0, graph_gpu_0, graph_gpu_1]) + + self.assertItemsEqual( + ["/job:localhost/replica:0/task:0/cpu:0", + "/job:localhost/replica:0/task:0/gpu:0", + "/job:localhost/replica:0/task:0/gpu:1"], dump_dir.devices()) + self.assertEqual(1472563253536385, dump_dir.t0) + self.assertEqual(3, dump_dir.size) + + with self.assertRaisesRegexp( + ValueError, + r"There are multiple \(3\) devices, but device_name is not specified"): + dump_dir.nodes() + self.assertItemsEqual( + ["node_foo_1"], + dump_dir.nodes(device_name="/job:localhost/replica:0/task:0/cpu:0")) + + def testDuplicateNodeNamesInGraphDefOfSingleDeviceRaisesException(self): + self._makeDataDirWithMultipleDevicesAndDuplicateNodeNames() + graph_cpu_0 = graph_pb2.GraphDef() + node = graph_cpu_0.node.add() + node.name = "node_foo_1" + node.op = "FooOp" + node.device = "/job:localhost/replica:0/task:0/cpu:0" + graph_gpu_0 = graph_pb2.GraphDef() + node = graph_gpu_0.node.add() + node.name = "node_foo_1" + node.op = "FooOp" + node.device = "/job:localhost/replica:0/task:0/gpu:0" + graph_gpu_1 = graph_pb2.GraphDef() + node = graph_gpu_1.node.add() + node.name = "node_foo_1" + node.op = "FooOp" + node.device = "/job:localhost/replica:0/task:0/gpu:1" + node = graph_gpu_1.node.add() # Here is the duplicate. + node.name = "node_foo_1" + node.op = "FooOp" + node.device = "/job:localhost/replica:0/task:0/gpu:1" + + with self.assertRaisesRegexp( + ValueError, r"Duplicate node name on device "): + debug_data.DebugDumpDir( + self._dump_root, + partition_graphs=[graph_cpu_0, graph_gpu_0, graph_gpu_1]) + def testDebugDumpDir_emptyDumpDir(self): dump_dir = debug_data.DebugDumpDir(self._dump_root) diff --git a/tensorflow/python/debug/lib/session_debug_multi_gpu_test.py b/tensorflow/python/debug/lib/session_debug_multi_gpu_test.py new file mode 100644 index 0000000000..b0dc25851c --- /dev/null +++ b/tensorflow/python/debug/lib/session_debug_multi_gpu_test.py @@ -0,0 +1,93 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for debugger functionalities under multiple (i.e., >1) GPUs.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import shutil +import tempfile + +from tensorflow.core.protobuf import config_pb2 +from tensorflow.python.client import device_lib +from tensorflow.python.client import session +from tensorflow.python.debug.lib import debug_data +from tensorflow.python.debug.lib import debug_utils +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import ops +from tensorflow.python.framework import test_util +from tensorflow.python.ops import math_ops +from tensorflow.python.ops import variables +from tensorflow.python.platform import googletest + + +class SessionDebugMultiGPUTest(test_util.TensorFlowTestCase): + + def setUp(self): + self._dump_root = tempfile.mkdtemp() + + def tearDown(self): + ops.reset_default_graph() + + # Tear down temporary dump directory. + if os.path.isdir(self._dump_root): + shutil.rmtree(self._dump_root) + + def testMultiGPUSessionRun(self): + local_devices = device_lib.list_local_devices() + gpu_device_names = [] + for device in local_devices: + if device.device_type == "GPU": + gpu_device_names.append(device.name) + gpu_device_names = sorted(gpu_device_names) + + if len(gpu_device_names) < 2: + self.skipTest( + "This test requires at least 2 GPUs, but only %d is available." % + len(gpu_device_names)) + + with session.Session() as sess: + v = variables.Variable([10.0, 15.0], dtype=dtypes.float32, name="v") + with ops.device(gpu_device_names[0]): + u0 = math_ops.add(v, v, name="u0") + with ops.device(gpu_device_names[1]): + u1 = math_ops.multiply(v, v, name="u1") + w = math_ops.subtract(u1, u0, name="w") + + sess.run(v.initializer) + + run_options = config_pb2.RunOptions(output_partition_graphs=True) + debug_utils.watch_graph(run_options, sess.graph, + debug_urls="file://" + self._dump_root) + run_metadata = config_pb2.RunMetadata() + self.assertAllClose( + [80.0, 195.0], + sess.run(w, options=run_options, run_metadata=run_metadata)) + + debug_dump_dir = debug_data.DebugDumpDir( + self._dump_root, partition_graphs=run_metadata.partition_graphs) + self.assertEqual(3, len(debug_dump_dir.devices())) + self.assertAllClose( + [10.0, 15.0], debug_dump_dir.get_tensors("v", 0, "DebugIdentity")[0]) + self.assertAllClose( + [20.0, 30.0], debug_dump_dir.get_tensors("u0", 0, "DebugIdentity")[0]) + self.assertAllClose( + [100.0, 225.0], + debug_dump_dir.get_tensors("u1", 0, "DebugIdentity")[0]) + + +if __name__ == "__main__": + googletest.main() diff --git a/tensorflow/python/debug/lib/session_debug_testlib.py b/tensorflow/python/debug/lib/session_debug_testlib.py index ef1d1bb2f3..d7da1952a0 100644 --- a/tensorflow/python/debug/lib/session_debug_testlib.py +++ b/tensorflow/python/debug/lib/session_debug_testlib.py @@ -249,8 +249,12 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): self.assertIn(results.v.op.type, results.dump.node_op_type(results.v_name)) self.assertIn(results.w.op.type, results.dump.node_op_type(results.w_name)) - with self.assertRaisesRegexp( - ValueError, "Node 'foo_bar' does not exist in partition graphs."): + if test_util.gpu_device_name(): + expected_error_regexp = r"None of the .* devices has a node named " + else: + expected_error_regexp = ( + r"Node \'foo_bar\' does not exist in the partition graph of device") + with self.assertRaisesRegexp(ValueError, expected_error_regexp): results.dump.node_op_type("foo_bar") def testDumpStringTensorsWorks(self): @@ -436,9 +440,11 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): # Verify dump files self.assertTrue(os.path.isdir(self._dump_root)) - self.assertTrue(os.path.isdir(os.path.join(self._dump_root, u_namespace))) - self.assertTrue( - os.path.isdir(os.path.join(self._dump_root, v_namespace, "v"))) + u_glob_out = glob.glob(os.path.join(self._dump_root, "*", u_namespace)) + v_glob_out = glob.glob(os.path.join( + self._dump_root, "*", v_namespace, "v")) + self.assertTrue(os.path.isdir(u_glob_out[0])) + self.assertTrue(os.path.isdir(v_glob_out[0])) dump = debug_data.DebugDumpDir( self._dump_root, partition_graphs=run_metadata.partition_graphs) @@ -688,7 +694,11 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): u_read_name = u_name + "/read" # Test node name list lookup of the DebugDumpDir object. - node_names = dump.nodes() + if test_util.gpu_device_name(): + node_names = dump.nodes( + device_name="/job:localhost/replica:0/task:0/gpu:0") + else: + node_names = dump.nodes() self.assertTrue(u_name in node_names) self.assertTrue(u_read_name in node_names) @@ -698,7 +708,11 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): self.assertEqual(1, len(u_attr["shape"].shape.dim)) self.assertEqual(2, u_attr["shape"].shape.dim[0].size) - with self.assertRaisesRegexp(ValueError, "No node named \"foo\" exists"): + if test_util.gpu_device_name(): + expected_error_regexp = r"None of the .* devices has a node named " + else: + expected_error_regexp = r"No node named \"foo\" exists" + with self.assertRaisesRegexp(ValueError, expected_error_regexp): dump.node_attributes("foo") def testGraphStructureLookupGivesDebugWatchKeys(self): @@ -721,7 +735,6 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): self.assertEqual(0, u_data[0].output_slot) self.assertEqual("DebugIdentity", u_data[0].debug_op) self.assertGreaterEqual(u_data[0].timestamp, 0) - self.assertEqual([], dump.watch_key_to_data("foo")) def testGraphStructureLookupGivesNodeInputsAndRecipients(self): @@ -752,12 +765,13 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): self.assertEqual([], dump.node_recipients(w_name, is_control=True)) # Test errors raised on invalid node names. - with self.assertRaisesRegexp(ValueError, - "does not exist in partition graphs"): + if test_util.gpu_device_name(): + expected_error_regexp = r"None of the .* devices has a node named " + else: + expected_error_regexp = "does not exist in the partition graph of device " + with self.assertRaisesRegexp(ValueError, expected_error_regexp): dump.node_inputs(u_name + "foo") - - with self.assertRaisesRegexp(ValueError, - "does not exist in partition graphs"): + with self.assertRaisesRegexp(ValueError, expected_error_regexp): dump.node_recipients(u_name + "foo") # Test transitive_inputs(). @@ -768,8 +782,7 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): self.assertEqual( set([u_name, u_read_name, v_name]), set(dump.transitive_inputs(w_name))) - with self.assertRaisesRegexp(ValueError, - "does not exist in partition graphs"): + with self.assertRaisesRegexp(ValueError, expected_error_regexp): dump.transitive_inputs(u_name + "foo") def testGraphStructureLookupWithoutPartitionGraphsDoesNotErrorOut(self): @@ -1066,10 +1079,12 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): y = array_ops.squeeze(ph, name="mismatch/y") run_options = config_pb2.RunOptions(output_partition_graphs=True) + run_metadata = config_pb2.RunMetadata() debug_utils.watch_graph( run_options, sess.graph, debug_urls=self._debug_urls(), global_step=1) - sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options) + sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options, + run_metadata=run_metadata) dump1 = debug_data.DebugDumpDir(self._dump_root) self.assertEqual(1, dump1.core_metadata.global_step) self.assertGreaterEqual(dump1.core_metadata.session_run_index, 0) |