From 2d0f36c84b50cce549ad76307cda56c4506f1a49 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 23 Feb 2015 23:16:17 -0800 Subject: Driver to client/server kind of works --- test/cpp/qps/driver.cc | 125 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 3 deletions(-) (limited to 'test/cpp/qps/driver.cc') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index e7fc46b9e8..098860610c 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -35,9 +35,33 @@ #include "src/core/support/env.h" #include #include +#include +#include +#include +#include +#include +#include +#include +#include +using std::list; +using std::thread; +using std::unique_ptr; using std::vector; using grpc::string; +using grpc::ChannelArguments; +using grpc::ClientContext; +using grpc::ClientReaderWriter; +using grpc::CreateChannelDeprecated; +using grpc::Status; +using grpc::testing::ClientArgs; +using grpc::testing::ClientConfig; +using grpc::testing::ClientResult; +using grpc::testing::QpsClient; +using grpc::testing::QpsServer; +using grpc::testing::ServerArgs; +using grpc::testing::ServerConfig; +using grpc::testing::ServerStatus; static vector get_hosts(const string& name) { char* env = gpr_getenv(name.c_str()); @@ -58,12 +82,107 @@ static vector get_hosts(const string& name) { } } -void RunScenario(const grpc::testing::ClientArgs& client_args, size_t num_clients, - const grpc::testing::ServerArgs& server_args, - size_t num_servers) { +void RunScenario(const ClientConfig& client_config, size_t num_clients, + const ServerConfig& server_config, size_t num_servers) { + // ClientContext allocator (all are destroyed at scope exit) + list contexts; + auto alloc_context = [&contexts]() { + contexts.emplace_back(); + return &contexts.back(); + }; + + // Get client, server lists auto clients = get_hosts("QPS_CLIENTS"); auto servers = get_hosts("QPS_SERVERS"); GPR_ASSERT(clients.size() >= num_clients); GPR_ASSERT(servers.size() >= num_servers); + + // Trim to just what we need + clients.resize(num_clients); + servers.resize(num_servers); + + // Start servers + vector> server_stubs; + vector>> server_streams; + vector server_targets; + for (const auto& target : servers) { + server_stubs.push_back(QpsServer::NewStub(CreateChannelDeprecated(target, ChannelArguments()))); + auto* stub = server_stubs.back().get(); + ServerArgs args; + *args.mutable_config() = server_config; + server_streams.push_back(stub->RunServer(alloc_context())); + auto* stream = server_streams.back().get(); + if (!stream->Write(args)) { + gpr_log(GPR_ERROR, "Failed starting server"); + return; + } + ServerStatus init_status; + if (!stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Failed starting server"); + return; + } + char* host; + char* driver_port; + char* cli_target; + gpr_split_host_port(target.c_str(), &host, &driver_port); + gpr_join_host_port(&cli_target, host, init_status.port()); + server_targets.push_back(cli_target); + gpr_free(host); + gpr_free(driver_port); + gpr_free(cli_target); + } + + // Start clients + class Client { + public: + Client(ClientContext* ctx, const string& target, const ClientArgs& args) + : thread_([ctx, target, args, this]() { + auto stub = QpsClient::NewStub(CreateChannelDeprecated(target, ChannelArguments())); + status_ = stub->RunTest(ctx, args, &result_); + }) {} + + ~Client() { join(); } + + void join() { if (!joined_) { thread_.join(); joined_ = true; } } + + const Status& status() const { return status_; } + const ClientResult& result() const { return result_; } + + private: + bool joined_ = false; + Status status_; + ClientResult result_; + thread thread_; + }; + list running_clients; + size_t svr_idx = 0; + for (const auto& target : clients) { + ClientArgs args; + *args.mutable_config() = client_config; + for (size_t i = 0; i < num_servers; i++) { + args.add_server_targets(server_targets[svr_idx]); + svr_idx = (svr_idx + 1) % num_servers; + } + + running_clients.emplace_back(alloc_context(), target, args); + } + + // Finish clients + for (auto& client : running_clients) { + client.join(); + if (!client.status().IsOk()) { + gpr_log(GPR_ERROR, "Client failed"); + return; + } + } + + // Finish servers + for (auto& stream : server_streams) { + ServerStatus final_status; + ServerStatus dummy; + if (!stream->WritesDone() || !stream->Read(&final_status) || stream->Read(&dummy) || !stream->Finish().IsOk()) { + gpr_log(GPR_ERROR, "Server protocol error"); + } + } } -- cgit v1.2.3