aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/driver.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/driver.cc')
-rw-r--r--test/cpp/qps/driver.cc154
1 files changed, 52 insertions, 102 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index ea0b38e8ad..74fe3662c1 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -44,6 +44,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/env.h"
@@ -75,47 +76,36 @@ static std::string get_host(const std::string& worker) {
return s;
}
-static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
- const deque<string>& workers) {
- std::unordered_map<string, std::deque<int>> hosts;
- for (auto it = workers.begin(); it != workers.end(); it++) {
- const string host = get_host(*it);
- if (hosts.find(host) == hosts.end()) {
- auto stub = WorkerService::NewStub(
- CreateChannel(*it, InsecureChannelCredentials()));
- grpc::ClientContext ctx;
- ctx.set_wait_for_ready(true);
- CoreRequest dummy;
- CoreResponse cores;
- grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
- GPR_ASSERT(s.ok());
- std::deque<int> dq;
- for (int i = 0; i < cores.cores(); i++) {
- dq.push_back(i);
- }
- hosts[host] = dq;
- }
+static deque<string> get_workers(const string& env_name) {
+ char* env = gpr_getenv(env_name.c_str());
+ if (!env) {
+ env = gpr_strdup("");
}
- return hosts;
-}
-
-static deque<string> get_workers(const string& name) {
- char* env = gpr_getenv(name.c_str());
- if (!env) return deque<string>();
-
deque<string> out;
char* p = env;
- for (;;) {
- char* comma = strchr(p, ',');
- if (comma) {
- out.emplace_back(p, comma);
- p = comma + 1;
- } else {
- out.emplace_back(p);
- gpr_free(env);
- return out;
+ if (strlen(env) != 0) {
+ for (;;) {
+ char* comma = strchr(p, ',');
+ if (comma) {
+ out.emplace_back(p, comma);
+ p = comma + 1;
+ } else {
+ out.emplace_back(p);
+ break;
+ }
}
}
+ if (out.size() == 0) {
+ gpr_log(GPR_ERROR,
+ "Environment variable \"%s\" does not contain a list of QPS "
+ "workers to use. Set it to a comma-separated list of "
+ "hostname:port pairs, starting with hosts that should act as "
+ "servers. E.g. export "
+ "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
+ env_name.c_str(), env_name.c_str());
+ }
+ gpr_free(env);
+ return out;
}
// helpers for postprocess_scenario_result
@@ -195,7 +185,8 @@ static void postprocess_scenario_result(ScenarioResult* result) {
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& initial_server_config, size_t num_servers,
- int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) {
+ int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
+ const char* qps_server_target_override) {
// Log everything from the driver
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
@@ -240,9 +231,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
workers.push_back(addr);
}
}
-
- // Setup the hosts and core counts
- auto hosts_cores = get_hosts_and_cores(workers);
+ GPR_ASSERT(workers.size() != 0);
// if num_clients is set to <=0, do dynamic sizing: all workers
// except for servers are clients
@@ -264,6 +253,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
std::vector<ServerData> servers(num_servers);
+ std::unordered_map<string, std::deque<int>> hosts_cores;
+
for (size_t i = 0; i < num_servers; i++) {
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
workers[i].c_str(), i);
@@ -271,38 +262,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
CreateChannel(workers[i], InsecureChannelCredentials()));
ServerConfig server_config = initial_server_config;
- char* host;
- char* driver_port;
- char* cli_target;
- gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
- string host_str(host);
- int server_core_limit = initial_server_config.core_limit();
- int client_core_limit = initial_client_config.core_limit();
-
- if (server_core_limit == 0 && client_core_limit > 0) {
- // In this case, limit the server cores if it matches the
- // same host as one or more clients
- const auto& dq = hosts_cores.at(host_str);
- bool match = false;
- int limit = dq.size();
- for (size_t cli = 0; cli < num_clients; cli++) {
- if (host_str == get_host(workers[cli + num_servers])) {
- limit -= client_core_limit;
- match = true;
- }
- }
- if (match) {
- GPR_ASSERT(limit > 0);
- server_core_limit = limit;
- }
- }
- if (server_core_limit > 0) {
- auto& dq = hosts_cores.at(host_str);
- GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
- for (int core = 0; core < server_core_limit; core++) {
- server_config.add_core_list(dq.front());
- dq.pop_front();
- }
+ if (server_config.core_limit() != 0) {
+ gpr_log(GPR_ERROR,
+ "server config core limit is set but ignored by driver");
}
ServerArgs args;
@@ -315,11 +277,19 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
- gpr_join_host_port(&cli_target, host, init_status.port());
- client_config.add_server_targets(cli_target);
- gpr_free(host);
- gpr_free(driver_port);
- gpr_free(cli_target);
+ if (qps_server_target_override != NULL &&
+ strlen(qps_server_target_override) > 0) {
+ // overriding the qps server target only works if there is 1 server
+ GPR_ASSERT(num_servers == 1);
+ client_config.add_server_targets(qps_server_target_override);
+ } else {
+ std::string host;
+ char* cli_target;
+ host = get_host(workers[i]);
+ gpr_join_host_port(&cli_target, host.c_str(), init_status.port());
+ client_config.add_server_targets(cli_target);
+ gpr_free(cli_target);
+ }
}
// Targets are all set by now
@@ -339,31 +309,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
CreateChannel(worker, InsecureChannelCredentials()));
ClientConfig per_client_config = client_config;
- int server_core_limit = initial_server_config.core_limit();
- int client_core_limit = initial_client_config.core_limit();
- if ((server_core_limit > 0) || (client_core_limit > 0)) {
- auto& dq = hosts_cores.at(get_host(worker));
- if (client_core_limit == 0) {
- // limit client cores if it matches a server host
- bool match = false;
- int limit = dq.size();
- for (size_t srv = 0; srv < num_servers; srv++) {
- if (get_host(worker) == get_host(workers[srv])) {
- match = true;
- }
- }
- if (match) {
- GPR_ASSERT(limit > 0);
- client_core_limit = limit;
- }
- }
- if (client_core_limit > 0) {
- GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
- for (int core = 0; core < client_core_limit; core++) {
- per_client_config.add_core_list(dq.front());
- dq.pop_front();
- }
- }
+ if (initial_client_config.core_limit() != 0) {
+ gpr_log(GPR_ERROR, "client config core limit set but ignored");
}
// Reduce channel count so that total channels specified is held regardless
@@ -548,6 +495,9 @@ bool RunQuit() {
// Get client, server lists
bool result = true;
auto workers = get_workers("QPS_WORKERS");
+ if (workers.size() == 0) {
+ return false;
+ }
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureChannelCredentials()));