aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client.h
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r--test/cpp/qps/client.h164
1 files changed, 109 insertions, 55 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index ee0049578d..15cfd7a2d7 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,6 +37,9 @@
#include <condition_variable>
#include <mutex>
+#include <grpc++/support/byte_buffer.h>
+#include <grpc++/support/slice.h>
+
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
@@ -66,37 +69,64 @@ namespace testing {
typedef std::chrono::high_resolution_clock grpc_time_source;
typedef std::chrono::time_point<grpc_time_source> grpc_time;
-class Client {
+template <class RequestType>
+class ClientRequestCreator {
public:
- explicit Client(const ClientConfig& config)
- : channels_(config.client_channels()),
- timer_(new Timer),
- interarrival_timer_() {
- for (int i = 0; i < config.client_channels(); i++) {
- channels_[i].init(config.server_targets(i % config.server_targets_size()),
- config);
- }
- if (config.payload_config().has_bytebuf_params()) {
- GPR_ASSERT(false); // not yet implemented
- } else if (config.payload_config().has_simple_params()) {
- request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
- request_.set_response_size(
- config.payload_config().simple_params().resp_size());
- request_.mutable_payload()->set_type(
+ ClientRequestCreator(RequestType* req, const PayloadConfig&) {
+ // this template must be specialized
+ // fail with an assertion rather than a compile-time
+ // check since these only happen at the beginning anyway
+ GPR_ASSERT(false);
+ }
+};
+
+template <>
+class ClientRequestCreator<SimpleRequest> {
+ public:
+ ClientRequestCreator(SimpleRequest* req,
+ const PayloadConfig& payload_config) {
+ if (payload_config.has_bytebuf_params()) {
+ GPR_ASSERT(false); // not appropriate for this specialization
+ } else if (payload_config.has_simple_params()) {
+ req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ req->set_response_size(payload_config.simple_params().resp_size());
+ req->mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
- int size = config.payload_config().simple_params().req_size();
+ int size = payload_config.simple_params().req_size();
std::unique_ptr<char[]> body(new char[size]);
- request_.mutable_payload()->set_body(body.get(), size);
- } else if (config.payload_config().has_complex_params()) {
- GPR_ASSERT(false); // not yet implemented
+ req->mutable_payload()->set_body(body.get(), size);
+ } else if (payload_config.has_complex_params()) {
+ GPR_ASSERT(false); // not appropriate for this specialization
} else {
// default should be simple proto without payloads
- request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
- request_.set_response_size(0);
- request_.mutable_payload()->set_type(
+ req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ req->set_response_size(0);
+ req->mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
}
}
+};
+
+template <>
+class ClientRequestCreator<ByteBuffer> {
+ public:
+ ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
+ if (payload_config.has_bytebuf_params()) {
+ std::unique_ptr<char> buf(
+ new char[payload_config.bytebuf_params().req_size()]);
+ gpr_slice s = gpr_slice_from_copied_buffer(
+ buf.get(), payload_config.bytebuf_params().req_size());
+ Slice slice(s, Slice::STEAL_REF);
+ *req = ByteBuffer(&slice, 1);
+ } else {
+ GPR_ASSERT(false); // not appropriate for this specialization
+ }
+ }
+};
+
+class Client {
+ public:
+ Client() : timer_(new Timer), interarrival_timer_() {}
virtual ~Client() {}
ClientStats Mark(bool reset) {
@@ -134,37 +164,8 @@ class Client {
}
protected:
- SimpleRequest request_;
bool closed_loop_;
- class ClientChannelInfo {
- public:
- ClientChannelInfo() {}
- ClientChannelInfo(const ClientChannelInfo& i) {
- // The copy constructor is to satisfy old compilers
- // that need it for using std::vector . It is only ever
- // used for empty entries
- GPR_ASSERT(!i.channel_ && !i.stub_);
- }
- void init(const grpc::string& target, const ClientConfig& config) {
- // We have to use a 2-phase init like this with a default
- // constructor followed by an initializer function to make
- // old compilers happy with using this in std::vector
- channel_ = CreateTestChannel(
- target, config.security_params().server_host_override(),
- config.has_security_params(),
- !config.security_params().use_test_ca());
- stub_ = BenchmarkService::NewStub(channel_);
- }
- Channel* get_channel() { return channel_.get(); }
- BenchmarkService::Stub* get_stub() { return stub_.get(); }
-
- private:
- std::shared_ptr<Channel> channel_;
- std::unique_ptr<BenchmarkService::Stub> stub_;
- };
- std::vector<ClientChannelInfo> channels_;
-
void StartThreads(size_t num_threads) {
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
@@ -295,8 +296,6 @@ class Client {
}
}
- BenchmarkService::Stub* stub_;
- ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
@@ -314,11 +313,66 @@ class Client {
std::vector<grpc_time> next_time_;
};
+template <class StubType, class RequestType>
+class ClientImpl : public Client {
+ public:
+ ClientImpl(const ClientConfig& config,
+ std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
+ create_stub)
+ : channels_(config.client_channels()), create_stub_(create_stub) {
+ for (int i = 0; i < config.client_channels(); i++) {
+ channels_[i].init(config.server_targets(i % config.server_targets_size()),
+ config, create_stub_);
+ }
+
+ ClientRequestCreator<RequestType> create_req(&request_,
+ config.payload_config());
+ }
+ virtual ~ClientImpl() {}
+
+ protected:
+ RequestType request_;
+
+ class ClientChannelInfo {
+ public:
+ ClientChannelInfo() {}
+ ClientChannelInfo(const ClientChannelInfo& i) {
+ // The copy constructor is to satisfy old compilers
+ // that need it for using std::vector . It is only ever
+ // used for empty entries
+ GPR_ASSERT(!i.channel_ && !i.stub_);
+ }
+ void init(const grpc::string& target, const ClientConfig& config,
+ std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
+ create_stub) {
+ // We have to use a 2-phase init like this with a default
+ // constructor followed by an initializer function to make
+ // old compilers happy with using this in std::vector
+ channel_ = CreateTestChannel(
+ target, config.security_params().server_host_override(),
+ config.has_security_params(),
+ !config.security_params().use_test_ca());
+ stub_ = create_stub(channel_);
+ }
+ Channel* get_channel() { return channel_.get(); }
+ StubType* get_stub() { return stub_.get(); }
+
+ private:
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<StubType> stub_;
+ };
+ std::vector<ClientChannelInfo> channels_;
+ std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
+ create_stub_;
+};
+
std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
std::unique_ptr<Client> CreateSynchronousStreamingClient(
const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
+ const ClientConfig& args);
} // namespace testing
} // namespace grpc