diff options
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r-- | test/cpp/qps/client_sync.cc | 71 |
1 files changed, 57 insertions, 14 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 7bb7231c6f..947618121b 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -48,31 +48,42 @@ #include <grpc/support/host_port.h> #include <gflags/gflags.h> #include <grpc++/client_context.h> -#include <grpc++/status.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> -#include "test/core/util/grpc_profiler.h" +#include <grpc++/status.h> +#include <grpc++/stream.h> +#include <gtest/gtest.h> #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/client.h" -#include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/qpstest.grpc.pb.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/timer.h" namespace grpc { namespace testing { -class SynchronousClient GRPC_FINAL : public Client { +class SynchronousClient : public Client { public: SynchronousClient(const ClientConfig& config) : Client(config) { - size_t num_threads = - config.outstanding_rpcs_per_channel() * config.client_channels(); - responses_.resize(num_threads); - StartThreads(num_threads); + num_threads_ = + config.outstanding_rpcs_per_channel() * config.client_channels(); + responses_.resize(num_threads_); } - ~SynchronousClient() { EndThreads(); } + virtual ~SynchronousClient() { EndThreads(); } + + protected: + size_t num_threads_; + std::vector<SimpleResponse> responses_; +}; - void ThreadFunc(Histogram* histogram, size_t thread_idx) { +class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { + public: + SynchronousUnaryClient(const ClientConfig& config): + SynchronousClient(config) {StartThreads(num_threads_);} + ~SynchronousUnaryClient() {} + + void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; @@ -80,13 +91,45 @@ class SynchronousClient GRPC_FINAL : public Client { stub->UnaryCall(&context, request_, &responses_[thread_idx]); histogram->Add((Timer::Now() - start) * 1e9); } +}; - private: - std::vector<SimpleResponse> responses_; +class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { + public: + SynchronousStreamingClient(const ClientConfig& config): + SynchronousClient(config) { + for (size_t thread_idx=0;thread_idx<num_threads_;thread_idx++){ + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_ = stub->StreamingCall(&context_); + } + StartThreads(num_threads_); + } + ~SynchronousStreamingClient() { + if (stream_) { + SimpleResponse response; + stream_->WritesDone(); + EXPECT_TRUE(stream_->Finish().IsOk()); + } + } + + void ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + double start = Timer::Now(); + EXPECT_TRUE(stream_->Write(request_)); + EXPECT_TRUE(stream_->Read(&responses_[thread_idx])); + histogram->Add((Timer::Now() - start) * 1e9); + } + private: + grpc::ClientContext context_; + std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, + SimpleResponse>> stream_; }; -std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { - return std::unique_ptr<Client>(new SynchronousClient(config)); +std::unique_ptr<Client> +CreateSynchronousUnaryClient(const ClientConfig& config) { + return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); +} +std::unique_ptr<Client> +CreateSynchronousStreamingClient(const ClientConfig& config) { + return std::unique_ptr<Client>(new SynchronousStreamingClient(config)); } } // namespace testing |