diff options
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r-- | test/cpp/qps/client_async.cc | 103 |
1 files changed, 68 insertions, 35 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..5d9cb4bd0c 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -31,7 +31,6 @@ * */ -#include <cassert> #include <forward_list> #include <functional> #include <list> @@ -48,7 +47,6 @@ #include <grpc++/generic/generic_stub.h> #include <grpc/grpc.h> #include <grpc/support/cpu.h> -#include <grpc/support/histogram.h> #include <grpc/support/log.h> #include "src/proto/grpc/testing/services.grpc.pb.h" @@ -64,7 +62,7 @@ class ClientRpcContext { ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate - virtual bool RunNextState(bool, Histogram* hist) = 0; + virtual bool RunNextState(bool, HistogramEntry* entry) = 0; virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } static ClientRpcContext* detag(void* t) { @@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); } } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); @@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::INVALID; return false; @@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); next_issuers_.emplace_back(NextIssuer(i)); + shutdown_state_.emplace_back(new PerThreadShutdownState()); } using namespace std::placeholders; @@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } virtual ~AsyncClient() { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); void* got_tag; bool ok; while ((*cq)->Next(&got_tag, &ok)) { @@ -201,32 +199,16 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } } - bool ThreadFunc(Histogram* histogram, - size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { - void* got_tag; - bool ok; - - if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { - // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - auto clone = ctx->StartNewClone(); - clone->Start(cli_cqs_[thread_idx].get()); - // delete the old version - delete ctx; - } - return true; - } else { // queue is shutting down - return false; - } - } - protected: const int num_async_threads_; private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing @@ -235,9 +217,60 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } return num_threads; } + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard<std::mutex> lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // this needed for resolution + } + + bool ThreadFunc(HistogramEntry* entry, + size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { + void* got_tag; + bool ok; + + switch (cli_cqs_[thread_idx]->AsyncNext( + &got_tag, &ok, + std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { + case CompletionQueue::GOT_EVENT: { + // Got a regular event, so process it + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + return true; + } else if (!ctx->RunNextState(ok, entry)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + auto clone = ctx->StartNewClone(); + clone->Start(cli_cqs_[thread_idx].get()); + // delete the old version + delete ctx; + } + return true; + } + case CompletionQueue::TIMEOUT: { + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + return true; + } + return true; + } + case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be + // done + return true; + } + GPR_UNREACHABLE_CODE(return true); + } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::function<gpr_timespec()>> next_issuers_; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( @@ -253,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } - ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncUnaryClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -298,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -330,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -382,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -430,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -462,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -518,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~GenericAsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} |