diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-10-30 16:28:56 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2017-10-30 16:28:56 -0700 |
commit | 268685bcbd1a3af4239f4cd0f623b5f6d033b524 (patch) | |
tree | 361f6a43eb4ab9d040929ab4327e16bee0cbfa88 /test/cpp/qps | |
parent | f8a6c8297c6e9e9024ff169291fb0485862c190f (diff) | |
parent | 94a52266c9570b5658764e16832f8eef246641f1 (diff) |
Merge branch 'master' into testc++ize
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client_async.cc | 28 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 37 |
2 files changed, 32 insertions, 33 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index b5c7208664..a541f94fa5 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -245,9 +245,20 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { return; } - ClientRpcContext* ctx; + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; - do { + shutdown_mu->lock(); + while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + if (!ctx->RunNextState(ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { t->UpdateHistogram(entry_ptr); // Got a regular event, so process it ctx = ClientRpcContext::detag(got_tag); @@ -265,18 +276,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { shutdown_mu->unlock(); return; } - } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( - [&, ctx, ok, entry_ptr, shutdown_mu]() { - bool next_ok = ok; - if (!ctx->RunNextState(next_ok, entry_ptr)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; - } - shutdown_mu->unlock(); - }, - &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); + } } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4576be5bb3..1c1a5636a9 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -70,7 +70,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerAsyncReaderWriter<ResponseType, RequestType> *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_both_ways_function, - std::function<grpc::Status(const PayloadConfig &, const RequestType *, + std::function<grpc::Status(const PayloadConfig &, RequestType *, ResponseType *)> process_rpc) : Server(config) { @@ -206,13 +206,12 @@ class AsyncQpsServerTest final : public grpc::testing::Server { return; } ServerRpcContext *ctx; - std::mutex *mu_ptr; + std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex; do { ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - mu_ptr = &shutdown_state_[thread_idx]->mutex; mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { mu_ptr->unlock(); @@ -255,7 +254,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), @@ -301,8 +300,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; @@ -313,7 +311,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), @@ -381,8 +379,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; }; @@ -394,7 +391,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromClientImpl::request_done), @@ -452,8 +449,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReader<ResponseType, RequestType> stream_; }; @@ -464,7 +460,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromServerImpl::request_done), @@ -521,8 +517,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncWriter<ResponseType> stream_; }; @@ -551,8 +546,7 @@ static void RegisterGenericService(ServerBuilder *builder, builder->RegisterAsyncGenericService(service); } -static Status ProcessSimpleRPC(const PayloadConfig &, - const SimpleRequest *request, +static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request, SimpleResponse *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), @@ -560,12 +554,17 @@ static Status ProcessSimpleRPC(const PayloadConfig &, return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); return Status::OK; } static Status ProcessGenericRPC(const PayloadConfig &payload_config, - const ByteBuffer *request, - ByteBuffer *response) { + ByteBuffer *request, ByteBuffer *response) { + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); int resp_size = payload_config.bytebuf_params().resp_size(); std::unique_ptr<char[]> buf(new char[resp_size]); Slice slice(buf.get(), resp_size); |