aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2016-05-06 15:50:15 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2016-05-06 15:50:15 -0700
commit26a1785cfc5a2118b5fbb4c2e7e89edd6a241a61 (patch)
tree851f2b92ccd86254bbb0d973462932a1d555fab6 /test/cpp
parent7d099a5c907d9ab164416ea875ae07a3074adedb (diff)
parentc916c1ce17fe273dff881e1c87e17ad991c2078a (diff)
Resolve conflicts
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc338
-rw-r--r--test/cpp/qps/driver.cc3
2 files changed, 217 insertions, 124 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 0de6c74c47..e0649bd694 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -51,6 +51,7 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/string_ref_helper.h"
+#include "test/cpp/util/test_credentials_provider.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/ev_posix.h"
@@ -58,6 +59,7 @@
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
+using grpc::testing::kTlsCredentialsType;
using std::chrono::system_clock;
GPR_TLS_DECL(g_is_async_end2end_test);
@@ -219,20 +221,37 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
}
};
-class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
+class TestScenario {
+ public:
+ TestScenario(bool non_block, const grpc::string& creds_type,
+ const grpc::string& content)
+ : disable_blocking(non_block),
+ credentials_type(creds_type),
+ message_content(content) {}
+ void Log() const {
+ gpr_log(GPR_INFO,
+ "Scenario: disable_blocking %d, credentials %s, message size %d",
+ disable_blocking, credentials_type.c_str(), message_content.size());
+ }
+ bool disable_blocking;
+ const grpc::string credentials_type;
+ const grpc::string message_content;
+};
+
+class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
- AsyncEnd2endTest() {}
+ AsyncEnd2endTest() { GetParam().Log(); }
void SetUp() GRPC_OVERRIDE {
- poll_overrider_.reset(new PollingOverrider(!GetParam()));
+ poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking));
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
- builder.AddListeningPort(server_address_.str(),
- grpc::InsecureServerCredentials());
+ auto server_creds = GetServerCredentials(GetParam().credentials_type);
+ builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
@@ -258,8 +277,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
}
void ResetStub() {
+ ChannelArguments args;
+ auto channel_creds =
+ GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
- CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
@@ -275,22 +297,23 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -330,7 +353,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -338,23 +361,22 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10));
- Verifier(GetParam()).Verify(cq_.get(), time_now);
- Verifier(GetParam()).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam())
- .Expect(3, true)
- .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam())
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
@@ -375,41 +397,48 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ServerContext srv_ctx;
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(1, true)
+ .Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -428,39 +457,45 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ServerContext srv_ctx;
ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
- Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -478,41 +513,48 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ServerContext srv_ctx;
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -531,7 +573,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
@@ -544,7 +586,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@@ -557,11 +599,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
-
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -580,7 +622,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@@ -589,15 +631,15 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second,
ToString(server_initial_metadata.find(meta1.first)->second));
@@ -607,10 +649,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -629,7 +672,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@@ -638,20 +681,22 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
+ response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -675,7 +720,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2(
"key2-bin",
@@ -699,7 +744,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@@ -711,9 +756,9 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second,
ToString(server_initial_metadata.find(meta3.first)->second));
@@ -725,11 +770,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
+ response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -754,7 +801,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -762,15 +809,15 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel();
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
}
@@ -789,7 +836,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -797,25 +844,29 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
- EXPECT_FALSE(srv_ctx.IsCancelled());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
+ EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
+ ChannelArguments args;
+ auto channel_creds =
+ GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
- CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
stub = grpc::testing::UnimplementedService::NewStub(channel);
EchoRequest send_request;
@@ -823,12 +874,12 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
Status recv_status;
ClientContext cli_ctx;
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message());
@@ -875,23 +926,25 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'RequestStream' call on client
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + std::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx));
- Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(tag_idx, true)
+ .Verify(cq_.get());
}
cli_stream->WritesDone(tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
@@ -899,7 +952,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
@@ -909,7 +962,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -967,13 +1020,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server sends the final message and cancelled status (but the RPC is
// already cancelled at this point. So we expect the operation to fail)
srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
// TODO(sreek): The expectation here should be true. This is a bug (github
// issue #4972)
- Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, false).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1007,13 +1060,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'ResponseStream' call on the client
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
@@ -1022,7 +1075,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@@ -1032,7 +1085,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -1092,7 +1145,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
- Verifier(GetParam())
+ Verifier(GetParam().disable_blocking)
.Expect(tag_idx, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
@@ -1103,11 +1156,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server finishes the stream (but the RPC is already cancelled)
srv_stream.Finish(Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1142,19 +1195,19 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
bool expected_cq_result = true;
bool ignore_cq_result = false;
@@ -1162,7 +1215,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@@ -1172,7 +1225,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -1272,10 +1325,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// know that cq results are supposed to return false on server.
srv_stream.Finish(Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1317,11 +1370,48 @@ TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
}
+std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
+ bool test_secure,
+ int test_big_limit) {
+ std::vector<TestScenario> scenarios;
+ std::vector<grpc::string> credentials_types;
+ std::vector<grpc::string> messages;
+
+ credentials_types.push_back(kInsecureCredentialsType);
+ auto sec_list = GetSecureCredentialsTypeList();
+ for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
+ credentials_types.push_back(*sec);
+ }
+
+ messages.push_back("Hello");
+ for (int sz = 1; sz < test_big_limit; sz *= 2) {
+ grpc::string big_msg;
+ for (int i = 0; i < sz * 1024; i++) {
+ char c = 'a' + (i % 26);
+ big_msg += c;
+ }
+ messages.push_back(big_msg);
+ }
+
+ for (auto cred = credentials_types.begin(); cred != credentials_types.end();
+ ++cred) {
+ for (auto msg = messages.begin(); msg != messages.end(); msg++) {
+ scenarios.push_back(TestScenario(false, *cred, *msg));
+ if (test_disable_blocking) {
+ scenarios.push_back(TestScenario(true, *cred, *msg));
+ }
+ }
+ }
+ return scenarios;
+}
+
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
- ::testing::Values(false, true));
+ ::testing::ValuesIn(CreateTestScenarios(true, true,
+ 1024)));
INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
AsyncEnd2endServerTryCancelTest,
- ::testing::Values(false));
+ ::testing::ValuesIn(CreateTestScenarios(false, false,
+ 0)));
} // namespace
} // namespace testing
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 2583ceb819..04b2b453f9 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -83,6 +83,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
auto stub = WorkerService::NewStub(
CreateChannel(*it, InsecureChannelCredentials()));
grpc::ClientContext ctx;
+ ctx.set_fail_fast(false);
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
@@ -166,6 +167,7 @@ namespace runsc {
static ClientContext* AllocContext(list<ClientContext>* contexts) {
contexts->emplace_back();
auto context = &contexts->back();
+ context->set_fail_fast(false);
return context;
}
@@ -435,6 +437,7 @@ void RunQuit() {
CreateChannel(workers[i], InsecureChannelCredentials()));
Void dummy;
grpc::ClientContext ctx;
+ ctx.set_fail_fast(false);
GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
}
}