aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc111
1 files changed, 110 insertions, 1 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 2a33e8ae11..b7634d0438 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -99,7 +99,7 @@ class PollingOverrider {
class Verifier {
public:
- explicit Verifier(bool spin) : spin_(spin) {}
+ explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {}
// Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) {
return ExpectUnless(i, expect_ok, false);
@@ -142,6 +142,18 @@ class Verifier {
return detag(got_tag);
}
+ template <typename T>
+ CompletionQueue::NextStatus DoOnceThenAsyncNext(
+ CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
+ std::function<void(void)> lambda) {
+ if (lambda_run_) {
+ return cq->AsyncNext(got_tag, ok, deadline);
+ } else {
+ lambda_run_ = true;
+ return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
+ }
+ }
+
// Verify keeps calling Next until all currently set
// expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); }
@@ -154,6 +166,7 @@ class Verifier {
Next(cq, ignore_ok);
}
}
+
// This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) {
@@ -193,6 +206,47 @@ class Verifier {
}
}
+ // This version of Verify stops after a certain deadline, and uses the
+ // DoThenAsyncNext API
+ // to call the lambda
+ void Verify(CompletionQueue* cq,
+ std::chrono::system_clock::time_point deadline,
+ std::function<void(void)> lambda) {
+ if (expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ while (std::chrono::system_clock::now() < deadline) {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ while (!expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ for (;;) {
+ GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+ auto r = DoOnceThenAsyncNext(
+ cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda);
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::GOT_EVENT);
+ }
+ GotTag(got_tag, ok, false);
+ }
+ }
+ }
+
private:
void GotTag(void* got_tag, bool ok, bool ignore_ok) {
auto it = expectations_.find(got_tag);
@@ -226,6 +280,7 @@ class Verifier {
std::map<void*, bool> expectations_;
std::map<void*, MaybeExpect> maybe_expectations_;
bool spin_;
+ bool lambda_run_;
};
bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
@@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
EXPECT_TRUE(recv_status.ok());
}
+// Test a simple RPC using the async version of Next
+TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+
+ std::chrono::system_clock::time_point time_now(
+ std::chrono::system_clock::now());
+ std::chrono::system_clock::time_point time_limit(
+ std::chrono::system_clock::now() + std::chrono::seconds(10));
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+
+ auto resp_writer_ptr = &response_writer;
+ auto lambda_2 = [&, this, resp_writer_ptr]() {
+ gpr_log(GPR_ERROR, "CALLED");
+ service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
+ cq_.get(), tag(2));
+ };
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Verify(cq_.get(), time_limit, lambda_2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ auto recv_resp_ptr = &recv_response;
+ auto status_ptr = &recv_status;
+ send_response.set_message(recv_request.message());
+ auto lambda_3 = [&, this, resp_writer_ptr, send_response]() {
+ resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
+ };
+ response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get(), std::chrono::system_clock::time_point::max(),
+ lambda_3);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Two pings and a final pong.
TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();