diff options
author | Craig Tiller <ctiller@google.com> | 2017-03-20 09:36:17 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-03-20 09:36:17 -0700 |
commit | 444c223c177a053e1462a8781895906f5381fa07 (patch) | |
tree | 977a27493c92b9c157b14c77cf0b4acab35c252f /test/cpp/microbenchmarks | |
parent | 83190a1e1b463bb263ebce38ab23cc147dd341c9 (diff) | |
parent | 40a947ef93aeddca3b606613f628d4d09f094e77 (diff) |
Merge github.com:grpc/grpc into new_transport_op
Diffstat (limited to 'test/cpp/microbenchmarks')
-rw-r--r-- | test/cpp/microbenchmarks/bm_call_create.cc | 26 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc | 229 |
2 files changed, 176 insertions, 79 deletions
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 90cd529cf9..fd9033bcef 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -232,7 +232,7 @@ static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *then_sched_closure) {} grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { @@ -275,7 +275,7 @@ const char *name; /* implementation of grpc_transport_init_stream */ int InitStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { return 0; } @@ -299,7 +299,7 @@ void PerformOp(grpc_exec_ctx *exec_ctx, grpc_transport *self, /* implementation of grpc_transport_destroy_stream */ void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, void *and_free_memory) {} + grpc_stream *stream, grpc_closure *then_sched_closure) {} /* implementation of grpc_transport_destroy */ void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {} @@ -397,7 +397,7 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_channel_stack *channel_stack = static_cast<grpc_channel_stack *>(gpr_zalloc(channel_size)); GPR_ASSERT(GRPC_LOG_IF_ERROR( - "call_stack_init", + "channel_stack_init", grpc_channel_stack_init(&exec_ctx, 1, FilterDestroy, channel_stack, &filters[0], filters.size(), &channel_args, fixture.flags & REQUIRES_TRANSPORT @@ -412,15 +412,29 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_slice method = grpc_slice_from_static_string("/foo/bar"); grpc_call_final_info final_info; TestOp test_op_data; + grpc_call_element_args call_args; + call_args.call_stack = call_stack; + call_args.server_transport_data = NULL; + call_args.context = NULL; + call_args.path = method; + call_args.start_time = start_time; + call_args.deadline = deadline; + const int kArenaSize = 4096; + call_args.arena = gpr_arena_create(kArenaSize); while (state.KeepRunning()) { GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1, - DoNothing, NULL, NULL, NULL, method, - start_time, deadline, call_stack)); + DoNothing, NULL, &call_args)); typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack); grpc_call_stack_destroy(&exec_ctx, call_stack, &final_info, NULL); op.Finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx); + // recreate arena every 64k iterations to avoid oom + if (0 == (state.iterations() & 0xffff)) { + gpr_arena_destroy(call_args.arena); + call_args.arena = gpr_arena_create(kArenaSize); + } } + gpr_arena_destroy(call_args.arena); grpc_channel_stack_destroy(&exec_ctx, channel_stack); grpc_exec_ctx_finish(&exec_ctx); gpr_free(channel_stack); diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc index dc0e7d769a..513974fde1 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc @@ -54,86 +54,144 @@ auto& force_library_initialization = Library::get(); static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } -template <class Fixture> -static void BM_PumpStreamClientToServer(benchmark::State& state) { +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPong(benchmark::State& state) { + TrackCounters track_counters; + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + EchoTestService::AsyncService service; std::unique_ptr<Fixture> fixture(new Fixture(&service)); { + EchoResponse send_response; + EchoResponse recv_response; EchoRequest send_request; EchoRequest recv_request; - if (state.range(0) > 0) { - send_request.set_message(std::string(state.range(0), 'a')); + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); } - Status recv_status; - ServerContext svr_ctx; - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( EchoTestService::NewStub(fixture->channel())); - ClientContext cli_ctx; - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); - void* t; - bool ok; - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - response_rw.Read(&recv_request, tag(0)); + while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - request_rw->Write(send_request, tag(1)); - while (true) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + // Establish async stream between client side and server side + void* t; + bool ok; + int need_tags = (1 << 0) | (1 << 1); + while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - response_rw.Read(&recv_request, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); } + + ping_pong_cnt++; } - } - request_rw->WritesDone(tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); + + request_rw->WritesDone(tag(0)); + response_rw.Finish(Status::OK, tag(1)); + + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); } } + fixture->Finish(state); fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); + track_counters.Finish(state); } -template <class Fixture> -static void BM_PumpStreamServerToClient(benchmark::State& state) { +// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongMsgs(benchmark::State& state) { + TrackCounters track_counters; + const int msg_size = state.range(0); + EchoTestService::AsyncService service; std::unique_ptr<Fixture> fixture(new Fixture(&service)); { EchoResponse send_response; EchoResponse recv_response; - if (state.range(0) > 0) { - send_response.set_message(std::string(state.range(0), 'a')); + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); } - Status recv_status; + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), fixture->cq(), tag(0)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); + + // Establish async stream between client side and server side void* t; bool ok; + int need_tags = (1 << 0) | (1 << 1); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); @@ -141,54 +199,79 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) { GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } - request_rw->Read(&recv_response, tag(0)); + while (state.KeepRunning()) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); - response_rw.Write(send_response, tag(1)); - while (true) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - request_rw->Read(&recv_response, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); } } + + request_rw->WritesDone(tag(0)); response_rw.Finish(Status::OK, tag(1)); - need_tags = (1 << 0) | (1 << 1); + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } + + GPR_ASSERT(recv_status.ok()); } + fixture->Finish(state); fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); + state.SetBytesProcessed(msg_size * state.iterations() * 2); + track_counters.Finish(state); } /******************************************************************************* * CONFIGURATIONS */ -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair) +// Generate Args for StreamingPingPong benchmarks. Currently generates args for +// only "small streams" (i.e streams with 0, 1 or 2 messages) +static void StreamingPingPongArgs(benchmark::internal::Benchmark* b) { + int msg_size = 0; + + b->Args({0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here) + + for (msg_size = 0; msg_size <= 128 * 1024 * 1024; + msg_size == 0 ? msg_size++ : msg_size *= 8) { + b->Args({msg_size, 1}); + b->Args({msg_size, 2}); + } +} + +BENCHMARK_TEMPLATE(BM_StreamingPingPong, InProcessCHTTP2, NoOpMutator, + NoOpMutator) + ->Apply(StreamingPingPongArgs); +BENCHMARK_TEMPLATE(BM_StreamingPingPong, TCP, NoOpMutator, NoOpMutator) + ->Apply(StreamingPingPongArgs); + +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator, + NoOpMutator) ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2) +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator) ->Range(0, 128 * 1024 * 1024); } // namespace testing |