diff options
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r-- | test/cpp/qps/server_async.cc | 86 |
1 files changed, 51 insertions, 35 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4a82f98199..1c1a5636a9 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -70,18 +70,21 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerAsyncReaderWriter<ResponseType, RequestType> *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_both_ways_function, - std::function<grpc::Status(const PayloadConfig &, const RequestType *, + std::function<grpc::Status(const PayloadConfig &, RequestType *, ResponseType *)> process_rpc) : Server(config) { - char *server_address = NULL; - - gpr_join_host_port(&server_address, "::", port()); - ServerBuilder builder; - builder.AddListeningPort(server_address, - Server::CreateServerCredentials(config)); - gpr_free(server_address); + + auto port_num = port(); + // Negative port number means inproc server, so no listen port needed + if (port_num >= 0) { + char *server_address = NULL; + gpr_join_host_port(&server_address, "::", port_num); + builder.AddListeningPort(server_address, + Server::CreateServerCredentials(config)); + gpr_free(server_address); + } register_service(&builder, &async_service_); @@ -183,6 +186,11 @@ class AsyncQpsServerTest final : public grpc::testing::Server { return count; } + std::shared_ptr<Channel> InProcessChannel( + const ChannelArguments &args) override { + return server_->InProcessChannel(args); + } + private: void ShutdownThreadFunc() { // TODO (vpai): Remove this deadline and allow Shutdown to finish properly @@ -194,23 +202,31 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); + if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ServerRpcContext *ctx; + std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex; + do { + ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { + mu_ptr->unlock(); return; } - std::lock_guard<ServerRpcContext> l2(*ctx); - const bool still_going = ctx->RunNextState(ok); - // if this RPC context is done, refresh it - if (!still_going) { - ctx->Reset(); - } - } - return; + } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, mu_ptr]() { + ctx->lock(); + if (!ctx->RunNextState(ok)) { + ctx->Reset(); + } + ctx->unlock(); + mu_ptr->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } class ServerRpcContext { @@ -238,7 +254,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), @@ -284,8 +300,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; @@ -296,7 +311,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), @@ -364,8 +379,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; }; @@ -377,7 +391,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromClientImpl::request_done), @@ -435,8 +449,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReader<ResponseType, RequestType> stream_; }; @@ -447,7 +460,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromServerImpl::request_done), @@ -504,8 +517,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncWriter<ResponseType> stream_; }; @@ -534,8 +546,7 @@ static void RegisterGenericService(ServerBuilder *builder, builder->RegisterAsyncGenericService(service); } -static Status ProcessSimpleRPC(const PayloadConfig &, - const SimpleRequest *request, +static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request, SimpleResponse *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), @@ -543,12 +554,17 @@ static Status ProcessSimpleRPC(const PayloadConfig &, return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); return Status::OK; } static Status ProcessGenericRPC(const PayloadConfig &payload_config, - const ByteBuffer *request, - ByteBuffer *response) { + ByteBuffer *request, ByteBuffer *response) { + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); int resp_size = payload_config.bytebuf_params().resp_size(); std::unique_ptr<char[]> buf(new char[resp_size]); Slice slice(buf.get(), resp_size); |