diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-03-02 22:42:10 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-03-02 22:42:10 -0800 |
commit | 6af9ed0bf78a1fe6cbfe5e91d44d34da5b152f1b (patch) | |
tree | ad56885a4385cdf4e6569b76f9e424c2a3928000 /test/cpp/qps/driver.cc | |
parent | 32083bd771449fee853f4f5cb56ecd768ffec16a (diff) |
Rework QPS client/server
Now setup as a driver and N anonymous workers that may become clients or servers.
Will convert async soon.
Diffstat (limited to 'test/cpp/qps/driver.cc')
-rw-r--r-- | test/cpp/qps/driver.cc | 218 |
1 files changed, 113 insertions, 105 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index a54aad5631..c090d0377c 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -48,21 +48,9 @@ using std::list; using std::thread; using std::unique_ptr; using std::vector; -using grpc::string; -using grpc::ChannelArguments; -using grpc::ClientContext; -using grpc::ClientReaderWriter; -using grpc::CreateChannelDeprecated; -using grpc::Status; -using grpc::testing::ClientArgs; -using grpc::testing::ClientConfig; -using grpc::testing::ClientResult; -using grpc::testing::Worker; -using grpc::testing::ServerArgs; -using grpc::testing::ServerConfig; -using grpc::testing::ServerStatus; - -#if 0 + +namespace grpc { +namespace testing { static vector<string> get_hosts(const string& name) { char* env = gpr_getenv(name.c_str()); if (!env) return vector<string>(); @@ -70,119 +58,139 @@ static vector<string> get_hosts(const string& name) { vector<string> out; char* p = env; for (;;) { - char* comma = strchr(p, ','); - if (comma) { - out.emplace_back(p, comma); - p = comma + 1; - } else { - out.emplace_back(p); - gpr_free(env); - return out; - } + char* comma = strchr(p, ','); + if (comma) { + out.emplace_back(p, comma); + p = comma + 1; + } else { + out.emplace_back(p); + gpr_free(env); + return out; + } } } -void RunScenario(const ClientConfig& client_config, size_t num_clients, +void RunScenario(const ClientConfig& initial_client_config, size_t num_clients, const ServerConfig& server_config, size_t num_servers) { // ClientContext allocator (all are destroyed at scope exit) list<ClientContext> contexts; auto alloc_context = [&contexts]() { - contexts.emplace_back(); - return &contexts.back(); + contexts.emplace_back(); + return &contexts.back(); }; // Get client, server lists auto workers = get_hosts("QPS_WORKERS"); + ClientConfig client_config = initial_client_config; - GPR_ASSERT(clients.size() >= num_clients); - GPR_ASSERT(servers.size() >= num_servers); + // TODO(ctiller): support running multiple configurations, and binpack client/server pairs + // to available workers + GPR_ASSERT(workers.size() >= num_clients + num_servers); // Trim to just what we need - clients.resize(num_clients); - servers.resize(num_servers); + workers.resize(num_clients + num_servers); // Start servers - vector<unique_ptr<QpsServer::Stub>> server_stubs; - vector<unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>>> server_streams; - vector<string> server_targets; - for (const auto& target : servers) { - server_stubs.push_back(QpsServer::NewStub(CreateChannelDeprecated(target, ChannelArguments()))); - auto* stub = server_stubs.back().get(); - ServerArgs args; - *args.mutable_config() = server_config; - server_streams.push_back(stub->RunServer(alloc_context())); - auto* stream = server_streams.back().get(); - if (!stream->Write(args)) { - gpr_log(GPR_ERROR, "Failed starting server"); - return; - } - ServerStatus init_status; - if (!stream->Read(&init_status)) { - gpr_log(GPR_ERROR, "Failed starting server"); - return; - } - char* host; - char* driver_port; - char* cli_target; - gpr_split_host_port(target.c_str(), &host, &driver_port); - gpr_join_host_port(&cli_target, host, init_status.port()); - server_targets.push_back(cli_target); - gpr_free(host); - gpr_free(driver_port); - gpr_free(cli_target); + struct ServerData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; + }; + vector<ServerData> servers; + for (size_t i = 0; i < num_servers; i++) { + ServerData sd; + sd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i], ChannelArguments()))); + ServerArgs args; + *args.mutable_setup() = server_config; + sd.stream = std::move(sd.stub->RunServer(alloc_context())); + GPR_ASSERT(sd.stream->Write(args)); + ServerStatus init_status; + GPR_ASSERT(sd.stream->Read(&init_status)); + char* host; + char* driver_port; + char* cli_target; + gpr_split_host_port(workers[i].c_str(), &host, &driver_port); + gpr_join_host_port(&cli_target, host, init_status.port()); + client_config.add_server_targets(cli_target); + gpr_free(host); + gpr_free(driver_port); + gpr_free(cli_target); + + servers.push_back(std::move(sd)); } // Start clients - class Client { - public: - Client(ClientContext* ctx, const string& target, const ClientArgs& args) - : thread_([ctx, target, args, this]() { - auto stub = QpsClient::NewStub(CreateChannelDeprecated(target, ChannelArguments())); - status_ = stub->RunTest(ctx, args, &result_); - }) {} - - ~Client() { join(); } - - void join() { if (!joined_) { thread_.join(); joined_ = true; } } - - const Status& status() const { return status_; } - const ClientResult& result() const { return result_; } - - private: - bool joined_ = false; - Status status_; - ClientResult result_; - thread thread_; + struct ClientData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; }; - list<Client> running_clients; - size_t svr_idx = 0; - for (const auto& target : clients) { - ClientArgs args; - *args.mutable_config() = client_config; - for (size_t i = 0; i < num_servers; i++) { - args.add_server_targets(server_targets[svr_idx]); - svr_idx = (svr_idx + 1) % num_servers; - } - - running_clients.emplace_back(alloc_context(), target, args); + vector<ClientData> clients; + for (size_t i = 0; i < num_clients; i++) { + ClientData cd; + cd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i + num_servers], ChannelArguments()))); + ClientArgs args; + *args.mutable_setup() = client_config; + cd.stream = std::move(cd.stub->RunTest(alloc_context())); + GPR_ASSERT(cd.stream->Write(args)); + ClientStatus init_status; + GPR_ASSERT(cd.stream->Read(&init_status)); + + clients.push_back(std::move(cd)); } - // Finish clients - for (auto& client : running_clients) { - client.join(); - if (!client.status().IsOk()) { - gpr_log(GPR_ERROR, "Client failed"); - return; - } + // Let everything warmup + gpr_log(GPR_INFO, "Warming up"); + gpr_timespec start = gpr_now(); + gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(5))); + + // Start a run + gpr_log(GPR_INFO, "Starting"); + ServerArgs server_mark; + server_mark.mutable_mark(); + ClientArgs client_mark; + client_mark.mutable_mark(); + for (auto& server : servers) { + GPR_ASSERT(server.stream->Write(server_mark)); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Write(client_mark)); + } + ServerStatus server_status; + ClientStatus client_status; + for (auto& server : servers) { + GPR_ASSERT(server.stream->Read(&server_status)); } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Read(&client_status)); + } + + // Wait some time + gpr_log(GPR_INFO, "Running"); + gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(15))); - // Finish servers - for (auto& stream : server_streams) { - ServerStatus final_status; - ServerStatus dummy; - if (!stream->WritesDone() || !stream->Read(&final_status) || stream->Read(&dummy) || !stream->Finish().IsOk()) { - gpr_log(GPR_ERROR, "Server protocol error"); - } + // Finish a run + gpr_log(GPR_INFO, "Finishing"); + for (auto& server : servers) { + GPR_ASSERT(server.stream->Write(server_mark)); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Write(client_mark)); + } + for (auto& server : servers) { + GPR_ASSERT(server.stream->Read(&server_status)); } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Read(&client_status)); + } + + for (auto& client : clients) { + GPR_ASSERT(client.stream->WritesDone()); + GPR_ASSERT(client.stream->Finish().IsOk()); + } + for (auto& server : servers) { + GPR_ASSERT(server.stream->WritesDone()); + GPR_ASSERT(server.stream->Finish().IsOk()); + } +} + +} } -#endif
\ No newline at end of file |