aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/gdr
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-08-15 12:08:29 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-08-15 12:12:05 -0700
commit28ce1d163eeffe618a6972c5245be0e660d94e85 (patch)
tree27e873a692b8bff17f1d6222bbe5c3e9689763ae /tensorflow/contrib/gdr
parent03a33c08dddbea5c58f7e04f8ab7ecae886f20bb (diff)
Merge changes from github.
END_PUBLIC --- Commit 9f81374c3 authored by raymondxyang<zihao.yang@microsoft.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Add option for build more python tests in Cmake (#11853) * Ignore Windows built project * Fix deprecated methods in tf.contrib.python * Fix regex match for Windows build in contrib.keras * Fix Regex match for Windows build in session_bundle * * Fix deprecated methods * Fix regex match for Windows * Fix compatibility issue with Python 3.x * Add missing ops into Windows build for test * Enabled more testcases for Windows build * Clean code and fix typo * Add conditional cmake mode for enabling more unit testcase * Add Cmake mode for major Contrib packages * Add supplementary info in RAEDME for new cmake option * * Update tf_tests after testing with TF 1.3 * Clean code and resolve conflicts * Fix unsafe regex matches and format code * Update exclude list after testing with latest master branch * Fix missing module --- Commit 98f0e1efe authored by Yong Tang<yong.tang.github@outlook.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Dynamic ksize and strides with MaxPool (#11875) * Dynamic ksize with max_pool This fix tries to fix the issue raised in 4746 where ksize is static (attr) with max_pool. This fix changes ksize to input tensor so that it is dynamic now. This fix fixes 4746. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add dynamic ksize to MaxPoolGrad and MaxPoolGradGrad Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add test cases for max_pool_v2 Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Fix GPU Jenkins issue. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Enable MaxPoolV2 in GPU Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Hide MaxPoolV2 and other fixes. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> --- Commit 02d6bc185 authored by Bairen Yi<byronyi@users.noreply.github.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: remove useless variable (#12212) --- Commit ed6b0d905 authored by namrata-ibm<bhavenamrata@gmail.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Adding support for s390x in calculation of cpu_frequency (#12201) --- Commit 627dfc9dd authored by Taehoon Lee<taehoonlee@snu.ac.kr> Committed by Taehoon Lee<taehoonlee@snu.ac.kr>: Fix typos --- Commit c0f9b0a91 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: In fast-math mode emit a tanh that has a faster min/max. PiperOrigin-RevId: 164943597 --- Commit 87605f3d6 authored by Kay Zhu<kayzhu@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: [TF:XLA] Use HloEvaluator for ComputeConstant, remove the need of a dedicated compute constant backend. PiperOrigin-RevId: 164940970 --- Commit 881de45c2 authored by Taehoon Lee<me@taehoonlee.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Add bool type supports for GPU kernels (#11927) * Add bool type supports for GPU kernels * Add bool type test codes for GPU kernels --- Commit eeacdcdb1 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Add missing "CPU" suffix in registrations. PiperOrigin-RevId: 164939527 --- Commit de01be952 authored by namrata-ibm<bhavenamrata@gmail.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Adding support for Big Endian in graph_constructor_test and wav_io (#12179) --- Commit 26719d29f authored by QingYing Chen<pkudysj@126.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Implement CRF decode (Viterbi decode) for tensor (#12056) * Implement CRF decoding for tensors * add test code for tensor version's CRF decoding * made modifications according to pylint * add some comments for crf decode * remove useless code * add comments at the top comment of crf module and add more comments in crf_test * capitalize first char of first word in comments * replace crf_decode test code with a deterministic example --- Commit f9a81ca2f authored by Pete Warden<pete@petewarden.com> Committed by gunan<gunan@google.com>: Create CI build script for Raspberry Pi (#12190) * Create CI build script for Raspberry Pi * Moved location of Pi build script --- Commit e2a163a90 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Merge code from PR #11940 with internal changes from cl/164796436, and update Python tests to also run on GPU. PiperOrigin-RevId: 164929133 --- Commit 08bbfa187 authored by Taehoon Lee<me@taehoonlee.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Fix typos (#12195) --- Commit ab96f41fb authored by Luke Iwanski<luke@codeplay.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: [OpenCL] Extends matmul_benchmark.py to cover SYCL (#11697) * [OpenCL] Extends matmul_benchmark.py to cover SYCL * Fixed typo * /gpu:0 -> /device:GPU:0 * Fixes control_flow_ops_py_test * /gpu: -> /device:GPU: * Fixes //tensorflow/python/profiler/internal:run_metadata_test * gpu: -> GPU: * Fixes tfprof_node * [OpenCL] Fixes device path to name with many colons (#123) The device path is constructed from a device name by replacing all colons with underscores. Some device names contain more than one colon, for example 'device:SYCL:0' which gives a path 'device_SYCL_0'. The previous code would not convert this back to the original device name, but rather to 'device:SYCL_0'. An alternative fix would be to convert all underscores to colons in the device name (i.e. remove the restriction inside `replace("_", ":", 1)`), however I'm not sure if there are any device names which contain underscores. * If no gpu device aviable fake one * gpu: -> device:GPU * Fixes profiler test * /gpu:x -> /device:GPU:x * Fixes debug_io_utils_test.cc test * Fixes device_name_utils_test.cc --- Commit 35e7a3665 authored by Yong Tang<yong.tang.github@outlook.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: Remove unneeded casting of int64 for reverse_sequence (#12192) This fix remove unneeded cast of int64 for reverse_sequence: ``` lengths = math_ops.to_int64(lengths) ``` as int32 has already been enabled for reverse_sequence. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> --- Commit 9fba8c185 authored by Anna R<annarev@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Add benchmark dashboard link to benchmarks doc. Also, I added a link and description for Benchmarks page to Community index page. PiperOrigin-RevId: 164924906 --- Commit bb6f32fa7 authored by Mark Heffernan<meheff@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Make HloAliasAnalysis updatable after changes to the HLO graph. As part of this change make HloAliasAnalysis a thinner layer which basically only holds a map from HloValue to HloBuffer and vice versa. PiperOrigin-RevId: 164923041 --- Commit 9103096c1 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by Thomas K?ppe<tkoeppe@google.com>: Merged commit includes the following changes: 164923041 by meheff: Make HloAliasAnalysis updatable after changes to the HLO graph. As part of this change make HloAliasAnalysis a thinner layer which basically only holds a map from HloValue to HloBuffer and vice versa. -- PiperOrigin-RevId: 164923041 --- Commit 822603aed authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Merging sibling fusion instruction using multi_output_fusion PiperOrigin-RevId: 164920220 --- Commit c035aa2a8 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Go: Update generated wrapper functions for TensorFlow ops. PiperOrigin-RevId: 164917891 --- Commit e1e81d9ba authored by Luke Iwanski<luke@codeplay.com> Committed by Rasmus Munk Larsen<rmlarsen@google.com>: [OpenCL] Fixes double memcpy bug (#151) (#12173) * [OpenCL] Fixes double memcpy bug (#151) As the debg CopyOp is called on a Tensor without type, we need to use the DataType enum to get type information, and use this to pass the type on to Eigen. This is a workaround Eigen's need to have a type when calling memcpy. If the Eigen memcpy can be provided without a type requirement, then the memcpy in sycl_util is unnecessary. * Acts on feedback from: #12173/files/32cb12a9001b672425867b5a3110fd98e737a20b#r132496277 --- Commit d9ca2d86d authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Internal change PiperOrigin-RevId: 164916465 --- Commit b8d13d218 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Remove more parts of DCASGD missed in the first pass. (47949b) PiperOrigin-RevId: 164914552 --- Commit 73b3d52c7 authored by Alexandre Passos<apassos@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: cmake fix PiperOrigin-RevId: 164911656 --- Commit 2173b5b0a authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Allow TFE_TensorHandleCopyToDevice to have the same device as src and destination. It will reuse the same underlying buffer in those cases. PiperOrigin-RevId: 164909906 --- Commit 13eb3b90e authored by Alexandre Passos<apassos@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Experimental C and Python APIs to invoke TensorFlow kernels on concrete values. PiperOrigin-RevId: 164902588 --- Commit 7dfabcc01 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Initialize ExecutionOptions in ComputeConstant to default values. PiperOrigin-RevId: 164894867 --- Commit c8897e9bc authored by Benoit Steiner<bsteiner@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Static required time computation PiperOrigin-RevId: 164894645 --- Commit 076158f9b authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Enable implicit->explicit conversion by default. PiperOrigin-RevId: 164890915 --- Commit 58c4a4cb1 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Bugfix: number of input channels is not necessarily in the last dimension, after introduction of data_format param. PiperOrigin-RevId: 164889729 --- Commit 8f9b1af8a authored by Igor Saprykin<isaprykin@google.com> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Recover MonitoredSession when the Coordinator is requested to stop with one of the _PREEMPTION_ERRORS. When SyncReplicasOptimizer is used, a preemption in the Coordinator may result in two cases: Case 1) the session gets silently marked as complete Case 2) the session gets stuck This CL aims to solve and verify solutions for both of these problems. Fix 1 changes the should_stop logic. Fix 2 changes the CoordinatedSession.run() logic. SyncReplicasOptimizer runs a separate set of threads using a Coordinator instance. Those threads do FIFOQueue.enqueue; the main thread does a blocking FIFOQueue.dequeue. `sync_token_q` FIFOQueue is on parameter-servers. When one of the PS instances gets preempted, an AbortedError causes the Coordinator to stop via request_stop(ex). That by itself changes the state of MonitoredSession.should_stop() to True (Fix 1). Results of the blocking Dequeue operation are sent to the chief worker via Recv. What happens next depends on the amount of tokens in `sync_token_q`. If there are enough for the next call to Dequeue to return, then the low-level "tf session run() call" returns. The next iteration of the `while not MonitoredSession.should_stop()` loop decides that the training is complete (Case 1). If there are not enough tokens in `sync_token_q`, then the blocking Dequeue is going to keep waiting for them. This results in the graph execution getting stuck and the whole session getting garbage collected after 10 minutes (Case 2). We decided to fix that by re-creating a session after it gets garbage collected (Fix 2). An alternative was to try to cancel the pending Dequeue operation, but it's not clear that it is the right thing to do and it is also not easy. PiperOrigin-RevId: 164888390 --- Commit 46e4de6e5 authored by A. Unique TensorFlower<gardener@tensorflow.org> Committed by TensorFlower Gardener<gardener@tensorflow.org>: Undo loop fusion changes for now as they seem to be altering a few results. END_PUBLIC RELNOTES: n/a BEGIN_PUBLIC BEGIN_PUBLIC Automated g4 rollback of changelist 164825735 PiperOrigin-RevId: 165340331
Diffstat (limited to 'tensorflow/contrib/gdr')
-rw-r--r--tensorflow/contrib/gdr/BUILD125
-rw-r--r--tensorflow/contrib/gdr/README.md122
-rw-r--r--tensorflow/contrib/gdr/gdr.proto13
-rw-r--r--tensorflow/contrib/gdr/gdr_memory_manager.cc682
-rw-r--r--tensorflow/contrib/gdr/gdr_memory_manager.h63
-rw-r--r--tensorflow/contrib/gdr/gdr_rendezvous_mgr.cc201
-rw-r--r--tensorflow/contrib/gdr/gdr_rendezvous_mgr.h42
-rw-r--r--tensorflow/contrib/gdr/gdr_server_lib.cc127
-rw-r--r--tensorflow/contrib/gdr/gdr_server_lib.h52
-rw-r--r--tensorflow/contrib/gdr/gdr_worker.cc146
-rw-r--r--tensorflow/contrib/gdr/gdr_worker.h45
11 files changed, 1618 insertions, 0 deletions
diff --git a/tensorflow/contrib/gdr/BUILD b/tensorflow/contrib/gdr/BUILD
new file mode 100644
index 0000000000..645e364d19
--- /dev/null
+++ b/tensorflow/contrib/gdr/BUILD
@@ -0,0 +1,125 @@
+# Description:
+# GPU Direct RDMA Out-of-Band Tensor transport for TensorFlow.
+
+package(default_visibility = [
+ "//tensorflow:__subpackages__",
+])
+
+licenses(["notice"]) # Apache 2.0
+
+exports_files(["LICENSE"])
+
+filegroup(
+ name = "all_files",
+ srcs = glob(
+ ["**/*"],
+ exclude = [
+ "**/METADATA",
+ "**/OWNERS",
+ ],
+ ),
+ visibility = ["//tensorflow:__subpackages__"],
+)
+
+filegroup(
+ name = "c_srcs",
+ data = glob([
+ "**/*.cc",
+ "**/*.h",
+ ]),
+)
+
+load(
+ "//tensorflow:tensorflow.bzl",
+ "tf_cuda_library",
+)
+
+# For platform specific build config
+load(
+ "//tensorflow/core:platform/default/build_config.bzl",
+ "tf_proto_library_cc",
+)
+
+tf_proto_library_cc(
+ name = "gdr_proto",
+ srcs = ["gdr.proto"],
+ cc_api_version = 2,
+ visibility = [
+ "//tensorflow:__subpackages__",
+ ],
+)
+
+tf_cuda_library(
+ name = "gdr_memory_manager",
+ srcs = ["gdr_memory_manager.cc"],
+ hdrs = ["gdr_memory_manager.h"],
+ linkopts = select({
+ "//tensorflow:with_gdr_support": [
+ "-libverbs",
+ "-lrdmacm",
+ ],
+ "//conditions:default": [],
+ }),
+ deps = [
+ ":gdr_proto_cc",
+ "//tensorflow/core:framework",
+ "//tensorflow/core:gpu_runtime",
+ "//tensorflow/core:lib",
+ "//tensorflow/core:lib_internal",
+ ],
+)
+
+tf_cuda_library(
+ name = "gdr_worker",
+ srcs = ["gdr_worker.cc"],
+ hdrs = ["gdr_worker.h"],
+ deps = [
+ ":gdr_memory_manager",
+ "//tensorflow/core:core_cpu_internal",
+ "//tensorflow/core:framework",
+ "//tensorflow/core:gpu_runtime",
+ "//tensorflow/core:lib",
+ "//tensorflow/core:lib_internal",
+ "//tensorflow/core/distributed_runtime:graph_mgr",
+ "//tensorflow/core/distributed_runtime:rendezvous_mgr_interface",
+ "//tensorflow/core/distributed_runtime:worker",
+ "//tensorflow/core/distributed_runtime:worker_cache",
+ "//tensorflow/core/distributed_runtime:worker_env",
+ "//tensorflow/core/distributed_runtime:worker_session",
+ "//tensorflow/core/distributed_runtime/rpc:grpc_call",
+ "//tensorflow/core/distributed_runtime/rpc:grpc_tensor_coding",
+ "//tensorflow/core/distributed_runtime/rpc:grpc_util",
+ "//tensorflow/core/distributed_runtime/rpc:grpc_worker_service",
+ ],
+)
+
+cc_library(
+ name = "gdr_rendezvous_mgr",
+ srcs = ["gdr_rendezvous_mgr.cc"],
+ hdrs = ["gdr_rendezvous_mgr.h"],
+ deps = [
+ ":gdr_memory_manager",
+ "//tensorflow/core:core_cpu_internal",
+ "//tensorflow/core:framework",
+ "//tensorflow/core:lib",
+ "//tensorflow/core/distributed_runtime:base_rendezvous_mgr",
+ "//tensorflow/core/distributed_runtime:worker_cache",
+ "//tensorflow/core/distributed_runtime:worker_env",
+ "//tensorflow/core/distributed_runtime:worker_interface",
+ ],
+)
+
+cc_library(
+ name = "gdr_server_lib",
+ srcs = ["gdr_server_lib.cc"],
+ hdrs = ["gdr_server_lib.h"],
+ linkstatic = 1, # Seems to be needed since alwayslink is broken in bazel
+ deps = [
+ ":gdr_memory_manager",
+ ":gdr_rendezvous_mgr",
+ ":gdr_worker",
+ "//tensorflow/core:lib_internal",
+ "//tensorflow/core/distributed_runtime/rpc:grpc_server_lib",
+ ],
+ alwayslink = 1,
+)
diff --git a/tensorflow/contrib/gdr/README.md b/tensorflow/contrib/gdr/README.md
new file mode 100644
index 0000000000..34ce60b360
--- /dev/null
+++ b/tensorflow/contrib/gdr/README.md
@@ -0,0 +1,122 @@
+Introduction
+===
+
+This is an implementation of GDR out-of-band transport for TensorFlow distributed runtime, complementary to current gRPC transport. It uses gRPC as control plane to setup rendezvous for each tensor transmission, and utilizes [GPU Direct RDMA](https://developer.nvidia.com/gpudirect) whenever possible to transmit tensors in remote GPU memory through network interface card (NIC), bypassing host memory and CPU entirely. It gracefully falls back to ordinary RDMA or even gRPC when GDR is not available.
+
+Design
+===
+
+The GDR out-of-band transport is designed to avoid any unnecessary memory copies, especially for large tensors (>100MB). That typically requires registration of tensor buffers to NIC in an ad-hoc manner, which is rather slow as described in the design trade-off of the verbs runtime. The verbs runtime thus chooses to manage its own NIC-registered buffers and copy the tensors from/to those buffers for every single tensor transfer.
+
+We show that, however, such design trade-off is not always relevant. In this patch, we manage both computation and communication buffers in a unified manner. By pre-registration of large buffers to NIC and allocating small tensors from the buffer pool using a BFC allocator, it is possible to avoid both ad-hoc buffer registration and memory copies all together.
+
+For the actual tensor transport, we rely on gRPC to transmit the [remote buffer information](gdr.proto). This greatly simplifies our design, and there are only 2 types of RDMA messages: a single READ to retrieve the tensor data (bypassing remote CPU), and another invalidate using WRITE with IMM to release the tensor buffer on the remote side. The remote side will only be polling the invalidate message and `Unref` the tensor buffers that read by its peer.
+
+Environment
+===
+
+To fully utilize GDR, the target environment has to meet 3 conditions:
+
+1. There is an RDMA capable device with corresponding [OFED package](https://www.openfabrics.org/index.php/overview.html) installed (detailed information is available from your [Infiniband/RoCE](http://www.mellanox.com/page/products_dyn?product_family=116)/[iWarp](http://www.chelsio.com/gpudirect-rdma/) vendor), which could be verified through `ibv_devinfo`, e.g.
+
+```
+$ ibv_devinfo
+hca_id: mlx4_0
+ transport: InfiniBand (0)
+ fw_ver: 2.40.7000
+ node_guid: 248a:0703:00f6:3370
+ sys_image_guid: 248a:0703:00f6:3370
+ vendor_id: 0x02c9
+ vendor_part_id: 4099
+ hw_ver: 0x1
+ board_id: MT_1090110023
+ phys_port_cnt: 2
+ Device ports:
+ port: 1
+ state: PORT_ACTIVE (4)
+ max_mtu: 4096 (5)
+ active_mtu: 1024 (3)
+ sm_lid: 0
+ port_lid: 0
+ port_lmc: 0x00
+ link_layer: Ethernet
+
+ port: 2
+ state: PORT_ACTIVE (4)
+ max_mtu: 4096 (5)
+ active_mtu: 1024 (3)
+ sm_lid: 0
+ port_lid: 0
+ port_lmc: 0x00
+ link_layer: Ethernet
+```
+
+2. There is a GDR capable GPU, i.e. of Fermi, Kepler or later architecture with [corresponding driver](http://docs.nvidia.com/cuda/gpudirect-rdma/index.html) installed. The PCI-e topology could be confirmed by `nvidia-smi topo -m`. For example, in the following topology, `GPU2` and `GPU3` are adjacent to `mlx4_0`, and tensors on these devices could benefit from GDR in current implementation.
+
+```
+$ nvidia-smi topo -m
+ GPU0 GPU1 GPU2 GPU3 mlx4_0 CPU Affinity
+GPU0 X PHB SOC SOC SOC 0-5
+GPU1 PHB X SOC SOC SOC 0-5
+GPU2 SOC SOC X PHB PHB 6-11
+GPU3 SOC SOC PHB X PHB 6-11
+mlx4_0 SOC SOC PHB PHB X
+
+Legend:
+
+ X = Self
+ SOC = Connection traversing PCIe as well as the SMP link between CPU sockets(e.g. QPI)
+ PHB = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
+ PXB = Connection traversing multiple PCIe switches (without traversing the PCIe Host Bridge)
+ PIX = Connection traversing a single PCIe switch
+ NV# = Connection traversing a bonded set of # NVLinks
+```
+
+3. The [`nv_peer_mem`](https://github.com/Mellanox/nv_peer_memory) kernel module is installed.
+
+How to build and run in GDR mode
+===
+
+To test it out on a GDR capable environment, choose to enable GDR in your configure script.
+
+```
+Do you wish to build TensorFlow with GDR support? [y/N]: y
+GDR support will be enabled for TensorFlow.
+```
+
+Change your `protocol` to `grpc+gdr` to enable GDR in your deployment.
+
+```
+server = tf.train.Server(cluster, job_name="local", task_index=0, protocol='grpc+gdr') # default protocol is 'grpc'
+```
+
+Currently the out-of-band transport service listens to the same IP and port address as specified in gRPC.
+
+A successful initialization looks like this:
+
+```
+2017-08-05 19:10:38.601718: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:0) -> (device: 0, name: Tesla K40m, pci bus id: 0000:02:00.0)
+2017-08-05 19:10:38.601728: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:1) -> (device: 1, name: Tesla K40m, pci bus id: 0000:03:00.0)
+2017-08-05 19:10:38.601736: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:2) -> (device: 2, name: Tesla K40m, pci bus id: 0000:82:00.0)
+2017-08-05 19:10:38.601742: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:3) -> (device: 3, name: Tesla K40m, pci bus id: 0000:83:00.0)
+2017-08-05 19:10:39.591026: I tensorflow/contrib/gdr/gdr_memory_manager.cc:235] RDMA server is listening on 10.40.2.200:5001
+2017-08-05 19:10:39.591071: I tensorflow/contrib/gdr/gdr_memory_manager.cc:285] Instrumenting CPU allocator cuda_host_bfc
+2017-08-05 19:10:39.591083: I tensorflow/contrib/gdr/gdr_memory_manager.cc:285] Instrumenting CPU allocator cpu_pool
+2017-08-05 19:10:39.591095: I tensorflow/contrib/gdr/gdr_memory_manager.cc:285] Instrumenting CPU allocator cpu_rdma_bfc
+2017-08-05 19:10:39.591278: I tensorflow/contrib/gdr/gdr_memory_manager.cc:78] NUMA node for device: mlx4_0 is 1
+2017-08-05 19:10:39.740253: I tensorflow/contrib/gdr/gdr_memory_manager.cc:296] Instrumenting GPU allocator with bus_id 2
+```
+
+The last line suggests that the GPUs with bus id 2 (mapped to pci bus id prefixed 0000:8) will benefit from GDR and host memory bypass, which is `/gpu:2` and `/gpu:3` in this case.
+
+Caveats
+===
+
+In current implementation, only tensors that reside in host memory or in GPU memory such that the GPU is adjacent to an RDMA capable NIC will use direct RDMA as its transport. When RDMA is available but not GDR, a temporary tensor copy on host memory will be used as RDMA source/destination (and copied from/to the target device). When there is no RDMA device present, it can even fallback to the original gRPC runtime. While it is theoretically possible to mix GDR enabled TF with non-GDR deployments in the same job, make sure the environment is properly setup so the GDR mode is enabled whenever possible (i.e. do not fall back to gRPC when it is not absolutely necessary).
+
+In the original design (as in the reference), tensor buffers are only registered to NIC when we could determine that the tensor will be either a source of Send or a sink of Recv across physical machine boundary. However, to implement the precise allocations, we need to change all the devices to possibly return a NIC compatible allocator. As GDR is currently in contrib, we would like to avoid the unnecessary code disruption to the TF core, so we allocate all tensors from NIC-registered buffers using a BFC allocator. This behaviour is similar to the effect of enabling the extra GPU option `force_gpu_compatible`, which allocate all host tensors in GPU-registered buffers no matter they will be transferred from/to GPUs or not.
+
+Reference
+===
+
+Bairen Yi, Jiacheng Xia, Li Chen, and Kai Chen. 2017. Towards Zero Copy Dataflows using RDMA. In Proceedings of SIGCOMM Posters and Demos'17, Los Angeles, CA, USA, August 22-24, 2017, 3 pages. https://doi.org/10.1145/3123878.3123907
diff --git a/tensorflow/contrib/gdr/gdr.proto b/tensorflow/contrib/gdr/gdr.proto
new file mode 100644
index 0000000000..c0b89245b1
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr.proto
@@ -0,0 +1,13 @@
+syntax = "proto3";
+
+package tensorflow;
+option cc_enable_arenas = true;
+
+message RemoteMemoryRegion {
+ string host = 1;
+ string port = 2;
+ uint64 addr = 3;
+ uint32 rkey = 4;
+ uint32 tensor_key = 5;
+ uint64 checksum = 6;
+}
diff --git a/tensorflow/contrib/gdr/gdr_memory_manager.cc b/tensorflow/contrib/gdr/gdr_memory_manager.cc
new file mode 100644
index 0000000000..c55989e3e5
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_memory_manager.cc
@@ -0,0 +1,682 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifdef TENSORFLOW_USE_GDR
+
+#include "tensorflow/contrib/gdr/gdr_memory_manager.h"
+
+#include <atomic>
+#include <cerrno>
+#include <fstream>
+#include <list>
+#include <map>
+#include <set>
+
+#include <fcntl.h>
+#include <rdma/rdma_cma.h>
+#include <rdma/rdma_verbs.h>
+#include <sys/epoll.h>
+
+#include "tensorflow/contrib/gdr/gdr.pb.h"
+#include "tensorflow/core/common_runtime/bfc_allocator.h"
+#include "tensorflow/core/common_runtime/device.h"
+#include "tensorflow/core/common_runtime/dma_helper.h"
+#if GOOGLE_CUDA
+#include "tensorflow/core/common_runtime/gpu/gpu_util.h"
+#include "tensorflow/core/common_runtime/gpu/process_state.h"
+#endif // GOOGLE_CUDA
+#include "tensorflow/core/framework/allocator_registry.h"
+#include "tensorflow/core/lib/core/status.h"
+#include "tensorflow/core/platform/macros.h"
+#include "tensorflow/core/platform/mutex.h"
+
+namespace tensorflow {
+
+namespace {
+
+bool IsGDRAvailable() {
+#if defined(__APPLE__)
+ return false;
+#elif defined(PLATFORM_WINDOWS)
+ return false;
+#else
+ std::ifstream ifs("/proc/modules");
+ string line;
+ while (std::getline(ifs, line)) {
+ auto sep = line.find(' ');
+ CHECK_NE(sep, std::string::npos);
+ if (line.substr(0, sep) == "nv_peer_mem") {
+ return true;
+ }
+ }
+ return false;
+#endif
+}
+
+int TryToReadNumaNode(ibv_device* device) {
+#if defined(__APPLE__)
+ LOG(INFO) << "OS X does not support NUMA - returning NUMA node 0";
+ return 0;
+#elif defined(PLATFORM_WINDOWS)
+ // Windows support for NUMA is not currently implemented. Return node 0.
+ return 0;
+#else
+ VLOG(2) << "Trying to read NUMA node for device: " << device->name;
+ static const int kUnknownNumaNode = -1;
+
+ auto filename = string(device->ibdev_path) + "/device/numa_node";
+
+ std::ifstream ifs(filename.c_str());
+ string content;
+ CHECK(std::getline(ifs, content));
+
+ int32 value;
+ if (strings::safe_strto32(content, &value)) {
+ if (value < 0) {
+ LOG(INFO) << "Successful NUMA node read from SysFS had negative value ("
+ << value << "), but there must be at least one NUMA node"
+ ", so returning NUMA node zero";
+ return 0;
+ }
+ LOG(INFO) << "NUMA node for device: " << device->name << " is " << value;
+ return value;
+ }
+ return kUnknownNumaNode;
+#endif
+}
+
+void EndpointDeleter(rdma_cm_id* id) {
+ if (id) {
+ rdma_destroy_ep(id);
+ }
+}
+
+void MRDeleter(ibv_mr* mr) {
+ if (mr) {
+ rdma_dereg_mr(mr);
+ }
+}
+
+using RdmaEndpointPtr = std::unique_ptr<rdma_cm_id, decltype(&EndpointDeleter)>;
+
+using MemoryRegionPtr = std::unique_ptr<ibv_mr, decltype(&MRDeleter)>;
+
+class GdrMemoryManager : public RemoteMemoryManager {
+ public:
+ GdrMemoryManager(const string& host, const string& port);
+
+ virtual ~GdrMemoryManager();
+
+ virtual Status Init() override;
+
+ virtual void Run() override;
+
+ virtual void Stop() override;
+
+ virtual Status TransportOptionsFromTensor(
+ ::google::protobuf::Any* mutable_transport_options, const Tensor& tensor,
+ Device* device, DeviceContext* device_context, bool on_host) override;
+
+ virtual Status TensorFromTransportOptions(
+ Tensor* tensor, const ::google::protobuf::Any& transport_options,
+ Device* device, DeviceContext* device_context, bool on_host) override;
+
+ protected:
+ Status CreateEndpoint(const string& host, const string& port,
+ RdmaEndpointPtr& endpoint);
+
+ static bool Comparator(const void* ptr, const MemoryRegionPtr& other) {
+ return ptr < reinterpret_cast<char*>(other->addr) + other->length;
+ }
+
+ ibv_mr* FindMemoryRegion(void* addr, size_t length);
+
+ void InsertMemoryRegion(void* addr, size_t length);
+
+#if GOOGLE_CUDA
+ void InsertCUDAMemoryRegion(void* addr, size_t length);
+#endif
+
+ void EvictMemoryRegion(void* addr, size_t length);
+
+ private:
+ const string host_;
+ const string port_;
+ RdmaEndpointPtr listening_;
+ std::atomic<bool> stopped_;
+ int epfd_;
+
+ // Server side endpoints
+ // Accessed sequentially in Run() so not protected by lock
+ std::list<RdmaEndpointPtr> server_clients_;
+
+ using TensorKey = uint32_t;
+ std::atomic<TensorKey> next_key_;
+
+ // Server side on-the-fly tensor buffers
+ mutex server_mu_;
+ std::map<TensorKey, const TensorBuffer*> tensor_buffers_
+ GUARDED_BY(server_mu_);
+
+ // Client side endpoints
+ mutex client_mu_;
+ std::map<std::pair<string, string>, RdmaEndpointPtr> clients_
+ GUARDED_BY(cient_mu_);
+
+ // Managed memory regions
+ mutex alloc_mu_;
+ std::vector<MemoryRegionPtr> mrs_ GUARDED_BY(alloc_mu_);
+
+ TF_DISALLOW_COPY_AND_ASSIGN(GdrMemoryManager);
+};
+
+// TODO(byronyi): remove this class duplicated from the one in
+// common/runtime/gpu/pool_allocator.h when it is available in common_runtime
+class BasicCPUAllocator : public SubAllocator {
+ public:
+ ~BasicCPUAllocator() override {}
+
+ void* Alloc(size_t alignment, size_t num_bytes) override {
+ return port::AlignedMalloc(num_bytes, alignment);
+ }
+ void Free(void* ptr, size_t) override { port::AlignedFree(ptr); }
+};
+
+// TODO(byronyi): remove this class and its registration when the default
+// cpu_allocator() returns visitable allocator
+class BFCRdmaAllocator : public BFCAllocator {
+ public:
+ BFCRdmaAllocator()
+ : BFCAllocator(new BasicCPUAllocator(), 1LL << 36, true, "cpu_rdma_bfc") {
+ }
+};
+
+REGISTER_MEM_ALLOCATOR("BFCRdmaAllocator", 101, BFCRdmaAllocator);
+
+GdrMemoryManager::GdrMemoryManager(const string& host, const string& port)
+ : host_(host),
+ port_(port),
+ listening_(nullptr, EndpointDeleter),
+ stopped_(true),
+ next_key_(0) {}
+
+GdrMemoryManager::~GdrMemoryManager() { close(epfd_); }
+
+Status GdrMemoryManager::Init() {
+ epfd_ = epoll_create1(0);
+ if (epfd_ == -1) {
+ return errors::Unavailable(strerror(errno), ": ", "epoll_create");
+ }
+
+ rdma_addrinfo* addrinfo;
+ rdma_addrinfo hints = {};
+ hints.ai_port_space = RDMA_PS_TCP;
+ hints.ai_flags = RAI_PASSIVE;
+ if (rdma_getaddrinfo(const_cast<char*>(host_.c_str()),
+ const_cast<char*>(port_.c_str()), &hints, &addrinfo)) {
+ return errors::Unavailable(strerror(errno), ": ", "cannot resolve rdma://",
+ host_, ":", port_);
+ }
+
+ ibv_qp_init_attr init_attr = {};
+ init_attr.qp_type = IBV_QPT_RC;
+ init_attr.cap.max_recv_wr = 32;
+ init_attr.cap.max_send_wr = 1;
+ init_attr.cap.max_recv_sge = 1;
+ init_attr.cap.max_send_sge = 1;
+
+ // Create listening endpoint
+ rdma_cm_id* id;
+ if (rdma_create_ep(&id, addrinfo, nullptr, &init_attr)) {
+ return errors::Unavailable(strerror(errno), ": ", "cannot bind to rdma://",
+ host_, ":", port_);
+ }
+ listening_.reset(id);
+ rdma_freeaddrinfo(addrinfo);
+
+ // Listen without backlog
+ if (rdma_listen(listening_.get(), 0)) {
+ return errors::Unavailable(strerror(errno), ": ",
+ "cannot listen on rdma://", host_, ":", port_);
+ }
+ LOG(INFO) << "RDMA server is listening on " << host_ << ":" << port_;
+
+ if (listening_->verbs == nullptr) {
+ return errors::Unimplemented(
+ "Unsupported address ", host_, ":", port_,
+ " as it does not bind to a particular RDMA device");
+ }
+
+ int flags = fcntl(listening_->channel->fd, F_GETFL, 0);
+ if (fcntl(listening_->channel->fd, F_SETFL, flags | O_NONBLOCK)) {
+ return errors::Unavailable(strerror(errno), ": ",
+ "cannot set server to non-blocking mode");
+ }
+
+ epoll_event event = {};
+ event.events = EPOLLIN | EPOLLPRI;
+ event.data.ptr = listening_.get();
+ if (epoll_ctl(epfd_, EPOLL_CTL_ADD, listening_->channel->fd, &event)) {
+ return errors::Unavailable(strerror(errno), ": ",
+ "cannot add server to epoll");
+ }
+
+ Allocator* allocators[] = {
+#if GOOGLE_CUDA
+ ProcessState::singleton()->GetCUDAHostAllocator(0),
+ ProcessState::singleton()->GetCPUAllocator(0),
+#endif // GOOGLE_CUDA
+ cpu_allocator(),
+ };
+
+ using namespace std::placeholders;
+ VisitableAllocator::Visitor alloc_visitor =
+ std::bind(&GdrMemoryManager::InsertMemoryRegion, this, _1, _2);
+ VisitableAllocator::Visitor free_visitor =
+ std::bind(&GdrMemoryManager::EvictMemoryRegion, this, _1, _2);
+
+ std::set<Allocator*> instrumented_;
+
+ // Host memory allocators
+ for (Allocator* allocator : allocators) {
+ auto* visitable_allocator = dynamic_cast<VisitableAllocator*>(allocator);
+ CHECK(visitable_allocator) << "is not visitable for instrumentation"
+ << allocator->Name();
+ // Make sure we don't instrument the same allocator twice
+ if (instrumented_.find(allocator) == std::end(instrumented_)) {
+ visitable_allocator->AddAllocVisitor(alloc_visitor);
+ visitable_allocator->AddFreeVisitor(free_visitor);
+ instrumented_.insert(allocator);
+ LOG(INFO) << "Instrumenting CPU allocator " << allocator->Name();
+ }
+ }
+
+#if GOOGLE_CUDA
+ VisitableAllocator::Visitor cuda_alloc_visitor =
+ std::bind(&GdrMemoryManager::InsertMemoryRegion, this, _1, _2);
+ if (IsGDRAvailable()) {
+ // Note we don't free allocated GPU memory so there is no free visitor
+ int32_t bus_id = TryToReadNumaNode(listening_->verbs->device) + 1;
+ ProcessState::singleton()->AddGPUAllocVisitor(bus_id, cuda_alloc_visitor);
+ LOG(INFO) << "Instrumenting GPU allocator with bus_id " << bus_id;
+ }
+#endif // GOOGLE_CUDA
+
+ return Status::OK();
+}
+
+void GdrMemoryManager::Run() {
+ stopped_ = false;
+ while (!stopped_) {
+ epoll_event events[32];
+ int ret = epoll_wait(epfd_, events, 32, 1);
+ if (ret == -1) {
+ LOG(ERROR) << "epoll_wait: " << strerror(errno);
+ return;
+ }
+ for (int i = 0; i < ret; i++) {
+ rdma_cm_id* id = static_cast<rdma_cm_id*>(events[i].data.ptr);
+ if (id == listening_.get()) {
+ // Accept incoming connections
+ if (!rdma_get_request(listening_.get(), &id)) {
+ if (!rdma_accept(id, nullptr)) {
+ LOG(INFO) << "Accepted new RDMA connection";
+ if (ibv_req_notify_cq(id->recv_cq, 0)) {
+ LOG(ERROR) << strerror(errno) << ": ibv_req_notify_cq failed";
+ EndpointDeleter(id);
+ continue;
+ }
+ for (int i = 0; i < 32; i++) {
+ if (rdma_post_recvv(id, nullptr, nullptr, 0)) {
+ LOG(ERROR) << strerror(errno) << ": rdma_post_recvv failed";
+ EndpointDeleter(id);
+ continue;
+ }
+ }
+ int flags = fcntl(id->recv_cq_channel->fd, F_GETFL, 0);
+ if (fcntl(id->recv_cq_channel->fd, F_SETFL, flags | O_NONBLOCK)) {
+ LOG(ERROR) << strerror(errno)
+ << ": cannot set server_client to non-blocking mode";
+ EndpointDeleter(id);
+ continue;
+ }
+ epoll_event event = {};
+ event.events = EPOLLIN | EPOLLPRI;
+ event.data.ptr = id;
+ if (epoll_ctl(epfd_, EPOLL_CTL_ADD, id->recv_cq_channel->fd,
+ &event)) {
+ LOG(ERROR) << strerror(errno)
+ << ": cannot add server client to epoll";
+ EndpointDeleter(id);
+ continue;
+ }
+ server_clients_.push_back({id, EndpointDeleter});
+ }
+ }
+ } else {
+ // Polling work completions
+ ibv_cq* cq;
+ void* context;
+ if (!ibv_get_cq_event(id->recv_cq_channel, &cq, &context)) {
+ ibv_ack_cq_events(id->recv_cq, 1);
+ if (ibv_req_notify_cq(id->recv_cq, 0)) {
+ LOG(ERROR) << strerror(errno) << ": ibv_req_notify_cq failed";
+ continue;
+ }
+ ibv_wc wc[32];
+ int ret = ibv_poll_cq(id->recv_cq, 32, wc);
+ if (ret < 0) {
+ LOG(ERROR) << "ibv_poll_cq failed";
+ continue;
+ }
+ for (int i = 0; i < ret; i++) {
+ if (wc[i].opcode != IBV_WC_RECV_RDMA_WITH_IMM) {
+ LOG(ERROR) << "Received unknown operation " << wc[i].opcode;
+ }
+ if (wc[i].status != 0) {
+ LOG(ERROR) << ibv_wc_status_str(wc[i].status);
+ }
+ TensorKey tensor_key = ntohl(wc[i].imm_data);
+ {
+ mutex_lock l(server_mu_);
+ auto iter = tensor_buffers_.find(tensor_key);
+ if (iter == std::end(tensor_buffers_)) {
+ LOG(ERROR) << "Cannot find tensor buffer for tensor key "
+ << tensor_key;
+ } else {
+ const TensorBuffer* buffer = iter->second;
+ buffer->Unref();
+ tensor_buffers_.erase(iter);
+ }
+ }
+ if (rdma_post_recvv(id, nullptr, nullptr, 0)) {
+ perror("rdma_post_recvv");
+ LOG(ERROR) << "rdma_post_recvv failed";
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+void GdrMemoryManager::Stop() { stopped_ = true; }
+
+Status GdrMemoryManager::TransportOptionsFromTensor(
+ ::google::protobuf::Any* mutable_transport_options, const Tensor& tensor,
+ Device* device, DeviceContext* device_context, bool on_host) {
+ auto buffer = DMAHelper::buffer(&tensor);
+ void* addr = buffer->data();
+ size_t length = buffer->size();
+ if (length == 0) {
+ return errors::Unavailable("Cannot register tensor buffer of size 0");
+ }
+
+ ibv_mr* mr = FindMemoryRegion(addr, length);
+
+ Tensor host_copy;
+#if GOOGLE_CUDA
+ if (!on_host && mr != nullptr) {
+ TF_RETURN_IF_ERROR(GPUUtil::Sync(device));
+ } else if (!on_host) {
+ Allocator* alloc = ProcessState::singleton()->GetCUDAHostAllocator(0);
+ host_copy = Tensor(alloc, tensor.dtype(), tensor.shape());
+ Status s;
+ Notification n;
+ GPUUtil::CopyGPUTensorToCPU(device, device_context, &tensor, &host_copy,
+ [&s, &n](const Status& status) {
+ s.Update(status);
+ n.Notify();
+ });
+ n.WaitForNotification();
+ if (!s.ok()) {
+ return s;
+ }
+ buffer = DMAHelper::buffer(&host_copy);
+ addr = buffer->data();
+ length = buffer->size();
+ mr = FindMemoryRegion(addr, length);
+ }
+#endif
+
+ if (mr == nullptr) {
+ return errors::Unavailable("Cannot find pinned memory region");
+ }
+
+ buffer->Ref();
+ TensorKey tensor_key = next_key_++;
+ {
+ mutex_lock l(server_mu_);
+ tensor_buffers_.insert(std::make_pair(tensor_key, buffer));
+ }
+
+ uint64_t checksum = 0;
+ if (VLOG_IS_ON(2)) {
+#ifdef GOOGLE_CUDA
+ if (device->tensorflow_gpu_device_info() && (!on_host)) {
+ if (host_copy.NumElements() > 0) {
+ checksum = GPUUtil::Checksum(device, device_context, host_copy);
+ } else {
+ checksum = GPUUtil::Checksum(device, device_context, tensor);
+ }
+ } else {
+ checksum = GPUUtil::Checksum(tensor);
+ }
+#endif
+ }
+
+ RemoteMemoryRegion remote_mr;
+ remote_mr.set_host(host_);
+ remote_mr.set_port(port_);
+ remote_mr.set_addr(reinterpret_cast<uint64_t>(addr));
+ remote_mr.set_rkey(mr->rkey);
+ remote_mr.set_tensor_key(tensor_key);
+ remote_mr.set_checksum(checksum);
+ mutable_transport_options->PackFrom(remote_mr);
+
+ return Status::OK();
+}
+
+Status GdrMemoryManager::TensorFromTransportOptions(
+ Tensor* tensor, const ::google::protobuf::Any& transport_options,
+ Device* device, DeviceContext* device_context, bool on_host) {
+ RemoteMemoryRegion remote_mr;
+ if (!transport_options.UnpackTo(&remote_mr)) {
+ return errors::NotFound("No RDMA transport options found");
+ }
+
+ auto buffer = DMAHelper::buffer(tensor);
+ void* addr = buffer->data();
+ size_t length = buffer->size();
+ ibv_mr* mr = FindMemoryRegion(addr, length);
+
+ Tensor host_copy;
+#if GOOGLE_CUDA
+ if (!on_host && mr != nullptr) {
+ TF_RETURN_IF_ERROR(GPUUtil::Sync(device));
+ } else if (!on_host) {
+ Allocator* alloc = ProcessState::singleton()->GetCUDAHostAllocator(0);
+ host_copy = Tensor(alloc, tensor->dtype(), tensor->shape());
+ buffer = DMAHelper::buffer(&host_copy);
+ addr = buffer->data();
+ length = buffer->size();
+ mr = FindMemoryRegion(addr, length);
+ }
+#endif // GOOGLE_CUDA
+
+ if (mr == nullptr) {
+ return errors::Unavailable("Cannot find pinned memory region");
+ }
+
+ decltype(clients_)::iterator iter;
+ bool success;
+ {
+ mutex_lock l(client_mu_);
+ std::tie(iter, success) = clients_.insert(
+ std::make_pair(std::make_pair(remote_mr.host(), remote_mr.port()),
+ RdmaEndpointPtr(nullptr, EndpointDeleter)));
+ if (success || iter->second.get() == nullptr) {
+ TF_RETURN_IF_ERROR(
+ CreateEndpoint(remote_mr.host(), remote_mr.port(), iter->second));
+ }
+ }
+ rdma_cm_id* id = iter->second.get();
+
+ uint64_t start = Env::Default()->NowMicros();
+
+ if (rdma_post_read(id, nullptr, buffer->data(), buffer->size(), mr, 0,
+ remote_mr.addr(), remote_mr.rkey())) {
+ return errors::Unavailable(strerror(errno), ": ", "rdma_post_read failed");
+ }
+
+ ibv_send_wr wr = {};
+ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+ wr.imm_data = htonl(remote_mr.tensor_key());
+ wr.send_flags = IBV_SEND_FENCE | IBV_SEND_SIGNALED;
+ ibv_send_wr* bad_wr;
+ if (ibv_post_send(id->qp, &wr, &bad_wr)) {
+ return errors::Unavailable(strerror(errno), ": ", "ibv_post_send failed");
+ }
+
+ ibv_wc wc = {};
+ int ret = rdma_get_send_comp(id, &wc);
+ if (ret < 0 || wc.status) {
+ return errors::Unavailable(ibv_wc_status_str(wc.status));
+ }
+
+#if GOOGLE_CUDA
+ if (host_copy.NumElements() > 0) {
+ Status s;
+ Notification n;
+ GPUUtil::CopyCPUTensorToGPU(&host_copy, device_context, device, tensor,
+ [&s, &n](const Status& status) {
+ s.Update(status);
+ n.Notify();
+ });
+ n.WaitForNotification();
+ if (!s.ok()) {
+ return s;
+ }
+ }
+#endif // GOOGLE_CUDA
+
+ uint64_t end = Env::Default()->NowMicros();
+
+ VLOG(2) << "RDMA from remote memory region " << remote_mr.rkey()
+ << " of size " << buffer->size() << " with tensor key "
+ << remote_mr.tensor_key() << " took " << (end - start) << " micros";
+
+ uint64_t checksum = 0;
+ if (VLOG_IS_ON(2)) {
+#ifdef GOOGLE_CUDA
+ if (device->tensorflow_gpu_device_info() && (!on_host)) {
+ if (host_copy.NumElements() > 0) {
+ checksum = GPUUtil::Checksum(device, device_context, host_copy);
+ } else {
+ checksum = GPUUtil::Checksum(device, device_context, *tensor);
+ }
+ } else {
+ checksum = GPUUtil::Checksum(*tensor);
+ }
+ CHECK(checksum == remote_mr.checksum()) << "Checksum mismatch: " << checksum
+ << "!=" << remote_mr.checksum();
+#endif
+ }
+ return Status::OK();
+}
+
+Status GdrMemoryManager::CreateEndpoint(const string& host, const string& port,
+ RdmaEndpointPtr& endpoint) {
+ rdma_addrinfo* addrinfo;
+ rdma_addrinfo hints = {};
+ hints.ai_port_space = RDMA_PS_TCP;
+ if (rdma_getaddrinfo(const_cast<char*>(host.c_str()),
+ const_cast<char*>(port.c_str()), &hints, &addrinfo)) {
+ return errors::InvalidArgument(
+ strerror(errno), ": ", "cannot connect to rdma://", host, ":", port);
+ }
+
+ ibv_qp_init_attr init_attr = {};
+ init_attr.qp_type = IBV_QPT_RC;
+ init_attr.cap.max_recv_wr = 1;
+ init_attr.cap.max_send_wr = 32;
+ init_attr.cap.max_recv_sge = 1;
+ init_attr.cap.max_send_sge = 1;
+
+ rdma_cm_id* id;
+ if (rdma_create_ep(&id, addrinfo, nullptr, &init_attr)) {
+ rdma_freeaddrinfo(addrinfo);
+ return errors::Unavailable(strerror(errno), ": ",
+ "cannot create endpoint to rdma://", host, ":",
+ port);
+ }
+ rdma_freeaddrinfo(addrinfo);
+
+ if (rdma_connect(id, nullptr)) {
+ rdma_destroy_ep(id);
+ return errors::Unavailable(strerror(errno), ": ",
+ "cannot connect to rdma://", host, ":", port);
+ }
+
+ LOG(INFO) << "RDMA endpoint connected to rdma://" << host << ":" << port;
+ endpoint = RdmaEndpointPtr(id, EndpointDeleter);
+ return Status::OK();
+}
+
+ibv_mr* GdrMemoryManager::FindMemoryRegion(void* addr, size_t length) {
+ if (length == 0) return nullptr;
+ mutex_lock l(alloc_mu_);
+ auto iter = std::upper_bound(mrs_.begin(), mrs_.end(), addr, &Comparator);
+ if (iter == std::end(mrs_) || iter->get()->addr > addr) {
+ return nullptr;
+ } else {
+ return iter->get();
+ }
+}
+
+void GdrMemoryManager::InsertMemoryRegion(void* addr, size_t length) {
+ if (length == 0) return;
+ ibv_mr* mr = rdma_reg_read(listening_.get(), addr, length);
+ if (mr != nullptr) {
+ mutex_lock l(alloc_mu_);
+ auto iter = std::upper_bound(mrs_.begin(), mrs_.end(), addr, &Comparator);
+ mrs_.insert(iter, {mr, &MRDeleter});
+ } else {
+ LOG(WARNING) << "Cannot register memory region";
+ }
+}
+
+void GdrMemoryManager::EvictMemoryRegion(void* addr, size_t length) {
+ if (length == 0) return;
+ mutex_lock l(alloc_mu_);
+ auto iter = std::upper_bound(mrs_.begin(), mrs_.end(), addr, &Comparator);
+ if (iter != std::end(mrs_) && iter->get()->addr == addr) {
+ mrs_.erase(iter);
+ } else {
+ LOG(WARNING) << "Failed to de-register memory region";
+ }
+}
+
+} // namespace
+
+RemoteMemoryManager* CreateRemoteMemoryManager(const string& host,
+ const string& port) {
+ return new GdrMemoryManager(host, port);
+}
+
+} // namespace tensorflow
+
+#endif // TENSORFLOW_USE_GDR
diff --git a/tensorflow/contrib/gdr/gdr_memory_manager.h b/tensorflow/contrib/gdr/gdr_memory_manager.h
new file mode 100644
index 0000000000..7e9fe01e97
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_memory_manager.h
@@ -0,0 +1,63 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef GDR_MEMORY_MANAGER_H_
+#define GDR_MEMORY_MANAGER_H_
+
+#include "tensorflow/core/lib/core/status.h"
+
+namespace google {
+namespace protobuf {
+class Any;
+}
+}
+
+namespace tensorflow {
+
+class Device;
+class DeviceContext;
+class Tensor;
+
+// Abstract interface that handles out-of-band tensor transport.
+//
+// The transport options are encoded into a protocol buffer and transmitted via
+// some other communication channels like RPC.
+// See RecvTensorRequest in tensorflow/core/protobuf/worker.proto
+class RemoteMemoryManager {
+ public:
+ virtual ~RemoteMemoryManager() {}
+ virtual Status Init() = 0;
+ virtual void Run() = 0;
+ virtual void Stop() = 0;
+
+ // Encodes the tensor information to an arbitrary protocol buffer
+ // The protocol buffer needs to be transmitted via some other channel
+ virtual Status TransportOptionsFromTensor(
+ ::google::protobuf::Any* mutable_transport_options, const Tensor& tensor,
+ Device* device, DeviceContext* device_context, bool on_host) = 0;
+
+ // Retrieve the tensor from the encoded protocol buffer
+ // Note that the tensor has to be allocated, but not initialized
+ virtual Status TensorFromTransportOptions(
+ Tensor* tensor, const ::google::protobuf::Any& transport_options,
+ Device* device, DeviceContext* device_context, bool on_host) = 0;
+};
+
+RemoteMemoryManager* CreateRemoteMemoryManager(const string& host,
+ const string& port);
+
+} // namespace tensorflow
+
+#endif // GDR_MEMORY_MANAGER_H_
diff --git a/tensorflow/contrib/gdr/gdr_rendezvous_mgr.cc b/tensorflow/contrib/gdr/gdr_rendezvous_mgr.cc
new file mode 100644
index 0000000000..259ee8817d
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_rendezvous_mgr.cc
@@ -0,0 +1,201 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/contrib/gdr/gdr_rendezvous_mgr.h"
+
+#include "google/protobuf/any.pb.h"
+#include "tensorflow/contrib/gdr/gdr_memory_manager.h"
+#include "tensorflow/core/common_runtime/device.h"
+#include "tensorflow/core/common_runtime/device_mgr.h"
+#include "tensorflow/core/common_runtime/process_util.h"
+#include "tensorflow/core/distributed_runtime/tensor_coding.h"
+#include "tensorflow/core/distributed_runtime/worker_cache.h"
+#include "tensorflow/core/distributed_runtime/worker_interface.h"
+#include "tensorflow/core/framework/types.h"
+#include "tensorflow/core/lib/core/errors.h"
+#include "tensorflow/core/lib/strings/numbers.h"
+#include "tensorflow/core/lib/strings/str_util.h"
+#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/macros.h"
+#include "tensorflow/core/platform/types.h"
+
+namespace tensorflow {
+
+namespace {
+
+class GdrRecvTensorCall : public BaseRecvTensorCall {
+ public:
+ GdrRecvTensorCall(WorkerInterface* wi, Device* dst_device,
+ RemoteMemoryManager* remote_memory_manager,
+ const Rendezvous::Args& recv_args, int64 step_id,
+ StringPiece key)
+ : wi_(wi),
+ dst_device_(dst_device),
+ remote_memory_manager_(remote_memory_manager),
+ recv_args_(recv_args) {
+ req_.set_step_id(step_id);
+ req_.set_rendezvous_key(key.data(), key.size());
+ }
+
+ ~GdrRecvTensorCall() override {}
+
+ void Start(std::function<void()> recv_done) override {
+ req_.set_dma_ok(true);
+ resp_.InitAlloc(dst_device_, recv_args_.alloc_attrs);
+ StatusCallback cb = [this, recv_done](const Status& s) {
+ bool dma_ok = resp_.metadata().has_transport_options();
+ if (s.ok() && tensor().TotalBytes() > 0 && (!is_dead()) && dma_ok) {
+ auto transport_options = resp_.metadata().transport_options();
+ const bool on_host =
+ (dst_device_->tensorflow_gpu_device_info() == nullptr) ||
+ recv_args_.alloc_attrs.on_host();
+ Status s = remote_memory_manager_->TensorFromTransportOptions(
+ const_cast<Tensor*>(&tensor()), transport_options, dst_device_,
+ recv_args_.device_context, on_host);
+ if (!s.ok()) {
+ mutex_lock l(mu_);
+ status_.Update(s);
+ LOG(ERROR)
+ << "Cannot find pinned memory region from allocator "
+ << dst_device_->GetAllocator(recv_args_.alloc_attrs)->Name();
+ }
+ }
+ if (!s.ok()) {
+ mutex_lock l(mu_);
+ status_.Update(s);
+ }
+ recv_done();
+ };
+ wi_->RecvTensorAsync(&opts_, &req_, &resp_, std::move(cb));
+ }
+
+ void StartAbort(const Status& s) override {
+ {
+ mutex_lock l(mu_);
+ status_.Update(s);
+ }
+ opts_.StartCancel();
+ }
+
+ Status status() const override {
+ mutex_lock l(mu_);
+ return status_;
+ }
+
+ const Tensor& tensor() const { return resp_.tensor(); }
+
+ bool is_dead() const { return resp_.metadata().is_dead(); }
+
+ Device* dst_device() const { return dst_device_; }
+
+ const Rendezvous::Args& recv_args() const { return recv_args_; }
+
+ private:
+ WorkerInterface* wi_;
+ Device* dst_device_;
+ RemoteMemoryManager* remote_memory_manager_;
+ CallOptions opts_;
+ RecvTensorRequest req_;
+ TensorResponse resp_;
+ Rendezvous::Args recv_args_;
+
+ mutable mutex mu_;
+ Status status_ GUARDED_BY(mu_);
+
+ TF_DISALLOW_COPY_AND_ASSIGN(GdrRecvTensorCall);
+};
+
+class GdrRemoteRendezvous : public BaseRemoteRendezvous {
+ public:
+ GdrRemoteRendezvous(const WorkerEnv* env, int64 step_id,
+ RemoteMemoryManager* remote_memory_manager)
+ : BaseRemoteRendezvous(env, step_id),
+ remote_memory_manager_(remote_memory_manager) {}
+
+ protected:
+ void RecvFromRemoteAsync(const Rendezvous::ParsedKey& parsed,
+ const Rendezvous::Args& recv_args,
+ DoneCallback done) override {
+ CHECK(is_initialized());
+
+ string src_worker;
+ string src_rel_device;
+ if (!DeviceNameUtils::SplitDeviceName(parsed.src_device, &src_worker,
+ &src_rel_device)) {
+ Status s = errors::Internal(parsed.src_device,
+ " is invalid remote source device.");
+ done(s, Args(), recv_args, Tensor{}, false);
+ return;
+ }
+
+ WorkerSession* sess = session();
+ WorkerInterface* rwi = sess->worker_cache->CreateWorker(src_worker);
+ if (rwi == nullptr) {
+ Status s = errors::Internal("No worker known as ", src_worker);
+ done(s, Args(), recv_args, Tensor{}, false);
+ return;
+ }
+
+ Device* dst_device;
+ Status s = sess->device_mgr->LookupDevice(parsed.dst_device, &dst_device);
+ if (!s.ok()) {
+ sess->worker_cache->ReleaseWorker(src_worker, rwi);
+ done(s, Args(), recv_args, Tensor{}, false);
+ return;
+ }
+
+ // Prepare a RecvTensor call that can handle being aborted.
+ GdrRecvTensorCall* call =
+ new GdrRecvTensorCall(rwi, dst_device, remote_memory_manager_,
+ recv_args, step_id_, parsed.FullKey());
+
+ // Record "call" in active_ so that it can be aborted cleanly.
+ RegisterCall(call);
+
+ // Start "call".
+ Ref();
+ call->Start([this, call, src_worker, rwi, done]() {
+ // Removes "call" from active_. Prevent StartAbort().
+ DeregisterCall(call);
+ // If StartAbort was called prior to DeregisterCall, then the
+ // current status should be bad.
+ Status s = call->status();
+ done(s, Args(), call->recv_args(), call->tensor(), call->is_dead());
+ session()->worker_cache->ReleaseWorker(src_worker, rwi);
+ delete call;
+ Unref();
+ });
+ }
+
+ private:
+ ~GdrRemoteRendezvous() override {}
+
+ RemoteMemoryManager* remote_memory_manager_;
+
+ TF_DISALLOW_COPY_AND_ASSIGN(GdrRemoteRendezvous);
+};
+
+} // namespace
+
+GdrRendezvousMgr::GdrRendezvousMgr(const WorkerEnv* env,
+ RemoteMemoryManager* remote_memory_manager)
+ : BaseRendezvousMgr(env), remote_memory_manager_(remote_memory_manager) {}
+
+BaseRemoteRendezvous* GdrRendezvousMgr::Create(int64 step_id,
+ const WorkerEnv* worker_env) {
+ return new GdrRemoteRendezvous(worker_env, step_id, remote_memory_manager_);
+}
+
+} // end namespace tensorflow
diff --git a/tensorflow/contrib/gdr/gdr_rendezvous_mgr.h b/tensorflow/contrib/gdr/gdr_rendezvous_mgr.h
new file mode 100644
index 0000000000..7fedd04f54
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_rendezvous_mgr.h
@@ -0,0 +1,42 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef GDR_RENDEZVOUS_MGR_H_
+#define GDR_RENDEZVOUS_MGR_H_
+
+#include "tensorflow/contrib/gdr/gdr_memory_manager.h"
+#include "tensorflow/core/distributed_runtime/base_rendezvous_mgr.h"
+#include "tensorflow/core/distributed_runtime/worker_env.h"
+#include "tensorflow/core/platform/macros.h"
+
+namespace tensorflow {
+
+class GdrRendezvousMgr : public BaseRendezvousMgr {
+ public:
+ explicit GdrRendezvousMgr(const WorkerEnv* env,
+ RemoteMemoryManager* remote_memory_manager);
+
+ protected:
+ BaseRemoteRendezvous* Create(int64 step_id, const WorkerEnv* worker_env);
+
+ private:
+ RemoteMemoryManager* remote_memory_manager_; // Not owned
+
+ TF_DISALLOW_COPY_AND_ASSIGN(GdrRendezvousMgr);
+};
+
+} // end namespace tensorflow
+
+#endif // GDR_RENDEZVOUS_MGR_H_
diff --git a/tensorflow/contrib/gdr/gdr_server_lib.cc b/tensorflow/contrib/gdr/gdr_server_lib.cc
new file mode 100644
index 0000000000..ae6a612ecf
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_server_lib.cc
@@ -0,0 +1,127 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/contrib/gdr/gdr_server_lib.h"
+#include "tensorflow/contrib/gdr/gdr_memory_manager.h"
+#include "tensorflow/contrib/gdr/gdr_rendezvous_mgr.h"
+#include "tensorflow/contrib/gdr/gdr_worker.h"
+
+#include "net/grpc/public/include/grpc/support/alloc.h"
+
+namespace tensorflow {
+
+GdrServer::GdrServer(const ServerDef& server_def, Env* env)
+ : GrpcServer(server_def, env) {
+ string host;
+ string port;
+ for (const auto& job : server_def.cluster().job()) {
+ if (job.name() == server_def.job_name()) {
+ auto iter = job.tasks().find(server_def.task_index());
+ if (iter != job.tasks().end()) {
+ const std::vector<string> hostname_port =
+ str_util::Split(iter->second, ':');
+ if (hostname_port.size() == 2) {
+ host = hostname_port[0];
+ port = hostname_port[1];
+ }
+ }
+ }
+ }
+ remote_memory_manager_ = std::unique_ptr<RemoteMemoryManager>(
+ CreateRemoteMemoryManager(host, port));
+}
+
+GdrServer::~GdrServer() {}
+
+Status GdrServer::Init() {
+ RendezvousMgrCreationFunction rendezvous_mgr_func =
+ [this](const WorkerEnv* env) {
+ return new GdrRendezvousMgr(env, remote_memory_manager_.get());
+ };
+ WorkerCreationFunction worker_func = [this](WorkerEnv* env) {
+ return std::unique_ptr<GdrWorker>(
+ new GdrWorker(env, remote_memory_manager_.get()));
+ };
+ TF_RETURN_IF_ERROR(
+ GrpcServer::Init(nullptr, rendezvous_mgr_func, worker_func));
+
+ return remote_memory_manager_->Init();
+}
+
+Status GdrServer::Start() {
+ {
+ mutex_lock l(mu_);
+ gdr_thread_.reset(worker_env()->env->StartThread(
+ ThreadOptions(), "TF_gdr_service",
+ [this] { remote_memory_manager_->Run(); }));
+ }
+ return GrpcServer::Start();
+}
+
+Status GdrServer::Stop() {
+ TF_RETURN_IF_ERROR(GrpcServer::Stop());
+ remote_memory_manager_->Stop();
+ return Status::OK();
+}
+
+Status GdrServer::Join() {
+ {
+ mutex_lock l(mu_);
+ gdr_thread_.reset();
+ }
+ return GrpcServer::Join();
+}
+
+/* static */
+Status GdrServer::Create(const ServerDef& server_def, Env* env,
+ std::unique_ptr<ServerInterface>* out_server) {
+ std::unique_ptr<GdrServer> ret(
+ new GdrServer(server_def, env == nullptr ? Env::Default() : env));
+ TF_RETURN_IF_ERROR(ret->Init());
+ *out_server = std::move(ret);
+ return Status::OK();
+}
+
+namespace {
+
+class GdrServerFactory : public ServerFactory {
+ public:
+ bool AcceptsOptions(const ServerDef& server_def) override {
+ return server_def.protocol() == "grpc+gdr";
+ }
+
+ Status NewServer(const ServerDef& server_def,
+ std::unique_ptr<ServerInterface>* out_server) override {
+ return GdrServer::Create(server_def, Env::Default(), out_server);
+ }
+};
+
+// Registers a `ServerFactory` for `GdrServer` instances.
+class GdrServerRegistrar {
+ public:
+ GdrServerRegistrar() {
+ gpr_allocation_functions alloc_fns;
+ memset(&alloc_fns, 0, sizeof(alloc_fns));
+ alloc_fns.malloc_fn = port::Malloc;
+ alloc_fns.realloc_fn = port::Realloc;
+ alloc_fns.free_fn = port::Free;
+ gpr_set_allocation_functions(alloc_fns);
+ ServerFactory::Register("GDR_SERVER", new GdrServerFactory());
+ }
+};
+static GdrServerRegistrar registrar;
+
+} // namespace
+} // namespace tensorflow
diff --git a/tensorflow/contrib/gdr/gdr_server_lib.h b/tensorflow/contrib/gdr/gdr_server_lib.h
new file mode 100644
index 0000000000..d6c40d429e
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_server_lib.h
@@ -0,0 +1,52 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef GDR_SERVER_LIB_H_
+#define GDR_SERVER_LIB_H_
+
+#include "tensorflow/contrib/gdr/gdr_memory_manager.h"
+#include "tensorflow/core/distributed_runtime/rpc/grpc_server_lib.h"
+
+namespace tensorflow {
+
+class GdrServer : public GrpcServer {
+ protected:
+ GdrServer(const ServerDef& server_def, Env* env);
+
+ public:
+ static Status Create(const ServerDef& server_def, Env* env,
+ std::unique_ptr<ServerInterface>* out_server);
+
+ virtual ~GdrServer() override;
+
+ virtual Status Start() override;
+
+ virtual Status Stop() override;
+
+ virtual Status Join() override;
+
+ protected:
+ Status Init();
+
+ private:
+ mutex mu_;
+
+ std::unique_ptr<RemoteMemoryManager> remote_memory_manager_;
+ std::unique_ptr<Thread> gdr_thread_ GUARDED_BY(mu_);
+};
+
+} // namespace tensorflow
+
+#endif // GDR_SERVER_LIB_H_
diff --git a/tensorflow/contrib/gdr/gdr_worker.cc b/tensorflow/contrib/gdr/gdr_worker.cc
new file mode 100644
index 0000000000..0bff0aff6d
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_worker.cc
@@ -0,0 +1,146 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/contrib/gdr/gdr_worker.h"
+
+#include "tensorflow/core/common_runtime/device.h"
+#include "tensorflow/core/common_runtime/device_mgr.h"
+#include "tensorflow/core/common_runtime/dma_helper.h"
+#if GOOGLE_CUDA
+#include "tensorflow/core/common_runtime/gpu/gpu_util.h"
+#endif // GOOGLE_CUDA
+#include "tensorflow/core/common_runtime/process_util.h"
+#include "tensorflow/core/common_runtime/step_stats_collector.h"
+#include "tensorflow/core/distributed_runtime/graph_mgr.h"
+#include "tensorflow/core/distributed_runtime/rendezvous_mgr_interface.h"
+#include "tensorflow/core/distributed_runtime/rpc/grpc_call.h"
+#include "tensorflow/core/distributed_runtime/rpc/grpc_tensor_coding.h"
+#include "tensorflow/core/distributed_runtime/rpc/grpc_util.h"
+#include "tensorflow/core/distributed_runtime/worker.h"
+#include "tensorflow/core/distributed_runtime/worker_cache.h"
+#include "tensorflow/core/distributed_runtime/worker_session.h"
+#include "tensorflow/core/framework/cancellation.h"
+#include "tensorflow/core/framework/tensor.h"
+#include "tensorflow/core/lib/core/errors.h"
+#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/tracing.h"
+
+namespace tensorflow {
+
+GdrWorker::GdrWorker(WorkerEnv* worker_env,
+ RemoteMemoryManager* remote_memory_manager)
+ : GrpcWorker(worker_env), remote_memory_manager_(remote_memory_manager) {}
+
+void GdrWorker::GrpcRecvTensorAsync(CallOptions* opts,
+ const RecvTensorRequest* request,
+ ::grpc::ByteBuffer* response,
+ StatusCallback done) {
+ const int64 step_id = request->step_id();
+ const string& key = request->rendezvous_key();
+ TRACEPRINTF("RecvTensor: %lld %s", step_id, key.c_str());
+ Rendezvous::ParsedKey parsed;
+ Status s = Rendezvous::ParseKey(key, &parsed);
+ Device* src_dev = nullptr;
+ if (s.ok()) {
+ s = PrepareRecvTensor(parsed, &src_dev);
+ }
+ if (!s.ok()) {
+ done(s);
+ return;
+ }
+
+ // Request the tensor associated with the rendezvous key. Any time
+ // while waiting for the tensor to be produced, up until the start
+ // of execution of the callback lambda body below, an RPC
+ // cancellation should abort the rendezvous.
+ opts->SetCancelCallback([this, step_id]() { AbortStep(step_id); });
+ const bool dma_ok = request->dma_ok();
+ env_->rendezvous_mgr->RecvLocalAsync(
+ step_id, parsed,
+ [this, opts, response, done, src_dev, dma_ok](
+ const Status& status, const Rendezvous::Args& send_args,
+ const Rendezvous::Args&, const Tensor& val, const bool is_dead) {
+ opts->ClearCancelCallback();
+ if (status.ok()) {
+ // DMA can only be used for Tensors that do not fall into
+ // the following three odd edge cases: 1) a zero-size
+ // buffer, 2) a dead tensor which has an uninit value, and
+ // 3) the tensor has the on_host allocation attribute,
+ // i.e. it's in CPU RAM *independent of its assigned
+ // device type*.
+ const bool on_host =
+ (src_dev->tensorflow_gpu_device_info() == nullptr) ||
+ send_args.alloc_attrs.on_host();
+ if (val.TotalBytes() > 0 && (!is_dead) &&
+ DMAHelper::CanUseDMA(&val) && dma_ok) {
+ // DMA cases.
+ RecvTensorResponse proto;
+ auto transport_options = proto.mutable_transport_options();
+ Status s = remote_memory_manager_->TransportOptionsFromTensor(
+ transport_options, val, src_dev, send_args.device_context,
+ on_host);
+ if (s.ok()) {
+ proto.set_is_dead(is_dead);
+ proto.set_send_start_micros(Env::Default()->NowMicros());
+ TensorProto* tensor_proto = proto.mutable_tensor();
+ tensor_proto->set_dtype(val.dtype());
+ val.shape().AsProto(tensor_proto->mutable_tensor_shape());
+ grpc::EncodeRecvTensorResponseToByteBuffer(proto, response);
+ done(Status::OK());
+ return;
+ } else {
+ done(s);
+ return;
+ }
+ } else {
+ // Non-DMA cases.
+ if (src_dev->tensorflow_gpu_device_info() && (!on_host)) {
+#if GOOGLE_CUDA
+ const DeviceContext* send_dev_context = send_args.device_context;
+ AllocatorAttributes alloc_attrs;
+ alloc_attrs.set_gpu_compatible(true);
+ alloc_attrs.set_on_host(true);
+ Allocator* alloc = src_dev->GetAllocator(alloc_attrs);
+ Tensor* copy = new Tensor(alloc, val.dtype(), val.shape());
+ CHECK(send_dev_context)
+ << "send dev name: " << src_dev->name()
+ << " gpu_info: " << src_dev->tensorflow_gpu_device_info();
+ // "val" is on a GPU. Uses GPUUtil to fill the response proto.
+ StatusCallback copy_ready = [response, done, copy,
+ is_dead](const Status& s) {
+ // The value is now ready to be returned on the wire.
+ grpc::EncodeTensorToByteBuffer(is_dead, *copy, response);
+ done(s);
+ delete copy;
+ };
+
+ GPUUtil::CopyGPUTensorToCPU(src_dev, send_dev_context, &val, copy,
+ copy_ready);
+#else
+ done(errors::Internal("No GPU device in process"));
+#endif // GOOGLE_CUDA
+ } else {
+ grpc::EncodeTensorToByteBuffer(is_dead, val, response);
+ done(Status::OK());
+ }
+ }
+ } else {
+ // !s.ok()
+ done(status);
+ }
+ });
+}
+
+} // namespace tensorflow
diff --git a/tensorflow/contrib/gdr/gdr_worker.h b/tensorflow/contrib/gdr/gdr_worker.h
new file mode 100644
index 0000000000..a30b7baaed
--- /dev/null
+++ b/tensorflow/contrib/gdr/gdr_worker.h
@@ -0,0 +1,45 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef GDR_WORKER_H_
+#define GDR_WORKER_H_
+
+#include "tensorflow/contrib/gdr/gdr_memory_manager.h"
+
+#include "tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h"
+
+namespace tensorflow {
+
+class GdrWorker : public GrpcWorker {
+ public:
+ GdrWorker(WorkerEnv* env, RemoteMemoryManager* remote_memory_manager);
+
+ // Serve the RecvTensorRequest but omit the tensor content and transmit it
+ // out-of-band using GPU Direct RDMA whenever possible.
+ // If it's not possible, it falls back to gRPC in-band tensor transport by
+ // encoding the tensor content into the grpc::ByteBuffer.
+ // The RecvTensorResponse will carry the necessary information for RDMA.
+ virtual void GrpcRecvTensorAsync(CallOptions* opts,
+ const RecvTensorRequest* request,
+ ::grpc::ByteBuffer* response,
+ StatusCallback done) override;
+
+ private:
+ RemoteMemoryManager* remote_memory_manager_; // Not owned
+};
+
+} // namespace tensorflow
+
+#endif // GDR_WORKER_H_