aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/microbenchmarks
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-20 09:36:17 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-20 09:36:17 -0700
commit444c223c177a053e1462a8781895906f5381fa07 (patch)
tree977a27493c92b9c157b14c77cf0b4acab35c252f /test/cpp/microbenchmarks
parent83190a1e1b463bb263ebce38ab23cc147dd341c9 (diff)
parent40a947ef93aeddca3b606613f628d4d09f094e77 (diff)
Merge github.com:grpc/grpc into new_transport_op
Diffstat (limited to 'test/cpp/microbenchmarks')
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc26
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc229
2 files changed, 176 insertions, 79 deletions
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 90cd529cf9..fd9033bcef 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -232,7 +232,7 @@ static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx,
static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory) {}
+ grpc_closure *then_sched_closure) {}
grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_channel_element_args *args) {
@@ -275,7 +275,7 @@ const char *name;
/* implementation of grpc_transport_init_stream */
int InitStream(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
return 0;
}
@@ -299,7 +299,7 @@ void PerformOp(grpc_exec_ctx *exec_ctx, grpc_transport *self,
/* implementation of grpc_transport_destroy_stream */
void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self,
- grpc_stream *stream, void *and_free_memory) {}
+ grpc_stream *stream, grpc_closure *then_sched_closure) {}
/* implementation of grpc_transport_destroy */
void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {}
@@ -397,7 +397,7 @@ static void BM_IsolatedFilter(benchmark::State &state) {
grpc_channel_stack *channel_stack =
static_cast<grpc_channel_stack *>(gpr_zalloc(channel_size));
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "call_stack_init",
+ "channel_stack_init",
grpc_channel_stack_init(&exec_ctx, 1, FilterDestroy, channel_stack,
&filters[0], filters.size(), &channel_args,
fixture.flags & REQUIRES_TRANSPORT
@@ -412,15 +412,29 @@ static void BM_IsolatedFilter(benchmark::State &state) {
grpc_slice method = grpc_slice_from_static_string("/foo/bar");
grpc_call_final_info final_info;
TestOp test_op_data;
+ grpc_call_element_args call_args;
+ call_args.call_stack = call_stack;
+ call_args.server_transport_data = NULL;
+ call_args.context = NULL;
+ call_args.path = method;
+ call_args.start_time = start_time;
+ call_args.deadline = deadline;
+ const int kArenaSize = 4096;
+ call_args.arena = gpr_arena_create(kArenaSize);
while (state.KeepRunning()) {
GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1,
- DoNothing, NULL, NULL, NULL, method,
- start_time, deadline, call_stack));
+ DoNothing, NULL, &call_args));
typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack);
grpc_call_stack_destroy(&exec_ctx, call_stack, &final_info, NULL);
op.Finish(&exec_ctx);
grpc_exec_ctx_flush(&exec_ctx);
+ // recreate arena every 64k iterations to avoid oom
+ if (0 == (state.iterations() & 0xffff)) {
+ gpr_arena_destroy(call_args.arena);
+ call_args.arena = gpr_arena_create(kArenaSize);
+ }
}
+ gpr_arena_destroy(call_args.arena);
grpc_channel_stack_destroy(&exec_ctx, channel_stack);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(channel_stack);
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
index dc0e7d769a..513974fde1 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
@@ -54,86 +54,144 @@ auto& force_library_initialization = Library::get();
static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
-template <class Fixture>
-static void BM_PumpStreamClientToServer(benchmark::State& state) {
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel
+//
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+// Second parameter (i.e state.range(1)): Number of ping pong messages.
+// Note: One ping-pong means two messages (one from client to server and
+// the other from server to client):
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPong(benchmark::State& state) {
+ TrackCounters track_counters;
+ const int msg_size = state.range(0);
+ const int max_ping_pongs = state.range(1);
+
EchoTestService::AsyncService service;
std::unique_ptr<Fixture> fixture(new Fixture(&service));
{
+ EchoResponse send_response;
+ EchoResponse recv_response;
EchoRequest send_request;
EchoRequest recv_request;
- if (state.range(0) > 0) {
- send_request.set_message(std::string(state.range(0), 'a'));
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
}
- Status recv_status;
- ServerContext svr_ctx;
- ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
- service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
- fixture->cq(), tag(0));
+
std::unique_ptr<EchoTestService::Stub> stub(
EchoTestService::NewStub(fixture->channel()));
- ClientContext cli_ctx;
- auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
- int need_tags = (1 << 0) | (1 << 1);
- void* t;
- bool ok;
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- GPR_ASSERT(ok);
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
- }
- response_rw.Read(&recv_request, tag(0));
+
while (state.KeepRunning()) {
- GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- request_rw->Write(send_request, tag(1));
- while (true) {
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ // Establish async stream between client side and server side
+ void* t;
+ bool ok;
+ int need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- if (t == tag(0)) {
- response_rw.Read(&recv_request, tag(0));
- } else if (t == tag(1)) {
- break;
- } else {
- GPR_ASSERT(false);
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ // Send 'max_ping_pongs' number of ping pong messages
+ int ping_pong_cnt = 0;
+ while (ping_pong_cnt < max_ping_pongs) {
+ request_rw->Write(send_request, tag(0)); // Start client send
+ response_rw.Read(&recv_request, tag(1)); // Start server recv
+ request_rw->Read(&recv_response, tag(2)); // Start client recv
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 1) {
+ response_rw.Write(send_response, tag(3));
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
}
+
+ ping_pong_cnt++;
}
- }
- request_rw->WritesDone(tag(1));
- need_tags = (1 << 0) | (1 << 1);
- while (need_tags) {
- GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- int i = (int)(intptr_t)t;
- GPR_ASSERT(need_tags & (1 << i));
- need_tags &= ~(1 << i);
+
+ request_rw->WritesDone(tag(0));
+ response_rw.Finish(Status::OK, tag(1));
+
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(2));
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
}
}
+
fixture->Finish(state);
fixture.reset();
- state.SetBytesProcessed(state.range(0) * state.iterations());
+ state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+ track_counters.Finish(state);
}
-template <class Fixture>
-static void BM_PumpStreamServerToClient(benchmark::State& state) {
+// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongMsgs(benchmark::State& state) {
+ TrackCounters track_counters;
+ const int msg_size = state.range(0);
+
EchoTestService::AsyncService service;
std::unique_ptr<Fixture> fixture(new Fixture(&service));
{
EchoResponse send_response;
EchoResponse recv_response;
- if (state.range(0) > 0) {
- send_response.set_message(std::string(state.range(0), 'a'));
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
}
- Status recv_status;
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
fixture->cq(), tag(0));
- std::unique_ptr<EchoTestService::Stub> stub(
- EchoTestService::NewStub(fixture->channel()));
+
ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
- int need_tags = (1 << 0) | (1 << 1);
+
+ // Establish async stream between client side and server side
void* t;
bool ok;
+ int need_tags = (1 << 0) | (1 << 1);
while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
GPR_ASSERT(ok);
@@ -141,54 +199,79 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) {
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
- request_rw->Read(&recv_response, tag(0));
+
while (state.KeepRunning()) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0);
- response_rw.Write(send_response, tag(1));
- while (true) {
+ request_rw->Write(send_request, tag(0)); // Start client send
+ response_rw.Read(&recv_request, tag(1)); // Start server recv
+ request_rw->Read(&recv_response, tag(2)); // Start client recv
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
+ while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
- if (t == tag(0)) {
- request_rw->Read(&recv_response, tag(0));
- } else if (t == tag(1)) {
- break;
- } else {
- GPR_ASSERT(false);
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 1) {
+ response_rw.Write(send_response, tag(3));
}
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
}
}
+
+ request_rw->WritesDone(tag(0));
response_rw.Finish(Status::OK, tag(1));
- need_tags = (1 << 0) | (1 << 1);
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(2));
+
+ need_tags = (1 << 0) | (1 << 1) | (1 << 2);
while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
+
+ GPR_ASSERT(recv_status.ok());
}
+
fixture->Finish(state);
fixture.reset();
- state.SetBytesProcessed(state.range(0) * state.iterations());
+ state.SetBytesProcessed(msg_size * state.iterations() * 2);
+ track_counters.Finish(state);
}
/*******************************************************************************
* CONFIGURATIONS
*/
-BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP)
- ->Range(0, 128 * 1024 * 1024);
-BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS)
- ->Range(0, 128 * 1024 * 1024);
-BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair)
- ->Range(0, 128 * 1024 * 1024);
-BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2)
- ->Range(0, 128 * 1024 * 1024);
-BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP)
- ->Range(0, 128 * 1024 * 1024);
-BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS)
- ->Range(0, 128 * 1024 * 1024);
-BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair)
+// Generate Args for StreamingPingPong benchmarks. Currently generates args for
+// only "small streams" (i.e streams with 0, 1 or 2 messages)
+static void StreamingPingPongArgs(benchmark::internal::Benchmark* b) {
+ int msg_size = 0;
+
+ b->Args({0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
+
+ for (msg_size = 0; msg_size <= 128 * 1024 * 1024;
+ msg_size == 0 ? msg_size++ : msg_size *= 8) {
+ b->Args({msg_size, 1});
+ b->Args({msg_size, 2});
+ }
+}
+
+BENCHMARK_TEMPLATE(BM_StreamingPingPong, InProcessCHTTP2, NoOpMutator,
+ NoOpMutator)
+ ->Apply(StreamingPingPongArgs);
+BENCHMARK_TEMPLATE(BM_StreamingPingPong, TCP, NoOpMutator, NoOpMutator)
+ ->Apply(StreamingPingPongArgs);
+
+BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator,
+ NoOpMutator)
->Range(0, 128 * 1024 * 1024);
-BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
+BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator)
->Range(0, 128 * 1024 * 1024);
} // namespace testing