aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc86
1 files changed, 51 insertions, 35 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4a82f98199..1c1a5636a9 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -70,18 +70,21 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerAsyncReaderWriter<ResponseType, RequestType> *,
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_both_ways_function,
- std::function<grpc::Status(const PayloadConfig &, const RequestType *,
+ std::function<grpc::Status(const PayloadConfig &, RequestType *,
ResponseType *)>
process_rpc)
: Server(config) {
- char *server_address = NULL;
-
- gpr_join_host_port(&server_address, "::", port());
-
ServerBuilder builder;
- builder.AddListeningPort(server_address,
- Server::CreateServerCredentials(config));
- gpr_free(server_address);
+
+ auto port_num = port();
+ // Negative port number means inproc server, so no listen port needed
+ if (port_num >= 0) {
+ char *server_address = NULL;
+ gpr_join_host_port(&server_address, "::", port_num);
+ builder.AddListeningPort(server_address,
+ Server::CreateServerCredentials(config));
+ gpr_free(server_address);
+ }
register_service(&builder, &async_service_);
@@ -183,6 +186,11 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
return count;
}
+ std::shared_ptr<Channel> InProcessChannel(
+ const ChannelArguments &args) override {
+ return server_->InProcessChannel(args);
+ }
+
private:
void ShutdownThreadFunc() {
// TODO (vpai): Remove this deadline and allow Shutdown to finish properly
@@ -194,23 +202,31 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
- ServerRpcContext *ctx = detag(got_tag);
+ if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ServerRpcContext *ctx;
+ std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex;
+ do {
+ ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) {
+ mu_ptr->unlock();
return;
}
- std::lock_guard<ServerRpcContext> l2(*ctx);
- const bool still_going = ctx->RunNextState(ok);
- // if this RPC context is done, refresh it
- if (!still_going) {
- ctx->Reset();
- }
- }
- return;
+ } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, mu_ptr]() {
+ ctx->lock();
+ if (!ctx->RunNextState(ok)) {
+ ctx->Reset();
+ }
+ ctx->unlock();
+ mu_ptr->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
}
class ServerRpcContext {
@@ -238,7 +254,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncResponseWriter<ResponseType> *,
void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextUnaryImpl::invoker),
@@ -284,8 +300,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
@@ -296,7 +311,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerContextType *,
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingImpl::request_done),
@@ -364,8 +379,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerContextType *,
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
@@ -377,7 +391,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReader<ResponseType, RequestType> *,
void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
@@ -435,8 +449,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReader<ResponseType, RequestType> *,
void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
};
@@ -447,7 +460,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncWriter<ResponseType> *, void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
@@ -504,8 +517,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncWriter<ResponseType> stream_;
};
@@ -534,8 +546,7 @@ static void RegisterGenericService(ServerBuilder *builder,
builder->RegisterAsyncGenericService(service);
}
-static Status ProcessSimpleRPC(const PayloadConfig &,
- const SimpleRequest *request,
+static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request,
SimpleResponse *response) {
if (request->response_size() > 0) {
if (!Server::SetPayload(request->response_type(), request->response_size(),
@@ -543,12 +554,17 @@ static Status ProcessSimpleRPC(const PayloadConfig &,
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
}
+ // We are done using the request. Clear it to reduce working memory.
+ // This proves to reduce cache misses in large message size cases.
+ request->Clear();
return Status::OK;
}
static Status ProcessGenericRPC(const PayloadConfig &payload_config,
- const ByteBuffer *request,
- ByteBuffer *response) {
+ ByteBuffer *request, ByteBuffer *response) {
+ // We are done using the request. Clear it to reduce working memory.
+ // This proves to reduce cache misses in large message size cases.
+ request->Clear();
int resp_size = payload_config.bytebuf_params().resp_size();
std::unique_ptr<char[]> buf(new char[resp_size]);
Slice slice(buf.get(), resp_size);