aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/driver.cc
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-23 23:16:17 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-23 23:16:17 -0800
commit2d0f36c84b50cce549ad76307cda56c4506f1a49 (patch)
treeb21e20dccce32a6cba620cee60a3d21966c1ba5d /test/cpp/qps/driver.cc
parent47445219b3fdae7bce25228ace491cf76b2b205c (diff)
Driver to client/server kind of works
Diffstat (limited to 'test/cpp/qps/driver.cc')
-rw-r--r--test/cpp/qps/driver.cc125
1 files changed, 122 insertions, 3 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index e7fc46b9e8..098860610c 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -35,9 +35,33 @@
#include "src/core/support/env.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/host_port.h>
+#include <grpc++/channel_arguments.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/stream.h>
+#include <list>
+#include <thread>
+#include <vector>
+using std::list;
+using std::thread;
+using std::unique_ptr;
using std::vector;
using grpc::string;
+using grpc::ChannelArguments;
+using grpc::ClientContext;
+using grpc::ClientReaderWriter;
+using grpc::CreateChannelDeprecated;
+using grpc::Status;
+using grpc::testing::ClientArgs;
+using grpc::testing::ClientConfig;
+using grpc::testing::ClientResult;
+using grpc::testing::QpsClient;
+using grpc::testing::QpsServer;
+using grpc::testing::ServerArgs;
+using grpc::testing::ServerConfig;
+using grpc::testing::ServerStatus;
static vector<string> get_hosts(const string& name) {
char* env = gpr_getenv(name.c_str());
@@ -58,12 +82,107 @@ static vector<string> get_hosts(const string& name) {
}
}
-void RunScenario(const grpc::testing::ClientArgs& client_args, size_t num_clients,
- const grpc::testing::ServerArgs& server_args,
- size_t num_servers) {
+void RunScenario(const ClientConfig& client_config, size_t num_clients,
+ const ServerConfig& server_config, size_t num_servers) {
+ // ClientContext allocator (all are destroyed at scope exit)
+ list<ClientContext> contexts;
+ auto alloc_context = [&contexts]() {
+ contexts.emplace_back();
+ return &contexts.back();
+ };
+
+ // Get client, server lists
auto clients = get_hosts("QPS_CLIENTS");
auto servers = get_hosts("QPS_SERVERS");
GPR_ASSERT(clients.size() >= num_clients);
GPR_ASSERT(servers.size() >= num_servers);
+
+ // Trim to just what we need
+ clients.resize(num_clients);
+ servers.resize(num_servers);
+
+ // Start servers
+ vector<unique_ptr<QpsServer::Stub>> server_stubs;
+ vector<unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>>> server_streams;
+ vector<string> server_targets;
+ for (const auto& target : servers) {
+ server_stubs.push_back(QpsServer::NewStub(CreateChannelDeprecated(target, ChannelArguments())));
+ auto* stub = server_stubs.back().get();
+ ServerArgs args;
+ *args.mutable_config() = server_config;
+ server_streams.push_back(stub->RunServer(alloc_context()));
+ auto* stream = server_streams.back().get();
+ if (!stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Failed starting server");
+ return;
+ }
+ ServerStatus init_status;
+ if (!stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Failed starting server");
+ return;
+ }
+ char* host;
+ char* driver_port;
+ char* cli_target;
+ gpr_split_host_port(target.c_str(), &host, &driver_port);
+ gpr_join_host_port(&cli_target, host, init_status.port());
+ server_targets.push_back(cli_target);
+ gpr_free(host);
+ gpr_free(driver_port);
+ gpr_free(cli_target);
+ }
+
+ // Start clients
+ class Client {
+ public:
+ Client(ClientContext* ctx, const string& target, const ClientArgs& args)
+ : thread_([ctx, target, args, this]() {
+ auto stub = QpsClient::NewStub(CreateChannelDeprecated(target, ChannelArguments()));
+ status_ = stub->RunTest(ctx, args, &result_);
+ }) {}
+
+ ~Client() { join(); }
+
+ void join() { if (!joined_) { thread_.join(); joined_ = true; } }
+
+ const Status& status() const { return status_; }
+ const ClientResult& result() const { return result_; }
+
+ private:
+ bool joined_ = false;
+ Status status_;
+ ClientResult result_;
+ thread thread_;
+ };
+ list<Client> running_clients;
+ size_t svr_idx = 0;
+ for (const auto& target : clients) {
+ ClientArgs args;
+ *args.mutable_config() = client_config;
+ for (size_t i = 0; i < num_servers; i++) {
+ args.add_server_targets(server_targets[svr_idx]);
+ svr_idx = (svr_idx + 1) % num_servers;
+ }
+
+ running_clients.emplace_back(alloc_context(), target, args);
+ }
+
+ // Finish clients
+ for (auto& client : running_clients) {
+ client.join();
+ if (!client.status().IsOk()) {
+ gpr_log(GPR_ERROR, "Client failed");
+ return;
+ }
+ }
+
+ // Finish servers
+ for (auto& stream : server_streams) {
+ ServerStatus final_status;
+ ServerStatus dummy;
+ if (!stream->WritesDone() || !stream->Read(&final_status) || stream->Read(&dummy) || !stream->Finish().IsOk()) {
+ gpr_log(GPR_ERROR, "Server protocol error");
+ }
+ }
}