aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreecha@users.noreply.github.com>2016-03-04 14:32:50 -0500
committerGravatar Sree Kuchibhotla <sreecha@users.noreply.github.com>2016-03-04 14:32:50 -0500
commitedd96e4926373644bac1c169431c213d361bed21 (patch)
treef471b62ef9ad25f06cb83b8f49b522c96a30ec27 /test/cpp/end2end
parent22e53cbac6e9c153b935a681811593b8e8e65857 (diff)
Revert "Properly integrate async API with server-side cancellations."
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc232
1 files changed, 56 insertions, 176 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index dc8c2bb6e5..9ca3bf98f8 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -68,7 +68,6 @@ namespace testing {
namespace {
void* tag(int i) { return (void*)(intptr_t)i; }
-int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
#ifdef GPR_POSIX_SOCKET
static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
@@ -107,50 +106,37 @@ class PollingOverrider {
class Verifier {
public:
explicit Verifier(bool spin) : spin_(spin) {}
- // Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) {
expectations_[tag(i)] = expect_ok;
return *this;
}
- // Next waits for 1 async tag to complete, checks its
- // expectations, and returns the tag
- int Next(CompletionQueue* cq, bool ignore_ok) {
- bool ok;
- void* got_tag;
- if (spin_) {
- for (;;) {
- auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
- if (r == CompletionQueue::TIMEOUT) continue;
- if (r == CompletionQueue::GOT_EVENT) break;
- gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
- abort();
- }
- } else {
- EXPECT_TRUE(cq->Next(&got_tag, &ok));
- }
- auto it = expectations_.find(got_tag);
- EXPECT_TRUE(it != expectations_.end());
- if (!ignore_ok) {
- EXPECT_EQ(it->second, ok);
- }
- expectations_.erase(it);
- return detag(got_tag);
- }
-
- // Verify keeps calling Next until all currently set
- // expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); }
- // This version of Verify allows optionally ignoring the
- // outcome of the expectation
void Verify(CompletionQueue* cq, bool ignore_ok) {
GPR_ASSERT(!expectations_.empty());
while (!expectations_.empty()) {
- Next(cq, ignore_ok);
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ for (;;) {
+ auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_TRUE(cq->Next(&got_tag, &ok));
+ }
+ auto it = expectations_.find(got_tag);
+ EXPECT_TRUE(it != expectations_.end());
+ if (!ignore_ok) {
+ EXPECT_EQ(it->second, ok);
+ }
+ expectations_.erase(it);
}
}
- // This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) {
if (expectations_.empty()) {
@@ -807,8 +793,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
}
// This class is for testing scenarios where RPCs are cancelled on the server
-// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
-// API to check for cancellation
+// by calling ServerContext::TryCancel()
class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
protected:
typedef enum {
@@ -818,6 +803,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase;
+ void ServerTryCancel(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel()");
+ EXPECT_TRUE(context->IsCancelled());
+ }
+
// Helper for testing client-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
@@ -851,7 +843,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
- srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -867,12 +858,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
- bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
+ ServerTryCancel(&srv_ctx);
// Since cancellation is done before server reads any results, we know
// for sure that all cq results will return false from this point forward
@@ -880,39 +868,22 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
std::thread* server_try_cancel_thd = NULL;
-
- auto verif = Verifier(GetParam());
-
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd =
- new std::thread(&ServerContext::TryCancel, &srv_ctx);
+ server_try_cancel_thd = new std::thread(
+ &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
// Server will cancel the RPC in a parallel thread while reading the
// requests from the client. Since the cancellation can happen at anytime,
// some of the cq results (i.e those until cancellation) might be true but
// its non deterministic. So better to ignore the cq results
ignore_cq_result = true;
- // Expect that we might possibly see the done tag that
- // indicates cancellation completion in this case
- want_done_tag = true;
- verif.Expect(11, true);
}
// Server reads 3 messages (tags 6, 7 and 8)
- // But if want_done_tag is true, we might also see tag 11
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
srv_stream.Read(&recv_request, tag(tag_idx));
- // Note that we'll add something to the verifier and verify that
- // something was seen, but it might be tag 11 and not what we
- // just added
- int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
- .Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
- }
+ Verifier(GetParam())
+ .Expect(tag_idx, expected_server_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
}
if (server_try_cancel_thd != NULL) {
@@ -921,15 +892,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- srv_ctx.TryCancel();
- want_done_tag = true;
- verif.Expect(11, true);
- }
-
- if (want_done_tag) {
- verif.Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
+ ServerTryCancel(&srv_ctx);
}
// The RPC has been cancelled at this point for sure (i.e irrespective of
@@ -982,7 +945,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
- srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -990,12 +952,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_cq_result = true;
bool ignore_cq_result = false;
- bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
+ ServerTryCancel(&srv_ctx);
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
@@ -1003,41 +962,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
std::thread* server_try_cancel_thd = NULL;
-
- auto verif = Verifier(GetParam());
-
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd =
- new std::thread(&ServerContext::TryCancel, &srv_ctx);
+ server_try_cancel_thd = new std::thread(
+ &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
// Server will cancel the RPC in a parallel thread while writing responses
// to the client. Since the cancellation can happen at anytime, some of
// the cq results (i.e those until cancellation) might be true but it is
// non deterministic. So better to ignore the cq results
ignore_cq_result = true;
- // Expect that we might possibly see the done tag that
- // indicates cancellation completion in this case
- want_done_tag = true;
- verif.Expect(11, true);
}
// Server sends three messages (tags 3, 4 and 5)
- // But if want_done tag is true, we might also see tag 11
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_response.set_message("Pong " + std::to_string(tag_idx));
srv_stream.Write(send_response, tag(tag_idx));
- // Note that we'll add something to the verifier and verify that
- // something was seen, but it might be tag 11 and not what we
- // just added
- int got_tag = verif.Expect(tag_idx, expected_cq_result)
- .Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
- }
+ Verifier(GetParam())
+ .Expect(tag_idx, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
}
if (server_try_cancel_thd != NULL) {
@@ -1046,21 +988,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- srv_ctx.TryCancel();
- want_done_tag = true;
- verif.Expect(11, true);
+ ServerTryCancel(&srv_ctx);
// Client reads may fail bacause it is notified that the stream is
// cancelled.
ignore_cq_result = true;
}
- if (want_done_tag) {
- verif.Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- }
-
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
@@ -1118,7 +1052,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
- srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -1130,12 +1063,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_cq_result = true;
bool ignore_cq_result = false;
- bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
+ ServerTryCancel(&srv_ctx);
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
@@ -1143,84 +1073,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
std::thread* server_try_cancel_thd = NULL;
-
- auto verif = Verifier(GetParam());
-
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd =
- new std::thread(&ServerContext::TryCancel, &srv_ctx);
+ server_try_cancel_thd = new std::thread(
+ &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
// Since server is going to cancel the RPC in a parallel thread, some of
// the cq results (i.e those until the cancellation) might be true. Since
// that number is non-deterministic, it is better to ignore the cq results
ignore_cq_result = true;
- // Expect that we might possibly see the done tag that
- // indicates cancellation completion in this case
- want_done_tag = true;
- verif.Expect(11, true);
}
- int got_tag;
srv_stream.Read(&recv_request, tag(4));
- verif.Expect(4, expected_cq_result);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
- }
+ Verifier(GetParam())
+ .Expect(4, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
send_response.set_message("Pong");
srv_stream.Write(send_response, tag(5));
- verif.Expect(5, expected_cq_result);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
- }
+ Verifier(GetParam())
+ .Expect(5, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
cli_stream->Read(&recv_response, tag(6));
- verif.Expect(6, expected_cq_result);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
- }
+ Verifier(GetParam())
+ .Expect(6, expected_cq_result)
+ .Verify(cq_.get(), ignore_cq_result);
// This is expected to succeed in all cases
cli_stream->WritesDone(tag(7));
- verif.Expect(7, true);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
- }
+ Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
// This is expected to fail in all cases i.e for all values of
// server_try_cancel. This is because at this point, either there are no
// more msgs from the client (because client called WritesDone) or the RPC
// is cancelled on the server
srv_stream.Read(&recv_request, tag(8));
- verif.Expect(8, false);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
- }
+ Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
@@ -1228,15 +1116,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- srv_ctx.TryCancel();
- want_done_tag = true;
- verif.Expect(11, true);
- }
-
- if (want_done_tag) {
- verif.Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
+ ServerTryCancel(&srv_ctx);
}
// The RPC has been cancelled at this point for sure (i.e irrespective of