diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-03-02 22:42:10 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-03-02 22:42:10 -0800 |
commit | 6af9ed0bf78a1fe6cbfe5e91d44d34da5b152f1b (patch) | |
tree | ad56885a4385cdf4e6569b76f9e424c2a3928000 /test/cpp/qps | |
parent | 32083bd771449fee853f4f5cb56ecd768ffec16a (diff) |
Rework QPS client/server
Now setup as a driver and N anonymous workers that may become clients or servers.
Will convert async soon.
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client.cc | 272 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 2 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 218 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 4 | ||||
-rw-r--r-- | test/cpp/qps/histogram.h | 63 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 18 | ||||
-rw-r--r-- | test/cpp/qps/qpstest.proto | 26 | ||||
-rw-r--r-- | test/cpp/qps/server.cc | 132 | ||||
-rw-r--r-- | test/cpp/qps/server.h | 2 | ||||
-rwxr-xr-x | test/cpp/qps/single_run_localhost.sh | 18 | ||||
-rw-r--r-- | test/cpp/qps/timer.cc | 6 | ||||
-rw-r--r-- | test/cpp/qps/timer.h | 2 | ||||
-rw-r--r-- | test/cpp/qps/worker.cc | 31 |
13 files changed, 387 insertions, 407 deletions
diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc index 7c9763a332..827c1ec09b 100644 --- a/test/cpp/qps/client.cc +++ b/test/cpp/qps/client.cc @@ -53,53 +53,112 @@ #include <grpc++/server_builder.h> #include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" +#include "test/cpp/qps/client.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/histogram.h" #include "test/cpp/qps/timer.h" -DEFINE_int32(driver_port, 0, "Client driver port."); - -using grpc::ChannelInterface; -using grpc::CreateTestChannel; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::Status; -using grpc::testing::ClientArgs; -using grpc::testing::ClientConfig; -using grpc::testing::ClientResult; -using grpc::testing::QpsClient; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google { } -namespace gflags { } -using namespace google; -using namespace gflags; - -static double now() { - gpr_timespec tv = gpr_now(); - return 1e9 * tv.tv_sec + tv.tv_nsec; -} +namespace grpc { +namespace testing { -static bool got_sigint = false; +class SynchronousClient GRPC_FINAL : public Client { + public: + SynchronousClient(const ClientConfig& config) : timer_(new Timer) { + for (int i = 0; i < config.client_channels(); i++) { + channels_.push_back(ClientChannelInfo(config.server_targets(i % config.server_targets_size()), config)); + auto* stub = channels_.back().get_stub(); + for (int j = 0; j < config.outstanding_rpcs_per_channel(); j++) { + threads_.emplace_back(new Thread(stub, config)); + } + } + } -static void sigint_handler(int x) { got_sigint = 1; } + ClientStats Mark() { + Histogram latencies; + std::vector<Histogram> to_merge(threads_.size()); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->BeginSwap(&to_merge[i]); + } + std::unique_ptr<Timer> timer(new Timer); + timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(&to_merge[i]); + } + + auto timer_result = timer->Mark(); + + ClientStats stats; + auto* l = stats.mutable_latencies(); + l->set_l_50(latencies.Percentile(50)); + l->set_l_90(latencies.Percentile(90)); + l->set_l_99(latencies.Percentile(99)); + l->set_l_999(latencies.Percentile(99.9)); + stats.set_num_rpcs(latencies.Count()); + stats.set_time_elapsed(timer_result.wall); + stats.set_time_system(timer_result.system); + stats.set_time_user(timer_result.user); + return stats; + } -ClientResult RunTest(const ClientArgs& args) { - const auto& config = args.config(); + private: + class Thread { + public: + Thread(TestService::Stub* stub, const ClientConfig& config) : stub_(stub), config_(config), done_(false), new_(nullptr), impl_([this]() { + SimpleRequest request; + SimpleResponse response; + request.set_response_type( + grpc::testing::PayloadType::COMPRESSABLE); + request.set_response_size(config_.payload_size()); + for (;;) { + { + std::lock_guard<std::mutex> g(mu_); + if (done_) return; + if (new_) { + new_->Swap(&histogram_); + new_ = nullptr; + cv_.notify_one(); + } + } + double start = Timer::Now(); + grpc::ClientContext context; + grpc::Status s = + stub_->UnaryCall(&context, request, &response); + histogram_.Add((Timer::Now() - start) * 1e9); + } + }) {} + + ~Thread() { + { + std::lock_guard<std::mutex> g(mu_); + done_ = true; + } + impl_.join(); + } + + void BeginSwap(Histogram* n) { + std::lock_guard<std::mutex> g(mu_); + new_ = n; + } + + void EndSwap() { + std::unique_lock<std::mutex> g(mu_); + cv_.wait(g, [this]() { return new_ == nullptr; }); + } - gpr_log(GPR_INFO, - "QPS test with parameters\n" - "enable_ssl = %d\n" - "client_channels = %d\n" - "client_threads = %d\n" - "num_rpcs = %d\n" - "payload_size = %d\n", - config.enable_ssl(), config.client_channels(), config.client_threads(), config.num_rpcs(), - config.payload_size()); + private: + Thread(const Thread&); + Thread& operator=(const Thread&); + + TestService::Stub* stub_; + ClientConfig config_; + std::mutex mu_; + std::condition_variable cv_; + bool done_; + Histogram *new_; + Histogram histogram_; + std::thread impl_; + }; class ClientChannelInfo { public: @@ -113,133 +172,14 @@ ClientResult RunTest(const ClientArgs& args) { std::shared_ptr<ChannelInterface> channel_; std::unique_ptr<TestService::Stub> stub_; }; - - std::vector<ClientChannelInfo> channels; - for (int i = 0; i < config.client_channels(); i++) { - channels.push_back(ClientChannelInfo(args.server_targets(i % args.server_targets_size()), config)); - } - - std::vector<std::thread> threads; // Will add threads when ready to execute - std::vector< ::gpr_histogram *> thread_stats(config.client_threads()); - - grpc::ClientContext context_stats_begin; - - grpc_profiler_start("qps_client.prof"); - - Timer timer; - - for (int i = 0; i < config.client_threads(); i++) { - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - thread_stats[i] = hist; - - threads.push_back( - std::thread([hist, config, &channels](int channel_num) { - SimpleRequest request; - SimpleResponse response; - request.set_response_type( - grpc::testing::PayloadType::COMPRESSABLE); - request.set_response_size(config.payload_size()); - - for (int j = 0; j < config.num_rpcs(); j++) { - TestService::Stub *stub = - channels[channel_num].get_stub(); - double start = now(); - grpc::ClientContext context; - grpc::Status s = - stub->UnaryCall(&context, request, &response); - gpr_histogram_add(hist, now() - start); - - GPR_ASSERT((s.code() == grpc::StatusCode::OK) && - (response.payload().type() == - grpc::testing::PayloadType::COMPRESSABLE) && - (response.payload().body().length() == - static_cast<size_t>(config.payload_size()))); - - // Now do runtime round-robin assignment of the next - // channel number - channel_num += config.client_threads(); - channel_num %= config.client_channels(); - } - }, - i % config.client_channels())); - } - - for (auto &t : threads) { - t.join(); - } - - auto timer_result = timer.Mark(); - - grpc_profiler_stop(); - - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - - for (int i = 0; i < config.client_threads(); i++) { - gpr_histogram *h = thread_stats[i]; - gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", - i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), - gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99), - gpr_histogram_percentile(h, 99.9)); - gpr_histogram_merge(hist, h); - gpr_histogram_destroy(h); - } - - ClientResult result; - auto* latencies = result.mutable_latencies(); - latencies->set_l_50(gpr_histogram_percentile(hist, 50)); - latencies->set_l_90(gpr_histogram_percentile(hist, 90)); - latencies->set_l_99(gpr_histogram_percentile(hist, 99)); - latencies->set_l_999(gpr_histogram_percentile(hist, 99.9)); - result.set_num_rpcs(config.client_threads() * config.num_rpcs()); - result.set_time_elapsed(timer_result.wall); - result.set_time_system(timer_result.system); - result.set_time_user(timer_result.user); - - gpr_histogram_destroy(hist); - - return result; -} - -class ClientImpl final : public QpsClient::Service { - public: - Status RunTest(ServerContext* ctx, const ClientArgs* args, ClientResult* result) override { - *result = ::RunTest(*args); - return Status::OK; - } - - private: - std::mutex client_mu_; + std::vector<ClientChannelInfo> channels_; + std::vector<std::unique_ptr<Thread>> threads_; + std::unique_ptr<Timer> timer_; }; -static void RunServer() { - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_driver_port); - - ClientImpl service; - - ServerBuilder builder; - builder.AddPort(server_address); - builder.RegisterService(&service); - - gpr_free(server_address); - - auto server = builder.BuildAndStart(); - - while (!got_sigint) { - std::this_thread::sleep_for(std::chrono::seconds(5)); - } +std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { + return std::unique_ptr<Client>(new SynchronousClient(config)); } -int main(int argc, char **argv) { - signal(SIGINT, sigint_handler); - - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - - RunServer(); - - grpc_shutdown(); - return 0; -} +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index ab34970ebd..97701d3d18 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -42,6 +42,8 @@ namespace testing { class Client { public: virtual ~Client() {} + + virtual ClientStats Mark() = 0; }; std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args); diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index a54aad5631..c090d0377c 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -48,21 +48,9 @@ using std::list; using std::thread; using std::unique_ptr; using std::vector; -using grpc::string; -using grpc::ChannelArguments; -using grpc::ClientContext; -using grpc::ClientReaderWriter; -using grpc::CreateChannelDeprecated; -using grpc::Status; -using grpc::testing::ClientArgs; -using grpc::testing::ClientConfig; -using grpc::testing::ClientResult; -using grpc::testing::Worker; -using grpc::testing::ServerArgs; -using grpc::testing::ServerConfig; -using grpc::testing::ServerStatus; - -#if 0 + +namespace grpc { +namespace testing { static vector<string> get_hosts(const string& name) { char* env = gpr_getenv(name.c_str()); if (!env) return vector<string>(); @@ -70,119 +58,139 @@ static vector<string> get_hosts(const string& name) { vector<string> out; char* p = env; for (;;) { - char* comma = strchr(p, ','); - if (comma) { - out.emplace_back(p, comma); - p = comma + 1; - } else { - out.emplace_back(p); - gpr_free(env); - return out; - } + char* comma = strchr(p, ','); + if (comma) { + out.emplace_back(p, comma); + p = comma + 1; + } else { + out.emplace_back(p); + gpr_free(env); + return out; + } } } -void RunScenario(const ClientConfig& client_config, size_t num_clients, +void RunScenario(const ClientConfig& initial_client_config, size_t num_clients, const ServerConfig& server_config, size_t num_servers) { // ClientContext allocator (all are destroyed at scope exit) list<ClientContext> contexts; auto alloc_context = [&contexts]() { - contexts.emplace_back(); - return &contexts.back(); + contexts.emplace_back(); + return &contexts.back(); }; // Get client, server lists auto workers = get_hosts("QPS_WORKERS"); + ClientConfig client_config = initial_client_config; - GPR_ASSERT(clients.size() >= num_clients); - GPR_ASSERT(servers.size() >= num_servers); + // TODO(ctiller): support running multiple configurations, and binpack client/server pairs + // to available workers + GPR_ASSERT(workers.size() >= num_clients + num_servers); // Trim to just what we need - clients.resize(num_clients); - servers.resize(num_servers); + workers.resize(num_clients + num_servers); // Start servers - vector<unique_ptr<QpsServer::Stub>> server_stubs; - vector<unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>>> server_streams; - vector<string> server_targets; - for (const auto& target : servers) { - server_stubs.push_back(QpsServer::NewStub(CreateChannelDeprecated(target, ChannelArguments()))); - auto* stub = server_stubs.back().get(); - ServerArgs args; - *args.mutable_config() = server_config; - server_streams.push_back(stub->RunServer(alloc_context())); - auto* stream = server_streams.back().get(); - if (!stream->Write(args)) { - gpr_log(GPR_ERROR, "Failed starting server"); - return; - } - ServerStatus init_status; - if (!stream->Read(&init_status)) { - gpr_log(GPR_ERROR, "Failed starting server"); - return; - } - char* host; - char* driver_port; - char* cli_target; - gpr_split_host_port(target.c_str(), &host, &driver_port); - gpr_join_host_port(&cli_target, host, init_status.port()); - server_targets.push_back(cli_target); - gpr_free(host); - gpr_free(driver_port); - gpr_free(cli_target); + struct ServerData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; + }; + vector<ServerData> servers; + for (size_t i = 0; i < num_servers; i++) { + ServerData sd; + sd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i], ChannelArguments()))); + ServerArgs args; + *args.mutable_setup() = server_config; + sd.stream = std::move(sd.stub->RunServer(alloc_context())); + GPR_ASSERT(sd.stream->Write(args)); + ServerStatus init_status; + GPR_ASSERT(sd.stream->Read(&init_status)); + char* host; + char* driver_port; + char* cli_target; + gpr_split_host_port(workers[i].c_str(), &host, &driver_port); + gpr_join_host_port(&cli_target, host, init_status.port()); + client_config.add_server_targets(cli_target); + gpr_free(host); + gpr_free(driver_port); + gpr_free(cli_target); + + servers.push_back(std::move(sd)); } // Start clients - class Client { - public: - Client(ClientContext* ctx, const string& target, const ClientArgs& args) - : thread_([ctx, target, args, this]() { - auto stub = QpsClient::NewStub(CreateChannelDeprecated(target, ChannelArguments())); - status_ = stub->RunTest(ctx, args, &result_); - }) {} - - ~Client() { join(); } - - void join() { if (!joined_) { thread_.join(); joined_ = true; } } - - const Status& status() const { return status_; } - const ClientResult& result() const { return result_; } - - private: - bool joined_ = false; - Status status_; - ClientResult result_; - thread thread_; + struct ClientData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; }; - list<Client> running_clients; - size_t svr_idx = 0; - for (const auto& target : clients) { - ClientArgs args; - *args.mutable_config() = client_config; - for (size_t i = 0; i < num_servers; i++) { - args.add_server_targets(server_targets[svr_idx]); - svr_idx = (svr_idx + 1) % num_servers; - } - - running_clients.emplace_back(alloc_context(), target, args); + vector<ClientData> clients; + for (size_t i = 0; i < num_clients; i++) { + ClientData cd; + cd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i + num_servers], ChannelArguments()))); + ClientArgs args; + *args.mutable_setup() = client_config; + cd.stream = std::move(cd.stub->RunTest(alloc_context())); + GPR_ASSERT(cd.stream->Write(args)); + ClientStatus init_status; + GPR_ASSERT(cd.stream->Read(&init_status)); + + clients.push_back(std::move(cd)); } - // Finish clients - for (auto& client : running_clients) { - client.join(); - if (!client.status().IsOk()) { - gpr_log(GPR_ERROR, "Client failed"); - return; - } + // Let everything warmup + gpr_log(GPR_INFO, "Warming up"); + gpr_timespec start = gpr_now(); + gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(5))); + + // Start a run + gpr_log(GPR_INFO, "Starting"); + ServerArgs server_mark; + server_mark.mutable_mark(); + ClientArgs client_mark; + client_mark.mutable_mark(); + for (auto& server : servers) { + GPR_ASSERT(server.stream->Write(server_mark)); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Write(client_mark)); + } + ServerStatus server_status; + ClientStatus client_status; + for (auto& server : servers) { + GPR_ASSERT(server.stream->Read(&server_status)); } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Read(&client_status)); + } + + // Wait some time + gpr_log(GPR_INFO, "Running"); + gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(15))); - // Finish servers - for (auto& stream : server_streams) { - ServerStatus final_status; - ServerStatus dummy; - if (!stream->WritesDone() || !stream->Read(&final_status) || stream->Read(&dummy) || !stream->Finish().IsOk()) { - gpr_log(GPR_ERROR, "Server protocol error"); - } + // Finish a run + gpr_log(GPR_INFO, "Finishing"); + for (auto& server : servers) { + GPR_ASSERT(server.stream->Write(server_mark)); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Write(client_mark)); + } + for (auto& server : servers) { + GPR_ASSERT(server.stream->Read(&server_status)); } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Read(&client_status)); + } + + for (auto& client : clients) { + GPR_ASSERT(client.stream->WritesDone()); + GPR_ASSERT(client.stream->Finish().IsOk()); + } + for (auto& server : servers) { + GPR_ASSERT(server.stream->WritesDone()); + GPR_ASSERT(server.stream->Finish().IsOk()); + } +} + +} } -#endif
\ No newline at end of file diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 8ac9d2f0a3..5c548bb848 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -36,7 +36,11 @@ #include "test/cpp/qps/qpstest.pb.h" +namespace grpc { +namespace testing { void RunScenario(const grpc::testing::ClientConfig& client_config, size_t num_clients, const grpc::testing::ServerConfig& server_config, size_t num_servers); +} +} #endif diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h new file mode 100644 index 0000000000..b8282a48c2 --- /dev/null +++ b/test/cpp/qps/histogram.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_HISTOGRAM_H +#define TEST_QPS_HISTOGRAM_H + +#include <grpc/support/histogram.h> + +namespace grpc { +namespace testing { + +class Histogram { + public: + Histogram() : impl_(gpr_histogram_create(0.01, 60e9)) {} + ~Histogram() { gpr_histogram_destroy(impl_); } + + void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); } + void Add(double value) { gpr_histogram_add(impl_, value); } + double Percentile(double pctile) { return gpr_histogram_percentile(impl_, pctile); } + double Count() { return gpr_histogram_count(impl_); } + void Swap(Histogram* other) { std::swap(impl_, other->impl_); } + + private: + Histogram(const Histogram&); + Histogram& operator=(const Histogram&); + + gpr_histogram* impl_; +}; + +} +} + +#endif /* TEST_QPS_HISTOGRAM_H */ diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 88b9d373a1..cbe0b857b0 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -32,6 +32,7 @@ */ #include <gflags/gflags.h> +#include <grpc/support/log.h> #include "test/cpp/qps/driver.h" @@ -43,15 +44,18 @@ DEFINE_bool(enable_ssl, false, "Use SSL"); // Server config DEFINE_int32(server_threads, 1, "Number of server threads"); +DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type"); // Client config -DEFINE_int32(client_threads, 1, "Number of client threads"); +DEFINE_int32(outstanding_rpcs_per_channel, 1, "Number of outstanding rpcs per channel"); DEFINE_int32(client_channels, 1, "Number of client channels"); -DEFINE_int32(num_rpcs, 10000, "Number of rpcs per client thread"); DEFINE_int32(payload_size, 1, "Payload size"); +DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; +using grpc::testing::ClientType; +using grpc::testing::ServerType; // In some distros, gflags is in the namespace google, and in some others, // in gflags. This hack is enabling us to find both. @@ -64,14 +68,20 @@ int main(int argc, char **argv) { grpc_init(); ParseCommandLineFlags(&argc, &argv, true); + ClientType client_type; + ServerType server_type; + GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type)); + GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type)); + ClientConfig client_config; + client_config.set_client_type(client_type); client_config.set_enable_ssl(FLAGS_enable_ssl); - client_config.set_client_threads(FLAGS_client_threads); + client_config.set_outstanding_rpcs_per_channel(FLAGS_outstanding_rpcs_per_channel); client_config.set_client_channels(FLAGS_client_channels); - client_config.set_num_rpcs(FLAGS_num_rpcs); client_config.set_payload_size(FLAGS_payload_size); ServerConfig server_config; + server_config.set_server_type(server_type); server_config.set_threads(FLAGS_server_threads); server_config.set_enable_ssl(FLAGS_enable_ssl); diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 52d7957944..39d44cd0ed 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -89,30 +89,21 @@ message ClientConfig { repeated string server_targets = 1; required ClientType client_type = 2; required bool enable_ssl = 3; - required int32 client_threads = 4; - // We have a configurable number of channels for sending RPCs. - // RPCs are sent round-robin on the available channels by the - // various threads. Interesting cases are 1 global channel or - // 1 per-thread channel, but we can support any number. - // The channels are assigned round-robin on an RPC by RPC basis - // rather than just at initialization time in order to also measure the - // impact of cache thrashing caused by channel changes. This is an issue - // if you are not in one of the above "interesting cases" + required int32 outstanding_rpcs_per_channel = 4; required int32 client_channels = 5; - required int32 num_rpcs = 6; - required int32 payload_size = 7; + required int32 payload_size = 6; } -message ClientStart {} +message Mark {} message ClientArgs { oneof argtype { ClientConfig setup = 1; - ClientStart start = 2; + Mark mark = 2; } } -message ClientResult { +message ClientStats { required Latencies latencies = 1; required int32 num_rpcs = 2; required double time_elapsed = 3; @@ -121,7 +112,7 @@ message ClientResult { } message ClientStatus { - optional ClientResult result = 1; + optional ClientStats stats = 1; } message ServerConfig { @@ -131,7 +122,10 @@ message ServerConfig { } message ServerArgs { - required ServerConfig config = 1; + oneof argtype { + ServerConfig setup = 1; + Mark mark = 2; + } } message ServerStatus { diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 75425c9eb8..ebe06c1e4a 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -48,42 +48,14 @@ #include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/server.h" #include "test/cpp/qps/timer.h" #include <grpc/grpc.h> #include <grpc/support/log.h> -DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(driver_port, 0, "Server driver port."); - -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ServerReaderWriter; -using grpc::ThreadPool; -using grpc::testing::Payload; -using grpc::testing::PayloadType; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; -using grpc::testing::QpsServer; -using grpc::testing::ServerArgs; -using grpc::testing::ServerStats; -using grpc::testing::ServerStatus; -using grpc::Status; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google { } -namespace gflags { } -using namespace google; -using namespace gflags; - -static bool got_sigint = false; - -static void sigint_handler(int x) { got_sigint = 1; } +namespace grpc { +namespace testing { static bool SetPayload(PayloadType type, int size, Payload* payload) { PayloadType response_type = type; @@ -97,8 +69,6 @@ static bool SetPayload(PayloadType type, int size, Payload* payload) { return true; } -namespace { - class TestServiceImpl GRPC_FINAL : public TestService::Service { public: Status UnaryCall(ServerContext* context, const SimpleRequest* request, @@ -113,88 +83,46 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service { } }; -} // namespace - -class ServerImpl : public QpsServer::Service { +class SynchronousServer GRPC_FINAL : public grpc::testing::Server { public: - Status RunServer(ServerContext* ctx, ServerReaderWriter<ServerStatus, ServerArgs>* stream) { - ServerArgs args; - if (!stream->Read(&args)) return Status::OK; + SynchronousServer(const ServerConfig& config, int port) : thread_pool_(config.threads()), impl_(MakeImpl(port)), timer_(new Timer) {} - std::lock_guard<std::mutex> lock(server_mu_); + ServerStats Mark() GRPC_OVERRIDE { + std::unique_ptr<Timer> timer(new Timer); + timer.swap(timer_); - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); + auto timer_result = timer->Mark(); - TestServiceImpl service; + ServerStats stats; + stats.set_time_elapsed(timer_result.wall); + stats.set_time_system(timer_result.system); + stats.set_time_user(timer_result.user); + return stats; + } + private: + std::unique_ptr<grpc::Server> MakeImpl(int port) { ServerBuilder builder; - builder.AddPort(server_address); - builder.RegisterService(&service); - - std::unique_ptr<ThreadPool> pool(new ThreadPool(args.config().threads())); - builder.SetThreadPool(pool.get()); - - auto server = builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", port); + builder.AddPort(server_address); gpr_free(server_address); - ServerStatus status; - status.set_port(FLAGS_port); - if (!stream->Write(status)) return Status(grpc::UNKNOWN); - - grpc_profiler_start("qps_server.prof"); - Timer timer; - - if (stream->Read(&args)) { - gpr_log(GPR_ERROR, "Got a server request, but not expecting one"); - return Status(grpc::UNKNOWN); - } - - auto timer_result = timer.Mark(); - grpc_profiler_stop(); + builder.RegisterService(&service_); - auto* stats = status.mutable_stats(); - stats->set_time_elapsed(timer_result.wall); - stats->set_time_system(timer_result.system); - stats->set_time_user(timer_result.user); - stream->Write(status); - return Status::OK; + return builder.BuildAndStart(); } - private: - std::mutex server_mu_; + TestServiceImpl service_; + ThreadPool thread_pool_; + std::unique_ptr<grpc::Server> impl_; + std::unique_ptr<Timer> timer_; }; -static void RunServer() { - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_driver_port); - - ServerImpl service; - - ServerBuilder builder; - builder.AddPort(server_address); - builder.RegisterService(&service); - - gpr_free(server_address); - - auto server = builder.BuildAndStart(); - - while (!got_sigint) { - sleep(5); - } +std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(const ServerConfig& config, int port) { + return std::unique_ptr<Server>(new SynchronousServer(config, port)); } -int main(int argc, char** argv) { - signal(SIGINT, sigint_handler); - - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - - GPR_ASSERT(FLAGS_port != 0); - RunServer(); - - grpc_shutdown(); - return 0; -} +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 25c15e4d0c..35d1aed19f 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -42,6 +42,8 @@ namespace testing { class Server { public: virtual ~Server() {} + + virtual ServerStats Mark() = 0; }; std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config, int port); diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh index 310a0824bb..2f60b4e49d 100755 --- a/test/cpp/qps/single_run_localhost.sh +++ b/test/cpp/qps/single_run_localhost.sh @@ -6,25 +6,23 @@ set -ex cd $(dirname $0)/../../.. -killall qps_server qps_client || true +killall qps_worker || true config=opt NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()'` -make CONFIG=$config qps_client qps_server qps_driver -j$NUMCPUS +make CONFIG=$config qps_worker qps_driver -j$NUMCPUS -bins/$config/qps_server -driver_port 10000 -port 10002 & -SERVER_PID=$! -bins/$config/qps_client -driver_port 10001 & -CLIENT_PID=$! +bins/$config/qps_worker -driver_port 10000 -server_port 10001 & +PID1=$! +bins/$config/qps_worker -driver_port 10010 -server_port 10011 & +PID2=$! -export QPS_SERVERS=localhost:10000 -export QPS_CLIENTS=localhost:10001 +export QPS_WORKERS="localhost:10000,localhost:10010" bins/$config/qps_driver $* -kill -2 $CLIENT_PID -kill -2 $SERVER_PID +kill -2 $PID1 $PID2 wait diff --git a/test/cpp/qps/timer.cc b/test/cpp/qps/timer.cc index 5a5be97071..3c1342041c 100644 --- a/test/cpp/qps/timer.cc +++ b/test/cpp/qps/timer.cc @@ -35,9 +35,15 @@ #include <sys/time.h> #include <sys/resource.h> +#include <grpc/support/time.h> Timer::Timer() : start_(Sample()) {} +double Timer::Now() { + auto ts = gpr_now(); + return ts.tv_sec + 1e-9 * ts.tv_nsec; +} + static double time_double(struct timeval* tv) { return tv->tv_sec + 1e-6 * tv->tv_usec; } diff --git a/test/cpp/qps/timer.h b/test/cpp/qps/timer.h index 8a229cbd30..1c9a006b93 100644 --- a/test/cpp/qps/timer.h +++ b/test/cpp/qps/timer.h @@ -46,6 +46,8 @@ class Timer { Result Mark(); + static double Now(); + private: static Result Sample(); diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 749420bd69..741dfc000d 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -78,7 +78,7 @@ namespace testing { std::unique_ptr<Client> CreateClient(const ClientConfig& config) { switch (config.client_type()) { case ClientType::SYNCHRONOUS_CLIENT: return CreateSynchronousClient(config); - case ClientType::ASYNC_CLIENT: return CreateAsyncClient(config); + case ClientType::ASYNC_CLIENT: abort(); //return CreateAsyncClient(config); } abort(); } @@ -86,7 +86,7 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) { std::unique_ptr<Server> CreateServer(const ServerConfig& config) { switch (config.server_type()) { case ServerType::SYNCHRONOUS_SERVER: return CreateSynchronousServer(config, FLAGS_server_port); - case ServerType::ASYNC_SERVER: return CreateAsyncServer(config, FLAGS_server_port); + case ServerType::ASYNC_SERVER: abort(); //return CreateAsyncServer(config, FLAGS_server_port); } abort(); } @@ -112,6 +112,17 @@ class WorkerImpl final : public Worker::Service { if (!client) { return Status(INVALID_ARGUMENT); } + ClientStatus status; + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = client->Mark(); + stream->Write(status); + } return Status::OK; } @@ -126,13 +137,25 @@ class WorkerImpl final : public Worker::Service { if (!stream->Read(&args)) { return Status(INVALID_ARGUMENT); } - if (!args.has_config()) { + if (!args.has_setup()) { return Status(INVALID_ARGUMENT); } - auto server = CreateServer(args.config()); + auto server = CreateServer(args.setup()); if (!server) { return Status(INVALID_ARGUMENT); } + ServerStatus status; + status.set_port(FLAGS_server_port); + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = server->Mark(); + stream->Write(status); + } return Status::OK; } |