diff options
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/BUILD | 5 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 89 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 36 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 9 | ||||
-rw-r--r-- | test/cpp/qps/inproc_sync_unary_ping_pong_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/qps_benchmark_script.bzl | 1 | ||||
-rw-r--r-- | test/cpp/qps/qps_json_driver.cc | 111 | ||||
-rw-r--r-- | test/cpp/qps/qps_openloop_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/secure_sync_unary_ping_pong_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/server_callback.cc | 46 |
10 files changed, 222 insertions, 81 deletions
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD index 26f43284a6..8855a1c155 100644 --- a/test/cpp/qps/BUILD +++ b/test/cpp/qps/BUILD @@ -60,7 +60,6 @@ grpc_cc_library( "//src/proto/grpc/testing:payloads_proto", "//src/proto/grpc/testing:worker_service_proto", "//test/core/end2end:ssl_test_data", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_config", "//test/cpp/util:test_util", @@ -86,7 +85,6 @@ grpc_cc_library( "//src/proto/grpc/testing:messages_proto", "//src/proto/grpc/testing:report_qps_scenario_service_proto", "//src/proto/grpc/testing:worker_service_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -170,7 +168,7 @@ grpc_cc_test( grpc_cc_test( name = "qps_openloop_test", srcs = ["qps_openloop_test.cc"], - data = ["//third_party/toolchains:RBE_USE_MACHINE_TYPE_LARGE"], + exec_compatible_with = ["//third_party/toolchains/machine_size:large"], deps = [ ":benchmark_config", ":driver_impl", @@ -205,7 +203,6 @@ grpc_cc_binary( deps = [ ":qps_worker_impl", "//:grpc++", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_config", "//test/cpp/util:test_util", diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 668d941916..73f91eed2d 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -429,13 +429,7 @@ class ClientImpl : public Client { config.server_targets(i % config.server_targets_size()), config, create_stub_, i); } - std::vector<std::unique_ptr<std::thread>> connecting_threads; - for (auto& c : channels_) { - connecting_threads.emplace_back(c.WaitForReady()); - } - for (auto& t : connecting_threads) { - t->join(); - } + WaitForChannelsToConnect(); median_latency_collection_interval_seconds_ = config.median_latency_collection_interval_millis() / 1e3; ClientRequestCreator<RequestType> create_req(&request_, @@ -443,6 +437,61 @@ class ClientImpl : public Client { } virtual ~ClientImpl() {} + void WaitForChannelsToConnect() { + int connect_deadline_seconds = 10; + /* Allow optionally overriding connect_deadline in order + * to deal with benchmark environments in which the server + * can take a long time to become ready. */ + char* channel_connect_timeout_str = + gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); + if (channel_connect_timeout_str != nullptr && + strcmp(channel_connect_timeout_str, "") != 0) { + connect_deadline_seconds = atoi(channel_connect_timeout_str); + } + gpr_log(GPR_INFO, + "Waiting for up to %d seconds for all channels to connect", + connect_deadline_seconds); + gpr_free(channel_connect_timeout_str); + gpr_timespec connect_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN)); + CompletionQueue cq; + size_t num_remaining = 0; + for (auto& c : channels_) { + if (!c.is_inproc()) { + Channel* channel = c.get_channel(); + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + } else { + num_remaining++; + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + while (num_remaining > 0) { + bool ok = false; + void* tag = nullptr; + cq.Next(&tag, &ok); + Channel* channel = static_cast<Channel*>(tag); + if (!ok) { + gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline", + channel); + abort(); + } else { + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + num_remaining--; + } else { + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + } + protected: const int cores_; RequestType request_; @@ -485,31 +534,7 @@ class ClientImpl : public Client { } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } - - std::unique_ptr<std::thread> WaitForReady() { - return std::unique_ptr<std::thread>(new std::thread([this]() { - if (!is_inproc_) { - int connect_deadline = 10; - /* Allow optionally overriding connect_deadline in order - * to deal with benchmark environments in which the server - * can take a long time to become ready. */ - char* channel_connect_timeout_str = - gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); - if (channel_connect_timeout_str != nullptr && - strcmp(channel_connect_timeout_str, "") != 0) { - connect_deadline = atoi(channel_connect_timeout_str); - } - gpr_log(GPR_INFO, - "Waiting for up to %d seconds for the channel %p to connect", - connect_deadline, channel_.get()); - gpr_free(channel_connect_timeout_str); - GPR_ASSERT(channel_->WaitForConnected(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN)))); - gpr_log(GPR_INFO, "Channel %p connected!", channel_.get()); - } - })); - } + bool is_inproc() { return is_inproc_; } private: void set_channel_args(const ClientConfig& config, ChannelArguments* args) { diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 11cfb4aa05..181e11f12b 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -95,6 +95,17 @@ static deque<string> get_workers(const string& env_name) { return out; } +std::string GetCredType( + const std::string& worker_addr, + const std::map<std::string, std::string>& per_worker_credential_types, + const std::string& credential_type) { + auto it = per_worker_credential_types.find(worker_addr); + if (it != per_worker_credential_types.end()) { + return it->second; + } + return credential_type; +} + // helpers for postprocess_scenario_result static double WallTime(const ClientStats& s) { return s.time_elapsed(); } static double SystemTime(const ClientStats& s) { return s.time_system(); } @@ -198,8 +209,9 @@ std::unique_ptr<ScenarioResult> RunScenario( const ServerConfig& initial_server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, const grpc::string& qps_server_target_override, - const grpc::string& credential_type, bool run_inproc, - int32_t median_latency_collection_interval_millis) { + const grpc::string& credential_type, + const std::map<std::string, std::string>& per_worker_credential_types, + bool run_inproc, int32_t median_latency_collection_interval_millis) { if (run_inproc) { g_inproc_servers = new std::vector<grpc::testing::Server*>; } @@ -278,7 +290,9 @@ std::unique_ptr<ScenarioResult> RunScenario( if (!run_inproc) { servers[i].stub = WorkerService::NewStub(CreateChannel( workers[i], GetCredentialsProvider()->GetChannelCredentials( - credential_type, &channel_args))); + GetCredType(workers[i], per_worker_credential_types, + credential_type), + &channel_args))); } else { servers[i].stub = WorkerService::NewStub( local_workers[i]->InProcessChannel(channel_args)); @@ -335,9 +349,11 @@ std::unique_ptr<ScenarioResult> RunScenario( gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")", worker.c_str(), i + num_servers); if (!run_inproc) { - clients[i].stub = WorkerService::NewStub( - CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials( - credential_type, &channel_args))); + clients[i].stub = WorkerService::NewStub(CreateChannel( + worker, + GetCredentialsProvider()->GetChannelCredentials( + GetCredType(worker, per_worker_credential_types, credential_type), + &channel_args))); } else { clients[i].stub = WorkerService::NewStub( local_workers[i + num_servers]->InProcessChannel(channel_args)); @@ -529,7 +545,9 @@ std::unique_ptr<ScenarioResult> RunScenario( return result; } -bool RunQuit(const grpc::string& credential_type) { +bool RunQuit( + const grpc::string& credential_type, + const std::map<std::string, std::string>& per_worker_credential_types) { // Get client, server lists bool result = true; auto workers = get_workers("QPS_WORKERS"); @@ -541,7 +559,9 @@ bool RunQuit(const grpc::string& credential_type) { for (size_t i = 0; i < workers.size(); i++) { auto stub = WorkerService::NewStub(CreateChannel( workers[i], GetCredentialsProvider()->GetChannelCredentials( - credential_type, &channel_args))); + GetCredType(workers[i], per_worker_credential_types, + credential_type), + &channel_args))); Void dummy; grpc::ClientContext ctx; ctx.set_wait_for_ready(true); diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index cda89f7ddf..568871d2da 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -32,10 +32,13 @@ std::unique_ptr<ScenarioResult> RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, const grpc::string& qps_server_target_override, - const grpc::string& credential_type, bool run_inproc, - int32_t median_latency_collection_interval_millis); + const grpc::string& credential_type, + const std::map<std::string, std::string>& per_worker_credential_types, + bool run_inproc, int32_t median_latency_collection_interval_millis); -bool RunQuit(const grpc::string& credential_type); +bool RunQuit( + const grpc::string& credential_type, + const std::map<std::string, std::string>& per_worker_credential_types); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc index 56d1730252..6257e42ebf 100644 --- a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc @@ -48,7 +48,7 @@ static void RunSynchronousUnaryPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", - kInsecureCredentialsType, true, 0); + kInsecureCredentialsType, {}, true, 0); GetReporter()->ReportQPS(*result); GetReporter()->ReportLatency(*result); diff --git a/test/cpp/qps/qps_benchmark_script.bzl b/test/cpp/qps/qps_benchmark_script.bzl index b2b67d988c..855caa0d37 100644 --- a/test/cpp/qps/qps_benchmark_script.bzl +++ b/test/cpp/qps/qps_benchmark_script.bzl @@ -69,7 +69,6 @@ def json_run_localhost_batch(): ], deps = [ "//:gpr", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_config", "//test/cpp/util:test_util", diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index eaa0dd992c..1d67e79b86 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -65,6 +65,16 @@ DEFINE_string(json_file_out, "", "File to write the JSON output to."); DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType, "Credential type for communication with workers"); +DEFINE_string( + per_worker_credential_types, "", + "A map of QPS worker addresses to credential types. When creating a " + "channel to a QPS worker's driver port, the qps_json_driver first checks " + "if the 'name:port' string is in the map, and it uses the corresponding " + "credential type if so. If the QPS worker's 'name:port' string is not " + "in the map, then the driver -> worker channel will be created with " + "the credentials specified in --credential_type. The value of this flag " + "is a semicolon-separated list of map entries, where each map entry is " + "a comma-separated pair."); DEFINE_bool(run_inproc, false, "Perform an in-process transport test"); DEFINE_int32( median_latency_collection_interval_millis, 0, @@ -75,16 +85,53 @@ DEFINE_int32( namespace grpc { namespace testing { -static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario, - bool* success) { +static std::map<std::string, std::string> +ConstructPerWorkerCredentialTypesMap() { + // Parse a list of the form: "addr1,cred_type1;addr2,cred_type2;..." into + // a map. + std::string remaining = FLAGS_per_worker_credential_types; + std::map<std::string, std::string> out; + while (!remaining.empty()) { + size_t next_semicolon = remaining.find(';'); + std::string next_entry = remaining.substr(0, next_semicolon); + if (next_semicolon == std::string::npos) { + remaining = ""; + } else { + remaining = remaining.substr(next_semicolon + 1, std::string::npos); + } + size_t comma = next_entry.find(','); + if (comma == std::string::npos) { + gpr_log(GPR_ERROR, + "Expectd --per_worker_credential_types to be a list " + "of the form: 'addr1,cred_type1;addr2,cred_type2;...' " + "into."); + abort(); + } + std::string addr = next_entry.substr(0, comma); + std::string cred_type = next_entry.substr(comma + 1, std::string::npos); + if (out.find(addr) != out.end()) { + gpr_log(GPR_ERROR, + "Found duplicate addr in per_worker_credential_types."); + abort(); + } + out[addr] = cred_type; + } + return out; +} + +static std::unique_ptr<ScenarioResult> RunAndReport( + const Scenario& scenario, + const std::map<std::string, std::string>& per_worker_credential_types, + bool* success) { std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n"; - auto result = RunScenario( - scenario.client_config(), scenario.num_clients(), - scenario.server_config(), scenario.num_servers(), - scenario.warmup_seconds(), scenario.benchmark_seconds(), - !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2, - FLAGS_qps_server_target_override, FLAGS_credential_type, FLAGS_run_inproc, - FLAGS_median_latency_collection_interval_millis); + auto result = + RunScenario(scenario.client_config(), scenario.num_clients(), + scenario.server_config(), scenario.num_servers(), + scenario.warmup_seconds(), scenario.benchmark_seconds(), + !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2, + FLAGS_qps_server_target_override, FLAGS_credential_type, + per_worker_credential_types, FLAGS_run_inproc, + FLAGS_median_latency_collection_interval_millis); // Amend the result with scenario config. Eventually we should adjust // RunScenario contract so we don't need to touch the result here. @@ -115,21 +162,26 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario, return result; } -static double GetCpuLoad(Scenario* scenario, double offered_load, - bool* success) { +static double GetCpuLoad( + Scenario* scenario, double offered_load, + const std::map<std::string, std::string>& per_worker_credential_types, + bool* success) { scenario->mutable_client_config() ->mutable_load_params() ->mutable_poisson() ->set_offered_load(offered_load); - auto result = RunAndReport(*scenario, success); + auto result = RunAndReport(*scenario, per_worker_credential_types, success); return result->summary().server_cpu_usage(); } -static double BinarySearch(Scenario* scenario, double targeted_cpu_load, - double low, double high, bool* success) { +static double BinarySearch( + Scenario* scenario, double targeted_cpu_load, double low, double high, + const std::map<std::string, std::string>& per_worker_credential_types, + bool* success) { while (low <= high * (1 - FLAGS_error_tolerance)) { double mid = low + (high - low) / 2; - double current_cpu_load = GetCpuLoad(scenario, mid, success); + double current_cpu_load = + GetCpuLoad(scenario, mid, per_worker_credential_types, success); gpr_log(GPR_DEBUG, "Binary Search: current_offered_load %.0f", mid); if (!*success) { gpr_log(GPR_ERROR, "Client/Server Failure"); @@ -145,12 +197,14 @@ static double BinarySearch(Scenario* scenario, double targeted_cpu_load, return low; } -static double SearchOfferedLoad(double initial_offered_load, - double targeted_cpu_load, Scenario* scenario, - bool* success) { +static double SearchOfferedLoad( + double initial_offered_load, double targeted_cpu_load, Scenario* scenario, + const std::map<std::string, std::string>& per_worker_credential_types, + bool* success) { std::cerr << "RUNNING SCENARIO: " << scenario->name() << "\n"; double current_offered_load = initial_offered_load; - double current_cpu_load = GetCpuLoad(scenario, current_offered_load, success); + double current_cpu_load = GetCpuLoad(scenario, current_offered_load, + per_worker_credential_types, success); if (current_cpu_load > targeted_cpu_load) { gpr_log(GPR_ERROR, "Initial offered load too high"); return -1; @@ -158,14 +212,15 @@ static double SearchOfferedLoad(double initial_offered_load, while (*success && (current_cpu_load < targeted_cpu_load)) { current_offered_load *= 2; - current_cpu_load = GetCpuLoad(scenario, current_offered_load, success); + current_cpu_load = GetCpuLoad(scenario, current_offered_load, + per_worker_credential_types, success); gpr_log(GPR_DEBUG, "Binary Search: current_offered_load %.0f", current_offered_load); } double targeted_offered_load = BinarySearch(scenario, targeted_cpu_load, current_offered_load / 2, - current_offered_load, success); + current_offered_load, per_worker_credential_types, success); return targeted_offered_load; } @@ -183,6 +238,7 @@ static bool QpsDriver() { abort(); } + auto per_worker_credential_types = ConstructPerWorkerCredentialTypesMap(); if (scfile) { // Read the json data from disk FILE* json_file = fopen(FLAGS_scenarios_file.c_str(), "r"); @@ -198,7 +254,7 @@ static bool QpsDriver() { } else if (scjson) { json = FLAGS_scenarios_json.c_str(); } else if (FLAGS_quit) { - return RunQuit(FLAGS_credential_type); + return RunQuit(FLAGS_credential_type, per_worker_credential_types); } // Parse into an array of scenarios @@ -212,15 +268,16 @@ static bool QpsDriver() { for (int i = 0; i < scenarios.scenarios_size(); i++) { if (FLAGS_search_param == "") { const Scenario& scenario = scenarios.scenarios(i); - RunAndReport(scenario, &success); + RunAndReport(scenario, per_worker_credential_types, &success); } else { if (FLAGS_search_param == "offered_load") { Scenario* scenario = scenarios.mutable_scenarios(i); - double targeted_offered_load = - SearchOfferedLoad(FLAGS_initial_search_value, - FLAGS_targeted_cpu_load, scenario, &success); + double targeted_offered_load = SearchOfferedLoad( + FLAGS_initial_search_value, FLAGS_targeted_cpu_load, scenario, + per_worker_credential_types, &success); gpr_log(GPR_INFO, "targeted_offered_load %f", targeted_offered_load); - GetCpuLoad(scenario, targeted_offered_load, &success); + GetCpuLoad(scenario, targeted_offered_load, per_worker_credential_types, + &success); } else { gpr_log(GPR_ERROR, "Unimplemented search param"); } diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index 6044f4265a..68062e66f2 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -52,7 +52,7 @@ static void RunQPS() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", - kInsecureCredentialsType, false, 0); + kInsecureCredentialsType, {}, false, 0); GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc index a559c82cc8..422bd617eb 100644 --- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc @@ -55,7 +55,7 @@ static void RunSynchronousUnaryPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", - kInsecureCredentialsType, false, 0); + kInsecureCredentialsType, {}, false, 0); GetReporter()->ReportQPS(*result); GetReporter()->ReportLatency(*result); diff --git a/test/cpp/qps/server_callback.cc b/test/cpp/qps/server_callback.cc index 8bedd44739..4a346dd017 100644 --- a/test/cpp/qps/server_callback.cc +++ b/test/cpp/qps/server_callback.cc @@ -34,13 +34,53 @@ class BenchmarkCallbackServiceImpl final : public BenchmarkService::ExperimentalCallbackService { public: void UnaryCall( - ServerContext* context, const SimpleRequest* request, - SimpleResponse* response, - experimental::ServerCallbackRpcController* controller) override { + ServerContext* context, const ::grpc::testing::SimpleRequest* request, + ::grpc::testing::SimpleResponse* response, + ::grpc::experimental::ServerCallbackRpcController* controller) override { auto s = SetResponse(request, response); controller->Finish(s); } + ::grpc::experimental::ServerBidiReactor<::grpc::testing::SimpleRequest, + ::grpc::testing::SimpleResponse>* + StreamingCall() override { + class Reactor + : public ::grpc::experimental::ServerBidiReactor< + ::grpc::testing::SimpleRequest, ::grpc::testing::SimpleResponse> { + public: + Reactor() {} + void OnStarted(ServerContext* context) override { StartRead(&request_); } + + void OnReadDone(bool ok) override { + if (!ok) { + Finish(::grpc::Status::OK); + return; + } + auto s = SetResponse(&request_, &response_); + if (!s.ok()) { + Finish(s); + return; + } + StartWrite(&response_); + } + + void OnWriteDone(bool ok) override { + if (!ok) { + Finish(::grpc::Status::OK); + return; + } + StartRead(&request_); + } + + void OnDone() override { delete (this); } + + private: + SimpleRequest request_; + SimpleResponse response_; + }; + return new Reactor; + } + private: static Status SetResponse(const SimpleRequest* request, SimpleResponse* response) { |