diff options
author | Vijay Pai <vpai@google.com> | 2015-03-04 22:54:07 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-03-04 22:54:07 -0800 |
commit | 0bc6805c6de9c2786da0f379d00f8b787a88e92d (patch) | |
tree | 798bc5d81463076f2c0e216160edecec529d9b18 /test | |
parent | e7a523e910d9fa346887c7620c7ad3e57021748c (diff) | |
parent | 43ef582e42b2e5d3b5052523d9dee1fb2ee0ae33 (diff) |
Merge pull request #837 from ctiller/qps_driver
QPS driver
Diffstat (limited to 'test')
-rw-r--r-- | test/core/util/grpc_profiler.c | 2 | ||||
-rw-r--r-- | test/cpp/qps/client.cc | 252 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 173 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 306 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 93 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 210 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 61 | ||||
-rw-r--r-- | test/cpp/qps/histogram.h | 85 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 132 | ||||
-rw-r--r-- | test/cpp/qps/qpstest.proto | 96 | ||||
-rw-r--r-- | test/cpp/qps/server.cc | 171 | ||||
-rw-r--r-- | test/cpp/qps/server.h | 84 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 146 | ||||
-rw-r--r-- | test/cpp/qps/server_sync.cc | 107 | ||||
-rwxr-xr-x | test/cpp/qps/single_run_localhost.sh | 28 | ||||
-rw-r--r-- | test/cpp/qps/stats.h | 60 | ||||
-rw-r--r-- | test/cpp/qps/timer.cc | 71 | ||||
-rw-r--r-- | test/cpp/qps/timer.h | 57 | ||||
-rw-r--r-- | test/cpp/qps/worker.cc | 235 |
19 files changed, 1576 insertions, 793 deletions
diff --git a/test/core/util/grpc_profiler.c b/test/core/util/grpc_profiler.c index 35b9361c70..d5b6cfeef1 100644 --- a/test/core/util/grpc_profiler.c +++ b/test/core/util/grpc_profiler.c @@ -44,7 +44,7 @@ void grpc_profiler_stop() { ProfilerStop(); } void grpc_profiler_start(const char *filename) { gpr_log(GPR_DEBUG, - "You do not have google-perftools installed, profiling is disabled"); + "You do not have google-perftools installed, profiling is disabled [for %s]", filename); gpr_log(GPR_DEBUG, "To install on ubuntu: sudo apt-get install google-perftools " "libgoogle-perftools-dev"); diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc deleted file mode 100644 index 11c39eb4f5..0000000000 --- a/test/cpp/qps/client.cc +++ /dev/null @@ -1,252 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <cassert> -#include <memory> -#include <string> -#include <thread> -#include <vector> -#include <sstream> - -#include <grpc/grpc.h> -#include <grpc/support/histogram.h> -#include <grpc/support/log.h> -#include <gflags/gflags.h> -#include <grpc++/client_context.h> -#include <grpc++/status.h> -#include "test/core/util/grpc_profiler.h" -#include "test/cpp/util/create_test_channel.h" -#include "test/cpp/qps/qpstest.pb.h" - -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(server_port, 0, "Server port."); -DEFINE_string(server_host, "127.0.0.1", "Server host."); -DEFINE_int32(client_threads, 4, "Number of client threads."); - -// We have a configurable number of channels for sending RPCs. -// RPCs are sent round-robin on the available channels by the -// various threads. Interesting cases are 1 global channel or -// 1 per-thread channel, but we can support any number. -// The channels are assigned round-robin on an RPC by RPC basis -// rather than just at initialization time in order to also measure the -// impact of cache thrashing caused by channel changes. This is an issue -// if you are not in one of the above "interesting cases" -DEFINE_int32(client_channels, 4, "Number of client channels."); - -DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); -DEFINE_int32(payload_size, 1, "Payload size in bytes"); - -// Alternatively, specify parameters for test as a workload so that multiple -// tests are initiated back-to-back. This is convenient for keeping a borg -// allocation consistent. This is a space-separated list of -// [threads channels num_rpcs payload_size ]* -DEFINE_string(workload, "", "Workload parameters"); - -using grpc::ChannelInterface; -using grpc::CreateTestChannel; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google { } -namespace gflags { } -using namespace google; -using namespace gflags; - -static double now() { - gpr_timespec tv = gpr_now(); - return 1e9 * tv.tv_sec + tv.tv_nsec; -} - -void RunTest(const int client_threads, const int client_channels, - const int num_rpcs, const int payload_size) { - gpr_log(GPR_INFO, - "QPS test with parameters\n" - "enable_ssl = %d\n" - "client_channels = %d\n" - "client_threads = %d\n" - "num_rpcs = %d\n" - "payload_size = %d\n" - "server_host:server_port = %s:%d\n\n", - FLAGS_enable_ssl, client_channels, client_threads, num_rpcs, - payload_size, FLAGS_server_host.c_str(), FLAGS_server_port); - - std::ostringstream oss; - oss << FLAGS_server_host << ":" << FLAGS_server_port; - - class ClientChannelInfo { - public: - explicit ClientChannelInfo(const grpc::string &server) - : channel_(CreateTestChannel(server, FLAGS_enable_ssl)), - stub_(TestService::NewStub(channel_)) {} - ChannelInterface *get_channel() { return channel_.get(); } - TestService::Stub *get_stub() { return stub_.get(); } - - private: - std::shared_ptr<ChannelInterface> channel_; - std::unique_ptr<TestService::Stub> stub_; - }; - - std::vector<ClientChannelInfo> channels; - for (int i = 0; i < client_channels; i++) { - channels.push_back(ClientChannelInfo(oss.str())); - } - - std::vector<std::thread> threads; // Will add threads when ready to execute - std::vector< ::gpr_histogram *> thread_stats(client_threads); - - TestService::Stub *stub_stats = channels[0].get_stub(); - grpc::ClientContext context_stats_begin; - StatsRequest stats_request; - ServerStats server_stats_begin; - stats_request.set_test_num(0); - grpc::Status status_beg = stub_stats->CollectServerStats( - &context_stats_begin, stats_request, &server_stats_begin); - - grpc_profiler_start("qps_client.prof"); - - for (int i = 0; i < client_threads; i++) { - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - thread_stats[i] = hist; - - threads.push_back( - std::thread([hist, client_threads, client_channels, num_rpcs, - payload_size, &channels](int channel_num) { - SimpleRequest request; - SimpleResponse response; - request.set_response_type( - grpc::testing::PayloadType::COMPRESSABLE); - request.set_response_size(payload_size); - - for (int j = 0; j < num_rpcs; j++) { - TestService::Stub *stub = - channels[channel_num].get_stub(); - double start = now(); - grpc::ClientContext context; - grpc::Status s = - stub->UnaryCall(&context, request, &response); - gpr_histogram_add(hist, now() - start); - - GPR_ASSERT((s.code() == grpc::StatusCode::OK) && - (response.payload().type() == - grpc::testing::PayloadType::COMPRESSABLE) && - (response.payload().body().length() == - static_cast<size_t>(payload_size))); - - // Now do runtime round-robin assignment of the next - // channel number - channel_num += client_threads; - channel_num %= client_channels; - } - }, - i % client_channels)); - } - - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - for (auto &t : threads) { - t.join(); - } - - grpc_profiler_stop(); - - for (int i = 0; i < client_threads; i++) { - gpr_histogram *h = thread_stats[i]; - gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", - i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), - gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99), - gpr_histogram_percentile(h, 99.9)); - gpr_histogram_merge(hist, h); - gpr_histogram_destroy(h); - } - - gpr_log( - GPR_INFO, - "latency across %d threads with %d channels and %d payload " - "(50/90/95/99/99.9): %f / %f / %f / %f / %f", - client_threads, client_channels, payload_size, - gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90), - gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99), - gpr_histogram_percentile(hist, 99.9)); - gpr_histogram_destroy(hist); - - grpc::ClientContext context_stats_end; - ServerStats server_stats_end; - grpc::Status status_end = stub_stats->CollectServerStats( - &context_stats_end, stats_request, &server_stats_end); - - double elapsed = server_stats_end.time_now() - server_stats_begin.time_now(); - int total_rpcs = client_threads * num_rpcs; - double utime = server_stats_end.time_user() - server_stats_begin.time_user(); - double stime = - server_stats_end.time_system() - server_stats_begin.time_system(); - gpr_log(GPR_INFO, - "Elapsed time: %.3f\n" - "RPC Count: %d\n" - "QPS: %.3f\n" - "System time: %.3f\n" - "User time: %.3f\n" - "Resource usage: %.1f%%\n", - elapsed, total_rpcs, total_rpcs / elapsed, stime, utime, - (stime + utime) / elapsed * 100.0); -} - -int main(int argc, char **argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - - GPR_ASSERT(FLAGS_server_port); - - if (FLAGS_workload.length() == 0) { - RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs, - FLAGS_payload_size); - } else { - std::istringstream workload(FLAGS_workload); - int client_threads, client_channels, num_rpcs, payload_size; - workload >> client_threads; - while (!workload.eof()) { - workload >> client_channels >> num_rpcs >> payload_size; - RunTest(client_threads, client_channels, num_rpcs, payload_size); - workload >> client_threads; - } - gpr_log(GPR_INFO, "Done with specified workload."); - } - - grpc_shutdown(); - return 0; -} diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h new file mode 100644 index 0000000000..221fb30fc5 --- /dev/null +++ b/test/cpp/qps/client.h @@ -0,0 +1,173 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_CLIENT_H +#define TEST_QPS_CLIENT_H + +#include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/qpstest.pb.h" + +#include <condition_variable> +#include <mutex> + +namespace grpc { +namespace testing { + +class Client { + public: + explicit Client(const ClientConfig& config) : timer_(new Timer) { + for (int i = 0; i < config.client_channels(); i++) { + channels_.push_back(ClientChannelInfo( + config.server_targets(i % config.server_targets_size()), config)); + } + request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + request_.set_response_size(config.payload_size()); + } + virtual ~Client() {} + + ClientStats Mark() { + Histogram latencies; + std::vector<Histogram> to_merge(threads_.size()); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->BeginSwap(&to_merge[i]); + } + std::unique_ptr<Timer> timer(new Timer); + timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(&to_merge[i]); + } + + auto timer_result = timer->Mark(); + + ClientStats stats; + latencies.FillProto(stats.mutable_latencies()); + stats.set_time_elapsed(timer_result.wall); + stats.set_time_system(timer_result.system); + stats.set_time_user(timer_result.user); + return stats; + } + + protected: + SimpleRequest request_; + + class ClientChannelInfo { + public: + ClientChannelInfo(const grpc::string& target, const ClientConfig& config) + : channel_(CreateTestChannel(target, config.enable_ssl())), + stub_(TestService::NewStub(channel_)) {} + ChannelInterface* get_channel() { return channel_.get(); } + TestService::Stub* get_stub() { return stub_.get(); } + + private: + std::shared_ptr<ChannelInterface> channel_; + std::unique_ptr<TestService::Stub> stub_; + }; + std::vector<ClientChannelInfo> channels_; + + void StartThreads(size_t num_threads) { + for (size_t i = 0; i < num_threads; i++) { + threads_.emplace_back(new Thread(this, i)); + } + } + + void EndThreads() { threads_.clear(); } + + virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + + private: + class Thread { + public: + Thread(Client* client, size_t idx) + : done_(false), + new_(nullptr), + impl_([this, idx, client]() { + for (;;) { + // run the loop body + client->ThreadFunc(&histogram_, idx); + // lock, see if we're done + std::lock_guard<std::mutex> g(mu_); + if (done_) return; + // also check if we're marking, and swap out the histogram if so + if (new_) { + new_->Swap(&histogram_); + new_ = nullptr; + cv_.notify_one(); + } + } + }) {} + + ~Thread() { + { + std::lock_guard<std::mutex> g(mu_); + done_ = true; + } + impl_.join(); + } + + void BeginSwap(Histogram* n) { + std::lock_guard<std::mutex> g(mu_); + new_ = n; + } + + void EndSwap() { + std::unique_lock<std::mutex> g(mu_); + cv_.wait(g, [this]() { return new_ == nullptr; }); + } + + private: + Thread(const Thread&); + Thread& operator=(const Thread&); + + TestService::Stub* stub_; + ClientConfig config_; + std::mutex mu_; + std::condition_variable cv_; + bool done_; + Histogram* new_; + Histogram histogram_; + std::thread impl_; + }; + + std::vector<std::unique_ptr<Thread>> threads_; + std::unique_ptr<Timer> timer_; +}; + +std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args); +std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args); + +} // namespace testing +} // namespace grpc + +#endif diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ea9cfe8b9..5eb9ff6521 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -49,86 +49,53 @@ #include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/client.h" -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(server_port, 0, "Server port."); -DEFINE_string(server_host, "127.0.0.1", "Server host."); -DEFINE_int32(client_threads, 4, "Number of client threads."); - -// We have a configurable number of channels for sending RPCs. -// RPCs are sent round-robin on the available channels by the -// various threads. Interesting cases are 1 global channel or -// 1 per-thread channel, but we can support any number. -// The channels are assigned round-robin on an RPC by RPC basis -// rather than just at initialization time in order to also measure the -// impact of cache thrashing caused by channel changes. This is an issue -// if you are not in one of the above "interesting cases" -DEFINE_int32(client_channels, 4, "Number of client channels."); - -DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); -DEFINE_int32(payload_size, 1, "Payload size in bytes"); - -// Alternatively, specify parameters for test as a workload so that multiple -// tests are initiated back-to-back. This is convenient for keeping a borg -// allocation consistent. This is a space-separated list of -// [threads channels num_rpcs payload_size ]* -DEFINE_string(workload, "", "Workload parameters"); - -using grpc::ChannelInterface; -using grpc::CreateTestChannel; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static double now() { - gpr_timespec tv = gpr_now(); - return 1e9 * tv.tv_sec + tv.tv_nsec; -} +namespace grpc { +namespace testing { class ClientRpcContext { public: ClientRpcContext() {} virtual ~ClientRpcContext() {} virtual bool RunNextState() = 0; // do next state, return false if steps done + virtual void StartNewClone() = 0; static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); } static ClientRpcContext *detag(void *t) { return reinterpret_cast<ClientRpcContext *>(t); } - virtual void report_stats(gpr_histogram *hist) = 0; + virtual void report_stats(Histogram *hist) = 0; }; + template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - TestService::Stub *stub, - const RequestType &req, + TestService::Stub *stub, const RequestType &req, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub *, grpc::ClientContext *, const RequestType &, - void *)> start_req, + TestService::Stub *, grpc::ClientContext *, const RequestType &, + void *)> start_req, std::function<void(grpc::Status, ResponseType *)> on_done) : context_(), - stub_(stub), + stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::ReqSent), callback_(on_done), - start_(now()), + start_req_(start_req), + start_(Timer::Now()), response_reader_( - start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} + start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); } - void report_stats(gpr_histogram *hist) GRPC_OVERRIDE { - gpr_histogram_add(hist, now() - start_); + void report_stats(Histogram *hist) GRPC_OVERRIDE { + hist->Add((Timer::Now() - start_) * 1e9); + } + + void StartNewClone() { + new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); } private: @@ -151,191 +118,84 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextUnaryImpl::*next_state_)(); std::function<void(grpc::Status, ResponseType *)> callback_; + std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( + TestService::Stub *, grpc::ClientContext *, const RequestType &, void *)> + start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; }; -static void RunTest(const int client_threads, const int client_channels, - const int num_rpcs, const int payload_size) { - gpr_log(GPR_INFO, - "QPS test with parameters\n" - "enable_ssl = %d\n" - "client_channels = %d\n" - "client_threads = %d\n" - "num_rpcs = %d\n" - "payload_size = %d\n" - "server_host:server_port = %s:%d\n\n", - FLAGS_enable_ssl, client_channels, client_threads, num_rpcs, - payload_size, FLAGS_server_host.c_str(), FLAGS_server_port); - - std::ostringstream oss; - oss << FLAGS_server_host << ":" << FLAGS_server_port; - - class ClientChannelInfo { - public: - explicit ClientChannelInfo(const grpc::string &server) - : channel_(CreateTestChannel(server, FLAGS_enable_ssl)), - stub_(TestService::NewStub(channel_)) {} - ChannelInterface *get_channel() { return channel_.get(); } - TestService::Stub *get_stub() { return stub_.get(); } +class AsyncClient GRPC_FINAL : public Client { + public: + explicit AsyncClient(const ClientConfig &config) : Client(config) { + for (int i = 0; i < config.async_client_threads(); i++) { + cli_cqs_.emplace_back(new CompletionQueue); + } - private: - std::shared_ptr<ChannelInterface> channel_; - std::unique_ptr<TestService::Stub> stub_; - }; + auto payload_size = config.payload_size(); + auto check_done = [payload_size](grpc::Status s, SimpleResponse *response) { + GPR_ASSERT(s.IsOk() && (response->payload().type() == + grpc::testing::PayloadType::COMPRESSABLE) && + (response->payload().body().length() == + static_cast<size_t>(payload_size))); + }; + + int t = 0; + for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + for (auto &channel : channels_) { + auto *cq = cli_cqs_[t].get(); + t = (t + 1) % cli_cqs_.size(); + auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, + const SimpleRequest &request, void *tag) { + return stub->AsyncUnaryCall(ctx, request, cq, tag); + }; + + TestService::Stub *stub = channel.get_stub(); + const SimpleRequest &request = request_; + new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + stub, request, start_req, check_done); + } + } - std::vector<ClientChannelInfo> channels; - for (int i = 0; i < client_channels; i++) { - channels.push_back(ClientChannelInfo(oss.str())); + StartThreads(config.async_client_threads()); } - std::vector<std::thread> threads; // Will add threads when ready to execute - std::vector< ::gpr_histogram *> thread_stats(client_threads); - - TestService::Stub *stub_stats = channels[0].get_stub(); - grpc::ClientContext context_stats_begin; - StatsRequest stats_request; - ServerStats server_stats_begin; - stats_request.set_test_num(0); - grpc::Status status_beg = stub_stats->CollectServerStats( - &context_stats_begin, stats_request, &server_stats_begin); - - grpc_profiler_start("qps_client_async.prof"); - - auto CheckDone = [=](grpc::Status s, SimpleResponse *response) { - GPR_ASSERT(s.IsOk() && (response->payload().type() == - grpc::testing::PayloadType::COMPRESSABLE) && - (response->payload().body().length() == - static_cast<size_t>(payload_size))); - }; + ~AsyncClient() GRPC_OVERRIDE { + EndThreads(); - for (int i = 0; i < client_threads; i++) { - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - thread_stats[i] = hist; - - threads.push_back(std::thread( - [hist, client_threads, client_channels, num_rpcs, payload_size, - &channels, &CheckDone](int channel_num) { - using namespace std::placeholders; - SimpleRequest request; - request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request.set_response_size(payload_size); - - grpc::CompletionQueue cli_cq; - auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, _1, - _2, _3, &cli_cq, _4); - - int rpcs_sent = 0; - while (rpcs_sent < num_rpcs) { - rpcs_sent++; - TestService::Stub *stub = channels[channel_num].get_stub(); - new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(stub, - request, start_req, CheckDone); - void *got_tag; - bool ok; - - // Need to call 2 next for every 1 RPC (1 for req done, 1 for resp - // done) - cli_cq.Next(&got_tag, &ok); - if (!ok) break; - ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState() == false) { - // call the callback and then delete it - ctx->report_stats(hist); - ctx->RunNextState(); - delete ctx; - } - cli_cq.Next(&got_tag, &ok); - if (!ok) break; - ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState() == false) { - // call the callback and then delete it - ctx->report_stats(hist); - ctx->RunNextState(); - delete ctx; - } - // Now do runtime round-robin assignment of the next - // channel number - channel_num += client_threads; - channel_num %= client_channels; - } - }, - i % client_channels)); - } - - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); - for (auto &t : threads) { - t.join(); + for (auto &cq : cli_cqs_) { + cq->Shutdown(); + void *got_tag; + bool ok; + while (cq->Next(&got_tag, &ok)) { + delete ClientRpcContext::detag(got_tag); + } + } } - grpc_profiler_stop(); - - for (int i = 0; i < client_threads; i++) { - gpr_histogram *h = thread_stats[i]; - gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", - i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), - gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99), - gpr_histogram_percentile(h, 99.9)); - gpr_histogram_merge(hist, h); - gpr_histogram_destroy(h); + void ThreadFunc(Histogram *histogram, size_t thread_idx) { + void *got_tag; + bool ok; + cli_cqs_[thread_idx]->Next(&got_tag, &ok); + + ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState() == false) { + // call the callback and then delete it + ctx->report_stats(histogram); + ctx->RunNextState(); + ctx->StartNewClone(); + delete ctx; + } } - gpr_log( - GPR_INFO, - "latency across %d threads with %d channels and %d payload " - "(50/90/95/99/99.9): %f / %f / %f / %f / %f", - client_threads, client_channels, payload_size, - gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90), - gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99), - gpr_histogram_percentile(hist, 99.9)); - gpr_histogram_destroy(hist); - - grpc::ClientContext context_stats_end; - ServerStats server_stats_end; - grpc::Status status_end = stub_stats->CollectServerStats( - &context_stats_end, stats_request, &server_stats_end); + std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; +}; - double elapsed = server_stats_end.time_now() - server_stats_begin.time_now(); - int total_rpcs = client_threads * num_rpcs; - double utime = server_stats_end.time_user() - server_stats_begin.time_user(); - double stime = - server_stats_end.time_system() - server_stats_begin.time_system(); - gpr_log(GPR_INFO, - "Elapsed time: %.3f\n" - "RPC Count: %d\n" - "QPS: %.3f\n" - "System time: %.3f\n" - "User time: %.3f\n" - "Resource usage: %.1f%%\n", - elapsed, total_rpcs, total_rpcs / elapsed, stime, utime, - (stime + utime) / elapsed * 100.0); +std::unique_ptr<Client> CreateAsyncClient(const ClientConfig &args) { + return std::unique_ptr<Client>(new AsyncClient(args)); } -int main(int argc, char **argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - - GPR_ASSERT(FLAGS_server_port); - - if (FLAGS_workload.length() == 0) { - RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs, - FLAGS_payload_size); - } else { - std::istringstream workload(FLAGS_workload); - int client_threads, client_channels, num_rpcs, payload_size; - workload >> client_threads; - while (!workload.eof()) { - workload >> client_channels >> num_rpcs >> payload_size; - RunTest(client_threads, client_channels, num_rpcs, payload_size); - workload >> client_threads; - } - gpr_log(GPR_INFO, "Done with specified workload."); - } - - grpc_shutdown(); - return 0; -} +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc new file mode 100644 index 0000000000..7bb7231c6f --- /dev/null +++ b/test/cpp/qps/client_sync.cc @@ -0,0 +1,93 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <cassert> +#include <memory> +#include <mutex> +#include <string> +#include <thread> +#include <vector> +#include <sstream> + +#include <sys/signal.h> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/histogram.h> +#include <grpc/support/log.h> +#include <grpc/support/host_port.h> +#include <gflags/gflags.h> +#include <grpc++/client_context.h> +#include <grpc++/status.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/qps/client.h" +#include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/timer.h" + +namespace grpc { +namespace testing { + +class SynchronousClient GRPC_FINAL : public Client { + public: + SynchronousClient(const ClientConfig& config) : Client(config) { + size_t num_threads = + config.outstanding_rpcs_per_channel() * config.client_channels(); + responses_.resize(num_threads); + StartThreads(num_threads); + } + + ~SynchronousClient() { EndThreads(); } + + void ThreadFunc(Histogram* histogram, size_t thread_idx) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + double start = Timer::Now(); + grpc::ClientContext context; + grpc::Status s = + stub->UnaryCall(&context, request_, &responses_[thread_idx]); + histogram->Add((Timer::Now() - start) * 1e9); + } + + private: + std::vector<SimpleResponse> responses_; +}; + +std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { + return std::unique_ptr<Client>(new SynchronousClient(config)); +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc new file mode 100644 index 0000000000..6d5df799a2 --- /dev/null +++ b/test/cpp/qps/driver.cc @@ -0,0 +1,210 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/driver.h" +#include "src/core/support/env.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/host_port.h> +#include <grpc++/channel_arguments.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/stream.h> +#include <list> +#include <thread> +#include <vector> +#include "test/cpp/qps/histogram.h" + +using std::list; +using std::thread; +using std::unique_ptr; +using std::vector; + +namespace grpc { +namespace testing { +static vector<string> get_hosts(const string& name) { + char* env = gpr_getenv(name.c_str()); + if (!env) return vector<string>(); + + vector<string> out; + char* p = env; + for (;;) { + char* comma = strchr(p, ','); + if (comma) { + out.emplace_back(p, comma); + p = comma + 1; + } else { + out.emplace_back(p); + gpr_free(env); + return out; + } + } +} + +ScenarioResult RunScenario(const ClientConfig& initial_client_config, + size_t num_clients, + const ServerConfig& server_config, + size_t num_servers) { + // ClientContext allocator (all are destroyed at scope exit) + list<ClientContext> contexts; + auto alloc_context = [&contexts]() { + contexts.emplace_back(); + return &contexts.back(); + }; + + // Get client, server lists + auto workers = get_hosts("QPS_WORKERS"); + ClientConfig client_config = initial_client_config; + + // TODO(ctiller): support running multiple configurations, and binpack + // client/server pairs + // to available workers + GPR_ASSERT(workers.size() >= num_clients + num_servers); + + // Trim to just what we need + workers.resize(num_clients + num_servers); + + // Start servers + struct ServerData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; + }; + vector<ServerData> servers; + for (size_t i = 0; i < num_servers; i++) { + ServerData sd; + sd.stub = std::move(Worker::NewStub( + CreateChannelDeprecated(workers[i], ChannelArguments()))); + ServerArgs args; + *args.mutable_setup() = server_config; + sd.stream = std::move(sd.stub->RunServer(alloc_context())); + GPR_ASSERT(sd.stream->Write(args)); + ServerStatus init_status; + GPR_ASSERT(sd.stream->Read(&init_status)); + char* host; + char* driver_port; + char* cli_target; + gpr_split_host_port(workers[i].c_str(), &host, &driver_port); + gpr_join_host_port(&cli_target, host, init_status.port()); + client_config.add_server_targets(cli_target); + gpr_free(host); + gpr_free(driver_port); + gpr_free(cli_target); + + servers.push_back(std::move(sd)); + } + + // Start clients + struct ClientData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; + }; + vector<ClientData> clients; + for (size_t i = 0; i < num_clients; i++) { + ClientData cd; + cd.stub = std::move(Worker::NewStub( + CreateChannelDeprecated(workers[i + num_servers], ChannelArguments()))); + ClientArgs args; + *args.mutable_setup() = client_config; + cd.stream = std::move(cd.stub->RunTest(alloc_context())); + GPR_ASSERT(cd.stream->Write(args)); + ClientStatus init_status; + GPR_ASSERT(cd.stream->Read(&init_status)); + + clients.push_back(std::move(cd)); + } + + // Let everything warmup + gpr_log(GPR_INFO, "Warming up"); + gpr_timespec start = gpr_now(); + gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(5))); + + // Start a run + gpr_log(GPR_INFO, "Starting"); + ServerArgs server_mark; + server_mark.mutable_mark(); + ClientArgs client_mark; + client_mark.mutable_mark(); + for (auto& server : servers) { + GPR_ASSERT(server.stream->Write(server_mark)); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Write(client_mark)); + } + ServerStatus server_status; + ClientStatus client_status; + for (auto& server : servers) { + GPR_ASSERT(server.stream->Read(&server_status)); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Read(&client_status)); + } + + // Wait some time + gpr_log(GPR_INFO, "Running"); + gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(15))); + + // Finish a run + ScenarioResult result; + gpr_log(GPR_INFO, "Finishing"); + for (auto& server : servers) { + GPR_ASSERT(server.stream->Write(server_mark)); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Write(client_mark)); + } + for (auto& server : servers) { + GPR_ASSERT(server.stream->Read(&server_status)); + const auto& stats = server_status.stats(); + result.server_resources.push_back(ResourceUsage{ + stats.time_elapsed(), stats.time_user(), stats.time_system()}); + } + for (auto& client : clients) { + GPR_ASSERT(client.stream->Read(&client_status)); + const auto& stats = client_status.stats(); + result.latencies.MergeProto(stats.latencies()); + result.client_resources.push_back(ResourceUsage{ + stats.time_elapsed(), stats.time_user(), stats.time_system()}); + } + + for (auto& client : clients) { + GPR_ASSERT(client.stream->WritesDone()); + GPR_ASSERT(client.stream->Finish().IsOk()); + } + for (auto& server : servers) { + GPR_ASSERT(server.stream->WritesDone()); + GPR_ASSERT(server.stream->Finish().IsOk()); + } + return result; +} +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h new file mode 100644 index 0000000000..d87e80dc55 --- /dev/null +++ b/test/cpp/qps/driver.h @@ -0,0 +1,61 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_DRIVER_H +#define TEST_QPS_DRIVER_H + +#include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/qpstest.pb.h" + +namespace grpc { +namespace testing { +struct ResourceUsage { + double wall_time; + double user_time; + double system_time; +}; + +struct ScenarioResult { + Histogram latencies; + std::vector<ResourceUsage> client_resources; + std::vector<ResourceUsage> server_resources; +}; + +ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config, + size_t num_clients, + const grpc::testing::ServerConfig& server_config, + size_t num_servers); +} // namespace testing +} // namespace grpc + +#endif diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h new file mode 100644 index 0000000000..7ba00e94c3 --- /dev/null +++ b/test/cpp/qps/histogram.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_HISTOGRAM_H +#define TEST_QPS_HISTOGRAM_H + +#include <grpc/support/histogram.h> +#include "test/cpp/qps/qpstest.pb.h" + +namespace grpc { +namespace testing { + +class Histogram { + public: + Histogram() : impl_(gpr_histogram_create(0.01, 60e9)) {} + ~Histogram() { + if (impl_) gpr_histogram_destroy(impl_); + } + Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; } + + void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); } + void Add(double value) { gpr_histogram_add(impl_, value); } + double Percentile(double pctile) { + return gpr_histogram_percentile(impl_, pctile); + } + double Count() { return gpr_histogram_count(impl_); } + void Swap(Histogram* other) { std::swap(impl_, other->impl_); } + void FillProto(HistogramData* p) { + size_t n; + const auto* data = gpr_histogram_get_contents(impl_, &n); + for (size_t i = 0; i < n; i++) { + p->add_bucket(data[i]); + } + p->set_min_seen(gpr_histogram_minimum(impl_)); + p->set_max_seen(gpr_histogram_maximum(impl_)); + p->set_sum(gpr_histogram_sum(impl_)); + p->set_sum_of_squares(gpr_histogram_sum_of_squares(impl_)); + p->set_count(gpr_histogram_count(impl_)); + } + void MergeProto(const HistogramData& p) { + gpr_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(), + p.min_seen(), p.max_seen(), p.sum(), + p.sum_of_squares(), p.count()); + } + + private: + Histogram(const Histogram&); + Histogram& operator=(const Histogram&); + + gpr_histogram* impl_; +}; +} +} + +#endif /* TEST_QPS_HISTOGRAM_H */ diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc new file mode 100644 index 0000000000..bf51e7408e --- /dev/null +++ b/test/cpp/qps/qps_driver.cc @@ -0,0 +1,132 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <gflags/gflags.h> +#include <grpc/support/log.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/stats.h" + +DEFINE_int32(num_clients, 1, "Number of client binaries"); +DEFINE_int32(num_servers, 1, "Number of server binaries"); + +// Common config +DEFINE_bool(enable_ssl, false, "Use SSL"); + +// Server config +DEFINE_int32(server_threads, 1, "Number of server threads"); +DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type"); + +// Client config +DEFINE_int32(outstanding_rpcs_per_channel, 1, + "Number of outstanding rpcs per channel"); +DEFINE_int32(client_channels, 1, "Number of client channels"); +DEFINE_int32(payload_size, 1, "Payload size"); +DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); +DEFINE_int32(async_client_threads, 1, "Async client threads"); + +using grpc::testing::ClientConfig; +using grpc::testing::ServerConfig; +using grpc::testing::ClientType; +using grpc::testing::ServerType; +using grpc::testing::ResourceUsage; +using grpc::testing::sum; + +// In some distros, gflags is in the namespace google, and in some others, +// in gflags. This hack is enabling us to find both. +namespace google {} +namespace gflags {} +using namespace google; +using namespace gflags; + +int main(int argc, char **argv) { + grpc_init(); + ParseCommandLineFlags(&argc, &argv, true); + + ClientType client_type; + ServerType server_type; + GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type)); + GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type)); + + ClientConfig client_config; + client_config.set_client_type(client_type); + client_config.set_enable_ssl(FLAGS_enable_ssl); + client_config.set_outstanding_rpcs_per_channel( + FLAGS_outstanding_rpcs_per_channel); + client_config.set_client_channels(FLAGS_client_channels); + client_config.set_payload_size(FLAGS_payload_size); + client_config.set_async_client_threads(FLAGS_async_client_threads); + + ServerConfig server_config; + server_config.set_server_type(server_type); + server_config.set_threads(FLAGS_server_threads); + server_config.set_enable_ssl(FLAGS_enable_ssl); + + auto result = RunScenario(client_config, FLAGS_num_clients, server_config, + FLAGS_num_servers); + + gpr_log(GPR_INFO, "QPS: %.1f", + result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; })); + + gpr_log(GPR_INFO, "Latencies (50/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f us", + result.latencies.Percentile(50) / 1000, + result.latencies.Percentile(95) / 1000, + result.latencies.Percentile(99) / 1000, + result.latencies.Percentile(99.9) / 1000); + + gpr_log(GPR_INFO, "Server system time: %.2f%%", + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.server_resources, + [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "Server user time: %.2f%%", + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.server_resources, + [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "Client system time: %.2f%%", + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.client_resources, + [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "Client user time: %.2f%%", + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.client_resources, + [](ResourceUsage u) { return u.wall_time; })); + + grpc_shutdown(); + return 0; +} diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 68ec6149f5..6a7170bf58 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -51,17 +51,14 @@ message StatsRequest { } message ServerStats { - // wall clock time for timestamp - required double time_now = 1; + // wall clock time + required double time_elapsed = 1; // user time used by the server process and threads required double time_user = 2; // server time used by the server process and all threads required double time_system = 3; - - // RPC count so far - optional int32 num_rpcs = 4; } message Payload { @@ -71,31 +68,75 @@ message Payload { optional bytes body = 2; } -message Latencies { - required double l_50 = 1; - required double l_90 = 2; - required double l_99 = 3; - required double l_999 = 4; +message HistogramData { + repeated uint32 bucket = 1; + required double min_seen = 2; + required double max_seen = 3; + required double sum = 4; + required double sum_of_squares = 5; + required double count = 6; +} + +enum ClientType { + SYNCHRONOUS_CLIENT = 1; + ASYNC_CLIENT = 2; +} + +enum ServerType { + SYNCHRONOUS_SERVER = 1; + ASYNC_SERVER = 2; +} + +message ClientConfig { + repeated string server_targets = 1; + required ClientType client_type = 2; + required bool enable_ssl = 3; + required int32 outstanding_rpcs_per_channel = 4; + required int32 client_channels = 5; + required int32 payload_size = 6; + // only for async client: + optional int32 async_client_threads = 7; } -message StartArgs { - required string server_host = 1; - required int32 server_port = 2; - optional bool enable_ssl = 3 [default = false]; - optional int32 client_threads = 4 [default = 1]; - optional int32 client_channels = 5 [default = -1]; - optional int32 num_rpcs = 6 [default = 1]; - optional int32 payload_size = 7 [default = 1]; +// Request current stats +message Mark {} + +message ClientArgs { + oneof argtype { + ClientConfig setup = 1; + Mark mark = 2; + } } -message StartResult { - required Latencies latencies = 1; - required int32 num_rpcs = 2; +message ClientStats { + required HistogramData latencies = 1; required double time_elapsed = 3; required double time_user = 4; required double time_system = 5; } +message ClientStatus { + optional ClientStats stats = 1; +} + +message ServerConfig { + required ServerType server_type = 1; + required int32 threads = 2; + required bool enable_ssl = 3; +} + +message ServerArgs { + oneof argtype { + ServerConfig setup = 1; + Mark mark = 2; + } +} + +message ServerStatus { + optional ServerStats stats = 1; + required int32 port = 2; +} + message SimpleRequest { // Desired payload type in the response from the server. // If response_type is RANDOM, server randomly chooses one from other formats. @@ -153,12 +194,6 @@ message StreamingOutputCallResponse { } service TestService { - // Start test with specified workload - rpc StartTest(StartArgs) returns (Latencies); - - // Collect stats from server, ignore request content - rpc CollectServerStats(StatsRequest) returns (ServerStats); - // One request followed by one response. // The server returns the client payload as-is. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); @@ -186,3 +221,10 @@ service TestService { rpc HalfDuplexCall(stream StreamingOutputCallRequest) returns (stream StreamingOutputCallResponse); } + +service Worker { + // Start test with specified workload + rpc RunTest(stream ClientArgs) returns (stream ClientStatus); + // Start test with specified workload + rpc RunServer(stream ServerArgs) returns (stream ServerStatus); +} diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc deleted file mode 100644 index be27c12b30..0000000000 --- a/test/cpp/qps/server.cc +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <sys/time.h> -#include <sys/resource.h> -#include <sys/signal.h> -#include <thread> - -#include <unistd.h> - -#include <gflags/gflags.h> -#include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> -#include <grpc++/config.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> -#include <grpc++/server_context.h> -#include <grpc++/status.h> -#include "src/cpp/server/thread_pool.h" -#include "test/core/util/grpc_profiler.h" -#include "test/cpp/qps/qpstest.pb.h" - -#include <grpc/grpc.h> -#include <grpc/support/log.h> - -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); - -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ThreadPool; -using grpc::testing::Payload; -using grpc::testing::PayloadType; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; -using grpc::Status; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google { } -namespace gflags { } -using namespace google; -using namespace gflags; - -static bool got_sigint = false; - -static void sigint_handler(int x) { got_sigint = 1; } - -static double time_double(struct timeval* tv) { - return tv->tv_sec + 1e-6 * tv->tv_usec; -} - -static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - -namespace { - -class TestServiceImpl GRPC_FINAL : public TestService::Service { - public: - Status CollectServerStats(ServerContext* context, const StatsRequest*, - ServerStats* response) { - struct rusage usage; - struct timeval tv; - gettimeofday(&tv, NULL); - getrusage(RUSAGE_SELF, &usage); - response->set_time_now(time_double(&tv)); - response->set_time_user(time_double(&usage.ru_utime)); - response->set_time_system(time_double(&usage.ru_stime)); - return Status::OK; - } - Status UnaryCall(ServerContext* context, const SimpleRequest* request, - SimpleResponse* response) { - if (request->has_response_size() && request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } - } - return Status::OK; - } -}; - -} // namespace - -static void RunServer() { - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); - - TestServiceImpl service; - - SimpleRequest request; - SimpleResponse response; - - ServerBuilder builder; - builder.AddPort(server_address); - builder.RegisterService(&service); - - std::unique_ptr<ThreadPool> pool(new ThreadPool(FLAGS_server_threads)); - builder.SetThreadPool(pool.get()); - - std::unique_ptr<Server> server(builder.BuildAndStart()); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); - - grpc_profiler_start("qps_server.prof"); - - while (!got_sigint) { - sleep(5); - } - - grpc_profiler_stop(); - - gpr_free(server_address); -} - -int main(int argc, char** argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - - signal(SIGINT, sigint_handler); - - GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); - RunServer(); - - grpc_shutdown(); - return 0; -} diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h new file mode 100644 index 0000000000..ef71cb94d0 --- /dev/null +++ b/test/cpp/qps/server.h @@ -0,0 +1,84 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_SERVER_H +#define TEST_QPS_SERVER_H + +#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/qpstest.pb.h" + +namespace grpc { +namespace testing { + +class Server { + public: + Server() : timer_(new Timer) {} + virtual ~Server() {} + + ServerStats Mark() { + std::unique_ptr<Timer> timer(new Timer); + timer.swap(timer_); + + auto timer_result = timer->Mark(); + + ServerStats stats; + stats.set_time_elapsed(timer_result.wall); + stats.set_time_system(timer_result.system); + stats.set_time_user(timer_result.user); + return stats; + } + + static bool SetPayload(PayloadType type, int size, Payload* payload) { + PayloadType response_type = type; + // TODO(yangg): Support UNCOMPRESSABLE payload. + if (type != PayloadType::COMPRESSABLE) { + return false; + } + payload->set_type(response_type); + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); + return true; + } + + private: + std::unique_ptr<Timer> timer_; +}; + +std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config, + int port); +std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port); + +} // namespace testing +} // namespace grpc + +#endif diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c006262fc3..64aca957e4 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -51,104 +51,38 @@ #include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/server.h" #include <grpc/grpc.h> #include <grpc/support/log.h> -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); +namespace grpc { +namespace testing { -using grpc::CompletionQueue; -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ThreadPool; -using grpc::testing::Payload; -using grpc::testing::PayloadType; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; -using grpc::Status; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static bool got_sigint = false; - -static void sigint_handler(int x) { got_sigint = 1; } - -static double time_double(struct timeval *tv) { - return tv->tv_sec + 1e-6 * tv->tv_usec; -} - -static bool SetPayload(PayloadType type, int size, Payload *payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - -namespace { - -class AsyncQpsServerTest { +class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + AsyncQpsServerTest(const ServerConfig &config, int port) + : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { char *server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); + gpr_join_host_port(&server_address, "::", port); ServerBuilder builder; builder.AddPort(server_address); + gpr_free(server_address); builder.RegisterAsyncService(&async_service_); server_ = builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); - gpr_free(server_address); using namespace std::placeholders; request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, _1, _2, _3, &srv_cq_, _4); - request_stats_ = - std::bind(&TestService::AsyncService::RequestCollectServerStats, - &async_service_, _1, _2, _3, &srv_cq_, _4); for (int i = 0; i < 100; i++) { contexts_.push_front( new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( request_unary_, UnaryCall)); - contexts_.push_front( - new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>( - request_stats_, CollectServerStats)); - } - } - ~AsyncQpsServerTest() { - server_->Shutdown(); - void *ignored_tag; - bool ignored_ok; - srv_cq_.Shutdown(); - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) { } - while (!contexts_.empty()) { - delete contexts_.front(); - contexts_.pop_front(); - } - for (auto& thr: threads_) { - thr.join(); - } - } - void ServeRpcs(int num_threads) { - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; @@ -166,8 +100,16 @@ class AsyncQpsServerTest { return; })); } - while (!got_sigint) { - std::this_thread::sleep_for(std::chrono::seconds(5)); + } + ~AsyncQpsServerTest() { + server_->Shutdown(); + srv_cq_.Shutdown(); + for (auto &thr : threads_) { + thr.join(); + } + while (!contexts_.empty()) { + delete contexts_.front(); + contexts_.pop_front(); } } @@ -176,8 +118,8 @@ class AsyncQpsServerTest { public: ServerRpcContext() {} virtual ~ServerRpcContext(){}; - virtual bool RunNextState() = 0;// do next state, return false if all done - virtual void Reset() = 0; // start this back at a clean state + virtual bool RunNextState() = 0; // do next state, return false if all done + virtual void Reset() = 0; // start this back at a clean state }; static void *tag(ServerRpcContext *func) { return reinterpret_cast<void *>(func); @@ -240,17 +182,6 @@ class AsyncQpsServerTest { grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; - static Status CollectServerStats(const StatsRequest *, - ServerStats *response) { - struct rusage usage; - struct timeval tv; - gettimeofday(&tv, NULL); - getrusage(RUSAGE_SELF, &usage); - response->set_time_now(time_double(&tv)); - response->set_time_user(time_double(&usage.ru_utime)); - response->set_time_system(time_double(&usage.ru_stime)); - return Status::OK; - } static Status UnaryCall(const SimpleRequest *request, SimpleResponse *response) { if (request->has_response_size() && request->response_size() > 0) { @@ -264,40 +195,17 @@ class AsyncQpsServerTest { CompletionQueue srv_cq_; TestService::AsyncService async_service_; std::vector<std::thread> threads_; - std::unique_ptr<Server> server_; + std::unique_ptr<grpc::Server> server_; std::function<void(ServerContext *, SimpleRequest *, grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> request_unary_; - std::function<void(ServerContext *, StatsRequest *, - grpc::ServerAsyncResponseWriter<ServerStats> *, void *)> - request_stats_; std::forward_list<ServerRpcContext *> contexts_; }; -} // namespace - -static void RunServer() { - AsyncQpsServerTest server; - - grpc_profiler_start("qps_server_async.prof"); - - server.ServeRpcs(FLAGS_server_threads); - - grpc_profiler_stop(); +std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, + int port) { + return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port)); } -int main(int argc, char **argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); - - signal(SIGINT, sigint_handler); - - RunServer(); - - grpc_shutdown(); - google::protobuf::ShutdownProtobufLibrary(); - - return 0; -} +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc new file mode 100644 index 0000000000..beee688608 --- /dev/null +++ b/test/cpp/qps/server_sync.cc @@ -0,0 +1,107 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <sys/signal.h> +#include <thread> + +#include <unistd.h> + +#include <gflags/gflags.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc++/config.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/status.h> +#include <grpc++/stream.h> +#include "src/cpp/server/thread_pool.h" +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/server.h" +#include "test/cpp/qps/timer.h" + +#include <grpc/grpc.h> +#include <grpc/support/log.h> + +namespace grpc { +namespace testing { + +class TestServiceImpl GRPC_FINAL : public TestService::Service { + public: + Status UnaryCall(ServerContext* context, const SimpleRequest* request, + SimpleResponse* response) GRPC_OVERRIDE { + if (request->has_response_size() && request->response_size() > 0) { + if (!Server::SetPayload(request->response_type(), + request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; + } +}; + +class SynchronousServer GRPC_FINAL : public grpc::testing::Server { + public: + SynchronousServer(const ServerConfig& config, int port) + : thread_pool_(config.threads()), impl_(MakeImpl(port)) {} + + private: + std::unique_ptr<grpc::Server> MakeImpl(int port) { + ServerBuilder builder; + + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", port); + builder.AddPort(server_address); + gpr_free(server_address); + + builder.RegisterService(&service_); + + builder.SetThreadPool(&thread_pool_); + + return builder.BuildAndStart(); + } + + TestServiceImpl service_; + ThreadPool thread_pool_; + std::unique_ptr<grpc::Server> impl_; +}; + +std::unique_ptr<grpc::testing::Server> CreateSynchronousServer( + const ServerConfig& config, int port) { + return std::unique_ptr<Server>(new SynchronousServer(config, port)); +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh new file mode 100755 index 0000000000..2f60b4e49d --- /dev/null +++ b/test/cpp/qps/single_run_localhost.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +# performs a single qps run with one client and one server + +set -ex + +cd $(dirname $0)/../../.. + +killall qps_worker || true + +config=opt + +NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()'` + +make CONFIG=$config qps_worker qps_driver -j$NUMCPUS + +bins/$config/qps_worker -driver_port 10000 -server_port 10001 & +PID1=$! +bins/$config/qps_worker -driver_port 10010 -server_port 10011 & +PID2=$! + +export QPS_WORKERS="localhost:10000,localhost:10010" + +bins/$config/qps_driver $* + +kill -2 $PID1 $PID2 +wait + diff --git a/test/cpp/qps/stats.h b/test/cpp/qps/stats.h new file mode 100644 index 0000000000..ca59390ad7 --- /dev/null +++ b/test/cpp/qps/stats.h @@ -0,0 +1,60 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_STATS_UTILS_H +#define TEST_QPS_STATS_UTILS_H + +#include "test/cpp/qps/histogram.h" +#include <string> + +namespace grpc { +namespace testing { + +template <class T, class F> +double sum(const T& container, F functor) { + double r = 0; + for (auto v : container) { + r += functor(v); + } + return r; +} + +template <class T, class F> +double average(const T& container, F functor) { + return sum(container, functor) / container.size(); +} + +} // namespace testing +} // namespace grpc + +#endif diff --git a/test/cpp/qps/timer.cc b/test/cpp/qps/timer.cc new file mode 100644 index 0000000000..3c1342041c --- /dev/null +++ b/test/cpp/qps/timer.cc @@ -0,0 +1,71 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/timer.h" + +#include <sys/time.h> +#include <sys/resource.h> +#include <grpc/support/time.h> + +Timer::Timer() : start_(Sample()) {} + +double Timer::Now() { + auto ts = gpr_now(); + return ts.tv_sec + 1e-9 * ts.tv_nsec; +} + +static double time_double(struct timeval* tv) { + return tv->tv_sec + 1e-6 * tv->tv_usec; +} + +Timer::Result Timer::Sample() { + struct rusage usage; + struct timeval tv; + gettimeofday(&tv, nullptr); + getrusage(RUSAGE_SELF, &usage); + + Result r; + r.wall = time_double(&tv); + r.user = time_double(&usage.ru_utime); + r.system = time_double(&usage.ru_stime); + return r; +} + +Timer::Result Timer::Mark() { + Result s = Sample(); + Result r; + r.wall = s.wall - start_.wall; + r.user = s.user - start_.user; + r.system = s.system - start_.system; + return r; +} diff --git a/test/cpp/qps/timer.h b/test/cpp/qps/timer.h new file mode 100644 index 0000000000..30dbd7e7d5 --- /dev/null +++ b/test/cpp/qps/timer.h @@ -0,0 +1,57 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_TIMER_H +#define TEST_QPS_TIMER_H + +class Timer { + public: + Timer(); + + struct Result { + double wall; + double user; + double system; + }; + + Result Mark(); + + static double Now(); + + private: + static Result Sample(); + + const Result start_; +}; + +#endif // TEST_QPS_TIMER_H diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc new file mode 100644 index 0000000000..3a46a22665 --- /dev/null +++ b/test/cpp/qps/worker.cc @@ -0,0 +1,235 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <cassert> +#include <memory> +#include <mutex> +#include <string> +#include <thread> +#include <vector> +#include <sstream> + +#include <sys/signal.h> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/histogram.h> +#include <grpc/support/log.h> +#include <grpc/support/host_port.h> +#include <gflags/gflags.h> +#include <grpc++/client_context.h> +#include <grpc++/status.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/stream.h> +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/client.h" +#include "test/cpp/qps/server.h" + +DEFINE_int32(driver_port, 0, "Driver server port."); +DEFINE_int32(server_port, 0, "Spawned server port."); + +// In some distros, gflags is in the namespace google, and in some others, +// in gflags. This hack is enabling us to find both. +namespace google {} +namespace gflags {} +using namespace google; +using namespace gflags; + +static bool got_sigint = false; + +namespace grpc { +namespace testing { + +std::unique_ptr<Client> CreateClient(const ClientConfig& config) { + switch (config.client_type()) { + case ClientType::SYNCHRONOUS_CLIENT: + return CreateSynchronousClient(config); + case ClientType::ASYNC_CLIENT: + return CreateAsyncClient(config); + } + abort(); +} + +std::unique_ptr<Server> CreateServer(const ServerConfig& config) { + switch (config.server_type()) { + case ServerType::SYNCHRONOUS_SERVER: + return CreateSynchronousServer(config, FLAGS_server_port); + case ServerType::ASYNC_SERVER: + return CreateAsyncServer(config, FLAGS_server_port); + } + abort(); +} + +class WorkerImpl GRPC_FINAL : public Worker::Service { + public: + WorkerImpl() : acquired_(false) {} + + Status RunTest(ServerContext* ctx, + ServerReaderWriter<ClientStatus, ClientArgs>* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + ClientArgs args; + if (!stream->Read(&args)) { + return Status(INVALID_ARGUMENT); + } + if (!args.has_setup()) { + return Status(INVALID_ARGUMENT); + } + auto client = CreateClient(args.setup()); + if (!client) { + return Status(INVALID_ARGUMENT); + } + ClientStatus status; + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = client->Mark(); + stream->Write(status); + } + + return Status::OK; + } + + Status RunServer(ServerContext* ctx, + ServerReaderWriter<ServerStatus, ServerArgs>* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + ServerArgs args; + if (!stream->Read(&args)) { + return Status(INVALID_ARGUMENT); + } + if (!args.has_setup()) { + return Status(INVALID_ARGUMENT); + } + auto server = CreateServer(args.setup()); + if (!server) { + return Status(INVALID_ARGUMENT); + } + ServerStatus status; + status.set_port(FLAGS_server_port); + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = server->Mark(); + stream->Write(status); + } + + return Status::OK; + } + + private: + // Protect against multiple clients using this worker at once. + class InstanceGuard { + public: + InstanceGuard(WorkerImpl* impl) + : impl_(impl), acquired_(impl->TryAcquireInstance()) {} + ~InstanceGuard() { + if (acquired_) { + impl_->ReleaseInstance(); + } + } + + bool Acquired() const { return acquired_; } + + private: + WorkerImpl* const impl_; + const bool acquired_; + }; + + bool TryAcquireInstance() { + std::lock_guard<std::mutex> g(mu_); + if (acquired_) return false; + acquired_ = true; + return true; + } + + void ReleaseInstance() { + std::lock_guard<std::mutex> g(mu_); + GPR_ASSERT(acquired_); + acquired_ = false; + } + + std::mutex mu_; + bool acquired_; +}; + +static void RunServer() { + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", FLAGS_driver_port); + + WorkerImpl service; + + ServerBuilder builder; + builder.AddPort(server_address); + builder.RegisterService(&service); + + gpr_free(server_address); + + auto server = builder.BuildAndStart(); + + while (!got_sigint) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + } +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_init(); + ParseCommandLineFlags(&argc, &argv, true); + + grpc::testing::RunServer(); + + grpc_shutdown(); + return 0; +} |