aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc319
-rw-r--r--test/cpp/end2end/end2end_test.cc118
-rw-r--r--test/cpp/end2end/filter_end2end_test.cc5
-rw-r--r--test/cpp/end2end/mock_test.cc2
-rw-r--r--test/cpp/end2end/test_service_impl.cc27
-rw-r--r--test/cpp/end2end/test_service_impl.h2
6 files changed, 468 insertions, 5 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 32e8a41795..0b5215ef8e 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -484,6 +484,81 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// Two pings and a final pong.
+TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 never comes up since no op is performed
+ std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
+ stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
+
+ service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->Write(send_request, tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
+ srv_stream.Read(&recv_request, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Finish(send_response, Status::OK, tag(8));
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking)
+ .Expect(8, true)
+ .Expect(9, true)
+ .Verify(cq_.get());
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, two pongs.
TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ResetStub();
@@ -540,6 +615,112 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// One ping, two pongs. Using WriteAndFinish API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, two pongs. Using WriteLast API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, one pong.
TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ResetStub();
@@ -599,6 +780,144 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// One ping, one pong. Using server:WriteAndFinish api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, one pong. Using server:WriteLast api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Expect(8, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Metadata tests
TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ResetStub();
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index df78557c43..df71777e4b 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -702,6 +702,21 @@ TEST_P(End2endTest, RequestStreamOneRequest) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ context.set_initial_metadata_corked(true);
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ stream->WriteLast(request, WriteOptions());
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, RequestStreamTwoRequests) {
ResetStub();
EchoRequest request;
@@ -718,6 +733,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ context.set_initial_metadata_corked(true);
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ EXPECT_TRUE(stream->Write(request));
+ stream->WriteLast(request, WriteOptions());
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), "hellohello");
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, ResponseStream) {
ResetStub();
EchoRequest request;
@@ -738,6 +769,27 @@ TEST_P(End2endTest, ResponseStream) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+ context.AddMetadata(kServerUseCoalescingApi, "1");
+
+ auto stream = stub_->ResponseStream(&context, request);
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "0");
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "1");
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "2");
+ EXPECT_FALSE(stream->Read(&response));
+
+ Status s = stream->Finish();
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, BidiStream) {
ResetStub();
EchoRequest request;
@@ -770,6 +822,39 @@ TEST_P(End2endTest, BidiStream) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ context.AddMetadata(kServerFinishAfterNReads, "3");
+ context.set_initial_metadata_corked(true);
+ grpc::string msg("hello");
+
+ auto stream = stub_->BidiStream(&context);
+
+ request.set_message(msg + "0");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message(msg + "1");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message(msg + "2");
+ stream->WriteLast(request, WriteOptions());
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ EXPECT_FALSE(stream->Read(&response));
+ EXPECT_FALSE(stream->Read(&response));
+
+ Status s = stream->Finish();
+ EXPECT_TRUE(s.ok());
+}
+
// Talk to the two services with the same name but different package names.
// The two stubs are created on the same channel.
TEST_P(End2endTest, DiffPackageServices) {
@@ -1045,6 +1130,39 @@ TEST_P(End2endTest, BinaryTrailerTest) {
EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
}
+TEST_P(End2endTest, ExpectErrorTest) {
+ ResetStub();
+
+ std::vector<ErrorStatus> expected_status;
+ expected_status.emplace_back();
+ expected_status.back().set_code(13); // INTERNAL
+ expected_status.back().set_error_message("text error message");
+ expected_status.back().set_binary_error_details("text error details");
+ expected_status.emplace_back();
+ expected_status.back().set_code(13); // INTERNAL
+ expected_status.back().set_error_message("text error message");
+ expected_status.back().set_binary_error_details(
+ "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
+
+ for (auto iter = expected_status.begin(); iter != expected_status.end();
+ ++iter) {
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("Hello");
+ auto* error = request.mutable_param()->mutable_expected_error();
+ error->set_code(iter->code());
+ error->set_error_message(iter->error_message());
+ error->set_binary_error_details(iter->binary_error_details());
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(iter->code(), s.error_code());
+ EXPECT_EQ(iter->error_message(), s.error_message());
+ EXPECT_EQ(iter->binary_error_details(), s.error_details());
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
// Test with and without a proxy.
class ProxyEnd2endTest : public End2endTest {
diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc
index bd384f68b4..2f873eeaa8 100644
--- a/test/cpp/end2end/filter_end2end_test.cc
+++ b/test/cpp/end2end/filter_end2end_test.cc
@@ -122,8 +122,9 @@ class ChannelDataImpl : public ChannelData {
class CallDataImpl : public CallData {
public:
- void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- TransportStreamOp* op) override {
+ void StartTransportStreamOpBatch(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ TransportStreamOpBatch* op) override {
// Incrementing the counter could be done from Init(), but we want
// to test that the individual methods are actually called correctly.
if (op->recv_initial_metadata() != nullptr) IncrementCallCounter();
diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc
index d6664da5a0..fdb2732e50 100644
--- a/test/cpp/end2end/mock_test.cc
+++ b/test/cpp/end2end/mock_test.cc
@@ -89,7 +89,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> final
return true;
}
- bool Write(const EchoRequest& msg, const WriteOptions& options) override {
+ bool Write(const EchoRequest& msg, WriteOptions options) override {
gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str());
last_message_ = msg.message();
return true;
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 59d36e9cb5..b473dd1f52 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -92,6 +92,11 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
gpr_log(GPR_ERROR, "The request should not reach application handler.");
GPR_ASSERT(0);
}
+ if (request->has_param() && request->param().has_expected_error()) {
+ const auto& error = request->param().expected_error();
+ return Status(static_cast<StatusCode>(error.code()), error.error_message(),
+ error.binary_error_details());
+ }
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel > DO_NOT_CANCEL) {
@@ -246,6 +251,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ int server_coalescing_api = GetIntValueFromMetadata(
+ kServerUseCoalescingApi, context->client_metadata(), 0);
+
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
@@ -260,7 +268,11 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
response.set_message(request->message() + grpc::to_string(i));
- writer->Write(response);
+ if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) {
+ writer->WriteLast(response, WriteOptions());
+ } else {
+ writer->Write(response);
+ }
}
if (server_try_cancel_thd != nullptr) {
@@ -305,10 +317,21 @@ Status TestServiceImpl::BidiStream(
new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
+ // kServerFinishAfterNReads suggests after how many reads, the server should
+ // write the last message and send status (coalesced using WriteLast)
+ int server_write_last = GetIntValueFromMetadata(
+ kServerFinishAfterNReads, context->client_metadata(), 0);
+
+ int read_counts = 0;
while (stream->Read(&request)) {
+ read_counts++;
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
- stream->Write(response);
+ if (read_counts == server_write_last) {
+ stream->WriteLast(response, WriteOptions());
+ } else {
+ stream->Write(response);
+ }
}
if (server_try_cancel_thd != nullptr) {
diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h
index 88e0be7bca..b1f02f93f6 100644
--- a/test/cpp/end2end/test_service_impl.h
+++ b/test/cpp/end2end/test_service_impl.h
@@ -48,6 +48,8 @@ const int kNumResponseStreamsMsgs = 3;
const char* const kServerCancelAfterReads = "cancel_after_reads";
const char* const kServerTryCancelRequest = "server_try_cancel";
const char* const kDebugInfoTrailerKey = "debug-info-bin";
+const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
+const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
typedef enum {
DO_NOT_CANCEL = 0,