From eea8cf0fe3a836b78e9ba122a01f6f1552ad8402 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 23 Mar 2017 16:19:00 -0700 Subject: Add QPS tests for one-sided streaming --- test/cpp/qps/client_sync.cc | 220 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 184 insertions(+), 36 deletions(-) (limited to 'test/cpp/qps/client_sync.cc') diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index f8ce2cccbe..9075033bd4 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -137,7 +137,8 @@ class SynchronousUnaryClient final : public SynchronousClient { } }; -class SynchronousStreamingClient final : public SynchronousClient { +template +class SynchronousStreamingClient : public SynchronousClient { public: SynchronousStreamingClient(const ClientConfig& config) : SynchronousClient(config), @@ -145,30 +146,69 @@ class SynchronousStreamingClient final : public SynchronousClient { stream_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { + StartThreads(num_threads_); + } + virtual ~SynchronousStreamingClient() { + std::vector cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + // forcibly cancel the streams, then finish + context_[i].TryCancel(); + (*stream)->Finish(); + // don't log any error message on !ok since this was canceled + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + protected: + std::vector context_; + std::vector> stream_; + const int messages_per_stream_; + std::vector messages_issued_; + + void FinishStream(HistogramEntry* entry, size_t thread_idx) { + Status s = stream_[thread_idx]->Finish(); + // don't set the value since the stream is failed and shouldn't be timed + entry->set_status(s.error_code()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, + s.error_message().c_str()); + } + context_[thread_idx].~ClientContext(); + new (&context_[thread_idx]) ClientContext(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) { for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - StartThreads(num_threads_); } - ~SynchronousStreamingClient() { + ~SynchronousStreamingPingPongClient() { std::vector cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { cleanup_threads.emplace_back([this, i]() { auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i, - s.error_message().c_str()); - } } }); } - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads[i].join(); + for (auto& th : cleanup_threads) { + th.join(); } } @@ -176,7 +216,7 @@ class SynchronousStreamingClient final : public SynchronousClient { if (!WaitToIssue(thread_idx)) { return true; } - GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); + GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -192,40 +232,148 @@ class SynchronousStreamingClient final : public SynchronousClient { } } stream_[thread_idx]->WritesDone(); - Status s = stream_[thread_idx]->Finish(); - // don't set the value since this is either a failure (shouldn't be timed) - // or a stream-end (already has been timed) - entry->set_status(s.error_code()); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); - } + FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - context_[thread_idx].~ClientContext(); - new (&context_[thread_idx]) ClientContext(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; return true; } +}; + +class SynchronousStreamingFromClientClient final + : public SynchronousStreamingClient> { + public: + SynchronousStreamingFromClientClient(const ClientConfig& config) + : SynchronousStreamingClient(config), last_issue_(num_threads_) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + last_issue_[thread_idx] = UsageTimer::Now(); + } + } + ~SynchronousStreamingFromClientClient() { + std::vector cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + (*stream)->WritesDone(); + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + // Figure out how to make histogram sensible if this is rate-paced + if (!WaitToIssue(thread_idx)) { + return true; + } + GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0); + if (stream_[thread_idx]->Write(request_)) { + double now = UsageTimer::Now(); + entry->set_value((now - last_issue_[thread_idx]) * 1e9); + last_issue_[thread_idx] = now; + return true; + } + stream_[thread_idx]->WritesDone(); + FinishStream(entry, thread_idx); + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + return true; + } private: - // These are both conceptually std::vector but cannot be for old compilers - // that expect contained classes to support copy constructors - std::vector context_; - std::vector< - std::unique_ptr>> - stream_; - const int messages_per_stream_; - std::vector messages_issued_; + std::vector last_issue_; }; -std::unique_ptr CreateSynchronousUnaryClient( - const ClientConfig& config) { - return std::unique_ptr(new SynchronousUnaryClient(config)); -} -std::unique_ptr CreateSynchronousStreamingClient( - const ClientConfig& config) { - return std::unique_ptr(new SynchronousStreamingClient(config)); +class SynchronousStreamingFromServerClient final + : public SynchronousStreamingClient> { + public: + SynchronousStreamingFromServerClient(const ClientConfig& config) + : SynchronousStreamingClient(config), last_recv_(num_threads_) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + last_recv_[thread_idx] = UsageTimer::Now(); + } + } + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); + if (stream_[thread_idx]->Read(&responses_[thread_idx])) { + double now = UsageTimer::Now(); + entry->set_value((now - last_recv_[thread_idx]) * 1e9); + last_recv_[thread_idx] = now; + return true; + } + FinishStream(entry, thread_idx); + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + return true; + } + + private: + std::vector last_recv_; +}; + +class SynchronousStreamingBothWaysClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter> { + public: + SynchronousStreamingBothWaysClient(const ClientConfig& config) + : SynchronousStreamingClient(config) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } + } + ~SynchronousStreamingBothWaysClient() { + std::vector cleanup_threads; + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + (*stream)->WritesDone(); + } + }); + } + for (auto& th : cleanup_threads) { + th.join(); + } + } + + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + // TODO (vjpai): Do this + return true; + } +}; + +std::unique_ptr CreateSynchronousClient(const ClientConfig& config) { + switch (config.rpc_type()) { + case UNARY: + return std::unique_ptr(new SynchronousUnaryClient(config)); + case STREAMING: + return std::unique_ptr( + new SynchronousStreamingPingPongClient(config)); + case STREAMING_FROM_CLIENT: + return std::unique_ptr( + new SynchronousStreamingFromClientClient(config)); + case STREAMING_FROM_SERVER: + return std::unique_ptr( + new SynchronousStreamingFromServerClient(config)); + case STREAMING_BOTH_WAYS: + return std::unique_ptr( + new SynchronousStreamingBothWaysClient(config)); + default: + assert(false); + return nullptr; + } } } // namespace testing -- cgit v1.2.3