aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/nccl/kernels/nccl_manager_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/nccl/kernels/nccl_manager_test.cc')
-rw-r--r--tensorflow/contrib/nccl/kernels/nccl_manager_test.cc285
1 files changed, 285 insertions, 0 deletions
diff --git a/tensorflow/contrib/nccl/kernels/nccl_manager_test.cc b/tensorflow/contrib/nccl/kernels/nccl_manager_test.cc
new file mode 100644
index 0000000000..b53cb82440
--- /dev/null
+++ b/tensorflow/contrib/nccl/kernels/nccl_manager_test.cc
@@ -0,0 +1,285 @@
+/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifdef GOOGLE_CUDA
+
+#include <algorithm>
+#include <vector>
+
+#include "tensorflow/contrib/nccl/kernels/nccl_manager.h"
+#include "tensorflow/core/common_runtime/device_factory.h"
+#include "tensorflow/core/common_runtime/gpu/gpu_device.h"
+#include "tensorflow/core/framework/tensor_testutil.h"
+#include "tensorflow/core/lib/core/status_test_util.h"
+#include "tensorflow/core/platform/test.h"
+
+namespace tensorflow {
+
+static std::vector<BaseGPUDevice*> GetGPUDevices() {
+ std::vector<Device*> devices;
+ SessionOptions session_options;
+ session_options.env = Env::Default();
+ Status s = DeviceFactory::GetFactory(DEVICE_GPU)
+ ->AddDevices(session_options, "", &devices);
+ TF_CHECK_OK(s);
+ std::vector<BaseGPUDevice*> gpus;
+ for (Device* d : devices) {
+ if (d->device_type() == "GPU") {
+ gpus.push_back(static_cast<BaseGPUDevice*>(d));
+ } else {
+ delete d;
+ }
+ }
+ return gpus;
+}
+
+class NcclManagerTest : public ::testing::Test {
+ protected:
+ static void SetUpTestCase() {
+ setenv("NCCL_DEBUG", "INFO", 1 /* replace */);
+ devices = new std::vector<BaseGPUDevice*>(GetGPUDevices());
+ CHECK(!devices->empty());
+ LOG(ERROR) << "Running test with " << devices->size() << " gpus";
+ }
+ static void TearDownTestCase() {
+ for (auto device : *devices) delete device;
+ delete devices;
+ }
+
+ static Allocator* gpu_allocator(BaseGPUDevice* device) {
+ return device->GetStepAllocator(AllocatorAttributes(),
+ nullptr /* step_resource_manager */);
+ }
+
+ static std::vector<BaseGPUDevice*>* devices;
+
+ template <typename Scalar>
+ perftools::gputools::DeviceMemory<Scalar> AsDeviceMemory(
+ const Scalar* cuda_memory) {
+ perftools::gputools::DeviceMemoryBase wrapped(
+ const_cast<Scalar*>(cuda_memory));
+ perftools::gputools::DeviceMemory<Scalar> typed(wrapped);
+ return typed;
+ }
+
+ // A single all-reduce to apply.
+ struct TestCase {
+ string key;
+ std::vector<Tensor> ins;
+ std::vector<Tensor> outs;
+ Tensor expected;
+
+ mutex mu;
+ Status final_status;
+ int num_completed = 0;
+ };
+
+ TestCase* MakeTestCase(int num_ranks, ncclRedOp_t reduction_op,
+ TensorShape shape, float value_offset) {
+ TestCase* test_case = new TestCase();
+ test_case->expected = Tensor(DT_FLOAT, shape);
+ if (reduction_op == ncclProd) {
+ test::FillFn<float>(&test_case->expected, [](int) { return 1; });
+ } else if (reduction_op == ncclSum) {
+ test::FillFn<float>(&test_case->expected, [](int) { return 0; });
+ } else if (reduction_op == ncclMax) {
+ test::FillFn<float>(&test_case->expected, [](int) {
+ return -1 * std::numeric_limits<float>::max();
+ });
+ } else if (reduction_op == ncclMin) {
+ test::FillFn<float>(&test_case->expected, [](int) {
+ return std::numeric_limits<float>::max();
+ });
+ } else {
+ LOG(FATAL) << "Invalid reduction_op " << reduction_op;
+ }
+
+ int mult = 1;
+ for (int i = 0; i < num_ranks; ++i) {
+ auto* device = devices->at(i % devices->size());
+ auto* stream = device->tensorflow_gpu_device_info()->stream;
+
+ Tensor in_cpu(DT_FLOAT, shape);
+ test::FillFn<float>(&in_cpu, [mult, value_offset](int index) {
+ return value_offset + (index + 1) * mult;
+ });
+ for (int j = 0; j < shape.num_elements(); ++j) {
+ auto in_val = in_cpu.flat<float>()(j);
+ auto out_expr = test_case->expected.flat<float>();
+ if (reduction_op == ncclProd) {
+ out_expr(j) *= in_val;
+ } else if (reduction_op == ncclSum) {
+ out_expr(j) += in_val;
+ } else if (reduction_op == ncclMax) {
+ if (in_val > out_expr(j)) {
+ out_expr(j) = in_val;
+ }
+ } else if (reduction_op == ncclMin) {
+ if (in_val < out_expr(j)) {
+ out_expr(j) = in_val;
+ }
+ }
+ }
+
+ mult *= 10;
+ test_case->ins.emplace_back(gpu_allocator(device), DT_FLOAT, shape);
+ test_case->outs.emplace_back(gpu_allocator(device), DT_FLOAT, shape);
+
+ const Tensor& in_gpu = test_case->ins.back();
+ auto in_gpu_mem = AsDeviceMemory(in_gpu.flat<float>().data());
+ stream->ThenMemcpy(&in_gpu_mem, in_cpu.flat<float>().data(),
+ in_cpu.TotalBytes());
+ }
+ return test_case;
+ }
+
+ NcclManager::DoneCallback CreateDoneCallback(TestCase* test_case) {
+ return [this, test_case](Status s) {
+ mutex_lock l(test_case->mu);
+ ++test_case->num_completed;
+ test_case->final_status.Update(s);
+ };
+ }
+
+ void VerifyResults(const string& case_label, TestCase* test_case) {
+ // Wait for the done callback to be called.
+ {
+ test_case->mu.lock();
+ while (test_case->num_completed != test_case->outs.size()) {
+ test_case->mu.unlock();
+ Env::Default()->SleepForMicroseconds(10);
+ test_case->mu.lock();
+ }
+ test_case->mu.unlock();
+ }
+ // Copy memory to host and verify.
+ for (int i = 0; i < test_case->outs.size(); ++i) {
+ auto* device = devices->at(i % devices->size());
+ auto* stream = device->tensorflow_gpu_device_info()->stream;
+ const Tensor& out_gpu = test_case->outs[i];
+ Tensor out_cpu(DT_FLOAT, out_gpu.shape());
+ auto out_gpu_mem = AsDeviceMemory(out_gpu.flat<float>().data());
+ stream->ThenMemcpy(out_cpu.flat<float>().data(), out_gpu_mem,
+ out_cpu.TotalBytes());
+ stream->BlockHostUntilDone();
+ test::ExpectTensorEqual<float>(test_case->expected, out_cpu);
+ }
+ }
+};
+std::vector<BaseGPUDevice*>* NcclManagerTest::devices = nullptr;
+
+// Test basic sum reduction.
+TEST_F(NcclManagerTest, BasicSumReduction) {
+ const int num_ranks = 3;
+
+ for (int op = 0; op < 4; ++op) {
+ ncclRedOp_t reduction_op = static_cast<ncclRedOp_t>(op);
+ std::unique_ptr<TestCase> test_case(
+ MakeTestCase(num_ranks, reduction_op, TensorShape({2, 3}), 0));
+ for (int device_num = 0; device_num < num_ranks; ++device_num) {
+ auto* device = devices->at(device_num % devices->size());
+ auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
+ auto* stream = device->tensorflow_gpu_device_info()->stream;
+ NcclManager::instance()->AddToAllReduce(
+ num_ranks, "allreduce", reduction_op, device->executor(), event_mgr,
+ stream, &test_case->ins[device_num], &test_case->outs[device_num],
+ CreateDoneCallback(test_case.get()));
+ }
+
+ LOG(ERROR) << "Verifying results";
+ VerifyResults("test_case", test_case.get());
+ }
+}
+
+// Same as the Basic test, but with multiple threads launching parts of many
+// reductions.
+//
+// Testing the multi-rank execution is currently reduced as it can hang when run
+// with num_ranks > devices->size(), for some GPUs (e.g. K20m).
+// To test the higher settings, increase num_ranks,
+// num_collectives_per_iteration and time_limit_micros.
+TEST_F(NcclManagerTest, MultipleCallers) {
+ const int num_ranks = 1; // 2;
+ const int num_collectives_per_iteration = 1; // 1000;
+ const int num_threads = 3;
+ const int time_limit_micros = 1; // 60 * 30 * 1000 * 1000;
+
+ int64 start = Env::Default()->NowMicros();
+ srand(Env::Default()->NowMicros());
+
+ for (;;) {
+ std::vector<std::pair<int, int>> case_and_device_num;
+ std::vector<std::unique_ptr<TestCase>> test_cases;
+ for (int i = 0; i < num_collectives_per_iteration; ++i) {
+ test_cases.emplace_back(
+ MakeTestCase(num_ranks, ncclSum,
+ TensorShape({100, i % 5 + 1, i % 3 + 1}), i + 0.1 * i));
+ for (int j = 0; j < num_ranks; ++j) {
+ case_and_device_num.emplace_back(i, j);
+ }
+ }
+
+ for (int i = 0; i < num_ranks; ++i) {
+ auto* device = devices->at(i % devices->size());
+ auto* stream = device->tensorflow_gpu_device_info()->stream;
+ stream->BlockHostUntilDone();
+ }
+
+ std::random_shuffle(case_and_device_num.begin(), case_and_device_num.end());
+
+ mutex mu; // guards case_and_device_num.
+ std::unique_ptr<thread::ThreadPool> pool(
+ new thread::ThreadPool(Env::Default(), "test", num_threads));
+ const int to_schedule = case_and_device_num.size();
+ for (int i = 0; i < to_schedule; ++i) {
+ auto fn = [&]() {
+ int device_num;
+ int test_num;
+ {
+ mutex_lock l(mu);
+ test_num = case_and_device_num.back().first;
+ device_num = case_and_device_num.back().second;
+ case_and_device_num.pop_back();
+ }
+ auto* device = devices->at(device_num % devices->size());
+ auto* event_mgr = device->tensorflow_gpu_device_info()->event_mgr;
+ auto* stream = device->tensorflow_gpu_device_info()->stream;
+ TestCase* test_case = test_cases[test_num].get();
+ NcclManager::instance()->AddToAllReduce(
+ num_ranks, strings::StrCat("allreduce", test_num), ncclSum,
+ device->executor(), event_mgr, stream, &test_case->ins[device_num],
+ &test_case->outs[device_num], CreateDoneCallback(test_case));
+ };
+ pool->Schedule(fn);
+ }
+ pool.reset(); // wait for all work to be scheduled.
+
+ LOG(ERROR) << "Verifying results for " << num_collectives_per_iteration
+ << " collectives";
+ for (int i = 0; i < test_cases.size(); ++i) {
+ VerifyResults(strings::StrCat("collective", i), test_cases[i].get());
+ }
+
+ int64 delta = Env::Default()->NowMicros() - start;
+ if (delta > time_limit_micros) {
+ LOG(ERROR) << "Ran for " << delta << " quitting";
+ break;
+ }
+ }
+}
+
+} // namespace tensorflow
+
+#endif // GOOGLE_CUDA