diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/core/transport/bdp_estimator_test.c | 15 | ||||
-rw-r--r-- | test/core/util/trickle_endpoint.c | 27 | ||||
-rw-r--r-- | test/core/util/trickle_endpoint.h | 2 | ||||
-rw-r--r-- | test/cpp/codegen/compiler_test_golden | 6 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/BUILD | 2 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 271 | ||||
-rw-r--r-- | test/cpp/performance/writes_per_rpc_test.cc | 4 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 7 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 296 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 220 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 12 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 190 | ||||
-rw-r--r-- | test/cpp/qps/server_sync.cc | 111 |
13 files changed, 1041 insertions, 122 deletions
diff --git a/test/core/transport/bdp_estimator_test.c b/test/core/transport/bdp_estimator_test.c index f55a3ca643..a6f1a55363 100644 --- a/test/core/transport/bdp_estimator_test.c +++ b/test/core/transport/bdp_estimator_test.c @@ -33,6 +33,7 @@ #include "src/core/lib/transport/bdp_estimator.h" +#include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -64,6 +65,8 @@ static void add_samples(grpc_bdp_estimator *estimator, int64_t *samples, GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]) == false); } + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(1, GPR_TIMESPAN))); grpc_bdp_estimator_complete_ping(estimator); } @@ -123,24 +126,25 @@ static void test_get_estimate_random_values(size_t n) { gpr_log(GPR_INFO, "test_get_estimate_random_values(%" PRIdPTR ")", n); grpc_bdp_estimator est; grpc_bdp_estimator_init(&est, "test"); - int min = INT_MAX; - int max = 65535; // Windows rand() has limited range, make sure the ASSERT - // passes + const int kMaxSample = 65535; + int min = kMaxSample; + int max = 0; for (size_t i = 0; i < n; i++) { - int sample = rand(); + int sample = rand() % (kMaxSample + 1); if (sample < min) min = sample; if (sample > max) max = sample; add_sample(&est, sample); if (i >= 3) { gpr_log(GPR_DEBUG, "est:%" PRId64 " min:%d max:%d", get_estimate(&est), min, max); - GPR_ASSERT(get_estimate(&est) <= 2 * next_pow_2(max)); + GPR_ASSERT(get_estimate(&est) <= GPR_MAX(65536, 2 * next_pow_2(max))); } } } int main(int argc, char **argv) { grpc_test_init(argc, argv); + grpc_init(); test_noop(); test_get_estimate_no_samples(); test_get_estimate_1_sample(); @@ -149,5 +153,6 @@ int main(int argc, char **argv) { for (size_t i = 3; i < 1000; i = i * 3 / 2) { test_get_estimate_random_values(i); } + grpc_shutdown(); return 0; } diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 58ac59711b..69386a0718 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -44,6 +44,8 @@ #include <grpc/support/useful.h> #include "src/core/lib/slice/slice_internal.h" +#define WRITE_BUFFER_SIZE (2 * 1024 * 1024) + typedef struct { grpc_endpoint base; double bytes_per_second; @@ -55,6 +57,7 @@ typedef struct { grpc_slice_buffer writing_buffer; grpc_error *error; bool writing; + grpc_closure *write_cb; } trickle_endpoint; static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -63,10 +66,20 @@ static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); } +static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, + trickle_endpoint *te) { + if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE || + te->write_buffer.length <= WRITE_BUFFER_SIZE)) { + grpc_closure_sched(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error)); + te->write_cb = NULL; + } +} + static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); + GPR_ASSERT(te->write_cb == NULL); if (te->write_buffer.length == 0) { te->last_write = gpr_now(GPR_CLOCK_MONOTONIC); } @@ -74,7 +87,8 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer_add(&te->write_buffer, grpc_slice_copy(slices->slices[i])); } - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error)); + te->write_cb = cb; + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); } @@ -102,6 +116,7 @@ static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (te->error == GRPC_ERROR_NONE) { te->error = GRPC_ERROR_REF(why); } + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); grpc_endpoint_shutdown(exec_ctx, te->wrapped, why); } @@ -157,6 +172,7 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, te->base.vtable = &vtable; te->wrapped = wrap; te->bytes_per_second = bytes_per_second; + te->write_cb = NULL; gpr_mu_init(&te->mu); grpc_slice_buffer_init(&te->write_buffer); grpc_slice_buffer_init(&te->writing_buffer); @@ -187,9 +203,18 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint_write( exec_ctx, te->wrapped, &te->writing_buffer, grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + maybe_call_write_cb_locked(exec_ctx, te); } } size_t backlog = te->write_buffer.length; gpr_mu_unlock(&te->mu); return backlog; } + +size_t grpc_trickle_get_backlog(grpc_endpoint *ep) { + trickle_endpoint *te = (trickle_endpoint *)ep; + gpr_mu_lock(&te->mu); + size_t backlog = te->write_buffer.length; + gpr_mu_unlock(&te->mu); + return backlog; +} diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h index 7e8d9d91e3..e513774eb4 100644 --- a/test/core/util/trickle_endpoint.h +++ b/test/core/util/trickle_endpoint.h @@ -43,4 +43,6 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint); +size_t grpc_trickle_get_backlog(grpc_endpoint *endpoint); + #endif diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 8e3ae32a49..eb12d371ea 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -69,6 +69,9 @@ namespace testing { // ServiceA leading comment 1 class ServiceA final { public: + static constexpr char const* service_full_name() { + return "grpc.testing.ServiceA"; + } class StubInterface { public: virtual ~StubInterface() {} @@ -373,6 +376,9 @@ class ServiceA final { // ServiceB leading comment 1 class ServiceB final { public: + static constexpr char const* service_full_name() { + return "grpc.testing.ServiceB"; + } class StubInterface { public: virtual ~StubInterface() {} diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index cae3fa1a14..208ac6d794 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -92,7 +92,7 @@ cc_test( cc_test( name = "bm_fullstack_trickle", srcs = ["bm_fullstack_trickle.cc"], - deps = [":helpers"], + deps = [":helpers", "//external:gflags"], ) cc_test( diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index a5cfeb4f95..6b9fa8b38d 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -34,6 +34,8 @@ /* Benchmark gRPC end2end in various configurations */ #include <benchmark/benchmark.h> +#include <gflags/gflags.h> +#include <fstream> #include "src/core/lib/profiling/timers.h" #include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -45,16 +47,58 @@ extern "C" { #include "test/core/util/trickle_endpoint.h" } +DEFINE_bool(log, false, "Log state to CSV files"); +DEFINE_int32( + warmup_megabytes, 1, + "Number of megabytes to pump before collecting flow control stats"); +DEFINE_int32( + warmup_iterations, 100, + "Number of iterations to run before collecting flow control stats"); +DEFINE_int32(warmup_max_time_seconds, 10, + "Maximum number of seconds to run warmup loop"); + namespace grpc { namespace testing { static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } +template <class A0> +static void write_csv(std::ostream* out, A0&& a0) { + if (!out) return; + (*out) << a0 << "\n"; +} + +template <class A0, class... Arg> +static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) { + if (!out) return; + (*out) << a0 << ","; + write_csv(out, std::forward<Arg>(arg)...); +} + class TrickledCHTTP2 : public EndpointPairFixture { public: - TrickledCHTTP2(Service* service, size_t megabits_per_second) - : EndpointPairFixture(service, MakeEndpoints(megabits_per_second), - FixtureConfiguration()) {} + TrickledCHTTP2(Service* service, bool streaming, size_t req_size, + size_t resp_size, size_t kilobits_per_second) + : EndpointPairFixture(service, MakeEndpoints(kilobits_per_second), + FixtureConfiguration()) { + if (FLAGS_log) { + std::ostringstream fn; + fn << "trickle." << (streaming ? "streaming" : "unary") << "." << req_size + << "." << resp_size << "." << kilobits_per_second << ".csv"; + log_.reset(new std::ofstream(fn.str().c_str())); + write_csv(log_.get(), "t", "iteration", "client_backlog", + "server_backlog", "client_t_stall", "client_s_stall", + "server_t_stall", "server_s_stall", "client_t_outgoing", + "server_t_outgoing", "client_t_incoming", "server_t_incoming", + "client_s_outgoing_delta", "server_s_outgoing_delta", + "client_s_incoming_delta", "server_s_incoming_delta", + "client_s_announce_window", "server_s_announce_window", + "client_peer_iws", "client_local_iws", "client_sent_iws", + "client_acked_iws", "server_peer_iws", "server_local_iws", + "server_sent_iws", "server_acked_iws", "client_queued_bytes", + "server_queued_bytes"); + } + } void AddToLabel(std::ostream& out, benchmark::State& state) { out << " writes/iter:" @@ -75,7 +119,58 @@ class TrickledCHTTP2 : public EndpointPairFixture { (double)state.iterations()); } - void Step() { + void Log(int64_t iteration) { + auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_); + grpc_chttp2_transport* client = + reinterpret_cast<grpc_chttp2_transport*>(client_transport_); + grpc_chttp2_transport* server = + reinterpret_cast<grpc_chttp2_transport*>(server_transport_); + grpc_chttp2_stream* client_stream = + client->stream_map.count == 1 + ? static_cast<grpc_chttp2_stream*>(client->stream_map.values[0]) + : nullptr; + grpc_chttp2_stream* server_stream = + server->stream_map.count == 1 + ? static_cast<grpc_chttp2_stream*>(server->stream_map.values[0]) + : nullptr; + write_csv( + log_.get(), static_cast<double>(now.tv_sec) + + 1e-9 * static_cast<double>(now.tv_nsec), + iteration, grpc_trickle_get_backlog(endpoint_pair_.client), + grpc_trickle_get_backlog(endpoint_pair_.server), + client->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, + client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, + server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, + server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, + client->outgoing_window, server->outgoing_window, + client->incoming_window, server->incoming_window, + client_stream ? client_stream->outgoing_window_delta : -1, + server_stream ? server_stream->outgoing_window_delta : -1, + client_stream ? client_stream->incoming_window_delta : -1, + server_stream ? server_stream->incoming_window_delta : -1, + client_stream ? client_stream->announce_window : -1, + server_stream ? server_stream->announce_window : -1, + client->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + server->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + client_stream ? client_stream->flow_controlled_buffer.length : 0, + server_stream ? server_stream->flow_controlled_buffer.length : 0); + } + + void Step(bool update_stats) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t client_backlog = grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client); @@ -83,10 +178,12 @@ class TrickledCHTTP2 : public EndpointPairFixture { grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server); grpc_exec_ctx_finish(&exec_ctx); - UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_, - client_backlog); - UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_, - server_backlog); + if (update_stats) { + UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_, + client_backlog); + UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_, + server_backlog); + } } private: @@ -97,6 +194,8 @@ class TrickledCHTTP2 : public EndpointPairFixture { }; Stats client_stats_; Stats server_stats_; + std::unique_ptr<std::ofstream> log_; + gpr_timespec start_ = gpr_now(GPR_CLOCK_MONOTONIC); grpc_endpoint_pair MakeEndpoints(size_t kilobits) { grpc_endpoint_pair p; @@ -123,13 +222,15 @@ class TrickledCHTTP2 : public EndpointPairFixture { // force library initialization auto& force_library_initialization = Library::get(); -static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) { +static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok, + int64_t iteration) { while (true) { + fixture->Log(iteration); switch (fixture->cq()->AsyncNext( t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_micros(100, GPR_TIMESPAN)))) { case CompletionQueue::TIMEOUT: - fixture->Step(); + fixture->Step(iteration != -1); break; case CompletionQueue::SHUTDOWN: GPR_ASSERT(false); @@ -142,8 +243,9 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) { static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { EchoTestService::AsyncService service; - std::unique_ptr<TrickledCHTTP2> fixture( - new TrickledCHTTP2(&service, state.range(1))); + std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( + &service, true, state.range(0) /* req_size */, + state.range(0) /* resp_size */, state.range(1) /* bw in kbit/s */)); { EchoResponse send_response; EchoResponse recv_response; @@ -163,18 +265,19 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { void* t; bool ok; while (need_tags) { - TrickleCQNext(fixture.get(), &t, &ok); + TrickleCQNext(fixture.get(), &t, &ok, -1); GPR_ASSERT(ok); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } request_rw->Read(&recv_response, tag(0)); - while (state.KeepRunning()) { + auto inner_loop = [&](bool in_warmup) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); response_rw.Write(send_response, tag(1)); while (true) { - TrickleCQNext(fixture.get(), &t, &ok); + TrickleCQNext(fixture.get(), &t, &ok, + in_warmup ? -1 : state.iterations()); if (t == tag(0)) { request_rw->Read(&recv_response, tag(0)); } else if (t == tag(1)) { @@ -183,11 +286,26 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { GPR_ASSERT(false); } } + }; + gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC); + for (int i = 0; + i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 * + 1024 / (14 + state.range(0))); + i++) { + inner_loop(true); + if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start), + gpr_time_from_seconds(FLAGS_warmup_max_time_seconds, + GPR_TIMESPAN)) > 0) { + break; + } + } + while (state.KeepRunning()) { + inner_loop(false); } response_rw.Finish(Status::OK, tag(1)); need_tags = (1 << 0) | (1 << 1); while (need_tags) { - TrickleCQNext(fixture.get(), &t, &ok); + TrickleCQNext(fixture.get(), &t, &ok, -1); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); @@ -198,23 +316,126 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { state.SetBytesProcessed(state.range(0) * state.iterations()); } -/******************************************************************************* - * CONFIGURATIONS - */ - -static void TrickleArgs(benchmark::internal::Benchmark* b) { +static void StreamingTrickleArgs(benchmark::internal::Benchmark* b) { for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { - for (int j = 1; j <= 128 * 1024 * 1024; j *= 8) { + for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) { double expected_time = static_cast<double>(14 + i) / (125.0 * static_cast<double>(j)); - if (expected_time > 0.01) continue; + if (expected_time > 2.0) continue; b->Args({i, j}); } } } +BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(StreamingTrickleArgs); + +static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( + &service, true, state.range(0) /* req_size */, + state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */)); + EchoRequest send_request; + EchoResponse send_response; + EchoResponse recv_response; + if (state.range(0) > 0) { + send_request.set_message(std::string(state.range(0), 'a')); + } + if (state.range(1) > 0) { + send_response.set_message(std::string(state.range(1), 'a')); + } + Status recv_status; + struct ServerEnv { + ServerContext ctx; + EchoRequest recv_request; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; + ServerEnv() : response_writer(&ctx) {} + }; + uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; + ServerEnv* server_env[2] = { + reinterpret_cast<ServerEnv*>(server_env_buffer), + reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; + new (server_env[0]) ServerEnv; + new (server_env[1]) ServerEnv; + service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, + &server_env[0]->response_writer, fixture->cq(), + fixture->cq(), tag(0)); + service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, + &server_env[1]->response_writer, fixture->cq(), + fixture->cq(), tag(1)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + auto inner_loop = [&](bool in_warmup) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + recv_response.Clear(); + ClientContext cli_ctx; + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); + void* t; + bool ok; + TrickleCQNext(fixture.get(), &t, &ok, state.iterations()); + GPR_ASSERT(ok); + GPR_ASSERT(t == tag(0) || t == tag(1)); + intptr_t slot = reinterpret_cast<intptr_t>(t); + ServerEnv* senv = server_env[slot]; + senv->response_writer.Finish(send_response, Status::OK, tag(3)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + for (int i = (1 << 3) | (1 << 4); i != 0;) { + TrickleCQNext(fixture.get(), &t, &ok, state.iterations()); + GPR_ASSERT(ok); + int tagnum = (int)reinterpret_cast<intptr_t>(t); + GPR_ASSERT(i & (1 << tagnum)); + i -= 1 << tagnum; + } + GPR_ASSERT(recv_status.ok()); + + senv->~ServerEnv(); + senv = new (senv) ServerEnv(); + service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, + fixture->cq(), fixture->cq(), tag(slot)); + }; + gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC); + for (int i = 0; + i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 * + 1024 / (14 + state.range(0))); + i++) { + inner_loop(true); + if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start), + gpr_time_from_seconds(FLAGS_warmup_max_time_seconds, + GPR_TIMESPAN)) > 0) { + break; + } + } + while (state.KeepRunning()) { + inner_loop(false); + } + fixture->Finish(state); + fixture.reset(); + server_env[0]->~ServerEnv(); + server_env[1]->~ServerEnv(); + state.SetBytesProcessed(state.range(0) * state.iterations() + + state.range(1) * state.iterations()); +} -BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(TrickleArgs); +static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) { + const int cli_1024k = 1024 * 1024; + const int cli_32M = 32 * 1024 * 1024; + const int svr_256k = 256 * 1024; + const int svr_4M = 4 * 1024 * 1024; + const int svr_64M = 64 * 1024 * 1024; + for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) { + b->Args({bw, cli_1024k, svr_256k}); + b->Args({bw, cli_1024k, svr_4M}); + b->Args({bw, cli_1024k, svr_64M}); + b->Args({bw, cli_32M, svr_256k}); + b->Args({bw, cli_32M, svr_4M}); + b->Args({bw, cli_32M, svr_64M}); + } +} +BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs); } } -BENCHMARK_MAIN(); +int main(int argc, char** argv) { + ::benchmark::Initialize(&argc, argv); + ::google::ParseCommandLineFlags(&argc, &argv, false); + ::benchmark::RunSpecifiedBenchmarks(); +} diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index 7a914c1547..12d8268330 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -254,8 +254,8 @@ TEST(WritesPerRpcTest, UnaryPingPong) { EXPECT_LT(UnaryPingPong(0, 0), 2.05); EXPECT_LT(UnaryPingPong(1, 0), 2.05); EXPECT_LT(UnaryPingPong(0, 1), 2.05); - EXPECT_LT(UnaryPingPong(4096, 0), 2.2); - EXPECT_LT(UnaryPingPong(0, 4096), 2.2); + EXPECT_LT(UnaryPingPong(4096, 0), 2.5); + EXPECT_LT(UnaryPingPong(0, 4096), 2.5); } } // namespace testing diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 25a19a5a74..c3197eb622 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -443,11 +443,8 @@ class ClientImpl : public Client { create_stub_; }; -std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); -std::unique_ptr<Client> CreateSynchronousStreamingClient( - const ClientConfig& args); -std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args); -std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args); +std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args); +std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args); std::unique_ptr<Client> CreateGenericAsyncStreamingClient( const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 29a79e7343..01856f714a 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -313,9 +313,9 @@ class AsyncUnaryClient final }; template <class RequestType, class ResponseType> -class ClientRpcContextStreamingImpl : public ClientRpcContext { +class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { public: - ClientRpcContextStreamingImpl( + ClientRpcContextStreamingPingPongImpl( BenchmarkService::Stub* stub, const RequestType& req, std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr< @@ -333,7 +333,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { callback_(on_done), next_issue_(next_issue), start_req_(start_req) {} - ~ClientRpcContextStreamingImpl() override {} + ~ClientRpcContextStreamingPingPongImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { StartInternal(cq, config.messages_per_stream()); } @@ -394,8 +394,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { } } void StartNewClone(CompletionQueue* cq) override { - auto* clone = new ClientRpcContextStreamingImpl(stub_, req_, next_issue_, - start_req_, callback_); + auto* clone = new ClientRpcContextStreamingPingPongImpl( + stub_, req_, next_issue_, start_req_, callback_); clone->StartInternal(cq, messages_per_stream_); } @@ -434,23 +434,23 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { void StartInternal(CompletionQueue* cq, int messages_per_stream) { cq_ = cq; - next_state_ = State::STREAM_IDLE; - stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + next_state_ = State::STREAM_IDLE; + stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); } }; -class AsyncStreamingClient final +class AsyncStreamingPingPongClient final : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: - explicit AsyncStreamingClient(const ClientConfig& config) + explicit AsyncStreamingPingPongClient(const ClientConfig& config) : AsyncClient<BenchmarkService::Stub, SimpleRequest>( config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } - ~AsyncStreamingClient() override {} + ~AsyncStreamingPingPongClient() override {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -464,9 +464,250 @@ class AsyncStreamingClient final static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, std::function<gpr_timespec()> next_issue, const SimpleRequest& req) { - return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - stub, req, next_issue, AsyncStreamingClient::StartReq, - AsyncStreamingClient::CheckDone); + return new ClientRpcContextStreamingPingPongImpl<SimpleRequest, + SimpleResponse>( + stub, req, next_issue, AsyncStreamingPingPongClient::StartReq, + AsyncStreamingPingPongClient::CheckDone); + } +}; + +template <class RequestType, class ResponseType> +class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { + public: + ClientRpcContextStreamingFromClientImpl( + BenchmarkService::Stub* stub, const RequestType& req, + std::function<gpr_timespec()> next_issue, + std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( + BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, + CompletionQueue*, void*)> + start_req, + std::function<void(grpc::Status, ResponseType*)> on_done) + : context_(), + stub_(stub), + cq_(nullptr), + req_(req), + response_(), + next_state_(State::INVALID), + callback_(on_done), + next_issue_(next_issue), + start_req_(start_req) {} + ~ClientRpcContextStreamingFromClientImpl() override {} + void Start(CompletionQueue* cq, const ClientConfig& config) override { + StartInternal(cq); + } + bool RunNextState(bool ok, HistogramEntry* entry) override { + while (true) { + switch (next_state_) { + case State::STREAM_IDLE: + if (!next_issue_) { // ready to issue + next_state_ = State::READY_TO_WRITE; + } else { + next_state_ = State::WAIT; + } + break; // loop around, don't return + case State::WAIT: + alarm_.reset( + new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + next_state_ = State::READY_TO_WRITE; + return true; + case State::READY_TO_WRITE: + if (!ok) { + return false; + } + start_ = UsageTimer::Now(); + next_state_ = State::WRITE_DONE; + stream_->Write(req_, ClientRpcContext::tag(this)); + return true; + case State::WRITE_DONE: + if (!ok) { + return false; + } + entry->set_value((UsageTimer::Now() - start_) * 1e9); + next_state_ = State::STREAM_IDLE; + break; // loop around + default: + GPR_ASSERT(false); + return false; + } + } + } + void StartNewClone(CompletionQueue* cq) override { + auto* clone = new ClientRpcContextStreamingFromClientImpl( + stub_, req_, next_issue_, start_req_, callback_); + clone->StartInternal(cq); + } + + private: + grpc::ClientContext context_; + BenchmarkService::Stub* stub_; + CompletionQueue* cq_; + std::unique_ptr<Alarm> alarm_; + RequestType req_; + ResponseType response_; + enum State { + INVALID, + STREAM_IDLE, + WAIT, + READY_TO_WRITE, + WRITE_DONE, + }; + State next_state_; + std::function<void(grpc::Status, ResponseType*)> callback_; + std::function<gpr_timespec()> next_issue_; + std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( + BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, + CompletionQueue*, void*)> + start_req_; + grpc::Status status_; + double start_; + std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_; + + void StartInternal(CompletionQueue* cq) { + cq_ = cq; + stream_ = start_req_(stub_, &context_, &response_, cq, + ClientRpcContext::tag(this)); + next_state_ = State::STREAM_IDLE; + } +}; + +class AsyncStreamingFromClientClient final + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { + public: + explicit AsyncStreamingFromClientClient(const ClientConfig& config) + : AsyncClient<BenchmarkService::Stub, SimpleRequest>( + config, SetupCtx, BenchmarkStubCreator) { + StartThreads(num_async_threads_); + } + + ~AsyncStreamingFromClientClient() override {} + + private: + static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> StartReq( + BenchmarkService::Stub* stub, grpc::ClientContext* ctx, + SimpleResponse* resp, CompletionQueue* cq, void* tag) { + auto stream = stub->AsyncStreamingFromClient(ctx, resp, cq, tag); + return stream; + }; + static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, + std::function<gpr_timespec()> next_issue, + const SimpleRequest& req) { + return new ClientRpcContextStreamingFromClientImpl<SimpleRequest, + SimpleResponse>( + stub, req, next_issue, AsyncStreamingFromClientClient::StartReq, + AsyncStreamingFromClientClient::CheckDone); + } +}; + +template <class RequestType, class ResponseType> +class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { + public: + ClientRpcContextStreamingFromServerImpl( + BenchmarkService::Stub* stub, const RequestType& req, + std::function<gpr_timespec()> next_issue, + std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( + BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, + CompletionQueue*, void*)> + start_req, + std::function<void(grpc::Status, ResponseType*)> on_done) + : context_(), + stub_(stub), + cq_(nullptr), + req_(req), + response_(), + next_state_(State::INVALID), + callback_(on_done), + next_issue_(next_issue), + start_req_(start_req) {} + ~ClientRpcContextStreamingFromServerImpl() override {} + void Start(CompletionQueue* cq, const ClientConfig& config) override { + StartInternal(cq); + } + bool RunNextState(bool ok, HistogramEntry* entry) override { + while (true) { + switch (next_state_) { + case State::STREAM_IDLE: + if (!ok) { + return false; + } + start_ = UsageTimer::Now(); + next_state_ = State::READ_DONE; + stream_->Read(&response_, ClientRpcContext::tag(this)); + return true; + case State::READ_DONE: + if (!ok) { + return false; + } + entry->set_value((UsageTimer::Now() - start_) * 1e9); + callback_(status_, &response_); + next_state_ = State::STREAM_IDLE; + break; // loop around + default: + GPR_ASSERT(false); + return false; + } + } + } + void StartNewClone(CompletionQueue* cq) override { + auto* clone = new ClientRpcContextStreamingFromServerImpl( + stub_, req_, next_issue_, start_req_, callback_); + clone->StartInternal(cq); + } + + private: + grpc::ClientContext context_; + BenchmarkService::Stub* stub_; + CompletionQueue* cq_; + std::unique_ptr<Alarm> alarm_; + RequestType req_; + ResponseType response_; + enum State { INVALID, STREAM_IDLE, READ_DONE }; + State next_state_; + std::function<void(grpc::Status, ResponseType*)> callback_; + std::function<gpr_timespec()> next_issue_; + std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( + BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, + CompletionQueue*, void*)> + start_req_; + grpc::Status status_; + double start_; + std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_; + + void StartInternal(CompletionQueue* cq) { + // TODO(vjpai): Add support to rate-pace this + cq_ = cq; + next_state_ = State::STREAM_IDLE; + stream_ = + start_req_(stub_, &context_, req_, cq, ClientRpcContext::tag(this)); + } +}; + +class AsyncStreamingFromServerClient final + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { + public: + explicit AsyncStreamingFromServerClient(const ClientConfig& config) + : AsyncClient<BenchmarkService::Stub, SimpleRequest>( + config, SetupCtx, BenchmarkStubCreator) { + StartThreads(num_async_threads_); + } + + ~AsyncStreamingFromServerClient() override {} + + private: + static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> StartReq( + BenchmarkService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& req, CompletionQueue* cq, void* tag) { + auto stream = stub->AsyncStreamingFromServer(ctx, req, cq, tag); + return stream; + }; + static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, + std::function<gpr_timespec()> next_issue, + const SimpleRequest& req) { + return new ClientRpcContextStreamingFromServerImpl<SimpleRequest, + SimpleResponse>( + stub, req, next_issue, AsyncStreamingFromServerClient::StartReq, + AsyncStreamingFromServerClient::CheckDone); } }; @@ -591,11 +832,11 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { cq_ = cq; const grpc::string kMethodName( "/grpc.testing.BenchmarkService/StreamingCall"); + messages_per_stream_ = messages_per_stream; + messages_issued_ = 0; next_state_ = State::STREAM_IDLE; stream_ = start_req_(stub_, &context_, kMethodName, cq, ClientRpcContext::tag(this)); - messages_per_stream_ = messages_per_stream; - messages_issued_ = 0; } }; @@ -632,11 +873,26 @@ class GenericAsyncStreamingClient final } }; -std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) { - return std::unique_ptr<Client>(new AsyncUnaryClient(args)); -} -std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) { - return std::unique_ptr<Client>(new AsyncStreamingClient(args)); +std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) { + switch (config.rpc_type()) { + case UNARY: + return std::unique_ptr<Client>(new AsyncUnaryClient(config)); + case STREAMING: + return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config)); + case STREAMING_FROM_CLIENT: + return std::unique_ptr<Client>( + new AsyncStreamingFromClientClient(config)); + case STREAMING_FROM_SERVER: + return std::unique_ptr<Client>( + new AsyncStreamingFromServerClient(config)); + case STREAMING_BOTH_WAYS: + // TODO(vjpai): Implement this + assert(false); + return nullptr; + default: + assert(false); + return nullptr; + } } std::unique_ptr<Client> CreateGenericAsyncStreamingClient( const ClientConfig& args) { diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index f8ce2cccbe..9075033bd4 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -137,7 +137,8 @@ class SynchronousUnaryClient final : public SynchronousClient { } }; -class SynchronousStreamingClient final : public SynchronousClient { +template <class StreamType> +class SynchronousStreamingClient : public SynchronousClient { public: SynchronousStreamingClient(const ClientConfig& config) : SynchronousClient(config), @@ -145,30 +146,69 @@ class SynchronousStreamingClient final : public SynchronousClient { stream_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { + StartThreads(num_threads_); + } + virtual ~SynchronousStreamingClient() { + std::vector<std::thread> cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + // forcibly cancel the streams, then finish + context_[i].TryCancel(); + (*stream)->Finish(); + // don't log any error message on !ok since this was canceled + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + protected: + std::vector<grpc::ClientContext> context_; + std::vector<std::unique_ptr<StreamType>> stream_; + const int messages_per_stream_; + std::vector<int> messages_issued_; + + void FinishStream(HistogramEntry* entry, size_t thread_idx) { + Status s = stream_[thread_idx]->Finish(); + // don't set the value since the stream is failed and shouldn't be timed + entry->set_status(s.error_code()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, + s.error_message().c_str()); + } + context_[thread_idx].~ClientContext(); + new (&context_[thread_idx]) ClientContext(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) { for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - StartThreads(num_threads_); } - ~SynchronousStreamingClient() { + ~SynchronousStreamingPingPongClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { cleanup_threads.emplace_back([this, i]() { auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i, - s.error_message().c_str()); - } } }); } - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads[i].join(); + for (auto& th : cleanup_threads) { + th.join(); } } @@ -176,7 +216,7 @@ class SynchronousStreamingClient final : public SynchronousClient { if (!WaitToIssue(thread_idx)) { return true; } - GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); + GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -192,40 +232,148 @@ class SynchronousStreamingClient final : public SynchronousClient { } } stream_[thread_idx]->WritesDone(); - Status s = stream_[thread_idx]->Finish(); - // don't set the value since this is either a failure (shouldn't be timed) - // or a stream-end (already has been timed) - entry->set_status(s.error_code()); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); - } + FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - context_[thread_idx].~ClientContext(); - new (&context_[thread_idx]) ClientContext(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; return true; } +}; + +class SynchronousStreamingFromClientClient final + : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> { + public: + SynchronousStreamingFromClientClient(const ClientConfig& config) + : SynchronousStreamingClient(config), last_issue_(num_threads_) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + last_issue_[thread_idx] = UsageTimer::Now(); + } + } + ~SynchronousStreamingFromClientClient() { + std::vector<std::thread> cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + (*stream)->WritesDone(); + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + // Figure out how to make histogram sensible if this is rate-paced + if (!WaitToIssue(thread_idx)) { + return true; + } + GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0); + if (stream_[thread_idx]->Write(request_)) { + double now = UsageTimer::Now(); + entry->set_value((now - last_issue_[thread_idx]) * 1e9); + last_issue_[thread_idx] = now; + return true; + } + stream_[thread_idx]->WritesDone(); + FinishStream(entry, thread_idx); + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + return true; + } private: - // These are both conceptually std::vector but cannot be for old compilers - // that expect contained classes to support copy constructors - std::vector<grpc::ClientContext> context_; - std::vector< - std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> - stream_; - const int messages_per_stream_; - std::vector<int> messages_issued_; + std::vector<double> last_issue_; }; -std::unique_ptr<Client> CreateSynchronousUnaryClient( - const ClientConfig& config) { - return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); -} -std::unique_ptr<Client> CreateSynchronousStreamingClient( - const ClientConfig& config) { - return std::unique_ptr<Client>(new SynchronousStreamingClient(config)); +class SynchronousStreamingFromServerClient final + : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> { + public: + SynchronousStreamingFromServerClient(const ClientConfig& config) + : SynchronousStreamingClient(config), last_recv_(num_threads_) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + last_recv_[thread_idx] = UsageTimer::Now(); + } + } + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); + if (stream_[thread_idx]->Read(&responses_[thread_idx])) { + double now = UsageTimer::Now(); + entry->set_value((now - last_recv_[thread_idx]) * 1e9); + last_recv_[thread_idx] = now; + return true; + } + FinishStream(entry, thread_idx); + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + return true; + } + + private: + std::vector<double> last_recv_; +}; + +class SynchronousStreamingBothWaysClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingBothWaysClient(const ClientConfig& config) + : SynchronousStreamingClient(config) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } + } + ~SynchronousStreamingBothWaysClient() { + std::vector<std::thread> cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + (*stream)->WritesDone(); + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + // TODO (vjpai): Do this + return true; + } +}; + +std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { + switch (config.rpc_type()) { + case UNARY: + return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); + case STREAMING: + return std::unique_ptr<Client>( + new SynchronousStreamingPingPongClient(config)); + case STREAMING_FROM_CLIENT: + return std::unique_ptr<Client>( + new SynchronousStreamingFromClientClient(config)); + case STREAMING_FROM_SERVER: + return std::unique_ptr<Client>( + new SynchronousStreamingFromServerClient(config)); + case STREAMING_BOTH_WAYS: + return std::unique_ptr<Client>( + new SynchronousStreamingBothWaysClient(config)); + default: + assert(false); + return nullptr; + } } } // namespace testing diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index d437920e68..92408974bd 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -68,15 +68,11 @@ static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { switch (config.client_type()) { case ClientType::SYNC_CLIENT: - return (config.rpc_type() == RpcType::UNARY) - ? CreateSynchronousUnaryClient(config) - : CreateSynchronousStreamingClient(config); + return CreateSynchronousClient(config); case ClientType::ASYNC_CLIENT: - return (config.rpc_type() == RpcType::UNARY) - ? CreateAsyncUnaryClient(config) - : (config.payload_config().has_bytebuf_params() - ? CreateGenericAsyncStreamingClient(config) - : CreateAsyncStreamingClient(config)); + return config.payload_config().has_bytebuf_params() + ? CreateGenericAsyncStreamingClient(config) + : CreateAsyncClient(config); default: abort(); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b499b82091..84f1579c2f 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -71,6 +71,18 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerAsyncReaderWriter<ResponseType, RequestType> *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, + std::function<void(ServiceType *, ServerContextType *, + ServerAsyncReader<ResponseType, RequestType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_from_client_function, + std::function<void(ServiceType *, ServerContextType *, RequestType *, + ServerAsyncWriter<ResponseType> *, CompletionQueue *, + ServerCompletionQueue *, void *)> + request_streaming_from_server_function, + std::function<void(ServiceType *, ServerContextType *, + ServerAsyncReaderWriter<ResponseType, RequestType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_both_ways_function, std::function<grpc::Status(const PayloadConfig &, const RequestType *, ResponseType *)> process_rpc) @@ -107,7 +119,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::bind(process_rpc, config.payload_config(), std::placeholders::_1, std::placeholders::_2); - for (int i = 0; i < 15000; i++) { + for (int i = 0; i < 5000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = std::bind( @@ -125,6 +137,26 @@ class AsyncQpsServerTest final : public grpc::testing::Server { contexts_.emplace_back(new ServerRpcContextStreamingImpl( request_streaming, process_rpc_bound)); } + if (request_streaming_from_client_function) { + auto request_streaming_from_client = std::bind( + request_streaming_from_client_function, &async_service_, + std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_3); + contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl( + request_streaming_from_client, process_rpc_bound)); + } + if (request_streaming_from_server_function) { + auto request_streaming_from_server = + std::bind(request_streaming_from_server_function, &async_service_, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_4); + contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl( + request_streaming_from_server, process_rpc_bound)); + } + if (request_streaming_both_ways_function) { + // TODO(vjpai): Add this code + } } } @@ -289,8 +321,8 @@ class AsyncQpsServerTest final : public grpc::testing::Server { if (!ok) { return false; } - stream_.Read(&req_, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::read_done; + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); return true; } @@ -300,23 +332,23 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Call the RPC processing function grpc::Status status = invoke_method_(&req_, &response_); // initiate the write - stream_.Write(response_, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::write_done; + stream_.Write(response_, AsyncQpsServerTest::tag(this)); } else { // client has sent writes done // finish the stream - stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::finish_done; + stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); } return true; } bool write_done(bool ok) { // now go back and get another streaming read! if (ok) { - stream_.Read(&req_, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::read_done; + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); } else { - stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::finish_done; + stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); } return true; } @@ -335,6 +367,146 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; }; + class ServerRpcContextStreamingFromClientImpl final + : public ServerRpcContext { + public: + ServerRpcContextStreamingFromClientImpl( + std::function<void(ServerContextType *, + grpc::ServerAsyncReader<ResponseType, RequestType> *, + void *)> + request_method, + std::function<grpc::Status(const RequestType *, ResponseType *)> + invoke_method) + : srv_ctx_(new ServerContextType), + next_state_(&ServerRpcContextStreamingFromClientImpl::request_done), + request_method_(request_method), + invoke_method_(invoke_method), + stream_(srv_ctx_.get()) { + request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); + } + ~ServerRpcContextStreamingFromClientImpl() override {} + bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } + void Reset() override { + srv_ctx_.reset(new ServerContextType); + req_ = RequestType(); + stream_ = + grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get()); + + // Then request the method + next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done; + request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); + } + + private: + bool request_done(bool ok) { + if (!ok) { + return false; + } + next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done; + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); + return true; + } + + bool read_done(bool ok) { + if (ok) { + // In this case, just do another read + // next_state_ is unchanged + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); + return true; + } else { // client has sent writes done + // invoke the method + // Call the RPC processing function + grpc::Status status = invoke_method_(&req_, &response_); + // finish the stream + next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done; + stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this)); + } + return true; + } + bool finish_done(bool ok) { return false; /* reset the context */ } + + std::unique_ptr<ServerContextType> srv_ctx_; + RequestType req_; + ResponseType response_; + bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool); + std::function<void(ServerContextType *, + grpc::ServerAsyncReader<ResponseType, RequestType> *, + void *)> + request_method_; + std::function<grpc::Status(const RequestType *, ResponseType *)> + invoke_method_; + grpc::ServerAsyncReader<ResponseType, RequestType> stream_; + }; + + class ServerRpcContextStreamingFromServerImpl final + : public ServerRpcContext { + public: + ServerRpcContextStreamingFromServerImpl( + std::function<void(ServerContextType *, RequestType *, + grpc::ServerAsyncWriter<ResponseType> *, void *)> + request_method, + std::function<grpc::Status(const RequestType *, ResponseType *)> + invoke_method) + : srv_ctx_(new ServerContextType), + next_state_(&ServerRpcContextStreamingFromServerImpl::request_done), + request_method_(request_method), + invoke_method_(invoke_method), + stream_(srv_ctx_.get()) { + request_method_(srv_ctx_.get(), &req_, &stream_, + AsyncQpsServerTest::tag(this)); + } + ~ServerRpcContextStreamingFromServerImpl() override {} + bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } + void Reset() override { + srv_ctx_.reset(new ServerContextType); + req_ = RequestType(); + stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get()); + + // Then request the method + next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done; + request_method_(srv_ctx_.get(), &req_, &stream_, + AsyncQpsServerTest::tag(this)); + } + + private: + bool request_done(bool ok) { + if (!ok) { + return false; + } + // invoke the method + // Call the RPC processing function + grpc::Status status = invoke_method_(&req_, &response_); + + next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done; + stream_.Write(response_, AsyncQpsServerTest::tag(this)); + return true; + } + + bool write_done(bool ok) { + if (ok) { + // Do another write! + // next_state_ is unchanged + stream_.Write(response_, AsyncQpsServerTest::tag(this)); + } else { // must be done so let's finish + next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done; + stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); + } + return true; + } + bool finish_done(bool ok) { return false; /* reset the context */ } + + std::unique_ptr<ServerContextType> srv_ctx_; + RequestType req_; + ResponseType response_; + bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool); + std::function<void(ServerContextType *, RequestType *, + grpc::ServerAsyncWriter<ResponseType> *, void *)> + request_method_; + std::function<grpc::Status(const RequestType *, ResponseType *)> + invoke_method_; + grpc::ServerAsyncWriter<ResponseType> stream_; + }; + std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; @@ -390,6 +562,9 @@ std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) { config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, + &BenchmarkService::AsyncService::RequestStreamingFromClient, + &BenchmarkService::AsyncService::RequestStreamingFromServer, + &BenchmarkService::AsyncService::RequestStreamingBothWays, ProcessSimpleRPC)); } std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) { @@ -397,7 +572,8 @@ std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) { new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService, grpc::GenericServerContext>( config, RegisterGenericService, nullptr, - &grpc::AsyncGenericService::RequestCall, ProcessGenericRPC)); + &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr, + ProcessGenericRPC)); } } // namespace testing diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index f79284d225..f04465e261 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -31,6 +31,9 @@ * */ +#include <atomic> +#include <thread> + #include <grpc++/resource_quota.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> @@ -52,12 +55,9 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service { public: Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) override { - if (request->response_size() > 0) { - if (!Server::SetPayload(request->response_type(), - request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } + auto s = SetResponse(request, response); + if (!s.ok()) { + return s; } return Status::OK; } @@ -67,12 +67,9 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service { SimpleRequest request; while (stream->Read(&request)) { SimpleResponse response; - if (request.response_size() > 0) { - if (!Server::SetPayload(request.response_type(), - request.response_size(), - response.mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } + auto s = SetResponse(&request, &response); + if (!s.ok()) { + return s; } if (!stream->Write(response)) { return Status(StatusCode::INTERNAL, "Server couldn't respond"); @@ -80,6 +77,96 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service { } return Status::OK; } + Status StreamingFromClient(ServerContext* context, + ServerReader<SimpleRequest>* stream, + SimpleResponse* response) override { + auto s = ClientPull(context, stream, response); + if (!s.ok()) { + return s; + } + return Status::OK; + } + Status StreamingFromServer(ServerContext* context, + const SimpleRequest* request, + ServerWriter<SimpleResponse>* stream) override { + SimpleResponse response; + auto s = SetResponse(request, &response); + if (!s.ok()) { + return s; + } + return ServerPush(context, stream, response, nullptr); + } + Status StreamingBothWays( + ServerContext* context, + ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override { + // Read the first client message to setup server response + SimpleRequest request; + if (!stream->Read(&request)) { + return Status::OK; + } + SimpleResponse response; + auto s = SetResponse(&request, &response); + if (!s.ok()) { + return s; + } + std::atomic_bool done; + Status sp; + std::thread t([context, stream, &response, &done, &sp]() { + sp = ServerPush(context, stream, response, [&done]() { + return done.load(std::memory_order_relaxed); + }); + }); + SimpleResponse dummy; + auto cp = ClientPull(context, stream, &dummy); + done.store(true, std::memory_order_relaxed); // can be lazy + t.join(); + if (!cp.ok()) { + return cp; + } + if (!sp.ok()) { + return sp; + } + return Status::OK; + } + + private: + static Status ClientPull(ServerContext* context, + ReaderInterface<SimpleRequest>* stream, + SimpleResponse* response) { + SimpleRequest request; + while (stream->Read(&request)) { + } + if (request.response_size() > 0) { + if (!Server::SetPayload(request.response_type(), request.response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; + } + static Status ServerPush(ServerContext* context, + WriterInterface<SimpleResponse>* stream, + const SimpleResponse& response, + std::function<bool()> done) { + while ((done == nullptr) || !done()) { + // TODO(vjpai): Add potential for rate-pacing on this + if (!stream->Write(response)) { + return Status(StatusCode::INTERNAL, "Server couldn't push"); + } + } + return Status::OK; + } + static Status SetResponse(const SimpleRequest* request, + SimpleResponse* response) { + if (request->response_size() > 0) { + if (!Server::SetPayload(request->response_type(), + request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; + } }; class SynchronousServer final : public grpc::testing::Server { |