diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/common/alarm_cpp_test.cc (renamed from test/cpp/common/alarm_test.cc) | 29 | ||||
-rw-r--r-- | test/cpp/end2end/hybrid_end2end_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 143 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 13 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 3 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 7 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 24 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.h | 7 | ||||
-rw-r--r-- | test/cpp/qps/worker.cc | 2 | ||||
-rw-r--r-- | test/cpp/util/time_test.cc | 4 |
10 files changed, 191 insertions, 43 deletions
diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_cpp_test.cc index 09df6852a5..4745ef14ec 100644 --- a/test/cpp/common/alarm_test.cc +++ b/test/cpp/common/alarm_cpp_test.cc @@ -35,58 +35,47 @@ #include <grpc++/completion_queue.h> #include <gtest/gtest.h> -#include <grpc++/completion_queue.h> #include "test/core/util/test_config.h" namespace grpc { namespace { -class TestTag : public CompletionQueueTag { - public: - TestTag() : tag_(0) {} - TestTag(intptr_t tag) : tag_(tag) {} - bool FinalizeResult(void** tag, bool* status) { return true; } - intptr_t tag() { return tag_; } - - private: - intptr_t tag_; -}; - TEST(AlarmTest, RegularExpiry) { CompletionQueue cq; - TestTag input_tag(1618033); - Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), &input_tag); + void* junk = reinterpret_cast<void*>(1618033); + Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), junk); - TestTag* output_tag; + void* output_tag; bool ok; const CompletionQueue::NextStatus status = cq.AsyncNext( (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2)); EXPECT_EQ(status, CompletionQueue::GOT_EVENT); EXPECT_TRUE(ok); - EXPECT_EQ(output_tag->tag(), input_tag.tag()); + EXPECT_EQ(junk, output_tag); } TEST(AlarmTest, Cancellation) { CompletionQueue cq; - TestTag input_tag(1618033); - Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), &input_tag); + void* junk = reinterpret_cast<void*>(1618033); + Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), junk); alarm.Cancel(); - TestTag* output_tag; + void* output_tag; bool ok; const CompletionQueue::NextStatus status = cq.AsyncNext( (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); EXPECT_EQ(status, CompletionQueue::GOT_EVENT); EXPECT_FALSE(ok); - EXPECT_EQ(output_tag->tag(), input_tag.tag()); + EXPECT_EQ(junk, output_tag); } } // namespace } // namespace grpc int main(int argc, char** argv) { + grpc_test_init(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index f8405627f9..c72e20628f 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test { } // Create a separate cq for each potential handler. for (int i = 0; i < 5; i++) { - cqs_.push_back(std::move(builder.AddCompletionQueue())); + cqs_.push_back(builder.AddCompletionQueue()); } server_ = builder.BuildAndStart(); } diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 4e8860e843..e246c0b0e2 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -45,6 +45,7 @@ #include <grpc/support/time.h> #include <gtest/gtest.h> +#include "src/core/surface/api_trace.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -54,6 +55,11 @@ using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; using std::chrono::system_clock; +const int kNumThreads = 100; // Number of threads +const int kNumAsyncSendThreads = 2; +const int kNumAsyncReceiveThreads = 50; +const int kNumRpcs = 1000; // Number of RPCs per thread + namespace grpc { namespace testing { @@ -84,7 +90,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { MaybeEchoDeadline(context, request, response); if (request->has_param() && request->param().client_cancel_after_us()) { { - std::unique_lock<std::mutex> lock(mu_); + unique_lock<mutex> lock(mu_); signal_client_ = true; } while (!context->IsCancelled()) { @@ -149,13 +155,13 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { } bool signal_client() { - std::unique_lock<std::mutex> lock(mu_); + unique_lock<mutex> lock(mu_); return signal_client_; } private: bool signal_client_; - std::mutex mu_; + mutex mu_; }; class TestServiceImplDupPkg @@ -168,11 +174,10 @@ class TestServiceImplDupPkg } }; -class End2endTest : public ::testing::Test { - protected: - End2endTest() : kMaxMessageSize_(8192) {} - - void SetUp() GRPC_OVERRIDE { +class CommonStressTest { + public: + CommonStressTest() : kMaxMessageSize_(8192) {} + void SetUp() { int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; // Setup server @@ -185,15 +190,15 @@ class End2endTest : public ::testing::Test { builder.RegisterService(&dup_pkg_service_); server_ = builder.BuildAndStart(); } - - void TearDown() GRPC_OVERRIDE { server_->Shutdown(); } - + void TearDown() { server_->Shutdown(); } void ResetStub() { std::shared_ptr<Channel> channel = CreateChannel(server_address_.str(), InsecureChannelCredentials()); stub_ = grpc::testing::EchoTestService::NewStub(channel); } + grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } + private: std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<Server> server_; std::ostringstream server_address_; @@ -202,6 +207,16 @@ class End2endTest : public ::testing::Test { TestServiceImplDupPkg dup_pkg_service_; }; +class End2endTest : public ::testing::Test { + protected: + End2endTest() {} + void SetUp() GRPC_OVERRIDE { common_.SetUp(); } + void TearDown() GRPC_OVERRIDE { common_.TearDown(); } + void ResetStub() { common_.ResetStub(); } + + CommonStressTest common_; +}; + static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { EchoRequest request; EchoResponse response; @@ -216,17 +231,115 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { } TEST_F(End2endTest, ThreadStress) { - ResetStub(); + common_.ResetStub(); std::vector<std::thread*> threads; - for (int i = 0; i < 100; ++i) { - threads.push_back(new std::thread(SendRpc, stub_.get(), 1000)); + for (int i = 0; i < kNumThreads; ++i) { + threads.push_back(new std::thread(SendRpc, common_.GetStub(), kNumRpcs)); } - for (int i = 0; i < 100; ++i) { + for (int i = 0; i < kNumThreads; ++i) { threads[i]->join(); delete threads[i]; } } +class AsyncClientEnd2endTest : public ::testing::Test { + protected: + AsyncClientEnd2endTest() : rpcs_outstanding_(0) {} + + void SetUp() GRPC_OVERRIDE { common_.SetUp(); } + void TearDown() GRPC_OVERRIDE { + void* ignored_tag; + bool ignored_ok; + while (cq_.Next(&ignored_tag, &ignored_ok)) + ; + common_.TearDown(); + } + + void Wait() { + unique_lock<mutex> l(mu_); + while (rpcs_outstanding_ != 0) { + cv_.wait(l); + } + + cq_.Shutdown(); + } + + struct AsyncClientCall { + EchoResponse response; + ClientContext context; + Status status; + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader; + }; + + void AsyncSendRpc(int num_rpcs) { + for (int i = 0; i < num_rpcs; ++i) { + AsyncClientCall* call = new AsyncClientCall; + EchoRequest request; + request.set_message("Hello: " + std::to_string(i)); + call->response_reader = + common_.GetStub()->AsyncEcho(&call->context, request, &cq_); + call->response_reader->Finish(&call->response, &call->status, + (void*)call); + + unique_lock<mutex> l(mu_); + rpcs_outstanding_++; + } + } + + void AsyncCompleteRpc() { + while (true) { + void* got_tag; + bool ok = false; + if (!cq_.Next(&got_tag, &ok)) break; + AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); + if (!ok) { + gpr_log(GPR_DEBUG, "Error: %d", call->status.error_code()); + } + delete call; + + bool notify; + { + unique_lock<mutex> l(mu_); + rpcs_outstanding_--; + notify = (rpcs_outstanding_ == 0); + } + if (notify) { + cv_.notify_all(); + } + } + } + + CommonStressTest common_; + CompletionQueue cq_; + mutex mu_; + condition_variable cv_; + int rpcs_outstanding_; +}; + +TEST_F(AsyncClientEnd2endTest, ThreadStress) { + common_.ResetStub(); + std::vector<std::thread*> send_threads, completion_threads; + for (int i = 0; i < kNumAsyncReceiveThreads; ++i) { + completion_threads.push_back(new std::thread( + &AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, this)); + } + for (int i = 0; i < kNumAsyncSendThreads; ++i) { + send_threads.push_back( + new std::thread(&AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc, + this, kNumRpcs)); + } + for (int i = 0; i < kNumAsyncSendThreads; ++i) { + send_threads[i]->join(); + delete send_threads[i]; + } + + Wait(); + for (int i = 0; i < kNumAsyncReceiveThreads; ++i) { + completion_threads[i]->join(); + delete completion_threads[i]; + } +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index c70b0303b8..80f6ada409 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -384,5 +384,18 @@ std::unique_ptr<ScenarioResult> RunScenario( delete[] servers; return result; } + +void RunQuit() { + // Get client, server lists + auto workers = get_workers("QPS_WORKERS"); + for (size_t i = 0; i < workers.size(); i++) { + auto stub = WorkerService::NewStub( + CreateChannel(workers[i], InsecureChannelCredentials())); + Void dummy; + grpc::ClientContext ctx; + GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok()); + } +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 2a7cf805e5..3af61f7391 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -70,6 +70,7 @@ std::unique_ptr<ScenarioResult> RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); +void RunQuit(); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index ffc8a83fc5..69fb4d75e8 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -78,6 +78,8 @@ DEFINE_int32(client_core_limit, -1, "Limit on client cores to use"); DEFINE_bool(secure_test, false, "Run a secure test"); +DEFINE_bool(quit, false, "Quit the workers"); + using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; using grpc::testing::ClientType; @@ -90,6 +92,11 @@ namespace grpc { namespace testing { static void QpsDriver() { + if (FLAGS_quit) { + RunQuit(); + return; + } + RpcType rpc_type; GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type)); diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 7e9e05f7ec..9442017ddf 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -103,8 +103,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { public: - explicit WorkerServiceImpl(int server_port) - : acquired_(false), server_port_(server_port) {} + WorkerServiceImpl(int server_port, QpsWorker* worker) + : acquired_(false), server_port_(server_port), worker_(worker) {} Status RunClient(ServerContext* ctx, ServerReaderWriter<ClientStatus, ClientArgs>* stream) @@ -140,6 +140,16 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { return Status::OK; } + Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + } + + worker_->MarkDone(); + return Status::OK; + } + private: // Protect against multiple clients using this worker at once. class InstanceGuard { @@ -250,10 +260,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { std::mutex mu_; bool acquired_; int server_port_; + QpsWorker* worker_; }; QpsWorker::QpsWorker(int driver_port, int server_port) { - impl_.reset(new WorkerServiceImpl(server_port)); + impl_.reset(new WorkerServiceImpl(server_port, this)); + gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0)); char* server_address = NULL; gpr_join_host_port(&server_address, "::", driver_port); @@ -269,5 +281,11 @@ QpsWorker::QpsWorker(int driver_port, int server_port) { QpsWorker::~QpsWorker() {} +bool QpsWorker::Done() const { + return (gpr_atm_acq_load(&done_) != static_cast<gpr_atm>(0)); +} +void QpsWorker::MarkDone() { + gpr_atm_rel_store(&done_, static_cast<gpr_atm>(1)); +} } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h index 27de69fa65..624c182100 100644 --- a/test/cpp/qps/qps_worker.h +++ b/test/cpp/qps/qps_worker.h @@ -36,6 +36,8 @@ #include <memory> +#include <grpc/support/atm.h> + namespace grpc { class Server; @@ -49,9 +51,14 @@ class QpsWorker { explicit QpsWorker(int driver_port, int server_port = 0); ~QpsWorker(); + bool Done() const; + void MarkDone(); + private: std::unique_ptr<WorkerServiceImpl> impl_; std::unique_ptr<Server> server_; + + gpr_atm done_; }; } // namespace testing diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index a1e73e9abe..f42cfe3255 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -56,7 +56,7 @@ namespace testing { static void RunServer() { QpsWorker worker(FLAGS_driver_port, FLAGS_server_port); - while (!got_sigint) { + while (!got_sigint && !worker.Done()) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN))); } diff --git a/test/cpp/util/time_test.cc b/test/cpp/util/time_test.cc index 1e501dfd28..48c6ce7697 100644 --- a/test/cpp/util/time_test.cc +++ b/test/cpp/util/time_test.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -45,7 +45,7 @@ namespace { class TimeTest : public ::testing::Test {}; TEST_F(TimeTest, AbsolutePointTest) { - long us = 10000000L; + int64_t us = 10000000L; gpr_timespec ts = gpr_time_from_micros(us, GPR_TIMESPAN); ts.clock_type = GPR_CLOCK_REALTIME; system_clock::time_point tp{microseconds(us)}; |