diff options
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 319 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 118 | ||||
-rw-r--r-- | test/cpp/end2end/filter_end2end_test.cc | 5 | ||||
-rw-r--r-- | test/cpp/end2end/mock_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 27 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 2 |
6 files changed, 468 insertions, 5 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 32e8a41795..0b5215ef8e 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -484,6 +484,81 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { EXPECT_TRUE(recv_status.ok()); } +// Two pings and a final pong. +TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 never comes up since no op is performed + std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); + + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->Write(send_request, tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + + EXPECT_EQ(send_request.message(), recv_request.message()); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(5)); + srv_stream.Read(&recv_request, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.Finish(send_response, Status::OK, tag(8)); + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking) + .Expect(8, true) + .Expect(9, true) + .Verify(cq_.get()); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // One ping, two pongs. TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ResetStub(); @@ -540,6 +615,112 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, two pongs. Using WriteAndFinish API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5)); + cli_stream->Read(&recv_response, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, two pongs. Using WriteLast API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteLast(send_response, WriteOptions(), tag(5)); + cli_stream->Read(&recv_response, tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // One ping, one pong. TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ResetStub(); @@ -599,6 +780,144 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, one pong. Using server:WriteAndFinish api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6)); + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, one pong. Using server:WriteLast api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteLast(send_response, WriteOptions(), tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Expect(8, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // Metadata tests TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ResetStub(); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index df78557c43..df71777e4b 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -702,6 +702,21 @@ TEST_P(End2endTest, RequestStreamOneRequest) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, RequestStreamTwoRequests) { ResetStub(); EchoRequest request; @@ -718,6 +733,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request)); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, ResponseStream) { ResetStub(); EchoRequest request; @@ -738,6 +769,27 @@ TEST_P(End2endTest, ResponseStream) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerUseCoalescingApi, "1"); + + auto stream = stub_->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "2"); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, BidiStream) { ResetStub(); EchoRequest request; @@ -770,6 +822,39 @@ TEST_P(End2endTest, BidiStream) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, BidiStreamWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "3"); + context.set_initial_metadata_corked(true); + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "1"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "2"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { @@ -1045,6 +1130,39 @@ TEST_P(End2endTest, BinaryTrailerTest) { EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second))); } +TEST_P(End2endTest, ExpectErrorTest) { + ResetStub(); + + std::vector<ErrorStatus> expected_status; + expected_status.emplace_back(); + expected_status.back().set_code(13); // INTERNAL + expected_status.back().set_error_message("text error message"); + expected_status.back().set_binary_error_details("text error details"); + expected_status.emplace_back(); + expected_status.back().set_code(13); // INTERNAL + expected_status.back().set_error_message("text error message"); + expected_status.back().set_binary_error_details( + "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB"); + + for (auto iter = expected_status.begin(); iter != expected_status.end(); + ++iter) { + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("Hello"); + auto* error = request.mutable_param()->mutable_expected_error(); + error->set_code(iter->code()); + error->set_error_message(iter->error_message()); + error->set_binary_error_details(iter->binary_error_details()); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(iter->code(), s.error_code()); + EXPECT_EQ(iter->error_message(), s.error_message()); + EXPECT_EQ(iter->binary_error_details(), s.error_details()); + } +} + ////////////////////////////////////////////////////////////////////////// // Test with and without a proxy. class ProxyEnd2endTest : public End2endTest { diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index bd384f68b4..2f873eeaa8 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -122,8 +122,9 @@ class ChannelDataImpl : public ChannelData { class CallDataImpl : public CallData { public: - void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - TransportStreamOp* op) override { + void StartTransportStreamOpBatch(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + TransportStreamOpBatch* op) override { // Incrementing the counter could be done from Init(), but we want // to test that the individual methods are actually called correctly. if (op->recv_initial_metadata() != nullptr) IncrementCallCounter(); diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index d6664da5a0..fdb2732e50 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -89,7 +89,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> final return true; } - bool Write(const EchoRequest& msg, const WriteOptions& options) override { + bool Write(const EchoRequest& msg, WriteOptions options) override { gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str()); last_message_ = msg.message(); return true; diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 59d36e9cb5..b473dd1f52 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -92,6 +92,11 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, gpr_log(GPR_ERROR, "The request should not reach application handler."); GPR_ASSERT(0); } + if (request->has_param() && request->param().has_expected_error()) { + const auto& error = request->param().expected_error(); + return Status(static_cast<StatusCode>(error.code()), error.error_message(), + error.binary_error_details()); + } int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); if (server_try_cancel > DO_NOT_CANCEL) { @@ -246,6 +251,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + int server_coalescing_api = GetIntValueFromMetadata( + kServerUseCoalescingApi, context->client_metadata(), 0); + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(context); return Status::CANCELLED; @@ -260,7 +268,11 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, for (int i = 0; i < kNumResponseStreamsMsgs; i++) { response.set_message(request->message() + grpc::to_string(i)); - writer->Write(response); + if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) { + writer->WriteLast(response, WriteOptions()); + } else { + writer->Write(response); + } } if (server_try_cancel_thd != nullptr) { @@ -305,10 +317,21 @@ Status TestServiceImpl::BidiStream( new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } + // kServerFinishAfterNReads suggests after how many reads, the server should + // write the last message and send status (coalesced using WriteLast) + int server_write_last = GetIntValueFromMetadata( + kServerFinishAfterNReads, context->client_metadata(), 0); + + int read_counts = 0; while (stream->Read(&request)) { + read_counts++; gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); - stream->Write(response); + if (read_counts == server_write_last) { + stream->WriteLast(response, WriteOptions()); + } else { + stream->Write(response); + } } if (server_try_cancel_thd != nullptr) { diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 88e0be7bca..b1f02f93f6 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -48,6 +48,8 @@ const int kNumResponseStreamsMsgs = 3; const char* const kServerCancelAfterReads = "cancel_after_reads"; const char* const kServerTryCancelRequest = "server_try_cancel"; const char* const kDebugInfoTrailerKey = "debug-info-bin"; +const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; +const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; typedef enum { DO_NOT_CANCEL = 0, |