aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/driver.cc
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-03-02 22:42:10 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-03-02 22:42:10 -0800
commit6af9ed0bf78a1fe6cbfe5e91d44d34da5b152f1b (patch)
treead56885a4385cdf4e6569b76f9e424c2a3928000 /test/cpp/qps/driver.cc
parent32083bd771449fee853f4f5cb56ecd768ffec16a (diff)
Rework QPS client/server
Now setup as a driver and N anonymous workers that may become clients or servers. Will convert async soon.
Diffstat (limited to 'test/cpp/qps/driver.cc')
-rw-r--r--test/cpp/qps/driver.cc218
1 files changed, 113 insertions, 105 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index a54aad5631..c090d0377c 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -48,21 +48,9 @@ using std::list;
using std::thread;
using std::unique_ptr;
using std::vector;
-using grpc::string;
-using grpc::ChannelArguments;
-using grpc::ClientContext;
-using grpc::ClientReaderWriter;
-using grpc::CreateChannelDeprecated;
-using grpc::Status;
-using grpc::testing::ClientArgs;
-using grpc::testing::ClientConfig;
-using grpc::testing::ClientResult;
-using grpc::testing::Worker;
-using grpc::testing::ServerArgs;
-using grpc::testing::ServerConfig;
-using grpc::testing::ServerStatus;
-
-#if 0
+
+namespace grpc {
+namespace testing {
static vector<string> get_hosts(const string& name) {
char* env = gpr_getenv(name.c_str());
if (!env) return vector<string>();
@@ -70,119 +58,139 @@ static vector<string> get_hosts(const string& name) {
vector<string> out;
char* p = env;
for (;;) {
- char* comma = strchr(p, ',');
- if (comma) {
- out.emplace_back(p, comma);
- p = comma + 1;
- } else {
- out.emplace_back(p);
- gpr_free(env);
- return out;
- }
+ char* comma = strchr(p, ',');
+ if (comma) {
+ out.emplace_back(p, comma);
+ p = comma + 1;
+ } else {
+ out.emplace_back(p);
+ gpr_free(env);
+ return out;
+ }
}
}
-void RunScenario(const ClientConfig& client_config, size_t num_clients,
+void RunScenario(const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& server_config, size_t num_servers) {
// ClientContext allocator (all are destroyed at scope exit)
list<ClientContext> contexts;
auto alloc_context = [&contexts]() {
- contexts.emplace_back();
- return &contexts.back();
+ contexts.emplace_back();
+ return &contexts.back();
};
// Get client, server lists
auto workers = get_hosts("QPS_WORKERS");
+ ClientConfig client_config = initial_client_config;
- GPR_ASSERT(clients.size() >= num_clients);
- GPR_ASSERT(servers.size() >= num_servers);
+ // TODO(ctiller): support running multiple configurations, and binpack client/server pairs
+ // to available workers
+ GPR_ASSERT(workers.size() >= num_clients + num_servers);
// Trim to just what we need
- clients.resize(num_clients);
- servers.resize(num_servers);
+ workers.resize(num_clients + num_servers);
// Start servers
- vector<unique_ptr<QpsServer::Stub>> server_stubs;
- vector<unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>>> server_streams;
- vector<string> server_targets;
- for (const auto& target : servers) {
- server_stubs.push_back(QpsServer::NewStub(CreateChannelDeprecated(target, ChannelArguments())));
- auto* stub = server_stubs.back().get();
- ServerArgs args;
- *args.mutable_config() = server_config;
- server_streams.push_back(stub->RunServer(alloc_context()));
- auto* stream = server_streams.back().get();
- if (!stream->Write(args)) {
- gpr_log(GPR_ERROR, "Failed starting server");
- return;
- }
- ServerStatus init_status;
- if (!stream->Read(&init_status)) {
- gpr_log(GPR_ERROR, "Failed starting server");
- return;
- }
- char* host;
- char* driver_port;
- char* cli_target;
- gpr_split_host_port(target.c_str(), &host, &driver_port);
- gpr_join_host_port(&cli_target, host, init_status.port());
- server_targets.push_back(cli_target);
- gpr_free(host);
- gpr_free(driver_port);
- gpr_free(cli_target);
+ struct ServerData {
+ unique_ptr<Worker::Stub> stub;
+ unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
+ };
+ vector<ServerData> servers;
+ for (size_t i = 0; i < num_servers; i++) {
+ ServerData sd;
+ sd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i], ChannelArguments())));
+ ServerArgs args;
+ *args.mutable_setup() = server_config;
+ sd.stream = std::move(sd.stub->RunServer(alloc_context()));
+ GPR_ASSERT(sd.stream->Write(args));
+ ServerStatus init_status;
+ GPR_ASSERT(sd.stream->Read(&init_status));
+ char* host;
+ char* driver_port;
+ char* cli_target;
+ gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
+ gpr_join_host_port(&cli_target, host, init_status.port());
+ client_config.add_server_targets(cli_target);
+ gpr_free(host);
+ gpr_free(driver_port);
+ gpr_free(cli_target);
+
+ servers.push_back(std::move(sd));
}
// Start clients
- class Client {
- public:
- Client(ClientContext* ctx, const string& target, const ClientArgs& args)
- : thread_([ctx, target, args, this]() {
- auto stub = QpsClient::NewStub(CreateChannelDeprecated(target, ChannelArguments()));
- status_ = stub->RunTest(ctx, args, &result_);
- }) {}
-
- ~Client() { join(); }
-
- void join() { if (!joined_) { thread_.join(); joined_ = true; } }
-
- const Status& status() const { return status_; }
- const ClientResult& result() const { return result_; }
-
- private:
- bool joined_ = false;
- Status status_;
- ClientResult result_;
- thread thread_;
+ struct ClientData {
+ unique_ptr<Worker::Stub> stub;
+ unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
- list<Client> running_clients;
- size_t svr_idx = 0;
- for (const auto& target : clients) {
- ClientArgs args;
- *args.mutable_config() = client_config;
- for (size_t i = 0; i < num_servers; i++) {
- args.add_server_targets(server_targets[svr_idx]);
- svr_idx = (svr_idx + 1) % num_servers;
- }
-
- running_clients.emplace_back(alloc_context(), target, args);
+ vector<ClientData> clients;
+ for (size_t i = 0; i < num_clients; i++) {
+ ClientData cd;
+ cd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i + num_servers], ChannelArguments())));
+ ClientArgs args;
+ *args.mutable_setup() = client_config;
+ cd.stream = std::move(cd.stub->RunTest(alloc_context()));
+ GPR_ASSERT(cd.stream->Write(args));
+ ClientStatus init_status;
+ GPR_ASSERT(cd.stream->Read(&init_status));
+
+ clients.push_back(std::move(cd));
}
- // Finish clients
- for (auto& client : running_clients) {
- client.join();
- if (!client.status().IsOk()) {
- gpr_log(GPR_ERROR, "Client failed");
- return;
- }
+ // Let everything warmup
+ gpr_log(GPR_INFO, "Warming up");
+ gpr_timespec start = gpr_now();
+ gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(5)));
+
+ // Start a run
+ gpr_log(GPR_INFO, "Starting");
+ ServerArgs server_mark;
+ server_mark.mutable_mark();
+ ClientArgs client_mark;
+ client_mark.mutable_mark();
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Write(server_mark));
+ }
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Write(client_mark));
+ }
+ ServerStatus server_status;
+ ClientStatus client_status;
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Read(&server_status));
}
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Read(&client_status));
+ }
+
+ // Wait some time
+ gpr_log(GPR_INFO, "Running");
+ gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(15)));
- // Finish servers
- for (auto& stream : server_streams) {
- ServerStatus final_status;
- ServerStatus dummy;
- if (!stream->WritesDone() || !stream->Read(&final_status) || stream->Read(&dummy) || !stream->Finish().IsOk()) {
- gpr_log(GPR_ERROR, "Server protocol error");
- }
+ // Finish a run
+ gpr_log(GPR_INFO, "Finishing");
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Write(server_mark));
+ }
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Write(client_mark));
+ }
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->Read(&server_status));
}
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->Read(&client_status));
+ }
+
+ for (auto& client : clients) {
+ GPR_ASSERT(client.stream->WritesDone());
+ GPR_ASSERT(client.stream->Finish().IsOk());
+ }
+ for (auto& server : servers) {
+ GPR_ASSERT(server.stream->WritesDone());
+ GPR_ASSERT(server.stream->Finish().IsOk());
+ }
+}
+
+}
}
-#endif \ No newline at end of file