aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer_corpus/crash-59a56fa18034a104fb9f16cd58071b6ff93b8756bin0 -> 268 bytes
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer_corpus/poc-c726ee220e980ed6ad17809fd9efe2844ee61555ac08e4f88afd8901cc2dd53abin0 -> 229 bytes
-rw-r--r--test/core/end2end/tests/keepalive_timeout.c20
-rw-r--r--test/core/iomgr/resource_quota_test.c52
-rw-r--r--test/cpp/end2end/async_end2end_test.cc319
-rw-r--r--test/cpp/end2end/end2end_test.cc85
-rw-r--r--test/cpp/end2end/mock_test.cc2
-rw-r--r--test/cpp/end2end/test_service_impl.cc22
-rw-r--r--test/cpp/end2end/test_service_impl.h2
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc191
10 files changed, 683 insertions, 10 deletions
diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/crash-59a56fa18034a104fb9f16cd58071b6ff93b8756 b/test/core/end2end/fuzzers/api_fuzzer_corpus/crash-59a56fa18034a104fb9f16cd58071b6ff93b8756
new file mode 100644
index 0000000000..1460bc9fbf
--- /dev/null
+++ b/test/core/end2end/fuzzers/api_fuzzer_corpus/crash-59a56fa18034a104fb9f16cd58071b6ff93b8756
Binary files differ
diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/poc-c726ee220e980ed6ad17809fd9efe2844ee61555ac08e4f88afd8901cc2dd53a b/test/core/end2end/fuzzers/api_fuzzer_corpus/poc-c726ee220e980ed6ad17809fd9efe2844ee61555ac08e4f88afd8901cc2dd53a
new file mode 100644
index 0000000000..01428693cf
--- /dev/null
+++ b/test/core/end2end/fuzzers/api_fuzzer_corpus/poc-c726ee220e980ed6ad17809fd9efe2844ee61555ac08e4f88afd8901cc2dd53a
Binary files differ
diff --git a/test/core/end2end/tests/keepalive_timeout.c b/test/core/end2end/tests/keepalive_timeout.c
index 4296be3619..44b6e12abc 100644
--- a/test/core/end2end/tests/keepalive_timeout.c
+++ b/test/core/end2end/tests/keepalive_timeout.c
@@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/transport/chttp2/transport/frame_ping.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/env.h"
@@ -109,13 +110,15 @@ static void test_keepalive_timeout(grpc_end2end_test_config config) {
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
- grpc_arg keepalive_args[2];
- keepalive_args[0].type = GRPC_ARG_INTEGER;
- keepalive_args[0].key = GRPC_ARG_HTTP2_KEEPALIVE_TIME;
- keepalive_args[0].value.integer = 2;
- keepalive_args[1].type = GRPC_ARG_INTEGER;
- keepalive_args[1].key = GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT;
- keepalive_args[1].value.integer = 0;
+ grpc_arg keepalive_args[] = {{.type = GRPC_ARG_INTEGER,
+ .key = GRPC_ARG_CLIENT_KEEPALIVE_TIME_S,
+ .value.integer = 2},
+ {.type = GRPC_ARG_INTEGER,
+ .key = GRPC_ARG_CLIENT_KEEPALIVE_TIMEOUT_S,
+ .value.integer = 0},
+ {.type = GRPC_ARG_INTEGER,
+ .key = GRPC_ARG_HTTP2_BDP_PROBE,
+ .value.integer = 1}};
grpc_channel_args *client_args = NULL;
client_args = grpc_channel_args_copy_and_add(client_args, keepalive_args, 2);
@@ -134,6 +137,9 @@ static void test_keepalive_timeout(grpc_end2end_test_config config) {
grpc_call_error error;
grpc_slice details;
+ /* Disable ping ack to trigger the keepalive timeout */
+ grpc_set_disable_ping_ack(true);
+
c = grpc_channel_create_call(
f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"),
diff --git a/test/core/iomgr/resource_quota_test.c b/test/core/iomgr/resource_quota_test.c
index a5b28f210d..ebce8b9da6 100644
--- a/test/core/iomgr/resource_quota_test.c
+++ b/test/core/iomgr/resource_quota_test.c
@@ -682,6 +682,56 @@ static void test_one_slice_deleted_late(void) {
}
}
+static void test_resize_to_zero(void) {
+ gpr_log(GPR_INFO, "** test_resize_to_zero **");
+ grpc_resource_quota *q = grpc_resource_quota_create("test_resize_to_zero");
+ grpc_resource_quota_resize(q, 0);
+ grpc_resource_quota_unref(q);
+}
+
+static void test_negative_rq_free_pool(void) {
+ gpr_log(GPR_INFO, "** test_negative_rq_free_pool **");
+ grpc_resource_quota *q =
+ grpc_resource_quota_create("test_negative_rq_free_pool");
+ grpc_resource_quota_resize(q, 1024);
+
+ grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
+
+ grpc_resource_user_slice_allocator alloc;
+ int num_allocs = 0;
+ grpc_resource_user_slice_allocator_init(&alloc, usr, inc_int_cb, &num_allocs);
+
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+
+ {
+ const int start_allocs = num_allocs;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc_slices(&exec_ctx, &alloc, 1024, 1, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(num_allocs == start_allocs + 1);
+ }
+
+ grpc_resource_quota_resize(q, 512);
+
+ double eps = 0.0001;
+ GPR_ASSERT(grpc_resource_quota_get_memory_pressure(q) < 1 + eps);
+ GPR_ASSERT(grpc_resource_quota_get_memory_pressure(q) > 1 - eps);
+
+ {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_unref(&exec_ctx, usr);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ grpc_resource_quota_unref(q);
+ {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+}
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
@@ -705,6 +755,8 @@ int main(int argc, char **argv) {
test_reclaimers_can_be_posted_repeatedly();
test_one_slice();
test_one_slice_deleted_late();
+ test_resize_to_zero();
+ test_negative_rq_free_pool();
grpc_shutdown();
return 0;
}
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 32e8a41795..0b5215ef8e 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -484,6 +484,81 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// Two pings and a final pong.
+TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 never comes up since no op is performed
+ std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
+ stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
+
+ service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->Write(send_request, tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
+ srv_stream.Read(&recv_request, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Finish(send_response, Status::OK, tag(8));
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking)
+ .Expect(8, true)
+ .Expect(9, true)
+ .Verify(cq_.get());
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, two pongs.
TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ResetStub();
@@ -540,6 +615,112 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// One ping, two pongs. Using WriteAndFinish API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, two pongs. Using WriteLast API
+TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
+ stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
+
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ cli_stream->Read(&recv_response, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
+ cli_stream->Read(&recv_response, tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// One ping, one pong.
TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ResetStub();
@@ -599,6 +780,144 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
EXPECT_TRUE(recv_status.ok());
}
+// One ping, one pong. Using server:WriteAndFinish api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
+ cli_stream->Read(&recv_response, tag(7));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(8));
+ Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
+// One ping, one pong. Using server:WriteLast api
+TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ cli_ctx.set_initial_metadata_corked(true);
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
+ cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
+
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
+
+ cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
+
+ // 65536(64KB) is the default flow control window size. Should change this
+ // number when default flow control window size changes. For the write of
+ // send_request larger than the flow control window size, tag:3 will not come
+ // up until server read is initiated. For write of send_request smaller than
+ // the flow control window size, the request can take the free ride with
+ // initial metadata due to coalescing, thus write tag:3 will come up here.
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(3, true)
+ .Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ }
+
+ srv_stream.Read(&recv_request, tag(4));
+
+ if (GetParam().message_content.length() < 65536) {
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ } else {
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
+ }
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ srv_stream.Read(&recv_request, tag(5));
+ Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
+ srv_stream.Finish(Status::OK, tag(7));
+ cli_stream->Read(&recv_response, tag(8));
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, true)
+ .Expect(7, true)
+ .Expect(8, true)
+ .Verify(cq_.get());
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->Finish(&recv_status, tag(9));
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
+
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Metadata tests
TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ResetStub();
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index df78557c43..d3a83b188f 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -702,6 +702,21 @@ TEST_P(End2endTest, RequestStreamOneRequest) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ context.set_initial_metadata_corked(true);
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ stream->WriteLast(request, WriteOptions());
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, RequestStreamTwoRequests) {
ResetStub();
EchoRequest request;
@@ -718,6 +733,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ context.set_initial_metadata_corked(true);
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ EXPECT_TRUE(stream->Write(request));
+ stream->WriteLast(request, WriteOptions());
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), "hellohello");
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, ResponseStream) {
ResetStub();
EchoRequest request;
@@ -738,6 +769,27 @@ TEST_P(End2endTest, ResponseStream) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+ context.AddMetadata(kServerUseCoalescingApi, "1");
+
+ auto stream = stub_->ResponseStream(&context, request);
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "0");
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "1");
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message() + "2");
+ EXPECT_FALSE(stream->Read(&response));
+
+ Status s = stream->Finish();
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, BidiStream) {
ResetStub();
EchoRequest request;
@@ -770,6 +822,39 @@ TEST_P(End2endTest, BidiStream) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ context.AddMetadata(kServerFinishAfterNReads, "3");
+ context.set_initial_metadata_corked(true);
+ grpc::string msg("hello");
+
+ auto stream = stub_->BidiStream(&context);
+
+ request.set_message(msg + "0");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message(msg + "1");
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ request.set_message(msg + "2");
+ stream->WriteLast(request, WriteOptions());
+ EXPECT_TRUE(stream->Read(&response));
+ EXPECT_EQ(response.message(), request.message());
+
+ EXPECT_FALSE(stream->Read(&response));
+ EXPECT_FALSE(stream->Read(&response));
+
+ Status s = stream->Finish();
+ EXPECT_TRUE(s.ok());
+}
+
// Talk to the two services with the same name but different package names.
// The two stubs are created on the same channel.
TEST_P(End2endTest, DiffPackageServices) {
diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc
index d6664da5a0..fdb2732e50 100644
--- a/test/cpp/end2end/mock_test.cc
+++ b/test/cpp/end2end/mock_test.cc
@@ -89,7 +89,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> final
return true;
}
- bool Write(const EchoRequest& msg, const WriteOptions& options) override {
+ bool Write(const EchoRequest& msg, WriteOptions options) override {
gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str());
last_message_ = msg.message();
return true;
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 59d36e9cb5..11729c425c 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -246,6 +246,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ int server_coalescing_api = GetIntValueFromMetadata(
+ kServerUseCoalescingApi, context->client_metadata(), 0);
+
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
@@ -260,7 +263,11 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
response.set_message(request->message() + grpc::to_string(i));
- writer->Write(response);
+ if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) {
+ writer->WriteLast(response, WriteOptions());
+ } else {
+ writer->Write(response);
+ }
}
if (server_try_cancel_thd != nullptr) {
@@ -305,10 +312,21 @@ Status TestServiceImpl::BidiStream(
new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
+ // kServerFinishAfterNReads suggests after how many reads, the server should
+ // write the last message and send status (coalesced using WriteLast)
+ int server_write_last = GetIntValueFromMetadata(
+ kServerFinishAfterNReads, context->client_metadata(), 0);
+
+ int read_counts = 0;
while (stream->Read(&request)) {
+ read_counts++;
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
- stream->Write(response);
+ if (read_counts == server_write_last) {
+ stream->WriteLast(response, WriteOptions());
+ } else {
+ stream->Write(response);
+ }
}
if (server_try_cancel_thd != nullptr) {
diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h
index 88e0be7bca..b1f02f93f6 100644
--- a/test/cpp/end2end/test_service_impl.h
+++ b/test/cpp/end2end/test_service_impl.h
@@ -48,6 +48,8 @@ const int kNumResponseStreamsMsgs = 3;
const char* const kServerCancelAfterReads = "cancel_after_reads";
const char* const kServerTryCancelRequest = "server_try_cancel";
const char* const kDebugInfoTrailerKey = "debug-info-bin";
+const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
+const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
typedef enum {
DO_NOT_CANCEL = 0,
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
index 00e37f7912..42a5381e27 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
@@ -240,6 +240,173 @@ static void BM_StreamingPingPongMsgs(benchmark::State& state) {
state.SetBytesProcessed(msg_size * state.iterations() * 2);
}
+// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
+// messages in each call) in a loop on a single channel. Different from
+// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
+// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
+// sendmsg syscalls for streaming by coalescing 1. initial metadata with first
+// message; 2. final streaming message with trailing metadata.
+//
+// First parmeter (i.e state.range(0)): Message size (in bytes) to use
+// Second parameter (i.e state.range(1)): Number of ping pong messages.
+// Note: One ping-pong means two messages (one from client to server and
+// the other from server to client):
+// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
+// API and WriteLast API for server.
+template <class Fixture, class ClientContextMutator, class ServerContextMutator>
+static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
+ const int msg_size = state.range(0);
+ const int max_ping_pongs = state.range(1);
+ // This options is used to test out server API: WriteLast and WriteAndFinish
+ // respectively, since we can not use both of them on server side at the same
+ // time. Value 1 means we are testing out the WriteAndFinish API, and
+ // otherwise we are testing out the WriteLast API.
+ const int write_and_finish = state.range(2);
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ EchoRequest send_request;
+ EchoRequest recv_request;
+
+ if (msg_size > 0) {
+ send_request.set_message(std::string(msg_size, 'a'));
+ send_response.set_message(std::string(msg_size, 'b'));
+ }
+
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+
+ while (state.KeepRunning()) {
+ ServerContext svr_ctx;
+ ServerContextMutator svr_ctx_mut(&svr_ctx);
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+
+ ClientContext cli_ctx;
+ ClientContextMutator cli_ctx_mut(&cli_ctx);
+ cli_ctx.set_initial_metadata_corked(true);
+ // tag:1 here will never comes up, since we are not performing any op due
+ // to initial metadata coalescing.
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+
+ void* t;
+ bool ok;
+ int need_tags;
+
+ // Send 'max_ping_pongs' number of ping pong messages
+ int ping_pong_cnt = 0;
+ while (ping_pong_cnt < max_ping_pongs) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ request_rw->WriteLast(send_request, WriteOptions(), tag(2));
+ } else {
+ request_rw->Write(send_request, tag(2)); // Start client send
+ }
+
+ need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
+
+ if (ping_pong_cnt == 0) {
+ // wait for the server call structure (call_hook, etc.) to be
+ // initialized (async stream between client side and server side
+ // established). It is necessary when client init metadata is
+ // coalesced
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ // In some cases tag:2 comes before tag:0 (write tag comes out
+ // first), this while loop is to make sure get tag:0.
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ }
+
+ response_rw.Read(&recv_request, tag(3)); // Start server recv
+ request_rw->Read(&recv_response, tag(4)); // Start client recv
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+
+ // If server recv is complete, start the server send operation
+ if (i == 3) {
+ if (ping_pong_cnt == max_ping_pongs - 1) {
+ if (write_and_finish == 1) {
+ response_rw.WriteAndFinish(send_response, WriteOptions(),
+ Status::OK, tag(5));
+ } else {
+ response_rw.WriteLast(send_response, WriteOptions(), tag(5));
+ // WriteLast buffers the write, so neither server write op nor
+ // client read op will finish inside the while loop.
+ need_tags &= ~(1 << 4);
+ need_tags &= ~(1 << 5);
+ }
+ } else {
+ response_rw.Write(send_response, tag(5));
+ }
+ }
+
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ ping_pong_cnt++;
+ }
+
+ if (max_ping_pongs == 0) {
+ need_tags = (1 << 6) | (1 << 7) | (1 << 8);
+ } else {
+ if (write_and_finish == 1) {
+ need_tags = (1 << 8);
+ } else {
+ // server's buffered write and the client's read of the buffered write
+ // tags should come up.
+ need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8);
+ }
+ }
+
+ // No message write or initial metadata write happened yet.
+ if (max_ping_pongs == 0) {
+ request_rw->WritesDone(tag(6));
+ // wait for server call data structure(call_hook, etc.) to be
+ // initialized, since initial metadata is corked.
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ while ((int)(intptr_t)t != 0) {
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ }
+ response_rw.Finish(Status::OK, tag(7));
+ } else {
+ if (write_and_finish != 1) {
+ response_rw.Finish(Status::OK, tag(7));
+ }
+ }
+
+ Status recv_status;
+ request_rw->Finish(&recv_status, tag(8));
+
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+
+ GPR_ASSERT(recv_status.ok());
+ }
+ }
+
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
+}
+
/*******************************************************************************
* CONFIGURATIONS
*/
@@ -270,6 +437,30 @@ BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator,
BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator)
->Range(0, 128 * 1024 * 1024);
+// Generate Args for StreamingPingPongWithCoalescingApi benchmarks. Currently
+// generates args for only "small streams" (i.e streams with 0, 1 or 2 messages)
+static void StreamingPingPongWithCoalescingApiArgs(
+ benchmark::internal::Benchmark* b) {
+ int msg_size = 0;
+
+ b->Args(
+ {0, 0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
+ b->Args(
+ {0, 0, 1}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
+
+ for (msg_size = 0; msg_size <= 128 * 1024 * 1024;
+ msg_size == 0 ? msg_size++ : msg_size *= 8) {
+ b->Args({msg_size, 1, 0});
+ b->Args({msg_size, 2, 0});
+ b->Args({msg_size, 1, 1});
+ b->Args({msg_size, 2, 1});
+ }
+}
+
+BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, InProcessCHTTP2,
+ NoOpMutator, NoOpMutator)
+ ->Apply(StreamingPingPongWithCoalescingApiArgs);
+
} // namespace testing
} // namespace grpc