aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/common_runtime
diff options
context:
space:
mode:
authorGravatar Brennan Saeta <saeta@google.com>2017-05-04 19:43:48 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-05-04 21:08:47 -0700
commitf28935a7d280b6ba75fe93fe35783d87b9cc2ec9 (patch)
treedb03a72f0dd29e8e09dcf7805684268b3d40a54c /tensorflow/core/common_runtime
parentefa08d80a53a95ce6b8beb61ec86a275aed6b6c7 (diff)
Implement ClusterSpec Propagation in TF Master
ClusterSpec propagation is a capability upgrade for TensorFlow that should make it much easier to (1) build distributed TensorFlow clusters, and (2) handle node failures. The ClusterSpec propagation capability allows TensorFlow workers to be booted independently of each other, and with no knowledge about others. The client can then construct a ClusterDef (ClusterSpec), and then send it to the TF master at session creation. The master in turn then propagates the ClusterDef along to all of the workers. Change: 155159972
Diffstat (limited to 'tensorflow/core/common_runtime')
-rw-r--r--tensorflow/core/common_runtime/device.cc3
-rw-r--r--tensorflow/core/common_runtime/device.h3
-rw-r--r--tensorflow/core/common_runtime/device_mgr.cc19
-rw-r--r--tensorflow/core/common_runtime/device_mgr.h2
-rw-r--r--tensorflow/core/common_runtime/device_set.h5
-rw-r--r--tensorflow/core/common_runtime/device_set_test.cc3
-rw-r--r--tensorflow/core/common_runtime/gpu/gpu_device.cc7
-rw-r--r--tensorflow/core/common_runtime/local_device.cc6
-rw-r--r--tensorflow/core/common_runtime/local_device.h4
-rw-r--r--tensorflow/core/common_runtime/renamed_device.cc54
-rw-r--r--tensorflow/core/common_runtime/renamed_device.h119
-rw-r--r--tensorflow/core/common_runtime/simple_placer_test.cc2
-rw-r--r--tensorflow/core/common_runtime/threadpool_device.cc6
13 files changed, 209 insertions, 24 deletions
diff --git a/tensorflow/core/common_runtime/device.cc b/tensorflow/core/common_runtime/device.cc
index 78649afeb9..aa8a2d989b 100644
--- a/tensorflow/core/common_runtime/device.cc
+++ b/tensorflow/core/common_runtime/device.cc
@@ -23,8 +23,7 @@ limitations under the License.
namespace tensorflow {
-Device::Device(Env* env, const DeviceAttributes& device_attributes,
- Allocator* device_allocator)
+Device::Device(Env* env, const DeviceAttributes& device_attributes)
: DeviceBase(env), device_attributes_(device_attributes) {
CHECK(DeviceNameUtils::ParseFullName(name(), &parsed_name_))
<< "Invalid device name: " << name();
diff --git a/tensorflow/core/common_runtime/device.h b/tensorflow/core/common_runtime/device.h
index 07c6bdd683..c0e58f143e 100644
--- a/tensorflow/core/common_runtime/device.h
+++ b/tensorflow/core/common_runtime/device.h
@@ -53,8 +53,7 @@ namespace tensorflow {
class Device : public DeviceBase {
public:
- Device(Env* env, const DeviceAttributes& device_attributes,
- Allocator* device_allocator);
+ Device(Env* env, const DeviceAttributes& device_attributes);
~Device() override;
// Full name of this device (see top comment).
diff --git a/tensorflow/core/common_runtime/device_mgr.cc b/tensorflow/core/common_runtime/device_mgr.cc
index 7807656cb2..31f12d4833 100644
--- a/tensorflow/core/common_runtime/device_mgr.cc
+++ b/tensorflow/core/common_runtime/device_mgr.cc
@@ -29,10 +29,18 @@ DeviceMgr::DeviceMgr(const std::vector<Device*>& devices)
for (Device* d : devices) {
devices_.push_back(d);
- // Register under both the full name and the local name.
+ // Register under the (1) full name, (2) canonical name, and (3) local name.
string full_name = d->name();
device_map_[CopyToBackingStore(full_name)] = d;
+ DeviceNameUtils::ParsedName parsed_name = d->parsed_name();
+ if (parsed_name.has_job && parsed_name.has_replica &&
+ parsed_name.has_task && parsed_name.has_type && parsed_name.has_id) {
+ string canonical_name = DeviceNameUtils::FullName(
+ parsed_name.job, parsed_name.replica, parsed_name.task,
+ parsed_name.type, parsed_name.id);
+ device_map_[CopyToBackingStore(canonical_name)] = d;
+ }
string lname = DeviceNameUtils::LocalName(d->name());
device_map_[CopyToBackingStore(lname)] = d;
device_type_counts_[d->device_type()]++;
@@ -40,7 +48,8 @@ DeviceMgr::DeviceMgr(const std::vector<Device*>& devices)
}
DeviceMgr::~DeviceMgr() {
- for (auto p : devices_) delete p;
+ // TODO(b/37437134): Remove destructor after converting to std::unique_ptr.
+ for (Device* p : devices_) delete p;
}
StringPiece DeviceMgr::CopyToBackingStore(StringPiece s) {
@@ -85,6 +94,12 @@ Status DeviceMgr::LookupDevice(StringPiece name, Device** device) const {
Status s;
auto iter = device_map_.find(name);
if (iter == device_map_.end()) {
+ std::vector<StringPiece> device_names;
+ for (auto&& itr : device_map_) {
+ device_names.push_back(itr.first);
+ }
+ LOG(WARNING) << "Unknown device: " << name
+ << " all devices: " << str_util::Join(device_names, ", ");
return errors::InvalidArgument(name, " unknown device.");
}
*device = iter->second;
diff --git a/tensorflow/core/common_runtime/device_mgr.h b/tensorflow/core/common_runtime/device_mgr.h
index bb1ed72640..d16681ac59 100644
--- a/tensorflow/core/common_runtime/device_mgr.h
+++ b/tensorflow/core/common_runtime/device_mgr.h
@@ -36,6 +36,7 @@ class DeviceMgr {
public:
// Takes ownership of each device in 'devices'.
// TODO(zhifengc): Other initialization information.
+ // TODO(b/37437134): Use std::unique_ptr's to track ownership.
explicit DeviceMgr(const std::vector<Device*>& devices);
~DeviceMgr();
@@ -61,6 +62,7 @@ class DeviceMgr {
int NumDeviceType(const string& type) const;
private:
+ // TODO(b/37437134): Use std::unique_ptr's to track ownership.
typedef gtl::InlinedVector<Device*, 8> DeviceVec;
DeviceVec devices_;
diff --git a/tensorflow/core/common_runtime/device_set.h b/tensorflow/core/common_runtime/device_set.h
index b0540dfa95..4cd56e583c 100644
--- a/tensorflow/core/common_runtime/device_set.h
+++ b/tensorflow/core/common_runtime/device_set.h
@@ -39,7 +39,10 @@ class DeviceSet {
// Set the device designated as the "client". This device
// must also be registered via AddDevice().
- void set_client_device(Device* device) { client_device_ = device; }
+ void set_client_device(Device* device) {
+ DCHECK(client_device_ == nullptr);
+ client_device_ = device;
+ }
// Returns a pointer to the device designated as the "client".
Device* client_device() const { return client_device_; }
diff --git a/tensorflow/core/common_runtime/device_set_test.cc b/tensorflow/core/common_runtime/device_set_test.cc
index ff20ee94a7..0507076c8c 100644
--- a/tensorflow/core/common_runtime/device_set_test.cc
+++ b/tensorflow/core/common_runtime/device_set_test.cc
@@ -27,8 +27,7 @@ namespace {
static Device* Dev(const char* type, const char* name) {
class FakeDevice : public Device {
public:
- explicit FakeDevice(const DeviceAttributes& attr)
- : Device(nullptr, attr, nullptr) {}
+ explicit FakeDevice(const DeviceAttributes& attr) : Device(nullptr, attr) {}
Status Sync() override { return Status::OK(); }
Allocator* GetAllocator(AllocatorAttributes) override { return nullptr; }
};
diff --git a/tensorflow/core/common_runtime/gpu/gpu_device.cc b/tensorflow/core/common_runtime/gpu/gpu_device.cc
index 0e2343cfe3..02f70d835d 100644
--- a/tensorflow/core/common_runtime/gpu/gpu_device.cc
+++ b/tensorflow/core/common_runtime/gpu/gpu_device.cc
@@ -179,10 +179,9 @@ BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
int gpu_id, const string& physical_device_desc,
Allocator* gpu_allocator, Allocator* cpu_allocator,
bool sync_every_op, int32 max_streams)
- : LocalDevice(options,
- Device::BuildDeviceAttributes(name, DEVICE_GPU, memory_limit,
- locality, physical_device_desc),
- gpu_allocator),
+ : LocalDevice(options, Device::BuildDeviceAttributes(name, DEVICE_GPU,
+ memory_limit, locality,
+ physical_device_desc)),
gpu_allocator_(gpu_allocator),
cpu_allocator_(cpu_allocator),
gpu_id_(gpu_id),
diff --git a/tensorflow/core/common_runtime/local_device.cc b/tensorflow/core/common_runtime/local_device.cc
index 0a6342ed73..3f7c9f68db 100644
--- a/tensorflow/core/common_runtime/local_device.cc
+++ b/tensorflow/core/common_runtime/local_device.cc
@@ -60,10 +60,8 @@ struct LocalDevice::EigenThreadPoolInfo {
};
LocalDevice::LocalDevice(const SessionOptions& options,
- const DeviceAttributes& attributes,
- Allocator* device_allocator)
- : Device(options.env, attributes, device_allocator),
- owned_tp_info_(nullptr) {
+ const DeviceAttributes& attributes)
+ : Device(options.env, attributes), owned_tp_info_(nullptr) {
// If we're running on the CPU, log warnings if we're not compiled using the
// best flags for performance.
port::WarnAboutUnusedCPUFeatures();
diff --git a/tensorflow/core/common_runtime/local_device.h b/tensorflow/core/common_runtime/local_device.h
index d1c27c6248..84a4f66db4 100644
--- a/tensorflow/core/common_runtime/local_device.h
+++ b/tensorflow/core/common_runtime/local_device.h
@@ -33,8 +33,8 @@ struct SessionOptions;
// GPUDevice into more 'process-wide' abstractions.
class LocalDevice : public Device {
public:
- LocalDevice(const SessionOptions& options, const DeviceAttributes& attributes,
- Allocator* device_allocator);
+ LocalDevice(const SessionOptions& options,
+ const DeviceAttributes& attributes);
~LocalDevice() override;
private:
diff --git a/tensorflow/core/common_runtime/renamed_device.cc b/tensorflow/core/common_runtime/renamed_device.cc
new file mode 100644
index 0000000000..fa9713735e
--- /dev/null
+++ b/tensorflow/core/common_runtime/renamed_device.cc
@@ -0,0 +1,54 @@
+/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/core/common_runtime/renamed_device.h"
+
+namespace tensorflow {
+
+// TODO(saeta): Convert to returning a std::unique_ptr?
+/* static */
+Device* RenamedDevice::NewRenamedDevice(const string& new_base,
+ Device* underlying,
+ bool owns_underlying) {
+ DeviceNameUtils::ParsedName parsed_name;
+ CHECK(DeviceNameUtils::ParseFullName(new_base, &parsed_name));
+ DeviceNameUtils::ParsedName underlying_parsed_name =
+ underlying->parsed_name();
+ CHECK(underlying_parsed_name.has_type);
+ CHECK(underlying_parsed_name.has_id);
+ parsed_name.type = underlying_parsed_name.type;
+ parsed_name.id = underlying_parsed_name.id;
+ string name = DeviceNameUtils::FullName(parsed_name.job, parsed_name.replica,
+ parsed_name.task, parsed_name.type,
+ parsed_name.id);
+ DeviceAttributes attributes(underlying->attributes());
+ attributes.set_name(name);
+ return new RenamedDevice(underlying, attributes, owns_underlying);
+}
+
+RenamedDevice::RenamedDevice(Device* underlying,
+ const DeviceAttributes& attributes,
+ bool owns_underlying)
+ : Device(underlying->env(), attributes),
+ underlying_(underlying),
+ owns_underlying_(owns_underlying) {}
+
+RenamedDevice::~RenamedDevice() {
+ if (owns_underlying_) {
+ delete underlying_;
+ }
+}
+
+} // namespace tensorflow
diff --git a/tensorflow/core/common_runtime/renamed_device.h b/tensorflow/core/common_runtime/renamed_device.h
new file mode 100644
index 0000000000..0158e18ced
--- /dev/null
+++ b/tensorflow/core/common_runtime/renamed_device.h
@@ -0,0 +1,119 @@
+/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef THIRD_PARTY_TENSORFLOW_CORE_COMMON_RUNTIME_RENAMED_DEVICE_H_
+#define THIRD_PARTY_TENSORFLOW_CORE_COMMON_RUNTIME_RENAMED_DEVICE_H_
+
+#include "tensorflow/core/common_runtime/device.h"
+#include "tensorflow/core/util/device_name_utils.h"
+
+namespace tensorflow {
+
+// Wraps a device with a new name, delegating work to the wrapped device.
+//
+// This class is used to wrap local devices when using clusterspec propagation
+// where the name of a particular device may change in the context of a given
+// session.
+class RenamedDevice : public Device {
+ public:
+ static Device* NewRenamedDevice(const string& new_base, Device* underlying,
+ bool owns_underlying);
+ ~RenamedDevice() override;
+
+ // Below are virtual methods defined on DeviceBase
+ bool RequiresRecordingAccessedTensors() const override {
+ return underlying_->RequiresRecordingAccessedTensors();
+ }
+
+ const CpuWorkerThreads* tensorflow_cpu_worker_threads() const override {
+ return underlying_->tensorflow_cpu_worker_threads();
+ }
+
+ const GpuDeviceInfo* tensorflow_gpu_device_info() const override {
+ return underlying_->tensorflow_gpu_device_info();
+ }
+
+ Allocator* GetAllocator(AllocatorAttributes attr) override {
+ return underlying_->GetAllocator(attr);
+ }
+
+ Allocator* GetStepAllocator(AllocatorAttributes attr,
+ ResourceMgr* step_resource_manager) override {
+ return underlying_->GetStepAllocator(attr, step_resource_manager);
+ }
+
+ const Eigen::ThreadPoolDevice* eigen_cpu_device() override {
+ return underlying_->eigen_cpu_device();
+ }
+
+#ifdef TENSORFLOW_USE_SYCL
+ const Eigen::SyclDevice* eigen_sycl_device() const override {
+ return underlying_->eigen_sycl_device();
+ }
+#endif
+
+ PerOpGpuDevice* MakeGpuDevice() override {
+ return underlying_->MakeGpuDevice();
+ }
+
+ void ReinitializeGpuDevice(OpKernelContext* context, PerOpGpuDevice* device,
+ DeviceContext* dc, Allocator* allocator) override {
+ underlying_->ReinitializeGpuDevice(context, device, dc, allocator);
+ }
+
+ Status MakeTensorFromProto(const TensorProto& tensor_proto,
+ const AllocatorAttributes alloc_attrs,
+ Tensor* tensor) override {
+ return underlying_->MakeTensorFromProto(tensor_proto, alloc_attrs, tensor);
+ }
+
+ // Below are virtual methods defined on Device
+
+ void Compute(OpKernel* op_kernel, OpKernelContext* context) override {
+ underlying_->Compute(op_kernel, context);
+ }
+
+ void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context,
+ AsyncOpKernel::DoneCallback done) override {
+ underlying_->ComputeAsync(op_kernel, context, std::move(done));
+ }
+
+ void ConsumeListOfAccessedTensors(
+ DeviceContext* context, const TensorReferenceVector& tensors) override {
+ underlying_->ConsumeListOfAccessedTensors(context, tensors);
+ }
+
+ Status Sync() override { return underlying_->Sync(); }
+
+ Status MaybeRewriteGraph(const FunctionDefLibrary& library,
+ std::unique_ptr<Graph>* graph) override {
+ return underlying_->MaybeRewriteGraph(library, graph);
+ }
+
+ Status FillContextMap(const Graph* graph,
+ DeviceContextMap* device_context_map) override {
+ return underlying_->FillContextMap(graph, device_context_map);
+ }
+
+ private:
+ RenamedDevice(Device* underlying, const DeviceAttributes& attributes,
+ bool owns_underlying);
+ Device* const underlying_;
+ const bool owns_underlying_;
+};
+
+} // namespace tensorflow
+
+#endif // THIRD_PARTY_TENSORFLOW_CORE_COMMON_RUNTIME_RENAMED_DEVICE_H_
diff --git a/tensorflow/core/common_runtime/simple_placer_test.cc b/tensorflow/core/common_runtime/simple_placer_test.cc
index bd84417b10..24f27af5f1 100644
--- a/tensorflow/core/common_runtime/simple_placer_test.cc
+++ b/tensorflow/core/common_runtime/simple_placer_test.cc
@@ -66,7 +66,7 @@ class DummyOp : public OpKernel {
class FakeDevice : public Device {
private:
explicit FakeDevice(const DeviceAttributes& device_attributes)
- : Device(nullptr, device_attributes, nullptr) {}
+ : Device(nullptr, device_attributes) {}
public:
Status Sync() override { return errors::Unimplemented("FakeDevice::Sync()"); }
diff --git a/tensorflow/core/common_runtime/threadpool_device.cc b/tensorflow/core/common_runtime/threadpool_device.cc
index 60348e885f..f5f8aab694 100644
--- a/tensorflow/core/common_runtime/threadpool_device.cc
+++ b/tensorflow/core/common_runtime/threadpool_device.cc
@@ -38,10 +38,8 @@ ThreadPoolDevice::ThreadPoolDevice(const SessionOptions& options,
const string& name, Bytes memory_limit,
const DeviceLocality& locality,
Allocator* allocator)
- : LocalDevice(options,
- Device::BuildDeviceAttributes(name, DEVICE_CPU, memory_limit,
- locality),
- allocator),
+ : LocalDevice(options, Device::BuildDeviceAttributes(
+ name, DEVICE_CPU, memory_limit, locality)),
allocator_(allocator) {}
ThreadPoolDevice::~ThreadPoolDevice() {}