diff options
Diffstat (limited to 'tensorflow/core/common_runtime/gpu/gpu_stream_util.cc')
-rw-r--r-- | tensorflow/core/common_runtime/gpu/gpu_stream_util.cc | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/tensorflow/core/common_runtime/gpu/gpu_stream_util.cc b/tensorflow/core/common_runtime/gpu/gpu_stream_util.cc new file mode 100644 index 0000000000..ca86c7fa06 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_stream_util.cc @@ -0,0 +1,97 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_stream_util.h" + +#include <set> +#include <string> +#include <unordered_set> +#include <vector> + +#include "tensorflow/core/graph/algorithm.h" +#include "tensorflow/core/lib/core/errors.h" +#include "tensorflow/core/lib/strings/strcat.h" + +namespace tensorflow { +namespace gpu_stream_util { + +Status AssignStreams(const Graph* graph, const AssignStreamsOpts& opts, + std::unordered_map<int, int>* node_to_stream_id) { + VLOG(1) << "AssignStreams"; + Status status; + + // Sanity check arguments. + if (graph == nullptr) + status.Update(errors::InvalidArgument("Bad graph argument supplied.")); + if (node_to_stream_id == nullptr) { + status.Update( + errors::InvalidArgument("Bad node_to_stream_id argument supplied.")); + } + if ((opts.max_streams < 1) || (opts.send_stream >= opts.max_streams) || + (opts.recv_stream >= opts.max_streams) || + (opts.const_stream >= opts.max_streams) || + (opts.compute_stream >= opts.max_streams)) { + status.Update(errors::InvalidArgument("Bad graph argument supplied.")); + } + TF_RETURN_IF_ERROR(status); + + // Topologically sort the nodes. + std::vector<Node*> order; + GetReversePostOrder(*graph, &order); + if (VLOG_IS_ON(2)) { + for (Node* n : order) { + const int node_id = n->id(); + VLOG(2) << "Node " << node_id << " " << n->type_string() << " " + << n->name() << " " << n->in_edges().size() << " inputs"; + for (const Edge* e : n->in_edges()) { + VLOG(2) << " Edge from " << e->src()->id() << " " << e->src()->name() + << " fanout " << e->src()->out_edges().size(); + } + } + } + // We perform stream assigmnent assuming a large number of + // stream IDs and then map these down to the required number of streams + // using simple round-robin. + // Stream Assignment strategy: + // 1. Nodes with zero inputs are always be executed on a + // fresh stream. + // 2. Try to execute a node on the same stream as one of its + // inputs to avoid inter-stream dependencies. + // 3. If any input comes from a node with a large fanout then + // perhaps an indication that it is shared between parallel + // streams of work. We choose a new stream here so that all consumers + // of the tensor are likely to run in parallel. + int highest_stream_id = -1; + for (Node* n : order) { + VLOG(3) << "Inspecting node " << n->DebugString(); + const int node_id = n->id(); + const string& op = n->type_string(); + + // Determine a suitable stream to use. + int stream_id = highest_stream_id + 1; + for (const Edge* e : n->in_edges()) { + const int fanout = e->src()->out_edges().size(); + if (fanout == 1) { + stream_id = (*node_to_stream_id)[e->src()->id()]; + break; + } + } + // Override stream for specific op types. + if (op == "_Send") { + if (opts.send_stream >= 0) stream_id = opts.send_stream; + } else if (op == "_Recv") { + if (opts.recv_stream >= 0) stream_id = opts.recv_stream; + } else if (op == "Const") { + if (opts.const_stream >= 0) stream_id = opts.const_stream; + } else { + if (opts.compute_stream >= 0) stream_id = opts.compute_stream; + } + + (*node_to_stream_id)[node_id] = stream_id % opts.max_streams; + highest_stream_id = std::max(stream_id, highest_stream_id); + } + VLOG(1) << "Identified " << highest_stream_id << " candidate streams for " + << order.size() << " nodes."; + + return Status::OK(); +} + +} // namespace gpu_stream_util +} // namespace tensorflow |