diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-08 10:32:00 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-08 10:32:00 -0700 |
commit | 52d646edddcb2bac053c73bff75ea9adff1d649c (patch) | |
tree | 941506b6e63182b47b706f0bbb4f0d7c3c5ca5ee /test/cpp/qps | |
parent | 250e96d941f0ddd22dec8ff1607137ab63c5dde9 (diff) | |
parent | b32c082906b9b52b89387761c3f7cb01638bcadc (diff) |
Merge github.com:grpc/grpc into cereal-is-bad-for-your-health
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client.h | 82 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 258 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 12 | ||||
-rw-r--r-- | test/cpp/qps/interarrival.h | 178 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 41 | ||||
-rw-r--r-- | test/cpp/qps/qps_interarrival_test.cc | 76 | ||||
-rw-r--r-- | test/cpp/qps/qps_test_openloop.cc | 87 | ||||
-rw-r--r-- | test/cpp/qps/qpstest.proto | 54 |
8 files changed, 63 insertions, 725 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 28cd32a197..dc3a9f2ac5 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -35,7 +35,6 @@ #define TEST_QPS_CLIENT_H #include "test/cpp/qps/histogram.h" -#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.grpc.pb.h" @@ -43,31 +42,11 @@ #include <mutex> namespace grpc { - -#if defined(__APPLE__) -// Specialize Timepoint for high res clock as we need that -template <> -class TimePoint<std::chrono::high_resolution_clock::time_point> { - public: - TimePoint(const std::chrono::high_resolution_clock::time_point& time) { - TimepointHR2Timespec(time, &time_); - } - gpr_timespec raw_time() const { return time_; } - - private: - gpr_timespec time_; -}; -#endif - namespace testing { -typedef std::chrono::high_resolution_clock grpc_time_source; -typedef std::chrono::time_point<grpc_time_source> grpc_time; - class Client { public: - explicit Client(const ClientConfig& config) - : timer_(new Timer), interarrival_timer_() { + explicit Client(const ClientConfig& config) : timer_(new Timer) { for (int i = 0; i < config.client_channels(); i++) { channels_.push_back(ClientChannelInfo( config.server_targets(i % config.server_targets_size()), config)); @@ -102,7 +81,6 @@ class Client { protected: SimpleRequest request_; - bool closed_loop_; class ClientChannelInfo { public: @@ -128,61 +106,6 @@ class Client { virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; - void SetupLoadTest(const ClientConfig& config, size_t num_threads) { - // Set up the load distribution based on the number of threads - if (config.load_type() == CLOSED_LOOP) { - closed_loop_ = true; - } else { - closed_loop_ = false; - - std::unique_ptr<RandomDist> random_dist; - const auto& load = config.load_params(); - switch (config.load_type()) { - case POISSON: - random_dist.reset( - new ExpDist(load.poisson().offered_load() / num_threads)); - break; - case UNIFORM: - random_dist.reset( - new UniformDist(load.uniform().interarrival_lo() * num_threads, - load.uniform().interarrival_hi() * num_threads)); - break; - case DETERMINISTIC: - random_dist.reset( - new DetDist(num_threads / load.determ().offered_load())); - break; - case PARETO: - random_dist.reset( - new ParetoDist(load.pareto().interarrival_base() * num_threads, - load.pareto().alpha())); - break; - default: - GPR_ASSERT(false); - break; - } - - interarrival_timer_.init(*random_dist, num_threads); - for (size_t i = 0; i < num_threads; i++) { - next_time_.push_back( - grpc_time_source::now() + - std::chrono::duration_cast<grpc_time_source::duration>( - interarrival_timer_(i))); - } - } - } - - bool NextIssueTime(int thread_idx, grpc_time* time_delay) { - if (closed_loop_) { - return false; - } else { - *time_delay = next_time_[thread_idx]; - next_time_[thread_idx] += - std::chrono::duration_cast<grpc_time_source::duration>( - interarrival_timer_(thread_idx)); - return true; - } - } - private: class Thread { public: @@ -245,9 +168,6 @@ class Client { std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<Timer> timer_; - - InterarrivalTimer interarrival_timer_; - std::vector<grpc_time> next_time_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index a238b60a9c..00bbd8a8a0 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -32,11 +32,8 @@ */ #include <cassert> -#include <forward_list> #include <functional> -#include <list> #include <memory> -#include <mutex> #include <string> #include <thread> #include <vector> @@ -58,55 +55,38 @@ namespace grpc { namespace testing { -typedef std::list<grpc_time> deadline_list; - class ClientRpcContext { public: - ClientRpcContext(int ch) : channel_id_(ch) {} + ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; - virtual ClientRpcContext* StartNewClone() = 0; + virtual void StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } static ClientRpcContext* detag(void* t) { return reinterpret_cast<ClientRpcContext*>(t); } - - deadline_list::iterator deadline_posn() const { return deadline_posn_; } - void set_deadline_posn(const deadline_list::iterator& it) { - deadline_posn_ = it; - } - virtual void Start(CompletionQueue* cq) = 0; - int channel_id() const { return channel_id_; } - - protected: - int channel_id_; - - private: - deadline_list::iterator deadline_posn_; }; template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - int channel_id, TestService::Stub* stub, const RequestType& req, + TestService::Stub* stub, const RequestType& req, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req, + TestService::Stub*, grpc::ClientContext*, const RequestType&)> + start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : ClientRpcContext(channel_id), - context_(), + : context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::RespDone), callback_(on_done), - start_req_(start_req) {} - void Start(CompletionQueue* cq) GRPC_OVERRIDE { - start_ = Timer::Now(); - response_reader_ = start_req_(stub_, &context_, req_, cq); + start_req_(start_req), + start_(Timer::Now()), + response_reader_(start_req(stub_, &context_, req_)) { response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); } ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} @@ -118,9 +98,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return ret; } - ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_, - callback_); + void StartNewClone() GRPC_OVERRIDE { + new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); } private: @@ -130,7 +109,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } bool DoCallBack(bool) { callback_(status_, &response_); - return true; // we're done, this'll be ignored + return false; } grpc::ClientContext context_; TestService::Stub* stub_; @@ -139,54 +118,29 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { bool (ClientRpcContextUnaryImpl::*next_state_)(bool); std::function<void(grpc::Status, ResponseType*)> callback_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req_; + TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; }; -typedef std::forward_list<ClientRpcContext*> context_list; - class AsyncClient : public Client { public: - explicit AsyncClient( - const ClientConfig& config, - std::function<ClientRpcContext*(int, TestService::Stub*, - const SimpleRequest&)> setup_ctx) - : Client(config), - channel_lock_(config.client_channels()), - contexts_(config.client_channels()), - max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), - channel_count_(config.client_channels()), - pref_channel_inc_(config.async_client_threads()) { - SetupLoadTest(config, config.async_client_threads()); - + explicit AsyncClient(const ClientConfig& config, + std::function<void(CompletionQueue*, TestService::Stub*, + const SimpleRequest&)> setup_ctx) + : Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); - if (!closed_loop_) { - rpc_deadlines_.emplace_back(); - next_channel_.push_back(i % channel_count_); - issue_allowed_.push_back(true); - - grpc_time next_issue; - NextIssueTime(i, &next_issue); - next_issue_.push_back(next_issue); - } } - int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (int ch = 0; ch < channel_count_; ch++) { + for (auto channel = channels_.begin(); channel != channels_.end(); + channel++) { auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_); - if (closed_loop_) { - ctx->Start(cq); - } else { - contexts_[ch].push_front(ctx); - } + setup_ctx(cq, channel->get_stub(), request_); } } } @@ -205,126 +159,30 @@ class AsyncClient : public Client { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - grpc_time deadline, short_deadline; - if (closed_loop_) { - deadline = grpc_time_source::now() + std::chrono::seconds(1); - short_deadline = deadline; - } else { - if (rpc_deadlines_[thread_idx].empty()) { - deadline = grpc_time_source::now() + std::chrono::seconds(1); - } else { - deadline = *(rpc_deadlines_[thread_idx].begin()); - } - short_deadline = - issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; - } - - bool got_event; - - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { + switch (cli_cqs_[thread_idx]->AsyncNext( + &got_tag, &ok, + std::chrono::system_clock::now() + std::chrono::seconds(1))) { case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: - got_event = false; - break; + return true; case CompletionQueue::GOT_EVENT: - got_event = true; - break; - default: - GPR_ASSERT(false); break; } - if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) && - grpc_time_source::now() > deadline) { - // we have missed some 1-second deadline, which is too much - gpr_log(GPR_INFO, "Missed an RPC deadline, giving up"); - return false; - } - if (got_event) { - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then clone the ctx - ctx->RunNextState(ok, histogram); - ClientRpcContext* clone_ctx = ctx->StartNewClone(); - if (closed_loop_) { - clone_ctx->Start(cli_cqs_[thread_idx].get()); - } else { - // Remove the entry from the rpc deadlines list - rpc_deadlines_[thread_idx].erase(ctx->deadline_posn()); - // Put the clone_ctx in the list of idle contexts for this channel - // Under lock - int ch = clone_ctx->channel_id(); - std::lock_guard<std::mutex> g(channel_lock_[ch]); - contexts_[ch].push_front(clone_ctx); - } - // delete the old version - delete ctx; - } - if (!closed_loop_) - issue_allowed_[thread_idx] = - true; // may be ok now even if it hadn't been - } - if (!closed_loop_ && issue_allowed_[thread_idx] && - grpc_time_source::now() >= next_issue_[thread_idx]) { - // Attempt to issue - bool issued = false; - for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; - num_attempts < channel_count_ && !issued; num_attempts++) { - bool can_issue = false; - ClientRpcContext* ctx = nullptr; - { - std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]); - if (!contexts_[channel_attempt].empty()) { - // Get an idle context from the front of the list - ctx = *(contexts_[channel_attempt].begin()); - contexts_[channel_attempt].pop_front(); - can_issue = true; - } - } - if (can_issue) { - // do the work to issue - rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() + - std::chrono::seconds(1)); - auto it = rpc_deadlines_[thread_idx].end(); - --it; - ctx->set_deadline_posn(it); - ctx->Start(cli_cqs_[thread_idx].get()); - issued = true; - // If we did issue, then next time, try our thread's next - // preferred channel - next_channel_[thread_idx] += pref_channel_inc_; - if (next_channel_[thread_idx] >= channel_count_) - next_channel_[thread_idx] = (thread_idx % channel_count_); - } else { - // Do a modular increment of channel attempt if we couldn't issue - channel_attempt = (channel_attempt + 1) % channel_count_; - } - } - if (issued) { - // We issued one; see when we can issue the next - grpc_time next_issue; - NextIssueTime(thread_idx, &next_issue); - next_issue_[thread_idx] = next_issue; - } else { - issue_allowed_[thread_idx] = false; - } + + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState(ok, histogram) == false) { + // call the callback and then delete it + ctx->RunNextState(ok, histogram); + ctx->StartNewClone(); + delete ctx; } + return true; } private: std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; - - std::vector<deadline_list> rpc_deadlines_; // per thread deadlines - std::vector<int> next_channel_; // per thread round-robin channel ctr - std::vector<bool> issue_allowed_; // may this thread attempt to issue - std::vector<grpc_time> next_issue_; // when should it issue? - - std::vector<std::mutex> channel_lock_; - std::vector<context_list> contexts_; // per-channel list of idle contexts - int max_outstanding_per_channel_; - int channel_count_; - int pref_channel_inc_; }; class AsyncUnaryClient GRPC_FINAL : public AsyncClient { @@ -336,15 +194,15 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: - static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, - const SimpleRequest& req) { + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, CompletionQueue* cq) { + auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request) { return stub->AsyncUnaryCall(ctx, request, cq); }; - return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, start_req, check_done); + new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + stub, req, start_req, check_done); } }; @@ -352,30 +210,26 @@ template <class RequestType, class ResponseType> class ClientRpcContextStreamingImpl : public ClientRpcContext { public: ClientRpcContextStreamingImpl( - int channel_id, TestService::Stub* stub, const RequestType& req, - std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter< - RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*, - CompletionQueue*, void*)> start_req, + TestService::Stub* stub, const RequestType& req, + std::function<std::unique_ptr< + grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( + TestService::Stub*, grpc::ClientContext*, void*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : ClientRpcContext(channel_id), - context_(), + : context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextStreamingImpl::ReqSent), callback_(on_done), start_req_(start_req), - start_(Timer::Now()) {} + start_(Timer::Now()), + stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { return (this->*next_state_)(ok, hist); } - ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_, - start_req_, callback_); - } - void Start(CompletionQueue* cq) GRPC_OVERRIDE { - stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); + void StartNewClone() GRPC_OVERRIDE { + new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); } private: @@ -409,8 +263,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function<void(grpc::Status, ResponseType*)> callback_; std::function< std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> - start_req_; + TestService::Stub*, grpc::ClientContext*, void*)> start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> @@ -421,25 +274,22 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx) { - // async streaming currently only supported closed loop - GPR_ASSERT(config.load_type() == CLOSED_LOOP); - StartThreads(config.async_client_threads()); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: - static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, - const SimpleRequest& req) { + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - CompletionQueue* cq, void* tag) { + auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, + void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, start_req, check_done); + new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + stub, req, start_req, check_done); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index d1682caf06..c28dc91321 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -32,7 +32,6 @@ */ #include <cassert> -#include <chrono> #include <memory> #include <mutex> #include <string> @@ -58,7 +57,6 @@ #include "test/cpp/qps/client.h" #include "test/cpp/qps/qpstest.grpc.pb.h" #include "test/cpp/qps/histogram.h" -#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" namespace grpc { @@ -70,19 +68,11 @@ class SynchronousClient : public Client { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); - SetupLoadTest(config, num_threads_); } virtual ~SynchronousClient(){}; protected: - void WaitToIssue(int thread_idx) { - grpc_time next_time; - if (NextIssueTime(thread_idx, &next_time)) { - std::this_thread::sleep_until(next_time); - } - } - size_t num_threads_; std::vector<SimpleResponse> responses_; }; @@ -96,7 +86,6 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ~SynchronousUnaryClient() { EndThreads(); } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; @@ -130,7 +119,6 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { - WaitToIssue(thread_idx); double start = Timer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h deleted file mode 100644 index f90a17a894..0000000000 --- a/test/cpp/qps/interarrival.h +++ /dev/null @@ -1,178 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef TEST_QPS_INTERARRIVAL_H -#define TEST_QPS_INTERARRIVAL_H - -#include <chrono> -#include <cmath> -#include <random> - -#include <grpc++/config.h> - -namespace grpc { -namespace testing { - -// First create classes that define a random distribution -// Note that this code does not include C++-specific random distribution -// features supported in std::random. Although this would make this code easier, -// this code is required to serve as the template code for other language -// stacks. Thus, this code only uses a uniform distribution of doubles [0,1) -// and then provides the distribution functions itself. - -class RandomDist { - public: - RandomDist() {} - virtual ~RandomDist() = 0; - // Argument to operator() is a uniform double in the range [0,1) - virtual double operator()(double uni) const = 0; -}; - -inline RandomDist::~RandomDist() {} - -// ExpDist implements an exponential distribution, which is the -// interarrival distribution for a Poisson process. The parameter -// lambda is the mean rate of arrivals. This is the -// most useful distribution since it is actually additive and -// memoryless. It is a good representation of activity coming in from -// independent identical stationary sources. For more information, -// see http://en.wikipedia.org/wiki/Exponential_distribution - -class ExpDist GRPC_FINAL : public RandomDist { - public: - explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {} - ~ExpDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { - // Note: Use 1.0-uni above to avoid NaN if uni is 0 - return lambda_recip_ * (-log(1.0 - uni)); - } - - private: - double lambda_recip_; -}; - -// UniformDist implements a random distribution that has -// interarrival time uniformly spread between [lo,hi). The -// mean interarrival time is (lo+hi)/2. For more information, -// see http://en.wikipedia.org/wiki/Uniform_distribution_%28continuous%29 - -class UniformDist GRPC_FINAL : public RandomDist { - public: - UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {} - ~UniformDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { - return uni * range_ + lo_; - } - - private: - double lo_; - double range_; -}; - -// DetDist provides a random distribution with interarrival time -// of val. Note that this is not additive, so using this on multiple -// flows of control (threads within the same client or separate -// clients) will not preserve any deterministic interarrival gap across -// requests. - -class DetDist GRPC_FINAL : public RandomDist { - public: - explicit DetDist(double val) : val_(val) {} - ~DetDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { return val_; } - - private: - double val_; -}; - -// ParetoDist provides a random distribution with interarrival time -// spread according to a Pareto (heavy-tailed) distribution. In this -// model, many interarrival times are close to the base, but a sufficient -// number will be high (up to infinity) as to disturb the mean. It is a -// good representation of the response times of data center jobs. See -// http://en.wikipedia.org/wiki/Pareto_distribution - -class ParetoDist GRPC_FINAL : public RandomDist { - public: - ParetoDist(double base, double alpha) - : base_(base), alpha_recip_(1.0 / alpha) {} - ~ParetoDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { - // Note: Use 1.0-uni above to avoid div by zero if uni is 0 - return base_ / pow(1.0 - uni, alpha_recip_); - } - - private: - double base_; - double alpha_recip_; -}; - -// A class library for generating pseudo-random interarrival times -// in an efficient re-entrant way. The random table is built at construction -// time, and each call must include the thread id of the invoker - -typedef std::default_random_engine qps_random_engine; - -class InterarrivalTimer { - public: - InterarrivalTimer() {} - void init(const RandomDist& r, int threads, int entries = 1000000) { - qps_random_engine gen; - std::uniform_real_distribution<double> uniform(0.0, 1.0); - for (int i = 0; i < entries; i++) { - random_table_.push_back(std::chrono::nanoseconds( - static_cast<int64_t>(1e9 * r(uniform(gen))))); - } - // Now set up the thread positions - for (int i = 0; i < threads; i++) { - thread_posns_.push_back(random_table_.begin() + (entries * i) / threads); - } - } - virtual ~InterarrivalTimer(){}; - - std::chrono::nanoseconds operator()(int thread_num) { - auto ret = *(thread_posns_[thread_num]++); - if (thread_posns_[thread_num] == random_table_.end()) - thread_posns_[thread_num] = random_table_.begin(); - return ret; - } - - private: - typedef std::vector<std::chrono::nanoseconds> time_table; - std::vector<time_table::const_iterator> thread_posns_; - time_table random_table_; -}; -} -} - -#endif diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index acc3098839..281e2e8119 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -63,15 +63,11 @@ DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(payload_size, 1, "Payload size"); DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); -DEFINE_string(load_type, "CLOSED_LOOP", "Load type"); -DEFINE_double(load_param_1, 0.0, "Load parameter 1"); -DEFINE_double(load_param_2, 0.0, "Load parameter 2"); using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; using grpc::testing::ClientType; using grpc::testing::ServerType; -using grpc::testing::LoadType; using grpc::testing::RpcType; using grpc::testing::ResourceUsage; @@ -84,14 +80,11 @@ static void QpsDriver() { ClientType client_type; ServerType server_type; - LoadType load_type; GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type)); GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type)); - GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type)); ClientConfig client_config; client_config.set_client_type(client_type); - client_config.set_load_type(load_type); client_config.set_enable_ssl(FLAGS_enable_ssl); client_config.set_outstanding_rpcs_per_channel( FLAGS_outstanding_rpcs_per_channel); @@ -100,40 +93,6 @@ static void QpsDriver() { client_config.set_async_client_threads(FLAGS_async_client_threads); client_config.set_rpc_type(rpc_type); - // set up the load parameters - switch (load_type) { - case grpc::testing::CLOSED_LOOP: - break; - case grpc::testing::POISSON: { - auto poisson = client_config.mutable_load_params()->mutable_poisson(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - poisson->set_offered_load(FLAGS_load_param_1); - break; - } - case grpc::testing::UNIFORM: { - auto uniform = client_config.mutable_load_params()->mutable_uniform(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - GPR_ASSERT(FLAGS_load_param_2 != 0.0); - uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6); - uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6); - break; - } - case grpc::testing::DETERMINISTIC: { - auto determ = client_config.mutable_load_params()->mutable_determ(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - determ->set_offered_load(FLAGS_load_param_1); - break; - } - case grpc::testing::PARETO: { - auto pareto = client_config.mutable_load_params()->mutable_pareto(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - GPR_ASSERT(FLAGS_load_param_2 != 0.0); - pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6); - pareto->set_alpha(FLAGS_load_param_2); - break; - } - } - ServerConfig server_config; server_config.set_server_type(server_type); server_config.set_threads(FLAGS_server_threads); diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc deleted file mode 100644 index cecd1be03f..0000000000 --- a/test/cpp/qps/qps_interarrival_test.cc +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "test/cpp/qps/interarrival.h" -#include <chrono> -#include <iostream> - -// Use the C histogram rather than C++ to avoid depending on proto -#include <grpc/support/histogram.h> -#include <grpc++/config.h> - -using grpc::testing::RandomDist; -using grpc::testing::InterarrivalTimer; - -void RunTest(RandomDist&& r, int threads, std::string title) { - InterarrivalTimer timer; - timer.init(r, threads); - gpr_histogram *h(gpr_histogram_create(0.01, 60e9)); - - for (int i = 0; i < 10000000; i++) { - for (int j = 0; j < threads; j++) { - gpr_histogram_add(h, timer(j).count()); - } - } - - std::cout << title << " Distribution" << std::endl; - std::cout << "Value, Percentile" << std::endl; - for (double pct = 0.0; pct < 100.0; pct += 1.0) { - std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl; - } - - gpr_histogram_destroy(h); -} - -using grpc::testing::ExpDist; -using grpc::testing::DetDist; -using grpc::testing::UniformDist; -using grpc::testing::ParetoDist; - -int main(int argc, char **argv) { - RunTest(ExpDist(10.0), 5, std::string("Exponential(10)")); - RunTest(DetDist(5.0), 5, std::string("Det(5)")); - RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(1,10)")); - RunTest(ParetoDist(1.0, 1.0), 5, std::string("Pareto(1,1)")); - return 0; -} diff --git a/test/cpp/qps/qps_test_openloop.cc b/test/cpp/qps/qps_test_openloop.cc deleted file mode 100644 index 0f6d8e8530..0000000000 --- a/test/cpp/qps/qps_test_openloop.cc +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <set> - -#include <grpc/support/log.h> - -#include <signal.h> - -#include "test/cpp/qps/driver.h" -#include "test/cpp/qps/report.h" -#include "test/cpp/util/benchmark_config.h" - -namespace grpc { -namespace testing { - -static const int WARMUP = 5; -static const int BENCHMARK = 10; - -static void RunQPS() { - gpr_log(GPR_INFO, "Running QPS test, open-loop"); - - ClientConfig client_config; - client_config.set_client_type(ASYNC_CLIENT); - client_config.set_enable_ssl(false); - client_config.set_outstanding_rpcs_per_channel(1000); - client_config.set_client_channels(8); - client_config.set_payload_size(1); - client_config.set_async_client_threads(8); - client_config.set_rpc_type(UNARY); - client_config.set_load_type(POISSON); - client_config.mutable_load_params()-> - mutable_poisson()->set_offered_load(10000.0); - - ServerConfig server_config; - server_config.set_server_type(ASYNC_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(4); - - const auto result = - RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); - - GetReporter()->ReportQPSPerCore(*result, server_config); - GetReporter()->ReportLatency(*result); -} - -} // namespace testing -} // namespace grpc - -int main(int argc, char** argv) { - grpc::testing::InitBenchmark(&argc, &argv, true); - - signal(SIGPIPE, SIG_IGN); - grpc::testing::RunQPS(); - - return 0; -} diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index d977c9b48b..122a7df1ac 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -36,7 +36,7 @@ package grpc.testing; enum PayloadType { // Compressable text format. - COMPRESSABLE = 1; + COMPRESSABLE= 1; // Uncompressable binary format. UNCOMPRESSABLE = 2; @@ -92,59 +92,21 @@ enum RpcType { STREAMING = 2; } -enum LoadType { - CLOSED_LOOP = 1; - POISSON = 2; - UNIFORM = 3; - DETERMINISTIC = 4; - PARETO = 5; -} - -message PoissonParams { - optional double offered_load = 1; -} - -message UniformParams { - optional double interarrival_lo = 1; - optional double interarrival_hi = 2; -} - -message DeterministicParams { - optional double offered_load = 1; -} - -message ParetoParams { - optional double interarrival_base = 1; - optional double alpha = 2; -} - -message LoadParams { - oneof load { - PoissonParams poisson = 1; - UniformParams uniform = 2; - DeterministicParams determ = 3; - ParetoParams pareto = 4; - }; -} - message ClientConfig { repeated string server_targets = 1; required ClientType client_type = 2; - optional bool enable_ssl = 3 [default = false]; + optional bool enable_ssl = 3 [default=false]; required int32 outstanding_rpcs_per_channel = 4; required int32 client_channels = 5; required int32 payload_size = 6; // only for async client: optional int32 async_client_threads = 7; - optional RpcType rpc_type = 8 [default = UNARY]; + optional RpcType rpc_type = 8 [default=UNARY]; optional string host = 9; - optional LoadType load_type = 10 [default = CLOSED_LOOP]; - optional LoadParams load_params = 11; } // Request current stats -message Mark { -} +message Mark {} message ClientArgs { oneof argtype { @@ -166,8 +128,8 @@ message ClientStatus { message ServerConfig { required ServerType server_type = 1; - optional int32 threads = 2 [default = 1]; - optional bool enable_ssl = 3 [default = false]; + optional int32 threads = 2 [default=1]; + optional bool enable_ssl = 3 [default=false]; optional string host = 4; } @@ -186,11 +148,11 @@ message ServerStatus { message SimpleRequest { // Desired payload type in the response from the server. // If response_type is RANDOM, server randomly chooses one from other formats. - optional PayloadType response_type = 1 [default = COMPRESSABLE]; + optional PayloadType response_type = 1 [default=COMPRESSABLE]; // Desired payload size in the response from the server. // If response_type is COMPRESSABLE, this denotes the size before compression. - optional int32 response_size = 2 [default = 0]; + optional int32 response_size = 2 [default=0]; // Optional input payload sent along with the request. optional Payload payload = 3; |