aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-12-30 11:56:19 -0800
committerGravatar Vijay Pai <vpai@google.com>2015-12-30 11:56:19 -0800
commite4886680752e9181e0d848c3046e4a8d1eddffa3 (patch)
treeaa8c3e87443f33d328f0658c720c5c7ddc02cc13 /test/cpp/qps
parent994f8f76e77d15d7e54f05ec7794d624cdc5b158 (diff)
WIP
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/client.h121
-rw-r--r--test/cpp/qps/client_async.cc23
-rw-r--r--test/cpp/qps/client_sync.cc7
3 files changed, 86 insertions, 65 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index b11a83b570..9a2894687d 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -37,6 +37,9 @@
#include <condition_variable>
#include <mutex>
+#include <grpc++/support/byte_buffer.h>
+#include <grpc++/support/slice.h>
+
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
@@ -74,7 +77,7 @@ void CreateRequest(RequestType *req, const PayloadConfig&) {
// check since these only happen at the beginning anyway
GPR_ASSERT(false);
}
-
+
template <>
void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& payload_config) {
if (payload_config.has_bytebuf_params()) {
@@ -96,11 +99,15 @@ void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& paylo
}
}
template <>
-void CreateRequest<ByteBuffer>(ByteBuffer *req, const PayloadConfig& payload_config) {
+void CreateRequest<ByteBuffer>(ByteBuffer *req,
+ const PayloadConfig& payload_config) {
if (payload_config.has_bytebuf_params()) {
- if (payload_config.req_size() > 0) {
- std::unique_ptr<char> buf(new char[payload_config.req_size()]);
- gpr_slice_from_copied_buffer(buf.get(), payload_config.req_size());
+ if (payload_config.bytebuf_params().req_size() > 0) {
+ std::unique_ptr<char>
+ buf(new char[payload_config.bytebuf_params().req_size()]);
+ gpr_slice s =
+ gpr_slice_from_copied_buffer(buf.get(),
+ payload_config.bytebuf_params().req_size());
Slice slice(s, Slice::STEAL_REF);
std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1));
req->MoveFrom(bbuf.get());
@@ -110,24 +117,11 @@ void CreateRequest<ByteBuffer>(ByteBuffer *req, const PayloadConfig& payload_con
}
}
}
-
-template <class StubType, class RequestType>
+
class Client {
public:
- Client(const ClientConfig& config,
- std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub)
- : channels_(config.client_channels()),
- create_stub_(create_stub),
- timer_(new Timer),
- interarrival_timer_() {
- for (int i = 0; i < config.client_channels(); i++) {
- channels_[i].init(config.server_targets(i % config.server_targets_size()),
- config);
- }
-
- ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config());
- }
- virtual ~Client() {}
+ Client() : timer_(new Timer), interarrival_timer_() {}
+ virtual ~Client();
ClientStats Mark(bool reset) {
Histogram latencies;
@@ -162,40 +156,9 @@ class Client {
stats.set_time_user(timer_result.user);
return stats;
}
-
protected:
- RequestType request_;
bool closed_loop_;
- class ClientChannelInfo {
- public:
- ClientChannelInfo() {}
- ClientChannelInfo(const ClientChannelInfo& i) {
- // The copy constructor is to satisfy old compilers
- // that need it for using std::vector . It is only ever
- // used for empty entries
- GPR_ASSERT(!i.channel_ && !i.stub_);
- }
- void init(const grpc::string& target, const ClientConfig& config) {
- // We have to use a 2-phase init like this with a default
- // constructor followed by an initializer function to make
- // old compilers happy with using this in std::vector
- channel_ = CreateTestChannel(
- target, config.security_params().server_host_override(),
- config.has_security_params(),
- !config.security_params().use_test_ca());
- stub_ = create_stub_(channel_);
- }
- Channel* get_channel() { return channel_.get(); }
- StubType* get_stub() { return stub_.get(); }
-
- private:
- std::shared_ptr<Channel> channel_;
- std::unique_ptr<StubType> stub_;
- };
- std::vector<ClientChannelInfo> channels_;
- std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_;
-
void StartThreads(size_t num_threads) {
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
@@ -264,7 +227,6 @@ class Client {
return true;
}
}
-
private:
class Thread {
public:
@@ -326,8 +288,6 @@ class Client {
}
}
- BenchmarkService::Stub* stub_;
- ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
@@ -337,7 +297,7 @@ class Client {
size_t idx_;
std::thread impl_;
};
-
+
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
@@ -345,6 +305,55 @@ class Client {
std::vector<grpc_time> next_time_;
};
+template <class StubType, class RequestType>
+class ClientImpl : public Client {
+ public:
+ ClientImpl(const ClientConfig& config,
+ std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub)
+ : channels_(config.client_channels()),
+ create_stub_(create_stub) {
+ for (int i = 0; i < config.client_channels(); i++) {
+ channels_[i].init(config.server_targets(i % config.server_targets_size()),
+ config);
+ }
+
+ ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config());
+ }
+ virtual ~ClientImpl() {}
+
+ protected:
+ RequestType request_;
+
+ class ClientChannelInfo {
+ public:
+ ClientChannelInfo() {}
+ ClientChannelInfo(const ClientChannelInfo& i) {
+ // The copy constructor is to satisfy old compilers
+ // that need it for using std::vector . It is only ever
+ // used for empty entries
+ GPR_ASSERT(!i.channel_ && !i.stub_);
+ }
+ void init(const grpc::string& target, const ClientConfig& config) {
+ // We have to use a 2-phase init like this with a default
+ // constructor followed by an initializer function to make
+ // old compilers happy with using this in std::vector
+ channel_ = CreateTestChannel(
+ target, config.security_params().server_host_override(),
+ config.has_security_params(),
+ !config.security_params().use_test_ca());
+ stub_ = create_stub_(channel_);
+ }
+ Channel* get_channel() { return channel_.get(); }
+ StubType* get_stub() { return stub_.get(); }
+
+ private:
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<StubType> stub_;
+ };
+ std::vector<ClientChannelInfo> channels_;
+ std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_;
+};
+
std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
std::unique_ptr<Client> CreateSynchronousStreamingClient(
const ClientConfig& args);
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index c05774c410..fdfe1a567a 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -47,6 +47,7 @@
#include <grpc/support/log.h>
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
+#include <grpc++/generic/generic_stub.h>
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
@@ -148,13 +149,20 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
typedef std::forward_list<ClientRpcContext*> context_list;
template <class StubType, class RequestType>
-class AsyncClient : public Client<StubType, RequestType> {
+class AsyncClient : public ClientImpl<StubType, RequestType> {
+ // Specify which protected members we are using since there is no
+ // member name resolution until the template types are fully resolved
public:
+ using Client::SetupLoadTest;
+ using Client::NextIssueTime;
+ using Client::closed_loop_;
+ using ClientImpl<StubType,RequestType>::channels_;
+ using ClientImpl<StubType,RequestType>::request_;
AsyncClient(
const ClientConfig& config,
std::function<ClientRpcContext*(int, StubType*, const RequestType&)> setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub)
- : Client(config, create_stub),
+ : ClientImpl<StubType,RequestType>(config, create_stub),
channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
@@ -344,7 +352,8 @@ class AsyncClient : public Client<StubType, RequestType> {
int pref_channel_inc_;
};
-class AsyncUnaryClient GRPC_FINAL : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
+class AsyncUnaryClient GRPC_FINAL :
+ public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
public:
explicit AsyncUnaryClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, BenchmarkService::NewStub) {
@@ -559,10 +568,10 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericS
};
static ClientRpcContext* SetupCtx(int channel_id,
grpc::GenericStub* stub,
- const SimpleRequest& req) {
- return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- channel_id, stub, req, AsyncStreamingClient::StartReq,
- AsyncStreamingClient::CheckDone);
+ const ByteBuffer& req) {
+ return new ClientGenericRpcContextStreamingImpl(
+ channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
+ GenericAsyncStreamingClient::CheckDone);
}
};
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 10d680860a..409fc26972 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -64,9 +64,12 @@
namespace grpc {
namespace testing {
-class SynchronousClient : public Client {
+class SynchronousClient :
+ public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
public:
- SynchronousClient(const ClientConfig& config) : Client(config) {
+ SynchronousClient(const ClientConfig& config) :
+ ClientImpl<BenchmarkService::Stub,
+ SimpleRequest>(config, BenchmarkService::NewStub) {
num_threads_ =
config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads_);