diff options
author | murgatroid99 <michael.lumish@gmail.com> | 2017-01-17 12:02:50 -0800 |
---|---|---|
committer | murgatroid99 <michael.lumish@gmail.com> | 2017-01-17 12:02:50 -0800 |
commit | 58233dbc1a7893756d33ecfc1ec7ffe6607c3168 (patch) | |
tree | 8e966022d92babe3fef8b956e78839b425d0c996 /test/cpp/qps/driver.cc | |
parent | c781f6451887ee5b47623c511ba337b08532f583 (diff) | |
parent | 1e2775861fbdbaa6f00553b5eaf708e46efc10a6 (diff) |
Merge branch 'master' into makefile_ruby_version_fixes
Diffstat (limited to 'test/cpp/qps/driver.cc')
-rw-r--r-- | test/cpp/qps/driver.cc | 128 |
1 files changed, 79 insertions, 49 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index ea0b38e8ad..93ef32db77 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -44,6 +44,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/env.h" @@ -99,23 +100,36 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores( return hosts; } -static deque<string> get_workers(const string& name) { - char* env = gpr_getenv(name.c_str()); - if (!env) return deque<string>(); - +static deque<string> get_workers(const string& env_name) { + char* env = gpr_getenv(env_name.c_str()); + if (!env) { + env = gpr_strdup(""); + } deque<string> out; char* p = env; - for (;;) { - char* comma = strchr(p, ','); - if (comma) { - out.emplace_back(p, comma); - p = comma + 1; - } else { - out.emplace_back(p); - gpr_free(env); - return out; + if (strlen(env) != 0) { + for (;;) { + char* comma = strchr(p, ','); + if (comma) { + out.emplace_back(p, comma); + p = comma + 1; + } else { + out.emplace_back(p); + break; + } } } + if (out.size() == 0) { + gpr_log(GPR_ERROR, + "Environment variable \"%s\" does not contain a list of QPS " + "workers to use. Set it to a comma-separated list of " + "hostname:port pairs, starting with hosts that should act as " + "servers. E.g. export " + "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"", + env_name.c_str(), env_name.c_str()); + } + gpr_free(env); + return out; } // helpers for postprocess_scenario_result @@ -195,7 +209,8 @@ static void postprocess_scenario_result(ScenarioResult* result) { std::unique_ptr<ScenarioResult> RunScenario( const ClientConfig& initial_client_config, size_t num_clients, const ServerConfig& initial_server_config, size_t num_servers, - int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { + int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, + const char* qps_server_target_override, bool configure_core_lists) { // Log everything from the driver gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); @@ -240,9 +255,7 @@ std::unique_ptr<ScenarioResult> RunScenario( workers.push_back(addr); } } - - // Setup the hosts and core counts - auto hosts_cores = get_hosts_and_cores(workers); + GPR_ASSERT(workers.size() != 0); // if num_clients is set to <=0, do dynamic sizing: all workers // except for servers are clients @@ -264,6 +277,11 @@ std::unique_ptr<ScenarioResult> RunScenario( unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; }; std::vector<ServerData> servers(num_servers); + std::unordered_map<string, std::deque<int>> hosts_cores; + + if (configure_core_lists) { + hosts_cores = get_hosts_and_cores(workers); + } for (size_t i = 0; i < num_servers; i++) { gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")", workers[i].c_str(), i); @@ -271,37 +289,36 @@ std::unique_ptr<ScenarioResult> RunScenario( CreateChannel(workers[i], InsecureChannelCredentials())); ServerConfig server_config = initial_server_config; - char* host; - char* driver_port; - char* cli_target; - gpr_split_host_port(workers[i].c_str(), &host, &driver_port); - string host_str(host); int server_core_limit = initial_server_config.core_limit(); int client_core_limit = initial_client_config.core_limit(); - if (server_core_limit == 0 && client_core_limit > 0) { - // In this case, limit the server cores if it matches the - // same host as one or more clients - const auto& dq = hosts_cores.at(host_str); - bool match = false; - int limit = dq.size(); - for (size_t cli = 0; cli < num_clients; cli++) { - if (host_str == get_host(workers[cli + num_servers])) { - limit -= client_core_limit; - match = true; + if (configure_core_lists) { + string host_str(get_host(workers[i])); + if (server_core_limit == 0 && client_core_limit > 0) { + // In this case, limit the server cores if it matches the + // same host as one or more clients + const auto& dq = hosts_cores.at(host_str); + bool match = false; + int limit = dq.size(); + for (size_t cli = 0; cli < num_clients; cli++) { + if (host_str == get_host(workers[cli + num_servers])) { + limit -= client_core_limit; + match = true; + } + } + if (match) { + GPR_ASSERT(limit > 0); + server_core_limit = limit; } } - if (match) { - GPR_ASSERT(limit > 0); - server_core_limit = limit; - } - } - if (server_core_limit > 0) { - auto& dq = hosts_cores.at(host_str); - GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit)); - for (int core = 0; core < server_core_limit; core++) { - server_config.add_core_list(dq.front()); - dq.pop_front(); + if (server_core_limit > 0) { + auto& dq = hosts_cores.at(host_str); + GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit)); + gpr_log(GPR_INFO, "Setting server core_list"); + for (int core = 0; core < server_core_limit; core++) { + server_config.add_core_list(dq.front()); + dq.pop_front(); + } } } @@ -315,11 +332,19 @@ std::unique_ptr<ScenarioResult> RunScenario( if (!servers[i].stream->Read(&init_status)) { gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i); } - gpr_join_host_port(&cli_target, host, init_status.port()); - client_config.add_server_targets(cli_target); - gpr_free(host); - gpr_free(driver_port); - gpr_free(cli_target); + if (qps_server_target_override != NULL && + strlen(qps_server_target_override) > 0) { + // overriding the qps server target only works if there is 1 server + GPR_ASSERT(num_servers == 1); + client_config.add_server_targets(qps_server_target_override); + } else { + std::string host; + char* cli_target; + host = get_host(workers[i]); + gpr_join_host_port(&cli_target, host.c_str(), init_status.port()); + client_config.add_server_targets(cli_target); + gpr_free(cli_target); + } } // Targets are all set by now @@ -341,7 +366,8 @@ std::unique_ptr<ScenarioResult> RunScenario( int server_core_limit = initial_server_config.core_limit(); int client_core_limit = initial_client_config.core_limit(); - if ((server_core_limit > 0) || (client_core_limit > 0)) { + if (configure_core_lists && + ((server_core_limit > 0) || (client_core_limit > 0))) { auto& dq = hosts_cores.at(get_host(worker)); if (client_core_limit == 0) { // limit client cores if it matches a server host @@ -359,6 +385,7 @@ std::unique_ptr<ScenarioResult> RunScenario( } if (client_core_limit > 0) { GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit)); + gpr_log(GPR_INFO, "Setting client core_list"); for (int core = 0; core < client_core_limit; core++) { per_client_config.add_core_list(dq.front()); dq.pop_front(); @@ -548,6 +575,9 @@ bool RunQuit() { // Get client, server lists bool result = true; auto workers = get_workers("QPS_WORKERS"); + if (workers.size() == 0) { + return false; + } for (size_t i = 0; i < workers.size(); i++) { auto stub = WorkerService::NewStub( CreateChannel(workers[i], InsecureChannelCredentials())); |