aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-02-01 13:00:19 -0800
committerGravatar Vijay Pai <vpai@google.com>2016-02-01 13:00:19 -0800
commit7d45cdb60b1f6f9cd8750f405d74362600b82c16 (patch)
tree2c0f775effed45216f30e697fc3c5b55f93414bc /test
parentc5eec2be89bb201f84b7278da5522a205dc3620b (diff)
Enable properly working core limits on clients and servers,
and determine these dynamically if only one or the other is specified but both are running on the same host
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/driver.cc126
-rw-r--r--test/cpp/qps/qps_driver.cc36
-rw-r--r--test/cpp/qps/qps_worker.cc7
3 files changed, 126 insertions, 43 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 490156aec2..f3b92c8082 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -34,6 +34,7 @@
#include <deque>
#include <list>
#include <thread>
+#include <unordered_map>
#include <vector>
#include <grpc++/channel.h>
@@ -59,7 +60,42 @@ using std::vector;
namespace grpc {
namespace testing {
-static deque<string> get_hosts(const string& name) {
+static std::string get_host(const std::string &worker) {
+ char *host;
+ char *port;
+
+ gpr_split_host_port(worker.c_str(), &host, &port);
+ string s(host);
+
+ gpr_free(host);
+ gpr_free(port);
+ return s;
+}
+
+static std::unordered_map<string,std::deque<int>>
+ get_hosts_and_cores(const deque<string>& workers) {
+ std::unordered_map<string,std::deque<int>> hosts;
+ for (auto it = workers.begin(); it != workers.end(); it++) {
+ string host = get_host(*it);
+ if (hosts.find(host) == hosts.end()) {
+ auto stub = WorkerService::NewStub(
+ CreateChannel(*it, InsecureChannelCredentials()));
+ grpc::ClientContext ctx;
+ CoreRequest dummy;
+ CoreResponse cores;
+ grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
+ assert(s.ok());
+ std::deque<int> dq;
+ for (int i=0; i<cores.cores(); i++) {
+ dq.push_back(i);
+ }
+ hosts[host] = dq;
+ }
+ }
+ return hosts;
+}
+
+static deque<string> get_workers(const string& name) {
char* env = gpr_getenv(name.c_str());
if (!env) return deque<string>();
@@ -105,7 +141,7 @@ struct ClientData {
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
- const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
+ const ServerConfig& initial_server_config, size_t num_servers, int warmup_seconds,
int benchmark_seconds, int spawn_local_worker_count) {
// ClientContext allocations (all are destroyed at scope exit)
list<ClientContext> contexts;
@@ -113,10 +149,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
// To be added to the result, containing the final configuration used for
// client and config (including host, etc.)
ClientConfig result_client_config;
- ServerConfig result_server_config;
+ ServerConfig result_server_config = initial_server_config;
// Get client, server lists
- auto workers = get_hosts("QPS_WORKERS");
+ auto workers = get_workers("QPS_WORKERS");
ClientConfig client_config = initial_client_config;
// Spawn some local workers if desired
@@ -143,6 +179,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
}
+ // Setup the hosts and core counts
+ auto hosts_cores = get_hosts_and_cores(workers);
+
// if num_clients is set to <=0, do dynamic sizing: all workers
// except for servers are clients
if (num_clients <= 0) {
@@ -172,18 +211,49 @@ std::unique_ptr<ScenarioResult> RunScenario(
i);
servers[i].stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureChannelCredentials()));
+
+ ServerConfig server_config = initial_server_config;
+ char* host;
+ char* driver_port;
+ char* cli_target;
+ gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
+ string host_str(host);
+ int server_core_limit = initial_server_config.core_limit();
+ int client_core_limit = initial_client_config.core_limit();
+
+ if (server_core_limit == 0 && client_core_limit > 0) {
+ // In this case, limit the server cores if it matches the
+ // same host as one or more clients
+ const auto& dq = hosts_cores[host_str];
+ bool match = false;
+ int limit = dq.size();
+ for (size_t cli = 0; cli < num_clients; cli++) {
+ if (host_str == get_host(workers[cli+num_servers])) {
+ limit -= client_core_limit;
+ match = true;
+ }
+ }
+ if (match) {
+ GPR_ASSERT(limit > 0);
+ server_core_limit = limit;
+ }
+ }
+ if (server_core_limit > 0) {
+ auto& dq = hosts_cores[host_str];
+ GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
+ for (int core=0; core < server_core_limit; core++) {
+ server_config.add_core_list(dq.front());
+ dq.pop_front();
+ }
+ }
+
ServerArgs args;
- result_server_config = server_config;
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
GPR_ASSERT(servers[i].stream->Write(args));
ServerStatus init_status;
GPR_ASSERT(servers[i].stream->Read(&init_status));
- char* host;
- char* driver_port;
- char* cli_target;
- gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
@@ -191,19 +261,49 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_free(cli_target);
}
+ // Targets are all set by now
+ result_client_config = client_config;
// Start clients
using runsc::ClientData;
// clients is array rather than std::vector to avoid gcc-4.4 issues
// where class contained in std::vector must have a copy constructor
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
+ const auto& worker = workers[i + num_servers];
gpr_log(GPR_INFO, "Starting client on %s (worker #%d)",
- workers[i + num_servers].c_str(), i + num_servers);
+ worker.c_str(), i + num_servers);
clients[i].stub = WorkerService::NewStub(
- CreateChannel(workers[i + num_servers], InsecureChannelCredentials()));
+ CreateChannel(worker, InsecureChannelCredentials()));
+ ClientConfig per_client_config = client_config;
+
+ int server_core_limit = initial_server_config.core_limit();
+ int client_core_limit = initial_client_config.core_limit();
+ if ((server_core_limit > 0) || (client_core_limit > 0)) {
+ auto& dq = hosts_cores[get_host(worker)];
+ if (client_core_limit == 0) {
+ // limit client cores if it matches a server host
+ bool match = false;
+ int limit = dq.size();
+ for (size_t srv = 0; srv < num_servers; srv++) {
+ if (get_host(worker) == get_host(workers[srv])) {
+ match = true;
+ }
+ }
+ if (match) {
+ client_core_limit = limit;
+ }
+ }
+ if (client_core_limit > 0) {
+ GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
+ for (int core=0; core < client_core_limit; core++) {
+ per_client_config.add_core_list(dq.front());
+ dq.pop_front();
+ }
+ }
+ }
+
ClientArgs args;
- result_client_config = client_config;
- *args.mutable_setup() = client_config;
+ *args.mutable_setup() = per_client_config;
clients[i].stream =
clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
GPR_ASSERT(clients[i].stream->Write(args));
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index 6462050b6c..ffc8a83fc5 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -51,8 +51,7 @@ DEFINE_int32(local_workers, 0, "Number of local workers to start");
// Server config
DEFINE_int32(async_server_threads, 1, "Number of threads for async servers");
DEFINE_string(server_type, "SYNC_SERVER", "Server type");
-// TODO (vpai): Automatically generate the core list to avoid breakage
-DEFINE_string(server_core_list, "", "Comma-separated list of cores for server");
+DEFINE_int32(server_core_limit, -1, "Limit on server cores to use");
// Client config
DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING");
@@ -75,8 +74,7 @@ DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)");
DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)");
DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value");
-// TODO (vpai): Automatically generate the core list to avoid breakage
-DEFINE_string(client_core_list, "", "Comma-separated list of cores for client");
+DEFINE_int32(client_core_limit, -1, "Limit on client cores to use");
DEFINE_bool(secure_test, false, "Run a secure test");
@@ -91,22 +89,6 @@ using grpc::testing::SecurityParams;
namespace grpc {
namespace testing {
-static std::vector<int> IntParse(const std::string& s) {
- size_t pos = 0;
- std::vector<int> res;
- while (pos < s.size()) {
- size_t comma = s.find(',', pos);
- if (comma == std::string::npos) {
- res.push_back(std::stoi(s.substr(pos)));
- break;
- } else {
- res.push_back(std::stoi(s.substr(pos, comma - pos), nullptr));
- pos = comma + 1;
- }
- }
- return res;
-}
-
static void QpsDriver() {
RpcType rpc_type;
GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type));
@@ -170,22 +152,16 @@ static void QpsDriver() {
client_config.mutable_histogram_params()->set_max_possible(
Histogram::default_max_possible());
- if (FLAGS_client_core_list.size() > 0) {
- auto v = IntParse(FLAGS_client_core_list);
- for (size_t i = 0; i < v.size(); i++) {
- client_config.add_core_list(v[i]);
- }
+ if (FLAGS_client_core_limit > 0) {
+ client_config.set_core_limit(FLAGS_client_core_limit);
}
ServerConfig server_config;
server_config.set_server_type(server_type);
server_config.set_async_server_threads(FLAGS_async_server_threads);
- if (FLAGS_server_core_list.size() > 0) {
- auto v = IntParse(FLAGS_server_core_list);
- for (size_t i = 0; i < v.size(); i++) {
- server_config.add_core_list(v[i]);
- }
+ if (FLAGS_server_core_limit > 0) {
+ server_config.set_core_limit(FLAGS_server_core_limit);
}
if (FLAGS_secure_test) {
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 6289c1a843..d0adbb1a54 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -47,6 +47,7 @@
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
@@ -133,6 +134,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
return ret;
}
+ Status CoreCount(ServerContext *ctx, const CoreRequest*,
+ CoreResponse* resp) GRPC_OVERRIDE {
+ resp->set_cores(gpr_cpu_num_cores());
+ return Status::OK;
+ }
+
private:
// Protect against multiple clients using this worker at once.
class InstanceGuard {