aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2017-02-22 15:30:16 -0800
committerGravatar Vijay Pai <vpai@google.com>2017-05-23 09:20:56 -0700
commit4b07aab51363243d3c3a04876a91dd4c68581a89 (patch)
treeb01e0b9e140ae73cda67f8ed089020b813677ab4 /test/cpp/qps/server_async.cc
parent275bc932d294e6f23de0d3922046840d0244c15d (diff)
Support multiple threads per cq sharing, add tests
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc18
1 files changed, 15 insertions, 3 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 3403ffd326..a8aab5082e 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -31,6 +31,7 @@
*
*/
+#include <algorithm>
#include <forward_list>
#include <functional>
#include <memory>
@@ -104,9 +105,14 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
}
- for (int i = 0; i < num_threads; i++) {
+ int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
+ int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
+ for (int i = 0; i < num_cqs; i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
+ for (int i = 0; i < num_threads; i++) {
+ cq_.emplace_back(i % srv_cqs_.size());
+ }
if (config.resource_quota_size() > 0) {
builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
@@ -120,7 +126,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::placeholders::_2);
for (int i = 0; i < 5000; i++) {
- for (int j = 0; j < num_threads; j++) {
+ for (int j = 0; j < num_cqs; j++) {
if (request_unary_function) {
auto request_unary = std::bind(
request_unary_function, &async_service_, std::placeholders::_1,
@@ -205,7 +211,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
@@ -214,6 +220,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
if (shutdown_state_[thread_idx]->shutdown) {
return;
}
+ std::lock_guard<ServerRpcContext> l2(*ctx);
const bool still_going = ctx->RunNextState(ok);
// if this RPC context is done, refresh it
if (!still_going) {
@@ -226,9 +233,13 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
class ServerRpcContext {
public:
ServerRpcContext() {}
+ void lock() { mu_.lock(); }
+ void unlock() { mu_.unlock(); }
virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done
virtual void Reset() = 0; // start this back at a clean state
+ private:
+ std::mutex mu_;
};
static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func);
@@ -518,6 +529,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
+ std::vector<int> cq_;
ServiceType async_service_;
std::vector<std::unique_ptr<ServerRpcContext>> contexts_;