diff options
author | Ken Payson <kpayson@google.com> | 2017-10-20 10:32:30 -0700 |
---|---|---|
committer | Ken Payson <kpayson@google.com> | 2017-10-25 09:13:30 -0700 |
commit | 42bd87e376913939850bfa78a3c7f96ce83af11e (patch) | |
tree | be63963cc76a3f293c1bcb8ca1b57b56d21a8e4e /test/cpp/end2end | |
parent | 0d1150855d5c812d649111a4675ad0c444dafdc4 (diff) |
Adds gRPC Experimental CQ DoThenAsyncNext lambda API
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 111 |
1 files changed, 110 insertions, 1 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 2a33e8ae11..b7634d0438 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -99,7 +99,7 @@ class PollingOverrider { class Verifier { public: - explicit Verifier(bool spin) : spin_(spin) {} + explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {} // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { return ExpectUnless(i, expect_ok, false); @@ -142,6 +142,18 @@ class Verifier { return detag(got_tag); } + template <typename T> + CompletionQueue::NextStatus DoOnceThenAsyncNext( + CompletionQueue* cq, void** got_tag, bool* ok, T deadline, + std::function<void(void)> lambda) { + if (lambda_run_) { + return cq->AsyncNext(got_tag, ok, deadline); + } else { + lambda_run_ = true; + return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline); + } + } + // Verify keeps calling Next until all currently set // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } @@ -154,6 +166,7 @@ class Verifier { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { @@ -193,6 +206,47 @@ class Verifier { } } + // This version of Verify stops after a certain deadline, and uses the + // DoThenAsyncNext API + // to call the lambda + void Verify(CompletionQueue* cq, + std::chrono::system_clock::time_point deadline, + std::function<void(void)> lambda) { + if (expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + while (std::chrono::system_clock::now() < deadline) { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + while (!expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + GPR_ASSERT(std::chrono::system_clock::now() < deadline); + auto r = DoOnceThenAsyncNext( + cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::GOT_EVENT); + } + GotTag(got_tag, ok, false); + } + } + } + private: void GotTag(void* got_tag, bool ok, bool ignore_ok) { auto it = expectations_.find(got_tag); @@ -226,6 +280,7 @@ class Verifier { std::map<void*, bool> expectations_; std::map<void*, MaybeExpect> maybe_expectations_; bool spin_; + bool lambda_run_; }; bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) { @@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { EXPECT_TRUE(recv_status.ok()); } +// Test a simple RPC using the async version of Next +TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + std::chrono::system_clock::time_point time_now( + std::chrono::system_clock::now()); + std::chrono::system_clock::time_point time_limit( + std::chrono::system_clock::now() + std::chrono::seconds(10)); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + + auto resp_writer_ptr = &response_writer; + auto lambda_2 = [&, this, resp_writer_ptr]() { + gpr_log(GPR_ERROR, "CALLED"); + service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(), + cq_.get(), tag(2)); + }; + + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit, lambda_2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + auto recv_resp_ptr = &recv_response; + auto status_ptr = &recv_status; + send_response.set_message(recv_request.message()); + auto lambda_3 = [&, this, resp_writer_ptr, send_response]() { + resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); + }; + response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max(), + lambda_3); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // Two pings and a final pong. TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ResetStub(); |