diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/client.h | 115 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 101 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 25 |
3 files changed, 140 insertions, 101 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 9a2894687d..1a0a53d23b 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -69,59 +69,68 @@ namespace testing { typedef std::chrono::high_resolution_clock grpc_time_source; typedef std::chrono::time_point<grpc_time_source> grpc_time; -namespace ClientRequestCreation { template <class RequestType> -void CreateRequest(RequestType *req, const PayloadConfig&) { - // this template must be specialized - // fail with an assertion rather than a compile-time - // check since these only happen at the beginning anyway - GPR_ASSERT(false); -} +class ClientRequestCreator { + public: + ClientRequestCreator(RequestType* req, const PayloadConfig&) { + // this template must be specialized + // fail with an assertion rather than a compile-time + // check since these only happen at the beginning anyway + GPR_ASSERT(false); + } +}; template <> -void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& payload_config) { - if (payload_config.has_bytebuf_params()) { - GPR_ASSERT(false); // not appropriate for this specialization - } else if (payload_config.has_simple_params()) { - req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - req->set_response_size(payload_config.simple_params().resp_size()); - req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE); - int size = payload_config.simple_params().req_size(); - std::unique_ptr<char[]> body(new char[size]); - req->mutable_payload()->set_body(body.get(), size); - } else if (payload_config.has_complex_params()) { - GPR_ASSERT(false); // not appropriate for this specialization - } else { - // default should be simple proto without payloads - req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - req->set_response_size(0); - req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE); +class ClientRequestCreator<SimpleRequest> { + public: + ClientRequestCreator(SimpleRequest* req, + const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else if (payload_config.has_simple_params()) { + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(payload_config.simple_params().resp_size()); + req->mutable_payload()->set_type( + grpc::testing::PayloadType::COMPRESSABLE); + int size = payload_config.simple_params().req_size(); + std::unique_ptr<char[]> body(new char[size]); + req->mutable_payload()->set_body(body.get(), size); + } else if (payload_config.has_complex_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else { + // default should be simple proto without payloads + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(0); + req->mutable_payload()->set_type( + grpc::testing::PayloadType::COMPRESSABLE); + } } -} +}; + template <> -void CreateRequest<ByteBuffer>(ByteBuffer *req, - const PayloadConfig& payload_config) { - if (payload_config.has_bytebuf_params()) { - if (payload_config.bytebuf_params().req_size() > 0) { - std::unique_ptr<char> - buf(new char[payload_config.bytebuf_params().req_size()]); - gpr_slice s = - gpr_slice_from_copied_buffer(buf.get(), - payload_config.bytebuf_params().req_size()); - Slice slice(s, Slice::STEAL_REF); - std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1)); - req->MoveFrom(bbuf.get()); - } else { - GPR_ASSERT(false); // not appropriate for this specialization +class ClientRequestCreator<ByteBuffer> { + public: + ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + if (payload_config.bytebuf_params().req_size() > 0) { + std::unique_ptr<char> buf( + new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = gpr_slice_from_copied_buffer( + buf.get(), payload_config.bytebuf_params().req_size()); + Slice slice(s, Slice::STEAL_REF); + std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1)); + req->MoveFrom(bbuf.get()); + } else { + GPR_ASSERT(false); // not appropriate for this specialization + } } } -} -} +}; class Client { public: Client() : timer_(new Timer), interarrival_timer_() {} - virtual ~Client(); + virtual ~Client() {} ClientStats Mark(bool reset) { Histogram latencies; @@ -156,6 +165,7 @@ class Client { stats.set_time_user(timer_result.user); return stats; } + protected: bool closed_loop_; @@ -227,6 +237,7 @@ class Client { return true; } } + private: class Thread { public: @@ -309,15 +320,16 @@ template <class StubType, class RequestType> class ClientImpl : public Client { public: ClientImpl(const ClientConfig& config, - std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) - : channels_(config.client_channels()), - create_stub_(create_stub) { + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) + : channels_(config.client_channels()), create_stub_(create_stub) { for (int i = 0; i < config.client_channels(); i++) { channels_[i].init(config.server_targets(i % config.server_targets_size()), - config); + config, create_stub_); } - ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config()); + ClientRequestCreator<RequestType> create_req(&request_, + config.payload_config()); } virtual ~ClientImpl() {} @@ -333,7 +345,9 @@ class ClientImpl : public Client { // used for empty entries GPR_ASSERT(!i.channel_ && !i.stub_); } - void init(const grpc::string& target, const ClientConfig& config) { + void init(const grpc::string& target, const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) { // We have to use a 2-phase init like this with a default // constructor followed by an initializer function to make // old compilers happy with using this in std::vector @@ -341,7 +355,7 @@ class ClientImpl : public Client { target, config.security_params().server_host_override(), config.has_security_params(), !config.security_params().use_test_ca()); - stub_ = create_stub_(channel_); + stub_ = create_stub(channel_); } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } @@ -351,7 +365,8 @@ class ClientImpl : public Client { std::unique_ptr<StubType> stub_; }; std::vector<ClientChannelInfo> channels_; - std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_; + std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)> + create_stub_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index fdfe1a567a..087ea75bf4 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -37,20 +37,20 @@ #include <list> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> -#include <grpc/grpc.h> -#include <grpc/support/histogram.h> -#include <grpc/support/log.h> #include <gflags/gflags.h> #include <grpc++/client_context.h> #include <grpc++/generic/generic_stub.h> +#include <grpc/grpc.h> +#include <grpc/support/histogram.h> +#include <grpc/support/log.h> -#include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" +#include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" #include "test/proto/benchmarks/services.grpc.pb.h" @@ -93,7 +93,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req, + CompletionQueue*)> + start_req, std::function<void(grpc::Status, ResponseType*)> on_done) : ClientRpcContext(channel_id), context_(), @@ -139,7 +140,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function<void(grpc::Status, ResponseType*)> callback_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req_; + CompletionQueue*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> @@ -156,13 +158,15 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { using Client::SetupLoadTest; using Client::NextIssueTime; using Client::closed_loop_; - using ClientImpl<StubType,RequestType>::channels_; - using ClientImpl<StubType,RequestType>::request_; + using ClientImpl<StubType, RequestType>::channels_; + using ClientImpl<StubType, RequestType>::request_; AsyncClient( const ClientConfig& config, - std::function<ClientRpcContext*(int, StubType*, const RequestType&)> setup_ctx, - std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) - : ClientImpl<StubType,RequestType>(config, create_stub), + std::function<ClientRpcContext*(int, StubType*, const RequestType&)> + setup_ctx, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) + : ClientImpl<StubType, RequestType>(config, create_stub), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), @@ -352,11 +356,16 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { int pref_channel_inc_; }; -class AsyncUnaryClient GRPC_FINAL : - public AsyncClient<BenchmarkService::Stub, SimpleRequest> { +static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( + std::shared_ptr<Channel> ch) { + return BenchmarkService::NewStub(ch); +} + +class AsyncUnaryClient GRPC_FINAL + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncUnaryClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { StartThreads(config.async_client_threads()); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -385,7 +394,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function<std::unique_ptr< grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req, + void*)> + start_req, std::function<void(grpc::Status, ResponseType*)> on_done) : ClientRpcContext(channel_id), context_(), @@ -437,20 +447,21 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function<void(grpc::Status, ResponseType*)> callback_; - std::function< - std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req_; + std::function<std::unique_ptr< + grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> stream_; }; -class AsyncStreamingClient GRPC_FINAL : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { +class AsyncStreamingClient GRPC_FINAL + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); @@ -481,10 +492,10 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { public: ClientGenericRpcContextStreamingImpl( int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, - std::function<std::unique_ptr< - grpc::GenericClientAsyncReaderWriter>( - grpc::GenericStub*, grpc::ClientContext*, const grpc::string& method_name, - CompletionQueue*, void*)> start_req, + std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( + grpc::GenericStub*, grpc::ClientContext*, + const grpc::string& method_name, CompletionQueue*, void*)> + start_req, std::function<void(grpc::Status, ByteBuffer*)> on_done) : ClientRpcContext(channel_id), context_(), @@ -501,11 +512,13 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { return new ClientGenericRpcContextStreamingImpl(channel_id_, stub_, req_, - start_req_, callback_); + start_req_, callback_); } void Start(CompletionQueue* cq) GRPC_OVERRIDE { - const grpc::string kMethodName("/grpc.testing.BenchmarkService/StreamingCall"); - stream_ = start_req_(stub_, &context_, kMethodName, cq, ClientRpcContext::tag(this)); + const grpc::string kMethodName( + "/grpc.testing.BenchmarkService/StreamingCall"); + stream_ = start_req_(stub_, &context_, kMethodName, cq, + ClientRpcContext::tag(this)); } private: @@ -537,19 +550,25 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { ByteBuffer response_; bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function<void(grpc::Status, ByteBuffer*)> callback_; - std::function< - std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( - grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, CompletionQueue*, - void*)> start_req_; + std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( + grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, + CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; }; -class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericStub, ByteBuffer> { +static std::unique_ptr<grpc::GenericStub> GenericStubCreator( + std::shared_ptr<Channel> ch) { + return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch)); +} + +class GenericAsyncStreamingClient GRPC_FINAL + : public AsyncClient<grpc::GenericStub, ByteBuffer> { public: explicit GenericAsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, grpc::GenericStub) { + : AsyncClient(config, SetupCtx, GenericStubCreator) { // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); @@ -560,14 +579,13 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericS private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} - static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> - StartReq(grpc::GenericStub* stub, grpc::ClientContext* ctx, - const grpc::string& method_name, CompletionQueue* cq, void* tag) { + static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq( + grpc::GenericStub* stub, grpc::ClientContext* ctx, + const grpc::string& method_name, CompletionQueue* cq, void* tag) { auto stream = stub->Call(ctx, method_name, cq, tag); return stream; }; - static ClientRpcContext* SetupCtx(int channel_id, - grpc::GenericStub* stub, + static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, const ByteBuffer& req) { return new ClientGenericRpcContextStreamingImpl( channel_id, stub, req, GenericAsyncStreamingClient::StartReq, @@ -581,7 +599,8 @@ std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) { std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) { return std::unique_ptr<Client>(new AsyncStreamingClient(args)); } -std::unique_ptr<Client> CreateGenericAsyncStreamingClient(const ClientConfig& args) { +std::unique_ptr<Client> CreateGenericAsyncStreamingClient( + const ClientConfig& args) { return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args)); } diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 409fc26972..c27ca7a623 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -35,28 +35,28 @@ #include <chrono> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> #include <gflags/gflags.h> +#include <grpc++/client_context.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/histogram.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/time.h> -#include <grpc++/client_context.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> #include <gtest/gtest.h> -#include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" +#include "test/cpp/util/create_test_channel.h" #include "test/proto/benchmarks/services.grpc.pb.h" #include "src/core/profiling/timers.h" @@ -64,12 +64,17 @@ namespace grpc { namespace testing { -class SynchronousClient : - public ClientImpl<BenchmarkService::Stub, SimpleRequest> { +static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( + std::shared_ptr<Channel> ch) { + return BenchmarkService::NewStub(ch); +} + +class SynchronousClient + : public ClientImpl<BenchmarkService::Stub, SimpleRequest> { public: - SynchronousClient(const ClientConfig& config) : - ClientImpl<BenchmarkService::Stub, - SimpleRequest>(config, BenchmarkService::NewStub) { + SynchronousClient(const ClientConfig& config) + : ClientImpl<BenchmarkService::Stub, SimpleRequest>( + config, BenchmarkStubCreator) { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); |