aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/driver.cc
diff options
context:
space:
mode:
authorGravatar murgatroid99 <michael.lumish@gmail.com>2017-01-17 12:02:50 -0800
committerGravatar murgatroid99 <michael.lumish@gmail.com>2017-01-17 12:02:50 -0800
commit58233dbc1a7893756d33ecfc1ec7ffe6607c3168 (patch)
tree8e966022d92babe3fef8b956e78839b425d0c996 /test/cpp/qps/driver.cc
parentc781f6451887ee5b47623c511ba337b08532f583 (diff)
parent1e2775861fbdbaa6f00553b5eaf708e46efc10a6 (diff)
Merge branch 'master' into makefile_ruby_version_fixes
Diffstat (limited to 'test/cpp/qps/driver.cc')
-rw-r--r--test/cpp/qps/driver.cc128
1 files changed, 79 insertions, 49 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index ea0b38e8ad..93ef32db77 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -44,6 +44,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/env.h"
@@ -99,23 +100,36 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
return hosts;
}
-static deque<string> get_workers(const string& name) {
- char* env = gpr_getenv(name.c_str());
- if (!env) return deque<string>();
-
+static deque<string> get_workers(const string& env_name) {
+ char* env = gpr_getenv(env_name.c_str());
+ if (!env) {
+ env = gpr_strdup("");
+ }
deque<string> out;
char* p = env;
- for (;;) {
- char* comma = strchr(p, ',');
- if (comma) {
- out.emplace_back(p, comma);
- p = comma + 1;
- } else {
- out.emplace_back(p);
- gpr_free(env);
- return out;
+ if (strlen(env) != 0) {
+ for (;;) {
+ char* comma = strchr(p, ',');
+ if (comma) {
+ out.emplace_back(p, comma);
+ p = comma + 1;
+ } else {
+ out.emplace_back(p);
+ break;
+ }
}
}
+ if (out.size() == 0) {
+ gpr_log(GPR_ERROR,
+ "Environment variable \"%s\" does not contain a list of QPS "
+ "workers to use. Set it to a comma-separated list of "
+ "hostname:port pairs, starting with hosts that should act as "
+ "servers. E.g. export "
+ "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
+ env_name.c_str(), env_name.c_str());
+ }
+ gpr_free(env);
+ return out;
}
// helpers for postprocess_scenario_result
@@ -195,7 +209,8 @@ static void postprocess_scenario_result(ScenarioResult* result) {
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& initial_server_config, size_t num_servers,
- int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) {
+ int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
+ const char* qps_server_target_override, bool configure_core_lists) {
// Log everything from the driver
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
@@ -240,9 +255,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
workers.push_back(addr);
}
}
-
- // Setup the hosts and core counts
- auto hosts_cores = get_hosts_and_cores(workers);
+ GPR_ASSERT(workers.size() != 0);
// if num_clients is set to <=0, do dynamic sizing: all workers
// except for servers are clients
@@ -264,6 +277,11 @@ std::unique_ptr<ScenarioResult> RunScenario(
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
std::vector<ServerData> servers(num_servers);
+ std::unordered_map<string, std::deque<int>> hosts_cores;
+
+ if (configure_core_lists) {
+ hosts_cores = get_hosts_and_cores(workers);
+ }
for (size_t i = 0; i < num_servers; i++) {
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
workers[i].c_str(), i);
@@ -271,37 +289,36 @@ std::unique_ptr<ScenarioResult> RunScenario(
CreateChannel(workers[i], InsecureChannelCredentials()));
ServerConfig server_config = initial_server_config;
- char* host;
- char* driver_port;
- char* cli_target;
- gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
- string host_str(host);
int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit();
- if (server_core_limit == 0 && client_core_limit > 0) {
- // In this case, limit the server cores if it matches the
- // same host as one or more clients
- const auto& dq = hosts_cores.at(host_str);
- bool match = false;
- int limit = dq.size();
- for (size_t cli = 0; cli < num_clients; cli++) {
- if (host_str == get_host(workers[cli + num_servers])) {
- limit -= client_core_limit;
- match = true;
+ if (configure_core_lists) {
+ string host_str(get_host(workers[i]));
+ if (server_core_limit == 0 && client_core_limit > 0) {
+ // In this case, limit the server cores if it matches the
+ // same host as one or more clients
+ const auto& dq = hosts_cores.at(host_str);
+ bool match = false;
+ int limit = dq.size();
+ for (size_t cli = 0; cli < num_clients; cli++) {
+ if (host_str == get_host(workers[cli + num_servers])) {
+ limit -= client_core_limit;
+ match = true;
+ }
+ }
+ if (match) {
+ GPR_ASSERT(limit > 0);
+ server_core_limit = limit;
}
}
- if (match) {
- GPR_ASSERT(limit > 0);
- server_core_limit = limit;
- }
- }
- if (server_core_limit > 0) {
- auto& dq = hosts_cores.at(host_str);
- GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
- for (int core = 0; core < server_core_limit; core++) {
- server_config.add_core_list(dq.front());
- dq.pop_front();
+ if (server_core_limit > 0) {
+ auto& dq = hosts_cores.at(host_str);
+ GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
+ gpr_log(GPR_INFO, "Setting server core_list");
+ for (int core = 0; core < server_core_limit; core++) {
+ server_config.add_core_list(dq.front());
+ dq.pop_front();
+ }
}
}
@@ -315,11 +332,19 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
- gpr_join_host_port(&cli_target, host, init_status.port());
- client_config.add_server_targets(cli_target);
- gpr_free(host);
- gpr_free(driver_port);
- gpr_free(cli_target);
+ if (qps_server_target_override != NULL &&
+ strlen(qps_server_target_override) > 0) {
+ // overriding the qps server target only works if there is 1 server
+ GPR_ASSERT(num_servers == 1);
+ client_config.add_server_targets(qps_server_target_override);
+ } else {
+ std::string host;
+ char* cli_target;
+ host = get_host(workers[i]);
+ gpr_join_host_port(&cli_target, host.c_str(), init_status.port());
+ client_config.add_server_targets(cli_target);
+ gpr_free(cli_target);
+ }
}
// Targets are all set by now
@@ -341,7 +366,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit();
- if ((server_core_limit > 0) || (client_core_limit > 0)) {
+ if (configure_core_lists &&
+ ((server_core_limit > 0) || (client_core_limit > 0))) {
auto& dq = hosts_cores.at(get_host(worker));
if (client_core_limit == 0) {
// limit client cores if it matches a server host
@@ -359,6 +385,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
if (client_core_limit > 0) {
GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
+ gpr_log(GPR_INFO, "Setting client core_list");
for (int core = 0; core < client_core_limit; core++) {
per_client_config.add_core_list(dq.front());
dq.pop_front();
@@ -548,6 +575,9 @@ bool RunQuit() {
// Get client, server lists
bool result = true;
auto workers = get_workers("QPS_WORKERS");
+ if (workers.size() == 0) {
+ return false;
+ }
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureChannelCredentials()));