diff options
author | Vijay Pai <vpai@google.com> | 2017-03-23 16:19:00 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2017-05-02 09:26:12 -0700 |
commit | eea8cf0fe3a836b78e9ba122a01f6f1552ad8402 (patch) | |
tree | 5e417bb6bc0a3c872281cb6ba6f81b8a829ace99 /test/cpp/qps/client_sync.cc | |
parent | 6626a86923aaebae86aeeba151c99ed3089d02fc (diff) |
Add QPS tests for one-sided streaming
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r-- | test/cpp/qps/client_sync.cc | 220 |
1 files changed, 184 insertions, 36 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index f8ce2cccbe..9075033bd4 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -137,7 +137,8 @@ class SynchronousUnaryClient final : public SynchronousClient { } }; -class SynchronousStreamingClient final : public SynchronousClient { +template <class StreamType> +class SynchronousStreamingClient : public SynchronousClient { public: SynchronousStreamingClient(const ClientConfig& config) : SynchronousClient(config), @@ -145,30 +146,69 @@ class SynchronousStreamingClient final : public SynchronousClient { stream_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { + StartThreads(num_threads_); + } + virtual ~SynchronousStreamingClient() { + std::vector<std::thread> cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + // forcibly cancel the streams, then finish + context_[i].TryCancel(); + (*stream)->Finish(); + // don't log any error message on !ok since this was canceled + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + protected: + std::vector<grpc::ClientContext> context_; + std::vector<std::unique_ptr<StreamType>> stream_; + const int messages_per_stream_; + std::vector<int> messages_issued_; + + void FinishStream(HistogramEntry* entry, size_t thread_idx) { + Status s = stream_[thread_idx]->Finish(); + // don't set the value since the stream is failed and shouldn't be timed + entry->set_status(s.error_code()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, + s.error_message().c_str()); + } + context_[thread_idx].~ClientContext(); + new (&context_[thread_idx]) ClientContext(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) { for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - StartThreads(num_threads_); } - ~SynchronousStreamingClient() { + ~SynchronousStreamingPingPongClient() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { cleanup_threads.emplace_back([this, i]() { auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i, - s.error_message().c_str()); - } } }); } - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads[i].join(); + for (auto& th : cleanup_threads) { + th.join(); } } @@ -176,7 +216,7 @@ class SynchronousStreamingClient final : public SynchronousClient { if (!WaitToIssue(thread_idx)) { return true; } - GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); + GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -192,40 +232,148 @@ class SynchronousStreamingClient final : public SynchronousClient { } } stream_[thread_idx]->WritesDone(); - Status s = stream_[thread_idx]->Finish(); - // don't set the value since this is either a failure (shouldn't be timed) - // or a stream-end (already has been timed) - entry->set_status(s.error_code()); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); - } + FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - context_[thread_idx].~ClientContext(); - new (&context_[thread_idx]) ClientContext(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; return true; } +}; + +class SynchronousStreamingFromClientClient final + : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> { + public: + SynchronousStreamingFromClientClient(const ClientConfig& config) + : SynchronousStreamingClient(config), last_issue_(num_threads_) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + last_issue_[thread_idx] = UsageTimer::Now(); + } + } + ~SynchronousStreamingFromClientClient() { + std::vector<std::thread> cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + (*stream)->WritesDone(); + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + // Figure out how to make histogram sensible if this is rate-paced + if (!WaitToIssue(thread_idx)) { + return true; + } + GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0); + if (stream_[thread_idx]->Write(request_)) { + double now = UsageTimer::Now(); + entry->set_value((now - last_issue_[thread_idx]) * 1e9); + last_issue_[thread_idx] = now; + return true; + } + stream_[thread_idx]->WritesDone(); + FinishStream(entry, thread_idx); + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + return true; + } private: - // These are both conceptually std::vector but cannot be for old compilers - // that expect contained classes to support copy constructors - std::vector<grpc::ClientContext> context_; - std::vector< - std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> - stream_; - const int messages_per_stream_; - std::vector<int> messages_issued_; + std::vector<double> last_issue_; }; -std::unique_ptr<Client> CreateSynchronousUnaryClient( - const ClientConfig& config) { - return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); -} -std::unique_ptr<Client> CreateSynchronousStreamingClient( - const ClientConfig& config) { - return std::unique_ptr<Client>(new SynchronousStreamingClient(config)); +class SynchronousStreamingFromServerClient final + : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> { + public: + SynchronousStreamingFromServerClient(const ClientConfig& config) + : SynchronousStreamingClient(config), last_recv_(num_threads_) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + last_recv_[thread_idx] = UsageTimer::Now(); + } + } + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); + if (stream_[thread_idx]->Read(&responses_[thread_idx])) { + double now = UsageTimer::Now(); + entry->set_value((now - last_recv_[thread_idx]) * 1e9); + last_recv_[thread_idx] = now; + return true; + } + FinishStream(entry, thread_idx); + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + return true; + } + + private: + std::vector<double> last_recv_; +}; + +class SynchronousStreamingBothWaysClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingBothWaysClient(const ClientConfig& config) + : SynchronousStreamingClient(config) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } + } + ~SynchronousStreamingBothWaysClient() { + std::vector<std::thread> cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + (*stream)->WritesDone(); + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + // TODO (vjpai): Do this + return true; + } +}; + +std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { + switch (config.rpc_type()) { + case UNARY: + return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); + case STREAMING: + return std::unique_ptr<Client>( + new SynchronousStreamingPingPongClient(config)); + case STREAMING_FROM_CLIENT: + return std::unique_ptr<Client>( + new SynchronousStreamingFromClientClient(config)); + case STREAMING_FROM_SERVER: + return std::unique_ptr<Client>( + new SynchronousStreamingFromServerClient(config)); + case STREAMING_BOTH_WAYS: + return std::unique_ptr<Client>( + new SynchronousStreamingBothWaysClient(config)); + default: + assert(false); + return nullptr; + } } } // namespace testing |