From ea330175404d901b952975b5ce8730822ffb39f8 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 24 Jan 2018 09:12:01 -0800 Subject: Add option to use client side coalescing API in qps test --- test/cpp/qps/client_async.cc | 38 +++++++++++++++++++++++++++++++++----- test/cpp/qps/client_sync.cc | 1 + 2 files changed, 34 insertions(+), 5 deletions(-) (limited to 'test/cpp/qps') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 7cf9d3ea7e..e3fba36a7a 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -82,6 +82,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextUnaryImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported. StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -349,10 +350,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - prepare_req_(prepare_req) {} + prepare_req_(prepare_req), + coalesce_(false) {} ~ClientRpcContextStreamingPingPongImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { - StartInternal(cq, config.messages_per_stream()); + StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api()); } bool RunNextState(bool ok, HistogramEntry* entry) override { while (true) { @@ -375,7 +377,12 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { } start_ = UsageTimer::Now(); next_state_ = State::WRITE_DONE; - stream_->Write(req_, ClientRpcContext::tag(this)); + if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) { + stream_->WriteLast(req_, WriteOptions(), + ClientRpcContext::tag(this)); + } else { + stream_->Write(req_, ClientRpcContext::tag(this)); + } return true; case State::WRITE_DONE: if (!ok) { @@ -391,6 +398,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { if ((messages_per_stream_ != 0) && (++messages_issued_ >= messages_per_stream_)) { next_state_ = State::WRITES_DONE_DONE; + if (coalesce_) { + // WritesDone should have been called on the last Write. + // loop around to call Finish. + break; + } stream_->WritesDone(ClientRpcContext::tag(this)); return true; } @@ -413,7 +425,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingPingPongImpl( stub_, req_, next_issue_, prepare_req_, callback_); - clone->StartInternal(cq, messages_per_stream_); + clone->StartInternal(cq, messages_per_stream_, coalesce_); } void TryCancel() override { context_.TryCancel(); } @@ -449,14 +461,27 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { // Allow a limit on number of messages in a stream int messages_per_stream_; int messages_issued_; + // Whether to use coalescing API. + bool coalesce_; - void StartInternal(CompletionQueue* cq, int messages_per_stream) { + void StartInternal(CompletionQueue* cq, int messages_per_stream, + bool coalesce) { cq_ = cq; messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + coalesce_ = coalesce; + if (coalesce_) { + GPR_ASSERT(messages_per_stream_ != 0); + context_.set_initial_metadata_corked(true); + } stream_ = prepare_req_(stub_, &context_, cq); next_state_ = State::STREAM_IDLE; stream_->StartCall(ClientRpcContext::tag(this)); + if (coalesce_) { + // When the intial metadata is corked, the tag will not come back and we + // need to manually drive the state machine. + RunNextState(true, nullptr); + } } }; @@ -512,6 +537,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromClientImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -641,6 +667,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromServerImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -753,6 +780,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextGenericStreamingImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. StartInternal(cq, config.messages_per_stream()); } bool RunNextState(bool ok, HistogramEntry* entry) override { diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 82a3f0042d..a2ddbeb508 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -402,6 +402,7 @@ class SynchronousStreamingBothWaysClient final }; std::unique_ptr CreateSynchronousClient(const ClientConfig& config) { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. switch (config.rpc_type()) { case UNARY: return std::unique_ptr(new SynchronousUnaryClient(config)); -- cgit v1.2.3