aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/generic_end2end_test.cc21
-rw-r--r--test/cpp/microbenchmarks/BUILD2
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc3
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc4
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc11
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_trickle.cc271
-rw-r--r--test/cpp/microbenchmarks/bm_pollset.cc36
-rw-r--r--test/cpp/microbenchmarks/helpers.cc4
-rw-r--r--test/cpp/performance/writes_per_rpc_test.cc4
-rw-r--r--test/cpp/qps/client_async.cc48
10 files changed, 337 insertions, 67 deletions
diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc
index 25c221bb2b..2c9f5e38e6 100644
--- a/test/cpp/end2end/generic_end2end_test.cc
+++ b/test/cpp/end2end/generic_end2end_test.cc
@@ -115,6 +115,10 @@ class GenericEnd2endTest : public ::testing::Test {
void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
void SendRpc(int num_rpcs) {
+ SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+
+ void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
for (int i = 0; i < num_rpcs; i++) {
EchoRequest send_request;
@@ -129,6 +133,11 @@ class GenericEnd2endTest : public ::testing::Test {
// The string needs to be long enough to test heap-based slice.
send_request.set_message("Hello world. Hello world. Hello world.");
+
+ if (check_deadline) {
+ cli_ctx.set_deadline(deadline);
+ }
+
std::unique_ptr<GenericClientAsyncReaderWriter> call =
generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
client_ok(1);
@@ -147,6 +156,12 @@ class GenericEnd2endTest : public ::testing::Test {
verify_ok(srv_cq_.get(), 4, true);
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
EXPECT_EQ(kMethodName, srv_ctx.method());
+
+ if (check_deadline) {
+ EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
+ gpr_time_from_millis(100, GPR_TIMESPAN)));
+ }
+
ByteBuffer recv_buffer;
stream.Read(&recv_buffer, tag(5));
server_ok(5);
@@ -262,6 +277,12 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+TEST_F(GenericEnd2endTest, Deadline) {
+ ResetStub();
+ SendRpc(1, true, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(10, GPR_TIMESPAN)));
+}
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD
index cae3fa1a14..208ac6d794 100644
--- a/test/cpp/microbenchmarks/BUILD
+++ b/test/cpp/microbenchmarks/BUILD
@@ -92,7 +92,7 @@ cc_test(
cc_test(
name = "bm_fullstack_trickle",
srcs = ["bm_fullstack_trickle.cc"],
- deps = [":helpers"],
+ deps = [":helpers", "//external:gflags"],
)
cc_test(
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index c91219e98c..67e7c02535 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -563,7 +563,8 @@ static void BM_IsolatedFilter(benchmark::State &state) {
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- size_t channel_size = grpc_channel_stack_size(&filters[0], filters.size());
+ size_t channel_size = grpc_channel_stack_size(
+ filters.size() == 0 ? NULL : &filters[0], filters.size());
grpc_channel_stack *channel_stack =
static_cast<grpc_channel_stack *>(gpr_zalloc(channel_size));
GPR_ASSERT(GRPC_LOG_IF_ERROR(
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index 9d7f65d292..0d267da723 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -67,7 +67,9 @@ static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
*mu = &ps->mu;
}
-static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
+static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* ps) {
+ gpr_mu_destroy(&ps->mu);
+}
static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
return GRPC_ERROR_NONE;
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
index 47705d3031..01ff39121e 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
@@ -105,6 +105,17 @@ static void BM_PumpStreamClientToServer(benchmark::State& state) {
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
+ response_rw.Finish(Status::OK, tag(0));
+ Status final_status;
+ request_rw->Finish(&final_status, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ GPR_ASSERT(final_status.ok());
}
fixture->Finish(state);
fixture.reset();
diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
index a5cfeb4f95..6b9fa8b38d 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
@@ -34,6 +34,8 @@
/* Benchmark gRPC end2end in various configurations */
#include <benchmark/benchmark.h>
+#include <gflags/gflags.h>
+#include <fstream>
#include "src/core/lib/profiling/timers.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@@ -45,16 +47,58 @@ extern "C" {
#include "test/core/util/trickle_endpoint.h"
}
+DEFINE_bool(log, false, "Log state to CSV files");
+DEFINE_int32(
+ warmup_megabytes, 1,
+ "Number of megabytes to pump before collecting flow control stats");
+DEFINE_int32(
+ warmup_iterations, 100,
+ "Number of iterations to run before collecting flow control stats");
+DEFINE_int32(warmup_max_time_seconds, 10,
+ "Maximum number of seconds to run warmup loop");
+
namespace grpc {
namespace testing {
static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+template <class A0>
+static void write_csv(std::ostream* out, A0&& a0) {
+ if (!out) return;
+ (*out) << a0 << "\n";
+}
+
+template <class A0, class... Arg>
+static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) {
+ if (!out) return;
+ (*out) << a0 << ",";
+ write_csv(out, std::forward<Arg>(arg)...);
+}
+
class TrickledCHTTP2 : public EndpointPairFixture {
public:
- TrickledCHTTP2(Service* service, size_t megabits_per_second)
- : EndpointPairFixture(service, MakeEndpoints(megabits_per_second),
- FixtureConfiguration()) {}
+ TrickledCHTTP2(Service* service, bool streaming, size_t req_size,
+ size_t resp_size, size_t kilobits_per_second)
+ : EndpointPairFixture(service, MakeEndpoints(kilobits_per_second),
+ FixtureConfiguration()) {
+ if (FLAGS_log) {
+ std::ostringstream fn;
+ fn << "trickle." << (streaming ? "streaming" : "unary") << "." << req_size
+ << "." << resp_size << "." << kilobits_per_second << ".csv";
+ log_.reset(new std::ofstream(fn.str().c_str()));
+ write_csv(log_.get(), "t", "iteration", "client_backlog",
+ "server_backlog", "client_t_stall", "client_s_stall",
+ "server_t_stall", "server_s_stall", "client_t_outgoing",
+ "server_t_outgoing", "client_t_incoming", "server_t_incoming",
+ "client_s_outgoing_delta", "server_s_outgoing_delta",
+ "client_s_incoming_delta", "server_s_incoming_delta",
+ "client_s_announce_window", "server_s_announce_window",
+ "client_peer_iws", "client_local_iws", "client_sent_iws",
+ "client_acked_iws", "server_peer_iws", "server_local_iws",
+ "server_sent_iws", "server_acked_iws", "client_queued_bytes",
+ "server_queued_bytes");
+ }
+ }
void AddToLabel(std::ostream& out, benchmark::State& state) {
out << " writes/iter:"
@@ -75,7 +119,58 @@ class TrickledCHTTP2 : public EndpointPairFixture {
(double)state.iterations());
}
- void Step() {
+ void Log(int64_t iteration) {
+ auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_);
+ grpc_chttp2_transport* client =
+ reinterpret_cast<grpc_chttp2_transport*>(client_transport_);
+ grpc_chttp2_transport* server =
+ reinterpret_cast<grpc_chttp2_transport*>(server_transport_);
+ grpc_chttp2_stream* client_stream =
+ client->stream_map.count == 1
+ ? static_cast<grpc_chttp2_stream*>(client->stream_map.values[0])
+ : nullptr;
+ grpc_chttp2_stream* server_stream =
+ server->stream_map.count == 1
+ ? static_cast<grpc_chttp2_stream*>(server->stream_map.values[0])
+ : nullptr;
+ write_csv(
+ log_.get(), static_cast<double>(now.tv_sec) +
+ 1e-9 * static_cast<double>(now.tv_nsec),
+ iteration, grpc_trickle_get_backlog(endpoint_pair_.client),
+ grpc_trickle_get_backlog(endpoint_pair_.server),
+ client->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
+ client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
+ server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
+ server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
+ client->outgoing_window, server->outgoing_window,
+ client->incoming_window, server->incoming_window,
+ client_stream ? client_stream->outgoing_window_delta : -1,
+ server_stream ? server_stream->outgoing_window_delta : -1,
+ client_stream ? client_stream->incoming_window_delta : -1,
+ server_stream ? server_stream->incoming_window_delta : -1,
+ client_stream ? client_stream->announce_window : -1,
+ server_stream ? server_stream->announce_window : -1,
+ client->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client_stream ? client_stream->flow_controlled_buffer.length : 0,
+ server_stream ? server_stream->flow_controlled_buffer.length : 0);
+ }
+
+ void Step(bool update_stats) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t client_backlog =
grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client);
@@ -83,10 +178,12 @@ class TrickledCHTTP2 : public EndpointPairFixture {
grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server);
grpc_exec_ctx_finish(&exec_ctx);
- UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_,
- client_backlog);
- UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_,
- server_backlog);
+ if (update_stats) {
+ UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_,
+ client_backlog);
+ UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_,
+ server_backlog);
+ }
}
private:
@@ -97,6 +194,8 @@ class TrickledCHTTP2 : public EndpointPairFixture {
};
Stats client_stats_;
Stats server_stats_;
+ std::unique_ptr<std::ofstream> log_;
+ gpr_timespec start_ = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_endpoint_pair MakeEndpoints(size_t kilobits) {
grpc_endpoint_pair p;
@@ -123,13 +222,15 @@ class TrickledCHTTP2 : public EndpointPairFixture {
// force library initialization
auto& force_library_initialization = Library::get();
-static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
+static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok,
+ int64_t iteration) {
while (true) {
+ fixture->Log(iteration);
switch (fixture->cq()->AsyncNext(
t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(100, GPR_TIMESPAN)))) {
case CompletionQueue::TIMEOUT:
- fixture->Step();
+ fixture->Step(iteration != -1);
break;
case CompletionQueue::SHUTDOWN:
GPR_ASSERT(false);
@@ -142,8 +243,9 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
EchoTestService::AsyncService service;
- std::unique_ptr<TrickledCHTTP2> fixture(
- new TrickledCHTTP2(&service, state.range(1)));
+ std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2(
+ &service, true, state.range(0) /* req_size */,
+ state.range(0) /* resp_size */, state.range(1) /* bw in kbit/s */));
{
EchoResponse send_response;
EchoResponse recv_response;
@@ -163,18 +265,19 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
void* t;
bool ok;
while (need_tags) {
- TrickleCQNext(fixture.get(), &t, &ok);
+ TrickleCQNext(fixture.get(), &t, &ok, -1);
GPR_ASSERT(ok);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
request_rw->Read(&recv_response, tag(0));
- while (state.KeepRunning()) {
+ auto inner_loop = [&](bool in_warmup) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0);
response_rw.Write(send_response, tag(1));
while (true) {
- TrickleCQNext(fixture.get(), &t, &ok);
+ TrickleCQNext(fixture.get(), &t, &ok,
+ in_warmup ? -1 : state.iterations());
if (t == tag(0)) {
request_rw->Read(&recv_response, tag(0));
} else if (t == tag(1)) {
@@ -183,11 +286,26 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
GPR_ASSERT(false);
}
}
+ };
+ gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (int i = 0;
+ i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 *
+ 1024 / (14 + state.range(0)));
+ i++) {
+ inner_loop(true);
+ if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start),
+ gpr_time_from_seconds(FLAGS_warmup_max_time_seconds,
+ GPR_TIMESPAN)) > 0) {
+ break;
+ }
+ }
+ while (state.KeepRunning()) {
+ inner_loop(false);
}
response_rw.Finish(Status::OK, tag(1));
need_tags = (1 << 0) | (1 << 1);
while (need_tags) {
- TrickleCQNext(fixture.get(), &t, &ok);
+ TrickleCQNext(fixture.get(), &t, &ok, -1);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
@@ -198,23 +316,126 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
state.SetBytesProcessed(state.range(0) * state.iterations());
}
-/*******************************************************************************
- * CONFIGURATIONS
- */
-
-static void TrickleArgs(benchmark::internal::Benchmark* b) {
+static void StreamingTrickleArgs(benchmark::internal::Benchmark* b) {
for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
- for (int j = 1; j <= 128 * 1024 * 1024; j *= 8) {
+ for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) {
double expected_time =
static_cast<double>(14 + i) / (125.0 * static_cast<double>(j));
- if (expected_time > 0.01) continue;
+ if (expected_time > 2.0) continue;
b->Args({i, j});
}
}
}
+BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(StreamingTrickleArgs);
+
+static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2(
+ &service, true, state.range(0) /* req_size */,
+ state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */));
+ EchoRequest send_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ if (state.range(1) > 0) {
+ send_response.set_message(std::string(state.range(1), 'a'));
+ }
+ Status recv_status;
+ struct ServerEnv {
+ ServerContext ctx;
+ EchoRequest recv_request;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
+ ServerEnv() : response_writer(&ctx) {}
+ };
+ uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
+ ServerEnv* server_env[2] = {
+ reinterpret_cast<ServerEnv*>(server_env_buffer),
+ reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
+ new (server_env[0]) ServerEnv;
+ new (server_env[1]) ServerEnv;
+ service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
+ &server_env[0]->response_writer, fixture->cq(),
+ fixture->cq(), tag(0));
+ service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
+ &server_env[1]->response_writer, fixture->cq(),
+ fixture->cq(), tag(1));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ auto inner_loop = [&](bool in_warmup) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ recv_response.Clear();
+ ClientContext cli_ctx;
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
+ void* t;
+ bool ok;
+ TrickleCQNext(fixture.get(), &t, &ok, state.iterations());
+ GPR_ASSERT(ok);
+ GPR_ASSERT(t == tag(0) || t == tag(1));
+ intptr_t slot = reinterpret_cast<intptr_t>(t);
+ ServerEnv* senv = server_env[slot];
+ senv->response_writer.Finish(send_response, Status::OK, tag(3));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ for (int i = (1 << 3) | (1 << 4); i != 0;) {
+ TrickleCQNext(fixture.get(), &t, &ok, state.iterations());
+ GPR_ASSERT(ok);
+ int tagnum = (int)reinterpret_cast<intptr_t>(t);
+ GPR_ASSERT(i & (1 << tagnum));
+ i -= 1 << tagnum;
+ }
+ GPR_ASSERT(recv_status.ok());
+
+ senv->~ServerEnv();
+ senv = new (senv) ServerEnv();
+ service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
+ fixture->cq(), fixture->cq(), tag(slot));
+ };
+ gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (int i = 0;
+ i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 *
+ 1024 / (14 + state.range(0)));
+ i++) {
+ inner_loop(true);
+ if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start),
+ gpr_time_from_seconds(FLAGS_warmup_max_time_seconds,
+ GPR_TIMESPAN)) > 0) {
+ break;
+ }
+ }
+ while (state.KeepRunning()) {
+ inner_loop(false);
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ server_env[0]->~ServerEnv();
+ server_env[1]->~ServerEnv();
+ state.SetBytesProcessed(state.range(0) * state.iterations() +
+ state.range(1) * state.iterations());
+}
-BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(TrickleArgs);
+static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) {
+ const int cli_1024k = 1024 * 1024;
+ const int cli_32M = 32 * 1024 * 1024;
+ const int svr_256k = 256 * 1024;
+ const int svr_4M = 4 * 1024 * 1024;
+ const int svr_64M = 64 * 1024 * 1024;
+ for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) {
+ b->Args({bw, cli_1024k, svr_256k});
+ b->Args({bw, cli_1024k, svr_4M});
+ b->Args({bw, cli_1024k, svr_64M});
+ b->Args({bw, cli_32M, svr_256k});
+ b->Args({bw, cli_32M, svr_4M});
+ b->Args({bw, cli_32M, svr_64M});
+ }
+}
+BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs);
}
}
-BENCHMARK_MAIN();
+int main(int argc, char** argv) {
+ ::benchmark::Initialize(&argc, argv);
+ ::google::ParseCommandLineFlags(&argc, &argv, false);
+ ::benchmark::RunSpecifiedBenchmarks();
+}
diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc
index 0f3d3cef66..f5e8d13881 100644
--- a/test/cpp/microbenchmarks/bm_pollset.cc
+++ b/test/cpp/microbenchmarks/bm_pollset.cc
@@ -59,7 +59,7 @@ extern "C" {
auto& force_library_initialization = Library::get();
static void shutdown_ps(grpc_exec_ctx* exec_ctx, void* ps, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(ps));
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(ps));
}
static void BM_CreateDestroyPollset(benchmark::State& state) {
@@ -136,8 +136,7 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(mu);
while (state.KeepRunning()) {
- grpc_pollset_worker* worker;
- GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline));
+ GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_closure shutdown_ps_closure;
grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
@@ -150,6 +149,34 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
}
BENCHMARK(BM_PollEmptyPollset);
+static void BM_PollAddFd(benchmark::State& state) {
+ TrackCounters track_counters;
+ size_t ps_sz = grpc_pollset_size();
+ grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz));
+ gpr_mu* mu;
+ grpc_pollset_init(ps, &mu);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_wakeup_fd wakeup_fd;
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd)));
+ grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx");
+ while (state.KeepRunning()) {
+ grpc_pollset_add_fd(&exec_ctx, ps, fd);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx");
+ grpc_closure shutdown_ps_closure;
+ grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
+ grpc_schedule_on_exec_ctx);
+ gpr_mu_lock(mu);
+ grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure);
+ gpr_mu_unlock(mu);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(ps);
+ track_counters.Finish(state);
+}
+BENCHMARK(BM_PollAddFd);
+
class Closure : public grpc_closure {
public:
virtual ~Closure() {}
@@ -233,8 +260,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure);
gpr_mu_lock(mu);
while (!done) {
- grpc_pollset_worker* worker;
- GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline));
+ GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done");
wakeup_fd.read_fd = 0;
diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc
index 6550742453..73ab9e4a1a 100644
--- a/test/cpp/microbenchmarks/helpers.cc
+++ b/test/cpp/microbenchmarks/helpers.cc
@@ -36,11 +36,11 @@
void TrackCounters::Finish(benchmark::State &state) {
std::ostringstream out;
AddToLabel(out, state);
- auto label = out.str();
+ std::string label = out.str();
if (label.length() && label[0] == ' ') {
label = label.substr(1);
}
- state.SetLabel(label);
+ state.SetLabel(label.c_str());
}
void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) {
diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc
index 7a914c1547..12d8268330 100644
--- a/test/cpp/performance/writes_per_rpc_test.cc
+++ b/test/cpp/performance/writes_per_rpc_test.cc
@@ -254,8 +254,8 @@ TEST(WritesPerRpcTest, UnaryPingPong) {
EXPECT_LT(UnaryPingPong(0, 0), 2.05);
EXPECT_LT(UnaryPingPong(1, 0), 2.05);
EXPECT_LT(UnaryPingPong(0, 1), 2.05);
- EXPECT_LT(UnaryPingPong(4096, 0), 2.2);
- EXPECT_LT(UnaryPingPong(0, 4096), 2.2);
+ EXPECT_LT(UnaryPingPong(4096, 0), 2.5);
+ EXPECT_LT(UnaryPingPong(0, 4096), 2.5);
}
} // namespace testing
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 01856f714a..82c3356f02 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -238,39 +238,27 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(
- &got_tag, &ok,
- std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
- case CompletionQueue::GOT_EVENT: {
- // Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- // Proceed while holding a lock to make sure that
- // this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
- if (shutdown_state_[thread_idx]->shutdown) {
- delete ctx;
- return true;
- } else if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[thread_idx].get());
- // delete the old version
- delete ctx;
- }
+ if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ // Got a regular event, so process it
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ delete ctx;
return true;
+ } else if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[thread_idx].get());
+ // delete the old version
+ delete ctx;
}
- case CompletionQueue::TIMEOUT: {
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
- if (shutdown_state_[thread_idx]->shutdown) {
- return true;
- }
- return true;
- }
- case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
- // done
- return true;
+ return true;
+ } else {
+ // queue is shutting down, so we must be done
+ return true;
}
- GPR_UNREACHABLE_CODE(return true);
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;