diff options
author | 2015-02-23 23:16:17 -0800 | |
---|---|---|
committer | 2015-02-23 23:16:17 -0800 | |
commit | 2d0f36c84b50cce549ad76307cda56c4506f1a49 (patch) | |
tree | b21e20dccce32a6cba620cee60a3d21966c1ba5d /test/cpp/qps/driver.cc | |
parent | 47445219b3fdae7bce25228ace491cf76b2b205c (diff) |
Driver to client/server kind of works
Diffstat (limited to 'test/cpp/qps/driver.cc')
-rw-r--r-- | test/cpp/qps/driver.cc | 125 |
1 files changed, 122 insertions, 3 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index e7fc46b9e8..098860610c 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -35,9 +35,33 @@ #include "src/core/support/env.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/host_port.h> +#include <grpc++/channel_arguments.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/stream.h> +#include <list> +#include <thread> +#include <vector> +using std::list; +using std::thread; +using std::unique_ptr; using std::vector; using grpc::string; +using grpc::ChannelArguments; +using grpc::ClientContext; +using grpc::ClientReaderWriter; +using grpc::CreateChannelDeprecated; +using grpc::Status; +using grpc::testing::ClientArgs; +using grpc::testing::ClientConfig; +using grpc::testing::ClientResult; +using grpc::testing::QpsClient; +using grpc::testing::QpsServer; +using grpc::testing::ServerArgs; +using grpc::testing::ServerConfig; +using grpc::testing::ServerStatus; static vector<string> get_hosts(const string& name) { char* env = gpr_getenv(name.c_str()); @@ -58,12 +82,107 @@ static vector<string> get_hosts(const string& name) { } } -void RunScenario(const grpc::testing::ClientArgs& client_args, size_t num_clients, - const grpc::testing::ServerArgs& server_args, - size_t num_servers) { +void RunScenario(const ClientConfig& client_config, size_t num_clients, + const ServerConfig& server_config, size_t num_servers) { + // ClientContext allocator (all are destroyed at scope exit) + list<ClientContext> contexts; + auto alloc_context = [&contexts]() { + contexts.emplace_back(); + return &contexts.back(); + }; + + // Get client, server lists auto clients = get_hosts("QPS_CLIENTS"); auto servers = get_hosts("QPS_SERVERS"); GPR_ASSERT(clients.size() >= num_clients); GPR_ASSERT(servers.size() >= num_servers); + + // Trim to just what we need + clients.resize(num_clients); + servers.resize(num_servers); + + // Start servers + vector<unique_ptr<QpsServer::Stub>> server_stubs; + vector<unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>>> server_streams; + vector<string> server_targets; + for (const auto& target : servers) { + server_stubs.push_back(QpsServer::NewStub(CreateChannelDeprecated(target, ChannelArguments()))); + auto* stub = server_stubs.back().get(); + ServerArgs args; + *args.mutable_config() = server_config; + server_streams.push_back(stub->RunServer(alloc_context())); + auto* stream = server_streams.back().get(); + if (!stream->Write(args)) { + gpr_log(GPR_ERROR, "Failed starting server"); + return; + } + ServerStatus init_status; + if (!stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Failed starting server"); + return; + } + char* host; + char* driver_port; + char* cli_target; + gpr_split_host_port(target.c_str(), &host, &driver_port); + gpr_join_host_port(&cli_target, host, init_status.port()); + server_targets.push_back(cli_target); + gpr_free(host); + gpr_free(driver_port); + gpr_free(cli_target); + } + + // Start clients + class Client { + public: + Client(ClientContext* ctx, const string& target, const ClientArgs& args) + : thread_([ctx, target, args, this]() { + auto stub = QpsClient::NewStub(CreateChannelDeprecated(target, ChannelArguments())); + status_ = stub->RunTest(ctx, args, &result_); + }) {} + + ~Client() { join(); } + + void join() { if (!joined_) { thread_.join(); joined_ = true; } } + + const Status& status() const { return status_; } + const ClientResult& result() const { return result_; } + + private: + bool joined_ = false; + Status status_; + ClientResult result_; + thread thread_; + }; + list<Client> running_clients; + size_t svr_idx = 0; + for (const auto& target : clients) { + ClientArgs args; + *args.mutable_config() = client_config; + for (size_t i = 0; i < num_servers; i++) { + args.add_server_targets(server_targets[svr_idx]); + svr_idx = (svr_idx + 1) % num_servers; + } + + running_clients.emplace_back(alloc_context(), target, args); + } + + // Finish clients + for (auto& client : running_clients) { + client.join(); + if (!client.status().IsOk()) { + gpr_log(GPR_ERROR, "Client failed"); + return; + } + } + + // Finish servers + for (auto& stream : server_streams) { + ServerStatus final_status; + ServerStatus dummy; + if (!stream->WritesDone() || !stream->Read(&final_status) || stream->Read(&dummy) || !stream->Finish().IsOk()) { + gpr_log(GPR_ERROR, "Server protocol error"); + } + } } |