diff options
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r-- | test/cpp/qps/client.h | 164 |
1 files changed, 109 insertions, 55 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index ee0049578d..15cfd7a2d7 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,6 +37,9 @@ #include <condition_variable> #include <mutex> +#include <grpc++/support/byte_buffer.h> +#include <grpc++/support/slice.h> + #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" @@ -66,37 +69,64 @@ namespace testing { typedef std::chrono::high_resolution_clock grpc_time_source; typedef std::chrono::time_point<grpc_time_source> grpc_time; -class Client { +template <class RequestType> +class ClientRequestCreator { public: - explicit Client(const ClientConfig& config) - : channels_(config.client_channels()), - timer_(new Timer), - interarrival_timer_() { - for (int i = 0; i < config.client_channels(); i++) { - channels_[i].init(config.server_targets(i % config.server_targets_size()), - config); - } - if (config.payload_config().has_bytebuf_params()) { - GPR_ASSERT(false); // not yet implemented - } else if (config.payload_config().has_simple_params()) { - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size( - config.payload_config().simple_params().resp_size()); - request_.mutable_payload()->set_type( + ClientRequestCreator(RequestType* req, const PayloadConfig&) { + // this template must be specialized + // fail with an assertion rather than a compile-time + // check since these only happen at the beginning anyway + GPR_ASSERT(false); + } +}; + +template <> +class ClientRequestCreator<SimpleRequest> { + public: + ClientRequestCreator(SimpleRequest* req, + const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else if (payload_config.has_simple_params()) { + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(payload_config.simple_params().resp_size()); + req->mutable_payload()->set_type( grpc::testing::PayloadType::COMPRESSABLE); - int size = config.payload_config().simple_params().req_size(); + int size = payload_config.simple_params().req_size(); std::unique_ptr<char[]> body(new char[size]); - request_.mutable_payload()->set_body(body.get(), size); - } else if (config.payload_config().has_complex_params()) { - GPR_ASSERT(false); // not yet implemented + req->mutable_payload()->set_body(body.get(), size); + } else if (payload_config.has_complex_params()) { + GPR_ASSERT(false); // not appropriate for this specialization } else { // default should be simple proto without payloads - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size(0); - request_.mutable_payload()->set_type( + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(0); + req->mutable_payload()->set_type( grpc::testing::PayloadType::COMPRESSABLE); } } +}; + +template <> +class ClientRequestCreator<ByteBuffer> { + public: + ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + std::unique_ptr<char> buf( + new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = gpr_slice_from_copied_buffer( + buf.get(), payload_config.bytebuf_params().req_size()); + Slice slice(s, Slice::STEAL_REF); + *req = ByteBuffer(&slice, 1); + } else { + GPR_ASSERT(false); // not appropriate for this specialization + } + } +}; + +class Client { + public: + Client() : timer_(new Timer), interarrival_timer_() {} virtual ~Client() {} ClientStats Mark(bool reset) { @@ -134,37 +164,8 @@ class Client { } protected: - SimpleRequest request_; bool closed_loop_; - class ClientChannelInfo { - public: - ClientChannelInfo() {} - ClientChannelInfo(const ClientChannelInfo& i) { - // The copy constructor is to satisfy old compilers - // that need it for using std::vector . It is only ever - // used for empty entries - GPR_ASSERT(!i.channel_ && !i.stub_); - } - void init(const grpc::string& target, const ClientConfig& config) { - // We have to use a 2-phase init like this with a default - // constructor followed by an initializer function to make - // old compilers happy with using this in std::vector - channel_ = CreateTestChannel( - target, config.security_params().server_host_override(), - config.has_security_params(), - !config.security_params().use_test_ca()); - stub_ = BenchmarkService::NewStub(channel_); - } - Channel* get_channel() { return channel_.get(); } - BenchmarkService::Stub* get_stub() { return stub_.get(); } - - private: - std::shared_ptr<Channel> channel_; - std::unique_ptr<BenchmarkService::Stub> stub_; - }; - std::vector<ClientChannelInfo> channels_; - void StartThreads(size_t num_threads) { for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); @@ -295,8 +296,6 @@ class Client { } } - BenchmarkService::Stub* stub_; - ClientConfig config_; std::mutex mu_; std::condition_variable cv_; bool done_; @@ -314,11 +313,66 @@ class Client { std::vector<grpc_time> next_time_; }; +template <class StubType, class RequestType> +class ClientImpl : public Client { + public: + ClientImpl(const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) + : channels_(config.client_channels()), create_stub_(create_stub) { + for (int i = 0; i < config.client_channels(); i++) { + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config, create_stub_); + } + + ClientRequestCreator<RequestType> create_req(&request_, + config.payload_config()); + } + virtual ~ClientImpl() {} + + protected: + RequestType request_; + + class ClientChannelInfo { + public: + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i) { + // The copy constructor is to satisfy old compilers + // that need it for using std::vector . It is only ever + // used for empty entries + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) { + // We have to use a 2-phase init like this with a default + // constructor followed by an initializer function to make + // old compilers happy with using this in std::vector + channel_ = CreateTestChannel( + target, config.security_params().server_host_override(), + config.has_security_params(), + !config.security_params().use_test_ca()); + stub_ = create_stub(channel_); + } + Channel* get_channel() { return channel_.get(); } + StubType* get_stub() { return stub_.get(); } + + private: + std::shared_ptr<Channel> channel_; + std::unique_ptr<StubType> stub_; + }; + std::vector<ClientChannelInfo> channels_; + std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)> + create_stub_; +}; + std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateSynchronousStreamingClient( const ClientConfig& args); std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args); +std::unique_ptr<Client> CreateGenericAsyncStreamingClient( + const ClientConfig& args); } // namespace testing } // namespace grpc |