diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/qps/client.h | 62 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 149 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 12 | ||||
-rw-r--r-- | test/cpp/qps/interarrival.h | 149 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 41 | ||||
-rw-r--r-- | test/cpp/qps/qps_interarrival_test.cc | 77 | ||||
-rw-r--r-- | test/cpp/qps/qpstest.proto | 37 |
7 files changed, 488 insertions, 39 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index dc3a9f2ac5..2b227ec909 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -35,6 +35,7 @@ #define TEST_QPS_CLIENT_H #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.grpc.pb.h" @@ -46,7 +47,8 @@ namespace testing { class Client { public: - explicit Client(const ClientConfig& config) : timer_(new Timer) { + explicit Client(const ClientConfig& config) : timer_(new Timer), + interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { channels_.push_back(ClientChannelInfo( config.server_targets(i % config.server_targets_size()), config)); @@ -81,6 +83,7 @@ class Client { protected: SimpleRequest request_; + bool closed_loop_; class ClientChannelInfo { public: @@ -105,7 +108,60 @@ class Client { void EndThreads() { threads_.clear(); } virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + + void SetupLoadTest(const ClientConfig& config, size_t num_threads) { + // Set up the load distribution based on the number of threads + if (config.load_type() == CLOSED_LOOP) { + closed_loop_ = true; + } + else { + closed_loop_ = false; + + std::unique_ptr<RandomDist> random_dist; + auto& load = config.load_params(); + switch (config.load_type()) { + case POISSON: + random_dist.reset + (new ExpDist(load.poisson().offered_load()/num_threads)); + break; + case UNIFORM: + random_dist.reset + (new UniformDist(load.uniform().interarrival_lo()*num_threads, + load.uniform().interarrival_hi()*num_threads)); + break; + case DETERMINISTIC: + random_dist.reset + (new DetDist(num_threads/load.determ().offered_load())); + break; + case PARETO: + random_dist.reset + (new ParetoDist(load.pareto().interarrival_base()*num_threads, + load.pareto().alpha())); + break; + default: + GPR_ASSERT(false); + break; + } + interarrival_timer_.init(*random_dist, num_threads); + for (size_t i = 0; i<num_threads; i++) { + next_time_.push_back(std::chrono::high_resolution_clock::now() + + interarrival_timer_(i)); + } + } + } + template<class Timepoint> + bool NextIssueTime(int thread_idx, Timepoint *time_delay) { + if (closed_loop_) { + return false; + } + else { + *time_delay = next_time_[thread_idx]; + next_time_[thread_idx] += interarrival_timer_(thread_idx); + return true; + } + } + private: class Thread { public: @@ -168,6 +224,10 @@ class Client { std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<Timer> timer_; + + InterarrivalTimer interarrival_timer_; + std::vector<std::chrono::time_point + <std::chrono::high_resolution_clock>> next_time_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 00bbd8a8a0..2d23192767 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -55,6 +55,10 @@ namespace grpc { namespace testing { +typedef std::chrono::high_resolution_clock grpc_time_source; +typedef std::chrono::time_point<grpc_time_source> grpc_time; +typedef std::forward_list<grpc_time> deadline_list; + class ClientRpcContext { public: ClientRpcContext() {} @@ -66,6 +70,12 @@ class ClientRpcContext { static ClientRpcContext* detag(void* t) { return reinterpret_cast<ClientRpcContext*>(t); } + + deadline_list::iterator deadline_posn() const {return deadline_posn_;} + void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;} + virtual void Start() = 0; + private: + deadline_list::iterator deadline_posn_; }; template <class RequestType, class ResponseType> @@ -84,9 +94,11 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_(), next_state_(&ClientRpcContextUnaryImpl::RespDone), callback_(on_done), - start_req_(start_req), - start_(Timer::Now()), - response_reader_(start_req(stub_, &context_, req_)) { + start_req_(start_req) { + } + void Start() GRPC_OVERRIDE { + start_ = Timer::Now(); + response_reader_.reset(start_req(stub_, &context_, req_)); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); } ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} @@ -128,19 +140,41 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { class AsyncClient : public Client { public: explicit AsyncClient(const ClientConfig& config, - std::function<void(CompletionQueue*, TestService::Stub*, - const SimpleRequest&)> setup_ctx) - : Client(config) { + std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*, + const SimpleRequest&)> setup_ctx) : + Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); + if (!closed_loop_) { + rpc_deadlines_.emplace_back(); + next_channel_.push_back(i % channel_count_); + issue_allowed_.push_back(true); + + grpc_time next_issue; + NextIssueTime(i, &next_issue); + next_issue_.push_back(next_issue); + } } + if (!closed_loop_) { + for (auto channel = channels_.begin(); channel != channels_.end(); + channel++) { + channel_rpc_lock_.emplace_back(); + rpcs_outstanding_.push_back(0); + } + } + int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (auto channel = channels_.begin(); channel != channels_.end(); - channel++) { - auto* cq = cli_cqs_[t].get(); - t = (t + 1) % cli_cqs_.size(); - setup_ctx(cq, channel->get_stub(), request_); + channel++) { + auto* cq = cli_cqs_[t].get(); + t = (t + 1) % cli_cqs_.size(); + ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_); + if (closed_loop_) { + // only relevant for closed_loop unary, but harmless for + // closed_loop streaming + ctx->Start(); + } } } } @@ -159,30 +193,70 @@ class AsyncClient : public Client { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext( - &got_tag, &ok, - std::chrono::system_clock::now() + std::chrono::seconds(1))) { - case CompletionQueue::SHUTDOWN: - return false; + grpc_time deadline, short_deadline; + if (closed_loop_) { + deadline = grpc_time_source::now() + std::chrono::seconds(1); + short_deadline = deadline; + } else { + deadline = *(rpc_deadlines_[thread_idx].begin()); + short_deadline = issue_allowed_[thread_idx] ? + next_issue_[thread_idx] : deadline; + } + switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { + case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: - return true; + got_event = false; + break; case CompletionQueue::GOT_EVENT: - break; + got_event = true; + break; } - - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then delete it - ctx->RunNextState(ok, histogram); - ctx->StartNewClone(); - delete ctx; - } - - return true; + if (grpc_time_source::now() > deadline) { + // we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up"); + return false; + } + if (got_event) { + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState(ok, histogram) == false) { + // call the callback and then delete it + rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn()); + ctx->RunNextState(ok, histogram); + ctx->StartNewClone(); + delete ctx; + } + issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been + } + if (issue_allowed_[thread_idx] && + grpc_time_source::now() >= next_issue_[thread_idx]) { + // Attempt to issue + bool issued = false; + for (int num_attempts = 0; num_attempts < channel_count_ && !issued; + num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { + std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]); + if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) { + // do the work to issue + rpcs_outstanding[next_channel_[thread_idx]]++; + issued = true; + } + } + if (!issued) + issue_allowed = false; + } + return true; } private: std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; + + std::vector<deadline_list> rpc_deadlines_; // per thread deadlines + std::vector<int> next_channel_; // per thread round-robin channel ctr + std::vector<bool> issue_allowed_; // may this thread attempt to issue + std::vector<grpc_time> next_issue_; // when should it issue? + + std::vector<std::mutex> channel_rpc_lock_; + std::vector<int> rpcs_outstanding_; // per-channel vector + int max_outstanding_per_channel_; + int channel_count_; }; class AsyncUnaryClient GRPC_FINAL : public AsyncClient { @@ -192,18 +266,18 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { StartThreads(config.async_client_threads()); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } - - private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, +private: + static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request) { return stub->AsyncUnaryCall(ctx, request, cq); }; - new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - stub, req, start_req, check_done); + return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + stub, req, start_req, check_done); } + }; template <class RequestType, class ResponseType> @@ -231,7 +305,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { void StartNewClone() GRPC_OVERRIDE { new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); } - + void Start() GRPC_OVERRIDE {} private: bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } bool StartWrite(bool ok) { @@ -278,17 +352,16 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } - - private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, - const SimpleRequest& req) { +private: + static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( stub, req, start_req, check_done); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c28dc91321..98297d3abb 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -32,6 +32,7 @@ */ #include <cassert> +#include <chrono> #include <memory> #include <mutex> #include <string> @@ -57,6 +58,7 @@ #include "test/cpp/qps/client.h" #include "test/cpp/qps/qpstest.grpc.pb.h" #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" namespace grpc { @@ -68,11 +70,19 @@ class SynchronousClient : public Client { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); + SetupLoadTest(config, num_threads_); } virtual ~SynchronousClient(){}; protected: + void WaitToIssue(int thread_idx) { + std::chrono::time_point<std::chrono::high_resolution_clock> next_time; + if (NextIssueTime(thread_idx, &next_time)) { + std::this_thread::sleep_until(next_time); + } + } + size_t num_threads_; std::vector<SimpleResponse> responses_; }; @@ -86,6 +96,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ~SynchronousUnaryClient() { EndThreads(); } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; @@ -119,6 +130,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + WaitToIssue(thread_idx); double start = Timer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h new file mode 100644 index 0000000000..98f4def1f2 --- /dev/null +++ b/test/cpp/qps/interarrival.h @@ -0,0 +1,149 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_INTERARRIVAL_H +#define TEST_QPS_INTERARRIVAL_H + +#include <chrono> +#include <cmath> +#include <random> + +#include <grpc++/config.h> + +namespace grpc { +namespace testing { + +// First create classes that define a random distribution +// Note that this code does not include C++-specific random distribution +// features supported in std::random. Although this would make this code easier, +// this code is required to serve as the template code for other language +// stacks. Thus, this code only uses a uniform distribution of doubles [0,1) +// and then provides the distribution functions itself. + +class RandomDist { + public: + RandomDist() {} + virtual ~RandomDist() = 0; + // Argument to operator() is a uniform double in the range [0,1) + virtual double operator() (double uni) const = 0; +}; + +inline RandomDist::~RandomDist() {} + +class UniformDist GRPC_FINAL: public RandomDist { +public: + UniformDist(double lo, double hi): lo_(lo), range_(hi-lo) {} + ~UniformDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE {return uni*range_+lo_;} +private: + double lo_; + double range_; +}; + +class ExpDist GRPC_FINAL : public RandomDist { +public: + explicit ExpDist(double lambda): lambda_recip_(1.0/lambda) {} + ~ExpDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid NaN if uni is 0 + return lambda_recip_ * (-log(1.0-uni)); + } +private: + double lambda_recip_; +}; + +class DetDist GRPC_FINAL : public RandomDist { +public: + explicit DetDist(double val): val_(val) {} + ~DetDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE {return val_;} +private: + double val_; +}; + +class ParetoDist GRPC_FINAL : public RandomDist { +public: + ParetoDist(double base, double alpha): base_(base), alpha_recip_(1.0/alpha) {} + ~ParetoDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid div by zero if uni is 0 + return base_ / pow(1.0-uni, alpha_recip_); + } +private: + double base_; + double alpha_recip_; +}; + +// A class library for generating pseudo-random interarrival times +// in an efficient re-entrant way. The random table is built at construction +// time, and each call must include the thread id of the invoker + +using qps_random_engine = std::default_random_engine; + +class InterarrivalTimer { +public: + InterarrivalTimer() {} + InterarrivalTimer(const RandomDist& r, int threads, int entries=1000000) { + init(r, threads, entries); + } + void init(const RandomDist& r, int threads, int entries=1000000) { + qps_random_engine gen; + std::uniform_real_distribution<double> uniform(0.0,1.0); + for (int i=0; i<entries; i++) { + random_table_.push_back( + std::chrono::nanoseconds( + static_cast<int64_t>(1e9*r(uniform(gen))))); + } + // Now set up the thread positions + for (int i=0; i<threads; i++) { + thread_posns_.push_back(random_table_.begin() + (entries * i)/threads); + } + } + virtual ~InterarrivalTimer() {}; + + std::chrono::nanoseconds operator() (int thread_num) { + auto ret = *(thread_posns_[thread_num]++); + if (thread_posns_[thread_num] == random_table_.end()) + thread_posns_[thread_num] = random_table_.begin(); + return ret; + } + private: + typedef std::vector<std::chrono::nanoseconds> time_table; + std::vector<time_table::const_iterator> thread_posns_; + time_table random_table_; +}; + +} +} + +#endif diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 281e2e8119..acc3098839 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -63,11 +63,15 @@ DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(payload_size, 1, "Payload size"); DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); +DEFINE_string(load_type, "CLOSED_LOOP", "Load type"); +DEFINE_double(load_param_1, 0.0, "Load parameter 1"); +DEFINE_double(load_param_2, 0.0, "Load parameter 2"); using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; using grpc::testing::ClientType; using grpc::testing::ServerType; +using grpc::testing::LoadType; using grpc::testing::RpcType; using grpc::testing::ResourceUsage; @@ -80,11 +84,14 @@ static void QpsDriver() { ClientType client_type; ServerType server_type; + LoadType load_type; GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type)); GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type)); + GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type)); ClientConfig client_config; client_config.set_client_type(client_type); + client_config.set_load_type(load_type); client_config.set_enable_ssl(FLAGS_enable_ssl); client_config.set_outstanding_rpcs_per_channel( FLAGS_outstanding_rpcs_per_channel); @@ -93,6 +100,40 @@ static void QpsDriver() { client_config.set_async_client_threads(FLAGS_async_client_threads); client_config.set_rpc_type(rpc_type); + // set up the load parameters + switch (load_type) { + case grpc::testing::CLOSED_LOOP: + break; + case grpc::testing::POISSON: { + auto poisson = client_config.mutable_load_params()->mutable_poisson(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + poisson->set_offered_load(FLAGS_load_param_1); + break; + } + case grpc::testing::UNIFORM: { + auto uniform = client_config.mutable_load_params()->mutable_uniform(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + GPR_ASSERT(FLAGS_load_param_2 != 0.0); + uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6); + uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6); + break; + } + case grpc::testing::DETERMINISTIC: { + auto determ = client_config.mutable_load_params()->mutable_determ(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + determ->set_offered_load(FLAGS_load_param_1); + break; + } + case grpc::testing::PARETO: { + auto pareto = client_config.mutable_load_params()->mutable_pareto(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + GPR_ASSERT(FLAGS_load_param_2 != 0.0); + pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6); + pareto->set_alpha(FLAGS_load_param_2); + break; + } + } + ServerConfig server_config; server_config.set_server_type(server_type); server_config.set_threads(FLAGS_server_threads); diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc new file mode 100644 index 0000000000..14af4c6506 --- /dev/null +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -0,0 +1,77 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/interarrival.h" +#include <chrono> +#include <iostream> + +// Use the C histogram rather than C++ to avoid depending on proto +#include <grpc/support/histogram.h> +#include <grpc++/config.h> + +using grpc::testing::ExpDist; +using grpc::testing::InterarrivalTimer; + +void RunTest(InterarrivalTimer&& timer, std::string title) { + gpr_histogram *h(gpr_histogram_create(0.01,60e9)); + + for (int i=0; i<10000000; i++) { + for (int j=0; j<5; j++) { + gpr_histogram_add(h, timer(j).count()); + } + } + + std::cout << title << " Distribution" << std::endl; + std::cout << "Value, Percentile" << std::endl; + for (double pct = 0.0; pct < 100.0; pct += 1.0) { + std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl; + } + + gpr_histogram_destroy(h); +} + +using grpc::testing::ExpDist; +using grpc::testing::DetDist; +using grpc::testing::UniformDist; +using grpc::testing::ParetoDist; + +int main(int argc, char **argv) { + RunTest(InterarrivalTimer(ExpDist(10.0), 5), std::string("Exponential(10)")); + RunTest(InterarrivalTimer(DetDist(5.0), 5), std::string("Det(5)")); + RunTest(InterarrivalTimer(UniformDist(0.0,10.0), 5), + std::string("Uniform(1,10)")); + RunTest(InterarrivalTimer(ParetoDist(1.0,1.0), 5), + std::string("Pareto(1,1)")); + + return 0; +} diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 122a7df1ac..b8661f3f4f 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -92,6 +92,41 @@ enum RpcType { STREAMING = 2; } +enum LoadType { + CLOSED_LOOP = 1; + POISSON = 2; + UNIFORM = 3; + DETERMINISTIC = 4; + PARETO = 5; +} + +message PoissonParams { + optional double offered_load = 1; +} + +message UniformParams { + optional double interarrival_lo = 1; + optional double interarrival_hi = 2; +} + +message DeterministicParams { + optional double offered_load = 1; +} + +message ParetoParams { + optional double interarrival_base = 1; + optional double alpha = 2; +} + +message LoadParams { + oneof load { + PoissonParams poisson = 1; + UniformParams uniform = 2; + DeterministicParams determ = 3; + ParetoParams pareto = 4; + }; +} + message ClientConfig { repeated string server_targets = 1; required ClientType client_type = 2; @@ -103,6 +138,8 @@ message ClientConfig { optional int32 async_client_threads = 7; optional RpcType rpc_type = 8 [default=UNARY]; optional string host = 9; + optional LoadType load_type = 10 [default=CLOSED_LOOP]; + optional LoadParams load_params = 11; } // Request current stats |