diff options
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r-- | test/cpp/qps/client_sync.cc | 212 |
1 files changed, 128 insertions, 84 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 9f20b148eb..82a3f0042d 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -60,21 +60,20 @@ class SynchronousClient SetupLoadTest(config, num_threads_); } - virtual ~SynchronousClient(){}; + virtual ~SynchronousClient() {} - virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool InitThreadFuncImpl(size_t thread_idx) = 0; virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; void ThreadFunc(size_t thread_idx, Thread* t) override { - InitThreadFuncImpl(thread_idx); + if (!InitThreadFuncImpl(thread_idx)) { + return; + } for (;;) { // run the loop body HistogramEntry entry; const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); t->UpdateHistogram(&entry); - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } if (!thread_still_ok || ThreadCompleted()) { return; } @@ -109,9 +108,6 @@ class SynchronousClient size_t num_threads_; std::vector<SimpleResponse> responses_; - - private: - void DestroyMultithreading() override final { EndThreads(); } }; class SynchronousUnaryClient final : public SynchronousClient { @@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFuncImpl(size_t thread_idx) override {} + bool InitThreadFuncImpl(size_t thread_idx) override { return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { @@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient { entry->set_status(s.error_code()); return true; } + + private: + void DestroyMultithreading() override final { EndThreads(); } }; template <class StreamType> @@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient { : SynchronousClient(config), context_(num_threads_), stream_(num_threads_), + stream_mu_(num_threads_), + shutdown_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { StartThreads(num_threads_); } virtual ~SynchronousStreamingClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - // forcibly cancel the streams, then finish - context_[i].TryCancel(); - (*stream)->Finish().IgnoreError(); - // don't log any error message on !ok since this was canceled - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams([this](size_t thread_idx) { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + }); } protected: std::vector<grpc::ClientContext> context_; std::vector<std::unique_ptr<StreamType>> stream_; + // stream_mu_ is only needed when changing an element of stream_ or context_ + std::vector<std::mutex> stream_mu_; + // use struct Bool rather than bool because vector<bool> is not concurrent + struct Bool { + bool val; + Bool() : val(false) {} + }; + std::vector<Bool> shutdown_; const int messages_per_stream_; std::vector<int> messages_issued_; @@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient { // don't set the value since the stream is failed and shouldn't be timed entry->set_status(s.error_code()); if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", + thread_idx, s.error_message().c_str()); + } } + // Lock the stream_mu_ now because the client context could change + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); context_[thread_idx].~ClientContext(); new (&context_[thread_idx]) ClientContext(); } -}; -class SynchronousStreamingPingPongClient final - : public SynchronousStreamingClient< - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { - public: - SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) {} - ~SynchronousStreamingPingPongClient() { + void CleanupAllStreams(std::function<void(size_t)> cleaner) { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); + cleanup_threads.emplace_back([this, i, cleaner] { + std::lock_guard<std::mutex> l(stream_mu_[i]); + shutdown_[i].val = true; + if (stream_[i]) { + cleaner(i); } }); } @@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + void DestroyMultithreading() override final { + CleanupAllStreams( + [this](size_t thread_idx) { context_[thread_idx].TryCancel(); }); + EndThreads(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) {} + ~SynchronousStreamingPingPongClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } + + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + return false; + } messages_issued_[thread_idx] = 0; + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } messages_issued_[thread_idx] = 0; return true; } @@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + std::vector<double> last_issue_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + return false; + } last_issue_[thread_idx] = UsageTimer::Now(); + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_issue_; }; class SynchronousStreamingFromServerClient final @@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFuncImpl(size_t thread_idx) override { + ~SynchronousStreamingFromServerClient() {} + + private: + std::vector<double> last_recv_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + return false; + } last_recv_[thread_idx] = UsageTimer::Now(); + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final } FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_recv_; }; class SynchronousStreamingBothWaysClient final @@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } else { + return false; + } + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; |