aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/grappler/clusters/single_machine.h
blob: f2773376e41e6da819e8bc1135474517ddd72a6c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#ifndef TENSORFLOW_GRAPPLER_CLUSTERS_SINGLE_MACHINE_H_
#define TENSORFLOW_GRAPPLER_CLUSTERS_SINGLE_MACHINE_H_

#include "tensorflow/cc/training/coordinator.h"
#include "tensorflow/core/grappler/clusters/cluster.h"
#include "tensorflow/core/lib/core/threadpool.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/public/session.h"

namespace tensorflow {
namespace grappler {

// Create a simple cluster that makes available to grappler a subset of the
// nodes available on a single local computer.
class SingleMachine : public Cluster {
 public:
  SingleMachine(int timeout_s, int num_cpu_cores, int num_gpus);
  ~SingleMachine() override;

  Status Provision() override;
  Status Initialize(const GrapplerItem& item) override;
  Status Run(const GraphDef& item,
             const std::vector<std::pair<string, Tensor>>& feed,
             const std::vector<string>& fetch, RunMetadata* metadata) override;

 private:
  Status RunWithTimeout(const std::vector<std::pair<string, Tensor>>& feed,
                        const std::vector<string>& fetch,
                        RunMetadata* run_metadata);
  Status RunWithTimeout(const std::vector<std::pair<string, Tensor>>& feed,
                        const std::vector<string>& fetch,
                        RunMetadata* run_metadata, int64 timeout_s);
  Status ResetSession();
  Status CloseSession(bool use_timeout);
  void MergeCosts(CostGraphDef* graph_costs, const CostGraphDef& init_costs,
                  const CostGraphDef& queue_costs);

  const int num_gpus_;
  std::unique_ptr<Session> session_;
  std::vector<QueueRunnerDef> queue_runner_defs_;
  string last_graph_id_;
  mutex last_graph_mu_;
  const GraphDef* last_graph_ GUARDED_BY(last_graph_mu_) = nullptr;
  std::vector<string> init_ops_;
  int64 expected_init_time_s_;
  std::unique_ptr<Coordinator> coordinator_;
  std::unique_ptr<thread::ThreadPool> thread_pool_;

  RunMetadata init_metadata_;

  mutex close_mu_;
  bool closing_ GUARDED_BY(close_mu_);
};

}  // end namespace grappler
}  // end namespace tensorflow

#endif  // TENSORFLOW_GRAPPLER_CLUSTERS_SINGLE_MACHINE_H_