diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/core/client_channel/lb_policies_test.cc | 4 | ||||
-rw-r--r-- | test/core/transport/status_conversion_test.cc | 8 | ||||
-rw-r--r-- | test/cpp/codegen/compiler_test_golden | 17 | ||||
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 22 | ||||
-rw-r--r-- | test/cpp/end2end/client_lb_end2end_test.cc | 4 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 22 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_cq.cc | 2 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 21 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/helpers.cc | 14 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 28 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 37 |
11 files changed, 110 insertions, 69 deletions
diff --git a/test/core/client_channel/lb_policies_test.cc b/test/core/client_channel/lb_policies_test.cc index 4379af11da..1f0d310380 100644 --- a/test/core/client_channel/lb_policies_test.cc +++ b/test/core/client_channel/lb_policies_test.cc @@ -53,8 +53,8 @@ typedef struct request_sequences { size_t n; /* number of iterations */ int *connections; /* indexed by the interation number, value is the index of the server it connected to or -1 if none */ - int *connectivity_states; /* indexed by the interation number, value is the - client connectivity state */ + /* indexed by the interation number, value is the client connectivity state */ + grpc_connectivity_state *connectivity_states; } request_sequences; typedef void (*verifier_fn)(const servers_fixture *, grpc_channel *, diff --git a/test/core/transport/status_conversion_test.cc b/test/core/transport/status_conversion_test.cc index de8fa4458a..02dad86693 100644 --- a/test/core/transport/status_conversion_test.cc +++ b/test/core/transport/status_conversion_test.cc @@ -38,6 +38,7 @@ int main(int argc, char **argv) { int i; grpc_test_init(argc, argv); + grpc_init(); GRPC_STATUS_TO_HTTP2_ERROR(GRPC_STATUS_OK, GRPC_HTTP2_NO_ERROR); GRPC_STATUS_TO_HTTP2_ERROR(GRPC_STATUS_CANCELLED, GRPC_HTTP2_CANCEL); @@ -129,6 +130,11 @@ int main(int argc, char **argv) { GRPC_STATUS_INTERNAL); HTTP2_ERROR_TO_GRPC_STATUS(GRPC_HTTP2_REFUSED_STREAM, after_deadline, GRPC_STATUS_UNAVAILABLE); + // We only have millisecond granularity in our timing code. This sleeps for 5 + // millis to ensure that the status conversion code will pick up the fact + // that the deadline has expired. + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(5, GPR_TIMESPAN))); HTTP2_ERROR_TO_GRPC_STATUS(GRPC_HTTP2_CANCEL, after_deadline, GRPC_STATUS_DEADLINE_EXCEEDED); HTTP2_ERROR_TO_GRPC_STATUS(GRPC_HTTP2_COMPRESSION_ERROR, after_deadline, @@ -158,5 +164,7 @@ int main(int argc, char **argv) { grpc_http2_status_to_grpc_status(i); } + grpc_shutdown(); + return 0; } diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 3d664e8825..026a94112a 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -39,7 +39,6 @@ namespace grpc { class CompletionQueue; class Channel; -class RpcService; class ServerCompletionQueue; class ServerContext; } // namespace grpc @@ -169,10 +168,10 @@ class ServiceA final { ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodA1_; - const ::grpc::RpcMethod rpcmethod_MethodA2_; - const ::grpc::RpcMethod rpcmethod_MethodA3_; - const ::grpc::RpcMethod rpcmethod_MethodA4_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA2_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA3_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA4_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -352,7 +351,7 @@ class ServiceA final { public: WithStreamedUnaryMethod_MethodA1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); @@ -373,7 +372,7 @@ class ServiceA final { public: WithSplitStreamingMethod_MethodA3() { ::grpc::Service::MarkMethodStreamed(2, - new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); } ~WithSplitStreamingMethod_MethodA3() override { BaseClassMustBeDerivedFromService(this); @@ -427,7 +426,7 @@ class ServiceB final { std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodB1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodB1_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -484,7 +483,7 @@ class ServiceB final { public: WithStreamedUnaryMethod_MethodB1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index b7634d0438..af3bdb25ac 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -28,12 +28,14 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/tls.h> #include "src/core/lib/iomgr/port.h" +#include "src/core/lib/support/env.h" #include "src/proto/grpc/health/v1/health.grpc.pb.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -459,6 +461,15 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { if (GetParam().inproc) { return; } + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200"); + int poller_slowdown_factor = 1; + // It needs 2 pollset_works to reconnect the channel with polling engine + // "poll" + char* s = gpr_getenv("GRPC_POLL_STRATEGY"); + if (s != NULL && 0 == strcmp(s, "poll")) { + poller_slowdown_factor = 2; + } + gpr_free(s); ResetStub(); SendRpc(1); server_->Shutdown(); @@ -468,10 +479,13 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { while (cq_->Next(&ignored_tag, &ignored_ok)) ; BuildAndStartServer(); - // It needs more than kConnectivityCheckIntervalMsec time to reconnect the - // channel. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1600, GPR_TIMESPAN))); + // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to + // reconnect the channel. + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis( + 300 * poller_slowdown_factor * grpc_test_slowdown_factor(), + GPR_TIMESPAN))); SendRpc(1); } diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 83bbe45523..60cdf08287 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -303,7 +303,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { ports.clear(); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET none *******"); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); @@ -479,7 +479,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { // An empty update will result in the channel going into TRANSIENT_FAILURE. ports.clear(); SetNextResolution(ports); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 1aa547d4e3..82ca39466e 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -30,11 +30,13 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/support/env.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -704,13 +706,25 @@ TEST_P(End2endTest, ReconnectChannel) { if (GetParam().inproc) { return; } + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200"); + int poller_slowdown_factor = 1; + // It needs 2 pollset_works to reconnect the channel with polling engine + // "poll" + char* s = gpr_getenv("GRPC_POLL_STRATEGY"); + if (s != NULL && 0 == strcmp(s, "poll")) { + poller_slowdown_factor = 2; + } + gpr_free(s); ResetStub(); SendRpc(stub_.get(), 1, false); RestartServer(std::shared_ptr<AuthMetadataProcessor>()); - // It needs more than kConnectivityCheckIntervalMsec time to reconnect the - // channel. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1600, GPR_TIMESPAN))); + // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to + // reconnect the channel. + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis( + 300 * poller_slowdown_factor * grpc_test_slowdown_factor(), + GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); } diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 68252d8bc3..dac702b08b 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -68,7 +68,7 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg, grpc_cq_completion* completion) {} -class DummyTag final : public CompletionQueueTag { +class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) override { return true; } }; diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 389b8c90ab..25d243a104 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -21,6 +21,7 @@ #include <benchmark/benchmark.h> #include <gflags/gflags.h> #include <fstream> + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/iomgr/timer_manager.h" @@ -142,17 +143,17 @@ class TrickledCHTTP2 : public EndpointPairFixture { client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, - client->flow_control->remote_window(), - server->flow_control->remote_window(), - client->flow_control->announced_window(), - server->flow_control->announced_window(), - client_stream ? client_stream->flow_control->remote_window_delta() : -1, - server_stream ? server_stream->flow_control->remote_window_delta() : -1, - client_stream ? client_stream->flow_control->local_window_delta() : -1, - server_stream ? server_stream->flow_control->local_window_delta() : -1, - client_stream ? client_stream->flow_control->announced_window_delta() + client->flow_control->remote_window_, + server->flow_control->remote_window_, + client->flow_control->announced_window_, + server->flow_control->announced_window_, + client_stream ? client_stream->flow_control->remote_window_delta_ : -1, + server_stream ? server_stream->flow_control->remote_window_delta_ : -1, + client_stream ? client_stream->flow_control->local_window_delta_ : -1, + server_stream ? server_stream->flow_control->local_window_delta_ : -1, + client_stream ? client_stream->flow_control->announced_window_delta_ : -1, - server_stream ? server_stream->flow_control->announced_window_delta() + server_stream ? server_stream->flow_control->announced_window_delta_ : -1, client->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc index 6802a0aa99..782f12e99a 100644 --- a/test/cpp/microbenchmarks/helpers.cc +++ b/test/cpp/microbenchmarks/helpers.cc @@ -16,6 +16,8 @@ * */ +#include <string.h> + #include "test/cpp/microbenchmarks/helpers.h" void TrackCounters::Finish(benchmark::State &state) { @@ -45,10 +47,14 @@ void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) { << "/iter:" << ((double)stats.counters[i] / (double)state.iterations()); } for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { - out << " " << grpc_stats_histogram_name[i] << "-median:" - << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 50.0) - << " " << grpc_stats_histogram_name[i] << "-99p:" - << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0); + std::ostringstream median_ss; + median_ss << grpc_stats_histogram_name[i] << "-median"; + state.counters[median_ss.str()] = benchmark::Counter( + grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 50.0)); + std::ostringstream tail_ss; + tail_ss << grpc_stats_histogram_name[i] << "-99p"; + state.counters[tail_ss.str()] = benchmark::Counter( + grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0)); } #ifdef GPR_LOW_LEVEL_COUNTERS grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot(); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index b5c7208664..a541f94fa5 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -245,9 +245,20 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { return; } - ClientRpcContext* ctx; + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; - do { + shutdown_mu->lock(); + while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + if (!ctx->RunNextState(ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { t->UpdateHistogram(entry_ptr); // Got a regular event, so process it ctx = ClientRpcContext::detag(got_tag); @@ -265,18 +276,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { shutdown_mu->unlock(); return; } - } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( - [&, ctx, ok, entry_ptr, shutdown_mu]() { - bool next_ok = ok; - if (!ctx->RunNextState(next_ok, entry_ptr)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; - } - shutdown_mu->unlock(); - }, - &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); + } } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4576be5bb3..1c1a5636a9 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -70,7 +70,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerAsyncReaderWriter<ResponseType, RequestType> *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_both_ways_function, - std::function<grpc::Status(const PayloadConfig &, const RequestType *, + std::function<grpc::Status(const PayloadConfig &, RequestType *, ResponseType *)> process_rpc) : Server(config) { @@ -206,13 +206,12 @@ class AsyncQpsServerTest final : public grpc::testing::Server { return; } ServerRpcContext *ctx; - std::mutex *mu_ptr; + std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex; do { ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - mu_ptr = &shutdown_state_[thread_idx]->mutex; mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { mu_ptr->unlock(); @@ -255,7 +254,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), @@ -301,8 +300,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; @@ -313,7 +311,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), @@ -381,8 +379,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; }; @@ -394,7 +391,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromClientImpl::request_done), @@ -452,8 +449,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReader<ResponseType, RequestType> stream_; }; @@ -464,7 +460,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromServerImpl::request_done), @@ -521,8 +517,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncWriter<ResponseType> stream_; }; @@ -551,8 +546,7 @@ static void RegisterGenericService(ServerBuilder *builder, builder->RegisterAsyncGenericService(service); } -static Status ProcessSimpleRPC(const PayloadConfig &, - const SimpleRequest *request, +static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request, SimpleResponse *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), @@ -560,12 +554,17 @@ static Status ProcessSimpleRPC(const PayloadConfig &, return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); return Status::OK; } static Status ProcessGenericRPC(const PayloadConfig &payload_config, - const ByteBuffer *request, - ByteBuffer *response) { + ByteBuffer *request, ByteBuffer *response) { + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); int resp_size = payload_config.bytebuf_params().resp_size(); std::unique_ptr<char[]> buf(new char[resp_size]); Slice slice(buf.get(), resp_size); |