diff options
author | Vijay Pai <vpai@google.com> | 2017-02-22 15:30:16 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2017-05-23 09:20:56 -0700 |
commit | 4b07aab51363243d3c3a04876a91dd4c68581a89 (patch) | |
tree | b01e0b9e140ae73cda67f8ed089020b813677ab4 /test/cpp/qps/server_async.cc | |
parent | 275bc932d294e6f23de0d3922046840d0244c15d (diff) |
Support multiple threads per cq sharing, add tests
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r-- | test/cpp/qps/server_async.cc | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 3403ffd326..a8aab5082e 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -31,6 +31,7 @@ * */ +#include <algorithm> #include <forward_list> #include <functional> #include <memory> @@ -104,9 +105,14 @@ class AsyncQpsServerTest final : public grpc::testing::Server { gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads); } - for (int i = 0; i < num_threads; i++) { + int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified + int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator + for (int i = 0; i < num_cqs; i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } + for (int i = 0; i < num_threads; i++) { + cq_.emplace_back(i % srv_cqs_.size()); + } if (config.resource_quota_size() > 0) { builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest") @@ -120,7 +126,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::placeholders::_2); for (int i = 0; i < 5000; i++) { - for (int j = 0; j < num_threads; j++) { + for (int j = 0; j < num_cqs; j++) { if (request_unary_function) { auto request_unary = std::bind( request_unary_function, &async_service_, std::placeholders::_1, @@ -205,7 +211,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) { + while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that @@ -214,6 +220,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { if (shutdown_state_[thread_idx]->shutdown) { return; } + std::lock_guard<ServerRpcContext> l2(*ctx); const bool still_going = ctx->RunNextState(ok); // if this RPC context is done, refresh it if (!still_going) { @@ -226,9 +233,13 @@ class AsyncQpsServerTest final : public grpc::testing::Server { class ServerRpcContext { public: ServerRpcContext() {} + void lock() { mu_.lock(); } + void unlock() { mu_.unlock(); } virtual ~ServerRpcContext(){}; virtual bool RunNextState(bool) = 0; // next state, return false if done virtual void Reset() = 0; // start this back at a clean state + private: + std::mutex mu_; }; static void *tag(ServerRpcContext *func) { return reinterpret_cast<void *>(func); @@ -518,6 +529,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; + std::vector<int> cq_; ServiceType async_service_; std::vector<std::unique_ptr<ServerRpcContext>> contexts_; |