diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-09 23:32:57 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-09 23:34:01 -0700 |
commit | 51f938f1692b1f4feceb70a0baf4fc7130f6ca8f (patch) | |
tree | ff6dc570ddf1deb1be75afbff06ea6c2b19ed08a | |
parent | 77ece807bb612df1c0a9537dd6c095969ca57f67 (diff) |
Make async server use one CQ per server thread
-rw-r--r-- | test/cpp/qps/server_async.cc | 55 |
1 files changed, 27 insertions, 28 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4b0678bb2c..210aef4fd6 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server { gpr_free(server_address); builder.RegisterAsyncService(&async_service_); - srv_cq_ = builder.AddCompletionQueue(); + for (int i = 0; i < config.threads(); i++) { + srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue())); + } server_ = builder.BuildAndStart(); using namespace std::placeholders; - request_unary_ = - std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, - _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4); - request_streaming_ = - std::bind(&TestService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3); - for (int i = 0; i < 100; i++) { - contexts_.push_front( - new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - request_unary_, ProcessRPC)); - contexts_.push_front( - new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - request_streaming_, ProcessRPC)); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < config.threads(); j++) { + auto request_unary = std::bind( + &TestService::AsyncService::RequestUnaryCall, &async_service_, _1, + _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + auto request_streaming = std::bind( + &TestService::AsyncService::RequestStreamingCall, &async_service_, + _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front( + new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + request_unary, ProcessRPC)); + contexts_.push_front( + new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + request_streaming, ProcessRPC)); + } } for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cq_->Next(&got_tag, &ok)) { + while (srv_cqs_[i]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke bool still_going = ctx->RunNextState(ok); @@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } - srv_cq_->Shutdown(); - bool ok; - void *got_tag; - while (srv_cq_->Next(&got_tag, &ok)) - ; + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); + bool ok; + void *got_tag; + while ((*cq)->Next(&got_tag, &ok)) + ; + } while (!contexts_.empty()) { delete contexts_.front(); contexts_.pop_front(); @@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server { } std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; - std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_; + std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; TestService::AsyncService async_service_; - std::function<void(ServerContext *, SimpleRequest *, - grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> - request_unary_; - std::function<void( - ServerContext *, - grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)> - request_streaming_; std::forward_list<ServerRpcContext *> contexts_; std::mutex shutdown_mutex_; |