aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-10-30 16:28:56 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2017-10-30 16:28:56 -0700
commit268685bcbd1a3af4239f4cd0f623b5f6d033b524 (patch)
tree361f6a43eb4ab9d040929ab4327e16bee0cbfa88 /test/cpp/qps
parentf8a6c8297c6e9e9024ff169291fb0485862c190f (diff)
parent94a52266c9570b5658764e16832f8eef246641f1 (diff)
Merge branch 'master' into testc++ize
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/client_async.cc28
-rw-r--r--test/cpp/qps/server_async.cc37
2 files changed, 32 insertions, 33 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index b5c7208664..a541f94fa5 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -245,9 +245,20 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
return;
}
- ClientRpcContext* ctx;
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
- do {
+ shutdown_mu->lock();
+ while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, entry_ptr, shutdown_mu]() {
+ if (!ctx->RunNextState(ok, entry_ptr)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it
ctx = ClientRpcContext::detag(got_tag);
@@ -265,18 +276,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
shutdown_mu->unlock();
return;
}
- } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
- [&, ctx, ok, entry_ptr, shutdown_mu]() {
- bool next_ok = ok;
- if (!ctx->RunNextState(next_ok, entry_ptr)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- delete ctx;
- }
- shutdown_mu->unlock();
- },
- &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
+ }
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4576be5bb3..1c1a5636a9 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -70,7 +70,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerAsyncReaderWriter<ResponseType, RequestType> *,
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_both_ways_function,
- std::function<grpc::Status(const PayloadConfig &, const RequestType *,
+ std::function<grpc::Status(const PayloadConfig &, RequestType *,
ResponseType *)>
process_rpc)
: Server(config) {
@@ -206,13 +206,12 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
return;
}
ServerRpcContext *ctx;
- std::mutex *mu_ptr;
+ std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex;
do {
ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- mu_ptr = &shutdown_state_[thread_idx]->mutex;
mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) {
mu_ptr->unlock();
@@ -255,7 +254,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncResponseWriter<ResponseType> *,
void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextUnaryImpl::invoker),
@@ -301,8 +300,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
@@ -313,7 +311,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerContextType *,
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingImpl::request_done),
@@ -381,8 +379,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerContextType *,
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
@@ -394,7 +391,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReader<ResponseType, RequestType> *,
void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
@@ -452,8 +449,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReader<ResponseType, RequestType> *,
void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
};
@@ -464,7 +460,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncWriter<ResponseType> *, void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
@@ -521,8 +517,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncWriter<ResponseType> stream_;
};
@@ -551,8 +546,7 @@ static void RegisterGenericService(ServerBuilder *builder,
builder->RegisterAsyncGenericService(service);
}
-static Status ProcessSimpleRPC(const PayloadConfig &,
- const SimpleRequest *request,
+static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request,
SimpleResponse *response) {
if (request->response_size() > 0) {
if (!Server::SetPayload(request->response_type(), request->response_size(),
@@ -560,12 +554,17 @@ static Status ProcessSimpleRPC(const PayloadConfig &,
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
}
+ // We are done using the request. Clear it to reduce working memory.
+ // This proves to reduce cache misses in large message size cases.
+ request->Clear();
return Status::OK;
}
static Status ProcessGenericRPC(const PayloadConfig &payload_config,
- const ByteBuffer *request,
- ByteBuffer *response) {
+ ByteBuffer *request, ByteBuffer *response) {
+ // We are done using the request. Clear it to reduce working memory.
+ // This proves to reduce cache misses in large message size cases.
+ request->Clear();
int resp_size = payload_config.bytebuf_params().resp_size();
std::unique_ptr<char[]> buf(new char[resp_size]);
Slice slice(buf.get(), resp_size);