aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc235
-rw-r--r--test/cpp/end2end/shutdown_test.cc159
-rw-r--r--test/cpp/interop/client_helper.cc30
-rw-r--r--test/cpp/interop/client_helper.h18
-rw-r--r--test/cpp/interop/interop_client.cc39
5 files changed, 350 insertions, 131 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index a30c841216..9a9cca0ba1 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -56,6 +56,10 @@
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/pollset_posix.h"
+#endif
+
using grpc::cpp::test::util::EchoRequest;
using grpc::cpp::test::util::EchoResponse;
using std::chrono::system_clock;
@@ -67,8 +71,41 @@ namespace {
void* tag(int i) { return (void*)(gpr_intptr)i; }
-class Verifier {
+#ifdef GPR_POSIX_SOCKET
+static int assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
+ int timeout) {
+ GPR_ASSERT(timeout == 0);
+ return poll(pfds, nfds, timeout);
+}
+
+class PollOverride {
+ public:
+ PollOverride(grpc_poll_function_type f) {
+ prev_ = grpc_poll_function;
+ grpc_poll_function = f;
+ }
+
+ ~PollOverride() { grpc_poll_function = prev_; }
+
+ private:
+ grpc_poll_function_type prev_;
+};
+
+class PollingCheckRegion : public PollOverride {
public:
+ explicit PollingCheckRegion(bool allow_blocking)
+ : PollOverride(allow_blocking ? poll : assert_non_blocking_poll) {}
+};
+#else
+class PollingCheckRegion {
+ public:
+ explicit PollingCheckRegion(bool allow_blocking) {}
+};
+#endif
+
+class Verifier : public PollingCheckRegion {
+ public:
+ explicit Verifier(bool spin) : PollingCheckRegion(!spin), spin_(spin) {}
Verifier& Expect(int i, bool expect_ok) {
expectations_[tag(i)] = expect_ok;
return *this;
@@ -78,7 +115,17 @@ class Verifier {
while (!expectations_.empty()) {
bool ok;
void* got_tag;
- EXPECT_TRUE(cq->Next(&got_tag, &ok));
+ if (spin_) {
+ for (;;) {
+ auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_TRUE(cq->Next(&got_tag, &ok));
+ }
auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end());
EXPECT_EQ(it->second, ok);
@@ -90,14 +137,34 @@ class Verifier {
if (expectations_.empty()) {
bool ok;
void* got_tag;
- EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
- CompletionQueue::TIMEOUT);
+ if (spin_) {
+ while (std::chrono::system_clock::now() < deadline) {
+ EXPECT_EQ(
+ cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
+ CompletionQueue::TIMEOUT);
+ }
} else {
while (!expectations_.empty()) {
bool ok;
void* got_tag;
- EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
- CompletionQueue::GOT_EVENT);
+ if (spin_) {
+ for (;;) {
+ GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+ auto r =
+ cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
+ CompletionQueue::GOT_EVENT);
+ }
auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end());
EXPECT_EQ(it->second, ok);
@@ -108,9 +175,10 @@ class Verifier {
private:
std::map<void*, bool> expectations_;
+ bool spin_;
};
-class AsyncEnd2endTest : public ::testing::Test {
+class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
protected:
AsyncEnd2endTest() {}
@@ -160,15 +228,15 @@ class AsyncEnd2endTest : public ::testing::Test {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -182,18 +250,18 @@ class AsyncEnd2endTest : public ::testing::Test {
std::ostringstream server_address_;
};
-TEST_F(AsyncEnd2endTest, SimpleRpc) {
+TEST_P(AsyncEnd2endTest, SimpleRpc) {
ResetStub();
SendRpc(1);
}
-TEST_F(AsyncEnd2endTest, SequentialRpcs) {
+TEST_P(AsyncEnd2endTest, SequentialRpcs) {
ResetStub();
SendRpc(10);
}
// Test a simple RPC using the async version of Next
-TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
+TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ResetStub();
EchoRequest send_request;
@@ -214,30 +282,32 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10));
- Verifier().Verify(cq_.get(), time_now);
- Verifier().Verify(cq_.get(), time_now);
+ Verifier(GetParam()).Verify(cq_.get(), time_now);
+ Verifier(GetParam()).Verify(cq_.get(), time_now);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier().Expect(3, true).Verify(
- cq_.get(), std::chrono::system_clock::time_point::max());
+ Verifier(GetParam())
+ .Expect(3, true)
+ .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier().Expect(4, true).Verify(
- cq_.get(), std::chrono::system_clock::time_point::max());
+ Verifier(GetParam())
+ .Expect(4, true)
+ .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
// Two pings and a final pong.
-TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();
EchoRequest send_request;
@@ -256,41 +326,41 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(6));
- Verifier().Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
- Verifier().Expect(7, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8));
- Verifier().Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
- Verifier().Expect(9, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
- Verifier().Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
// One ping, two pongs.
-TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ResetStub();
EchoRequest send_request;
@@ -309,38 +379,38 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
- Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6));
- Verifier().Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
- Verifier().Expect(7, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(8));
- Verifier().Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
- Verifier().Expect(9, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
// One ping, one pong.
-TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ResetStub();
EchoRequest send_request;
@@ -359,40 +429,40 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6));
- Verifier().Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
- Verifier().Expect(7, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8));
- Verifier().Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
- Verifier().Expect(9, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
- Verifier().Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
// Metadata tests
-TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ResetStub();
EchoRequest send_request;
@@ -416,7 +486,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@@ -426,16 +496,16 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
-TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
ResetStub();
EchoRequest send_request;
@@ -457,15 +527,15 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
@@ -473,16 +543,16 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier().Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
-TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
ResetStub();
EchoRequest send_request;
@@ -504,20 +574,20 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -526,7 +596,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
}
-TEST_F(AsyncEnd2endTest, MetadataRpc) {
+TEST_P(AsyncEnd2endTest, MetadataRpc) {
ResetStub();
EchoRequest send_request;
@@ -563,7 +633,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@@ -573,9 +643,9 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
@@ -586,10 +656,10 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier().Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -599,7 +669,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
}
// Server uses AsyncNotifyWhenDone API to check for cancellation
-TEST_F(AsyncEnd2endTest, ServerCheckCancellation) {
+TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
ResetStub();
EchoRequest send_request;
@@ -620,21 +690,21 @@ TEST_F(AsyncEnd2endTest, ServerCheckCancellation) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel();
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier().Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
}
// Server uses AsyncNotifyWhenDone API to check for normal finish
-TEST_F(AsyncEnd2endTest, ServerCheckDone) {
+TEST_P(AsyncEnd2endTest, ServerCheckDone) {
ResetStub();
EchoRequest send_request;
@@ -655,23 +725,23 @@ TEST_F(AsyncEnd2endTest, ServerCheckDone) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier().Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier().Expect(3, true).Verify(cq_.get());
- Verifier().Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier().Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
-TEST_F(AsyncEnd2endTest, UnimplementedRpc) {
+TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
std::shared_ptr<ChannelInterface> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
@@ -687,12 +757,15 @@ TEST_F(AsyncEnd2endTest, UnimplementedRpc) {
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier().Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message());
}
+INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
+ ::testing::Values(false, true));
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/end2end/shutdown_test.cc b/test/cpp/end2end/shutdown_test.cc
new file mode 100644
index 0000000000..fccbb13030
--- /dev/null
+++ b/test/cpp/end2end/shutdown_test.cc
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/util/test_config.h"
+
+#include <thread>
+
+#include "test/core/util/port.h"
+#include "test/cpp/util/echo.grpc.pb.h"
+#include "src/core/support/env.h"
+#include <grpc++/channel_arguments.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/server_credentials.h>
+#include <grpc++/status.h>
+#include <gtest/gtest.h>
+#include <grpc/grpc.h>
+#include <grpc/support/sync.h>
+
+using grpc::cpp::test::util::EchoRequest;
+using grpc::cpp::test::util::EchoResponse;
+
+namespace grpc {
+namespace testing {
+
+class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
+ public:
+ explicit TestServiceImpl(gpr_event* ev) : ev_(ev) {}
+
+ Status Echo(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) GRPC_OVERRIDE {
+ gpr_event_set(ev_, (void*)1);
+ while (!context->IsCancelled()) {
+ }
+ return Status::OK;
+ }
+
+ private:
+ gpr_event* ev_;
+};
+
+class ShutdownTest : public ::testing::Test {
+ public:
+ ShutdownTest() : shutdown_(false), service_(&ev_) { gpr_event_init(&ev_); }
+
+ void SetUp() GRPC_OVERRIDE {
+ port_ = grpc_pick_unused_port_or_die();
+ server_ = SetUpServer(port_);
+ }
+
+ std::unique_ptr<Server> SetUpServer(const int port) {
+ grpc::string server_address = "localhost:" + to_string(port);
+
+ ServerBuilder builder;
+ builder.AddListeningPort(server_address, InsecureServerCredentials());
+ builder.RegisterService(&service_);
+ std::unique_ptr<Server> server = builder.BuildAndStart();
+ return server;
+ }
+
+ void TearDown() GRPC_OVERRIDE { GPR_ASSERT(shutdown_); }
+
+ void ResetStub() {
+ string target = "dns:localhost:" + to_string(port_);
+ channel_ = CreateChannel(target, InsecureCredentials(), ChannelArguments());
+ stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
+ }
+
+ string to_string(const int number) {
+ std::stringstream strs;
+ strs << number;
+ return strs.str();
+ }
+
+ void SendRequest() {
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Hello");
+ ClientContext context;
+ GPR_ASSERT(!shutdown_);
+ Status s = stub_->Echo(&context, request, &response);
+ GPR_ASSERT(shutdown_);
+ }
+
+ protected:
+ std::shared_ptr<ChannelInterface> channel_;
+ std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
+ std::unique_ptr<Server> server_;
+ bool shutdown_;
+ int port_;
+ gpr_event ev_;
+ TestServiceImpl service_;
+};
+
+// Tests zookeeper state change between two RPCs
+// TODO(ctiller): leaked objects in this test
+TEST_F(ShutdownTest, ShutdownTest) {
+ ResetStub();
+
+ // send the request in a background thread
+ std::thread thr(std::bind(&ShutdownTest::SendRequest, this));
+
+ // wait for the server to get the event
+ gpr_event_wait(&ev_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+
+ shutdown_ = true;
+
+ // shutdown should trigger cancellation causing everything to shutdown
+ auto deadline =
+ std::chrono::system_clock::now() + std::chrono::microseconds(100);
+ server_->Shutdown(deadline);
+ EXPECT_GE(std::chrono::system_clock::now(), deadline);
+
+ thr.join();
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index 65fdc63b43..da5627de95 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -52,7 +52,6 @@
#include "test/core/security/oauth2_utils.h"
#include "test/cpp/util/create_test_channel.h"
-#include "src/core/surface/call.h"
#include "src/cpp/client/secure_credentials.h"
DECLARE_bool(enable_ssl);
@@ -65,8 +64,6 @@ DECLARE_string(default_service_account);
DECLARE_string(service_account_key_file);
DECLARE_string(oauth_scope);
-using grpc::testing::CompressionType;
-
namespace grpc {
namespace testing {
@@ -143,32 +140,5 @@ std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
}
}
-CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
- grpc_compression_algorithm algorithm) {
- switch (algorithm) {
- case GRPC_COMPRESS_NONE:
- return CompressionType::NONE;
- case GRPC_COMPRESS_GZIP:
- return CompressionType::GZIP;
- case GRPC_COMPRESS_DEFLATE:
- return CompressionType::DEFLATE;
- default:
- GPR_ASSERT(false);
- }
-}
-
-InteropClientContextInspector::InteropClientContextInspector(
- const ::grpc::ClientContext& context)
- : context_(context) {}
-
-grpc_compression_algorithm
-InteropClientContextInspector::GetCallCompressionAlgorithm() const {
- return grpc_call_get_compression_algorithm(context_.call_);
-}
-
-gpr_uint32 InteropClientContextInspector::GetMessageFlags() const {
- return grpc_call_get_message_flags(context_.call_);
-}
-
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h
index 28fca3266c..edc69e90ac 100644
--- a/test/cpp/interop/client_helper.h
+++ b/test/cpp/interop/client_helper.h
@@ -39,7 +39,7 @@
#include <grpc++/config.h>
#include <grpc++/channel_interface.h>
-#include "test/proto/messages.grpc.pb.h"
+#include "src/core/surface/call.h"
namespace grpc {
namespace testing {
@@ -51,17 +51,19 @@ grpc::string GetOauth2AccessToken();
std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
const grpc::string& test_case);
-grpc::testing::CompressionType
-GetInteropCompressionTypeFromCompressionAlgorithm(
- grpc_compression_algorithm algorithm);
-
class InteropClientContextInspector {
public:
- InteropClientContextInspector(const ::grpc::ClientContext& context);
+ InteropClientContextInspector(const ::grpc::ClientContext& context)
+ : context_(context) {}
// Inspector methods, able to peek inside ClientContext, follow.
- grpc_compression_algorithm GetCallCompressionAlgorithm() const;
- gpr_uint32 GetMessageFlags() const;
+ grpc_compression_algorithm GetCallCompressionAlgorithm() const {
+ return grpc_call_get_compression_algorithm(context_.call_);
+ }
+
+ gpr_uint32 GetMessageFlags() const {
+ return grpc_call_get_message_flags(context_.call_);
+ }
private:
const ::grpc::ClientContext& context_;
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index ddf91aa5eb..5ed14d556a 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -41,6 +41,7 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/credentials.h>
@@ -67,6 +68,20 @@ const int kResponseMessageSize = 1030;
const int kReceiveDelayMilliSeconds = 20;
const int kLargeRequestSize = 271828;
const int kLargeResponseSize = 314159;
+
+CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
+ grpc_compression_algorithm algorithm) {
+ switch (algorithm) {
+ case GRPC_COMPRESS_NONE:
+ return CompressionType::NONE;
+ case GRPC_COMPRESS_GZIP:
+ return CompressionType::GZIP;
+ case GRPC_COMPRESS_DEFLATE:
+ return CompressionType::DEFLATE;
+ default:
+ GPR_ASSERT(false);
+ }
+}
} // namespace
InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel)
@@ -257,18 +272,18 @@ void InteropClient::DoLargeUnary() {
void InteropClient::DoLargeCompressedUnary() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
- for (const auto payload_type : payload_types) {
- for (const auto compression_type : compression_types) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
+ for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
- CompressionType_Name(compression_type).c_str(),
- PayloadType_Name(payload_type).c_str());
+ CompressionType_Name(compression_types[j]).c_str(),
+ PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix);
SimpleRequest request;
SimpleResponse response;
- request.set_response_type(payload_type);
- request.set_response_compression(compression_type);
+ request.set_response_type(payload_types[i]);
+ request.set_response_compression(compression_types[j]);
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix);
gpr_free(log_suffix);
@@ -333,21 +348,21 @@ void InteropClient::DoResponseCompressedStreaming() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
- for (const auto payload_type : payload_types) {
- for (const auto compression_type : compression_types) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
+ for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
ClientContext context;
InteropClientContextInspector inspector(context);
StreamingOutputCallRequest request;
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
- CompressionType_Name(compression_type).c_str(),
- PayloadType_Name(payload_type).c_str());
+ CompressionType_Name(compression_types[j]).c_str(),
+ PayloadType_Name(payload_types[i]).c_str());
gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
- request.set_response_type(payload_type);
- request.set_response_compression(compression_type);
+ request.set_response_type(payload_types[i]);
+ request.set_response_compression(compression_types[j]);
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
ResponseParameters* response_parameter =