diff options
author | Craig Tiller <ctiller@google.com> | 2017-04-21 15:43:27 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-04-21 15:43:27 -0700 |
commit | ee880fd373e7d08e9dbd2e4560b5ba2cca094753 (patch) | |
tree | 97f51547b45e80d7a1f8b671f873ad865f2d5673 /test/cpp | |
parent | e1523e95c102a3eec48fef34350bca206c0a6546 (diff) | |
parent | 1a0b4cf5eef9542ccb9ad7263a17938bf882bd0f (diff) |
Merge github.com:grpc/grpc into count_now
Diffstat (limited to 'test/cpp')
57 files changed, 3372 insertions, 309 deletions
diff --git a/test/cpp/common/BUILD b/test/cpp/common/BUILD index 0e2db00f0a..48ad583981 100644 --- a/test/cpp/common/BUILD +++ b/test/cpp/common/BUILD @@ -34,3 +34,27 @@ cc_test( srcs = ["alarm_cpp_test.cc"], deps = ["//:grpc++", "//external:gtest", "//test/core/util:gpr_test_util"], ) + +cc_test( + name = "auth_property_iterator_test", + srcs = ["auth_property_iterator_test.cc"], + deps = ["//:grpc++", "//external:gtest", "//test/core/util:gpr_test_util", "//test/cpp/util:test_util"], +) + +cc_test( + name = "channel_arguments_test", + srcs = ["channel_arguments_test.cc"], + deps = ["//:grpc++", "//external:gtest", "//test/core/util:gpr_test_util"], +) + +cc_test( + name = "channel_filter_test", + srcs = ["channel_filter_test.cc"], + deps = ["//:grpc++", "//external:gtest", "//test/core/util:gpr_test_util"], +) + +cc_test( + name = "secure_auth_context_test", + srcs = ["secure_auth_context_test.cc"], + deps = ["//:grpc++", "//external:gtest", "//test/core/util:gpr_test_util", "//test/cpp/util:test_util"], +) diff --git a/test/cpp/common/channel_arguments_test.cc b/test/cpp/common/channel_arguments_test.cc index 190d32ce06..8e7b56cbd6 100644 --- a/test/cpp/common/channel_arguments_test.cc +++ b/test/cpp/common/channel_arguments_test.cc @@ -230,13 +230,6 @@ TEST_F(ChannelArgumentsTest, SetSocketMutator) { EXPECT_TRUE(HasArg(arg1)); // arg0 is replaced by arg1 EXPECT_FALSE(HasArg(arg0)); - - // arg0 is destroyed by grpc_socket_mutator_to_arg(mutator1) - { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - arg1.value.pointer.vtable->destroy(&exec_ctx, arg1.value.pointer.p); - grpc_exec_ctx_finish(&exec_ctx); - } } TEST_F(ChannelArgumentsTest, SetUserAgentPrefix) { @@ -250,6 +243,22 @@ TEST_F(ChannelArgumentsTest, SetUserAgentPrefix) { channel_args_.SetUserAgentPrefix(prefix); EXPECT_TRUE(HasArg(arg0)); + + // Test if the user agent string is copied correctly + ChannelArguments new_channel_args(channel_args_); + grpc_channel_args args; + SetChannelArgs(new_channel_args, &args); + bool found = false; + for (size_t i = 0; i < args.num_args; i++) { + const grpc_arg& arg = args.args[i]; + if (arg.type == GRPC_ARG_STRING && + grpc::string(arg.key) == GRPC_ARG_PRIMARY_USER_AGENT_STRING) { + EXPECT_FALSE(found); + EXPECT_EQ(0, strcmp(arg.value.string, arg0.value.string)); + found = true; + } + } + EXPECT_TRUE(found); } } // namespace testing diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 32e8a41795..0b5215ef8e 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -484,6 +484,81 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { EXPECT_TRUE(recv_status.ok()); } +// Two pings and a final pong. +TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 never comes up since no op is performed + std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); + + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->Write(send_request, tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + + EXPECT_EQ(send_request.message(), recv_request.message()); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(5)); + srv_stream.Read(&recv_request, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.Finish(send_response, Status::OK, tag(8)); + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking) + .Expect(8, true) + .Expect(9, true) + .Verify(cq_.get()); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // One ping, two pongs. TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ResetStub(); @@ -540,6 +615,112 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, two pongs. Using WriteAndFinish API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5)); + cli_stream->Read(&recv_response, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, two pongs. Using WriteLast API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteLast(send_response, WriteOptions(), tag(5)); + cli_stream->Read(&recv_response, tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // One ping, one pong. TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ResetStub(); @@ -599,6 +780,144 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, one pong. Using server:WriteAndFinish api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6)); + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, one pong. Using server:WriteLast api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteLast(send_response, WriteOptions(), tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Expect(8, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // Metadata tests TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ResetStub(); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index df78557c43..df71777e4b 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -702,6 +702,21 @@ TEST_P(End2endTest, RequestStreamOneRequest) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, RequestStreamTwoRequests) { ResetStub(); EchoRequest request; @@ -718,6 +733,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request)); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, ResponseStream) { ResetStub(); EchoRequest request; @@ -738,6 +769,27 @@ TEST_P(End2endTest, ResponseStream) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerUseCoalescingApi, "1"); + + auto stream = stub_->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "2"); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, BidiStream) { ResetStub(); EchoRequest request; @@ -770,6 +822,39 @@ TEST_P(End2endTest, BidiStream) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, BidiStreamWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "3"); + context.set_initial_metadata_corked(true); + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "1"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "2"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { @@ -1045,6 +1130,39 @@ TEST_P(End2endTest, BinaryTrailerTest) { EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second))); } +TEST_P(End2endTest, ExpectErrorTest) { + ResetStub(); + + std::vector<ErrorStatus> expected_status; + expected_status.emplace_back(); + expected_status.back().set_code(13); // INTERNAL + expected_status.back().set_error_message("text error message"); + expected_status.back().set_binary_error_details("text error details"); + expected_status.emplace_back(); + expected_status.back().set_code(13); // INTERNAL + expected_status.back().set_error_message("text error message"); + expected_status.back().set_binary_error_details( + "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB"); + + for (auto iter = expected_status.begin(); iter != expected_status.end(); + ++iter) { + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("Hello"); + auto* error = request.mutable_param()->mutable_expected_error(); + error->set_code(iter->code()); + error->set_error_message(iter->error_message()); + error->set_binary_error_details(iter->binary_error_details()); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(iter->code(), s.error_code()); + EXPECT_EQ(iter->error_message(), s.error_message()); + EXPECT_EQ(iter->binary_error_details(), s.error_details()); + } +} + ////////////////////////////////////////////////////////////////////////// // Test with and without a proxy. class ProxyEnd2endTest : public End2endTest { diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index bd384f68b4..2f873eeaa8 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -122,8 +122,9 @@ class ChannelDataImpl : public ChannelData { class CallDataImpl : public CallData { public: - void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - TransportStreamOp* op) override { + void StartTransportStreamOpBatch(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + TransportStreamOpBatch* op) override { // Incrementing the counter could be done from Init(), but we want // to test that the individual methods are actually called correctly. if (op->recv_initial_metadata() != nullptr) IncrementCallCounter(); diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index d6664da5a0..fdb2732e50 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -89,7 +89,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> final return true; } - bool Write(const EchoRequest& msg, const WriteOptions& options) override { + bool Write(const EchoRequest& msg, WriteOptions options) override { gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str()); last_message_ = msg.message(); return true; diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 59d36e9cb5..b473dd1f52 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -92,6 +92,11 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, gpr_log(GPR_ERROR, "The request should not reach application handler."); GPR_ASSERT(0); } + if (request->has_param() && request->param().has_expected_error()) { + const auto& error = request->param().expected_error(); + return Status(static_cast<StatusCode>(error.code()), error.error_message(), + error.binary_error_details()); + } int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); if (server_try_cancel > DO_NOT_CANCEL) { @@ -246,6 +251,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + int server_coalescing_api = GetIntValueFromMetadata( + kServerUseCoalescingApi, context->client_metadata(), 0); + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(context); return Status::CANCELLED; @@ -260,7 +268,11 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, for (int i = 0; i < kNumResponseStreamsMsgs; i++) { response.set_message(request->message() + grpc::to_string(i)); - writer->Write(response); + if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) { + writer->WriteLast(response, WriteOptions()); + } else { + writer->Write(response); + } } if (server_try_cancel_thd != nullptr) { @@ -305,10 +317,21 @@ Status TestServiceImpl::BidiStream( new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } + // kServerFinishAfterNReads suggests after how many reads, the server should + // write the last message and send status (coalesced using WriteLast) + int server_write_last = GetIntValueFromMetadata( + kServerFinishAfterNReads, context->client_metadata(), 0); + + int read_counts = 0; while (stream->Read(&request)) { + read_counts++; gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); - stream->Write(response); + if (read_counts == server_write_last) { + stream->WriteLast(response, WriteOptions()); + } else { + stream->Write(response); + } } if (server_try_cancel_thd != nullptr) { diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 88e0be7bca..b1f02f93f6 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -48,6 +48,8 @@ const int kNumResponseStreamsMsgs = 3; const char* const kServerCancelAfterReads = "cancel_after_reads"; const char* const kServerTryCancelRequest = "server_try_cancel"; const char* const kDebugInfoTrailerKey = "debug-info-bin"; +const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; +const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; typedef enum { DO_NOT_CANCEL = 0, diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index 82ccf436f8..0b569f1415 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -34,7 +34,7 @@ #include <grpc++/impl/codegen/config.h> #include <gtest/gtest.h> -#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/proto/grpc/lb/v1/load_balancer.pb.h" // C++ version @@ -106,11 +106,13 @@ TEST_F(GrpclbTest, ParseResponseServerList) { auto* server = serverlist->add_servers(); server->set_ip_address(Ip4ToPackedString("127.0.0.1")); server->set_port(12345); - server->set_drop_request(true); + server->set_drop_for_rate_limiting(true); + server->set_drop_for_load_balancing(false); server = response.mutable_server_list()->add_servers(); server->set_ip_address(Ip4ToPackedString("10.0.0.1")); server->set_port(54321); - server->set_drop_request(false); + server->set_drop_for_rate_limiting(false); + server->set_drop_for_load_balancing(true); auto* expiration_interval = serverlist->mutable_expiration_interval(); expiration_interval->set_seconds(888); expiration_interval->set_nanos(999); @@ -125,12 +127,14 @@ TEST_F(GrpclbTest, ParseResponseServerList) { EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address), "127.0.0.1"); EXPECT_EQ(c_serverlist->servers[0]->port, 12345); - EXPECT_TRUE(c_serverlist->servers[0]->drop_request); + EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting); + EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing); EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address); EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1"); EXPECT_EQ(c_serverlist->servers[1]->port, 54321); - EXPECT_FALSE(c_serverlist->servers[1]->drop_request); + EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting); + EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing); EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds); EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888); diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 89ed9249ad..a002c7f77d 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -51,7 +51,7 @@ #include <grpc++/impl/codegen/config.h> extern "C" { -#include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -310,7 +310,7 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, gpr_log(GPR_INFO, "LB Server[%s](%s) after tag 204. All done. LB server out", sf->servers_hostport, sf->balancer_name); - grpc_call_destroy(s); + grpc_call_unref(s); cq_verifier_destroy(cqv); @@ -457,7 +457,7 @@ static void start_backend_server(server_fixture *sf) { gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls", sf->servers_hostport, sf->num_calls_serviced); - grpc_call_destroy(s); + grpc_call_unref(s); cq_verifier_destroy(cqv); grpc_metadata_array_destroy(&request_metadata_recv); grpc_call_details_destroy(&call_details); @@ -557,7 +557,7 @@ static void perform_request(client_fixture *cf) { peer = grpc_call_get_peer(c); gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer); - grpc_call_destroy(c); + grpc_call_unref(c); cq_verify_empty_timeout(cqv, 1 /* seconds */); cq_verifier_destroy(cqv); @@ -573,34 +573,51 @@ static void perform_request(client_fixture *cf) { static void setup_client(const server_fixture *lb_server, const server_fixture *backends, client_fixture *cf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *lb_uri; - // The grpclb LB policy will be automatically selected by virtue of - // the fact that the returned addresses are balancer addresses. - gpr_asprintf(&lb_uri, "test:///%s?lb_enabled=1&balancer_names=%s", - lb_server->servers_hostport, lb_server->balancer_name); - - grpc_arg expected_target_arg; - expected_target_arg.type = GRPC_ARG_STRING; - expected_target_arg.key = - const_cast<char *>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS); char *expected_target_names = NULL; const char *backends_name = lb_server->servers_hostport; gpr_asprintf(&expected_target_names, "%s;%s", backends_name, BALANCERS_NAME); - expected_target_arg.value.string = const_cast<char *>(expected_target_names); + grpc_fake_resolver_response_generator *response_generator = + grpc_fake_resolver_response_generator_create(); + + grpc_lb_addresses *addresses = grpc_lb_addresses_create(1, NULL); + char *lb_uri_str; + gpr_asprintf(&lb_uri_str, "ipv4:%s", lb_server->servers_hostport); + grpc_uri *lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true); + GPR_ASSERT(lb_uri != NULL); + grpc_lb_addresses_set_address_from_uri(addresses, 0, lb_uri, true, + lb_server->balancer_name, NULL); + grpc_uri_destroy(lb_uri); + gpr_free(lb_uri_str); + + gpr_asprintf(&cf->server_uri, "test:///%s", lb_server->servers_hostport); + const grpc_arg fake_addresses = + grpc_lb_addresses_create_channel_arg(addresses); + grpc_channel_args *fake_result = + grpc_channel_args_copy_and_add(NULL, &fake_addresses, 1); + grpc_lb_addresses_destroy(&exec_ctx, addresses); + + const grpc_arg new_args[] = { + grpc_fake_transport_expected_targets_arg(expected_target_names), + grpc_fake_resolver_response_generator_arg(response_generator)}; + grpc_channel_args *args = - grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1); + grpc_channel_args_copy_and_add(NULL, new_args, GPR_ARRAY_SIZE(new_args)); gpr_free(expected_target_names); - cf->cq = grpc_completion_queue_create(NULL); - cf->server_uri = lb_uri; + cf->cq = grpc_completion_queue_create_for_next(NULL); grpc_channel_credentials *fake_creds = grpc_fake_transport_security_credentials_create(); cf->client = grpc_secure_channel_create(fake_creds, cf->server_uri, args, NULL); + grpc_fake_resolver_response_generator_set_response( + &exec_ctx, response_generator, fake_result); + grpc_channel_args_destroy(&exec_ctx, fake_result); grpc_channel_credentials_unref(&exec_ctx, fake_creds); grpc_channel_args_destroy(&exec_ctx, args); + grpc_fake_resolver_response_generator_unref(response_generator); + grpc_exec_ctx_finish(&exec_ctx); } static void teardown_client(client_fixture *cf) { @@ -616,7 +633,7 @@ static void teardown_client(client_fixture *cf) { static void setup_server(const char *host, server_fixture *sf) { int assigned_port; - sf->cq = grpc_completion_queue_create(NULL); + sf->cq = grpc_completion_queue_create_for_next(NULL); const char *colon_idx = strchr(host, ':'); if (colon_idx) { const char *port_str = colon_idx + 1; @@ -643,10 +660,15 @@ static void teardown_server(server_fixture *sf) { if (!sf->server) return; gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport); - grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - sf->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + + grpc_completion_queue *shutdown_cq = + grpc_completion_queue_create_for_pluck(NULL); + grpc_server_shutdown_and_notify(sf->server, shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); grpc_server_destroy(sf->server); gpr_thd_join(sf->tid); @@ -782,8 +804,8 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {} int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_test_init(argc, argv); grpc_fake_resolver_init(); + grpc_test_init(argc, argv); grpc_init(); const auto result = RUN_ALL_TESTS(); grpc_shutdown(); diff --git a/test/cpp/interop/BUILD b/test/cpp/interop/BUILD new file mode 100644 index 0000000000..1a3e8d916f --- /dev/null +++ b/test/cpp/interop/BUILD @@ -0,0 +1,90 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +licenses(["notice"]) # 3-clause BSD + +cc_library( + name = "server_helper_lib", + srcs = [ + "server_helper.cc", + ], + hdrs = [ + "server_helper.h", + ], + deps = [ + "//test/cpp/util:test_util", + "//external:gflags", + ], +) + +cc_binary( + name = "interop_server", + srcs = [ + "interop_server.cc", + "interop_server_bootstrap.cc", + ], + deps = [ + ":server_helper_lib", + "//:grpc++", + "//src/proto/grpc/testing:empty_proto", + "//src/proto/grpc/testing:messages_proto", + "//src/proto/grpc/testing:test_proto", + "//test/cpp/util:test_config", + ], +) + +cc_library( + name = "client_helper_lib", + srcs = [ + "client_helper.cc", + "interop_client.cc", + ], + hdrs = [ + "client_helper.h", + "interop_client.h", + ], + deps = [ + "//test/cpp/util:test_util", + "//src/proto/grpc/testing:empty_proto", + "//src/proto/grpc/testing:messages_proto", + "//src/proto/grpc/testing:test_proto", + "//test/core/security:oauth2_utils", + "//test/cpp/util:test_config", + ], +) + +cc_binary( + name = "interop_client", + srcs = [ + "client.cc", + ], + deps = [ + ":client_helper_lib", + ], +) diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 5688ab7971..6f1d910304 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -99,6 +99,7 @@ DEFINE_bool(do_not_abort_on_transient_failures, false, using grpc::testing::CreateChannelForTestCase; using grpc::testing::GetServiceAccountJsonKey; +using grpc::testing::UpdateActions; int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); @@ -162,8 +163,10 @@ int main(int argc, char** argv) { std::bind(&grpc::testing::InteropClient::DoUnimplementedMethod, &client); actions["unimplemented_service"] = std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client); - // actions["cacheable_unary"] = - // std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); + actions["cacheable_unary"] = + std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); + + UpdateActions(&actions); if (FLAGS_test_case == "all") { for (const auto& action : actions) { diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc index d3192ad0c9..784cd2826d 100644 --- a/test/cpp/interop/client_helper.cc +++ b/test/cpp/interop/client_helper.cc @@ -89,6 +89,9 @@ grpc::string GetOauth2AccessToken() { return access_token; } +void UpdateActions( + std::unordered_map<grpc::string, std::function<bool()>>* actions) {} + std::shared_ptr<Channel> CreateChannelForTestCase( const grpc::string& test_case) { GPR_ASSERT(FLAGS_server_port); diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h index 622b96e4fb..387530a21c 100644 --- a/test/cpp/interop/client_helper.h +++ b/test/cpp/interop/client_helper.h @@ -35,6 +35,7 @@ #define GRPC_TEST_CPP_INTEROP_CLIENT_HELPER_H #include <memory> +#include <unordered_map> #include <grpc++/channel.h> @@ -47,6 +48,9 @@ grpc::string GetServiceAccountJsonKey(); grpc::string GetOauth2AccessToken(); +void UpdateActions( + std::unordered_map<grpc::string, std::function<bool()>>* actions); + std::shared_ptr<Channel> CreateChannelForTestCase( const grpc::string& test_case); diff --git a/test/cpp/interop/http2_client.cc b/test/cpp/interop/http2_client.cc index 01c07823cf..2109e34616 100644 --- a/test/cpp/interop/http2_client.cc +++ b/test/cpp/interop/http2_client.cc @@ -41,7 +41,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/transport/byte_stream.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/interop/http2_client.h" @@ -108,7 +108,7 @@ bool Http2Client::DoRstAfterData() { SimpleResponse response; AssertStatusCode(SendUnaryCall(&response), grpc::StatusCode::INTERNAL); - GPR_ASSERT(response.has_payload()); // data should be received + // There is no guarantee that data would be received. gpr_log(GPR_DEBUG, "Done testing reset stream after data"); return true; diff --git a/test/cpp/interop/http2_client.h b/test/cpp/interop/http2_client.h index 12df5d26bc..e57d695205 100644 --- a/test/cpp/interop/http2_client.h +++ b/test/cpp/interop/http2_client.h @@ -38,7 +38,7 @@ #include <grpc++/channel.h> #include <grpc/grpc.h> -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" namespace grpc { diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index b7f2723c39..0e79c5e4b4 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -46,8 +46,8 @@ #include <grpc/support/useful.h> #include "src/core/lib/transport/byte_stream.h" -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/interop/client_helper.h" #include "test/cpp/interop/interop_client.h" @@ -918,6 +918,26 @@ bool InteropClient::DoCacheableUnary() { // second response is a cached copy of the first response GPR_ASSERT(response2.payload().body() == response1.payload().body()); + // Request 3 + // Modify the request body so it will not get a cache hit + ts = gpr_now(GPR_CLOCK_PRECISE); + timestamp = std::to_string((long long unsigned)ts.tv_nsec); + SimpleRequest request1; + request1.mutable_payload()->set_body(timestamp.c_str(), timestamp.size()); + ClientContext context3; + SimpleResponse response3; + context3.set_cacheable(true); + context3.AddMetadata("x-user-ip", "1.2.3.4"); + Status s3 = + serviceStub_.Get()->CacheableUnaryCall(&context3, request1, &response3); + if (!AssertStatusOk(s3)) { + return false; + } + gpr_log(GPR_DEBUG, "response 3 payload: %s", + response3.payload().body().c_str()); + + // Check that the response is different from the previous response. + GPR_ASSERT(response3.payload().body() != response1.payload().body()); return true; } diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 74f4db6b78..efcb7d2860 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -38,7 +38,7 @@ #include <grpc++/channel.h> #include <grpc/grpc.h> -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" namespace grpc { diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index 5a810b45ef..1cbca17928 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -48,8 +48,8 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/byte_stream.h" -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/interop/server_helper.h" #include "test/cpp/util/test_config.h" diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 1c2f606637..01d985068d 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -40,8 +40,8 @@ #include <grpc++/support/channel_arguments.h> #include <grpc/grpc.h> #include <grpc/support/log.h> -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/util/test_config.h" diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc index 634d0a90fc..8d1b884af9 100644 --- a/test/cpp/interop/reconnect_interop_server.cc +++ b/test/cpp/interop/reconnect_interop_server.cc @@ -47,8 +47,8 @@ #include <grpc/grpc.h> #include <grpc/support/log.h> -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/core/util/reconnect_server.h" #include "test/cpp/util/test_config.h" diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD new file mode 100644 index 0000000000..cae3fa1a14 --- /dev/null +++ b/test/cpp/microbenchmarks/BUILD @@ -0,0 +1,108 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +licenses(["notice"]) # 3-clause BSD + +cc_test( + name = "noop-benchmark", + srcs = ["noop-benchmark.cc"], + linkopts = ["-pthread"], + deps = ["//external:benchmark"], +) + +cc_library( + name = "helpers", + srcs = ["helpers.cc"], + hdrs = [ + "fullstack_context_mutators.h", + "fullstack_fixtures.h", + "helpers.h", + ], + linkopts = ["-pthread"], + deps = [ + "//:grpc++", + "//external:benchmark", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:grpc_test_util", + ], +) + +cc_test( + name = "bm_closure", + srcs = ["bm_closure.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_cq", + srcs = ["bm_cq.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_cq_multiple_threads", + srcs = ["bm_cq_multiple_threads.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_error", + srcs = ["bm_error.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_fullstack_streaming_ping_pong", + srcs = ["bm_fullstack_streaming_ping_pong.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_fullstack_streaming_pump", + srcs = ["bm_fullstack_streaming_pump.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_fullstack_trickle", + srcs = ["bm_fullstack_trickle.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_fullstack_unary_ping_pong", + srcs = ["bm_fullstack_unary_ping_pong.cc"], + deps = [":helpers"], +) + +cc_test( + name = "bm_metadata", + srcs = ["bm_metadata.cc"], + deps = [":helpers"], +) diff --git a/test/cpp/microbenchmarks/bm_arena.cc b/test/cpp/microbenchmarks/bm_arena.cc new file mode 100644 index 0000000000..770c0b6d47 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_arena.cc @@ -0,0 +1,76 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Benchmark arenas */ + +extern "C" { +#include "src/core/lib/support/arena.h" +} +#include "test/cpp/microbenchmarks/helpers.h" +#include "third_party/benchmark/include/benchmark/benchmark.h" + +static void BM_Arena_NoOp(benchmark::State& state) { + while (state.KeepRunning()) { + gpr_arena_destroy(gpr_arena_create(state.range(0))); + } +} +BENCHMARK(BM_Arena_NoOp)->Range(1, 1024 * 1024); + +static void BM_Arena_ManyAlloc(benchmark::State& state) { + gpr_arena* a = gpr_arena_create(state.range(0)); + const size_t realloc_after = + 1024 * 1024 * 1024 / ((state.range(1) + 15) & 0xffffff0u); + while (state.KeepRunning()) { + gpr_arena_alloc(a, state.range(1)); + // periodically recreate arena to avoid OOM + if (state.iterations() % realloc_after == 0) { + gpr_arena_destroy(a); + a = gpr_arena_create(state.range(0)); + } + } + gpr_arena_destroy(a); +} +BENCHMARK(BM_Arena_ManyAlloc)->Ranges({{1, 1024 * 1024}, {1, 32 * 1024}}); + +static void BM_Arena_Batch(benchmark::State& state) { + while (state.KeepRunning()) { + gpr_arena* a = gpr_arena_create(state.range(0)); + for (int i = 0; i < state.range(1); i++) { + gpr_arena_alloc(a, state.range(2)); + } + gpr_arena_destroy(a); + } +} +BENCHMARK(BM_Arena_Batch)->Ranges({{1, 64 * 1024}, {1, 64}, {1, 1024}}); + +BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 146f8e6bad..c91219e98c 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -34,6 +34,7 @@ /* This benchmark exists to ensure that the benchmark integration is * working */ +#include <benchmark/benchmark.h> #include <string.h> #include <sstream> @@ -44,22 +45,23 @@ #include <grpc/support/string_util.h> extern "C" { -#include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/deadline/deadline_filter.h" +#include "src/core/ext/filters/http/client/http_client_filter.h" +#include "src/core/ext/filters/http/message_compress/message_compress_filter.h" +#include "src/core/ext/filters/http/server/http_server_filter.h" +#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/message_size/message_size_filter.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/channel/connected_channel.h" -#include "src/core/lib/channel/deadline_filter.h" -#include "src/core/lib/channel/http_client_filter.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/channel/message_size_filter.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" } #include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/helpers.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" auto &force_library_initialization = Library::get(); @@ -87,6 +89,9 @@ BENCHMARK(BM_Zalloc) ->Arg(6144) ->Arg(7168); +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks creating full stacks + class BaseChannelFixture { public: BaseChannelFixture(grpc_channel *channel) : channel_(channel) {} @@ -116,12 +121,12 @@ template <class Fixture> static void BM_CallCreateDestroy(benchmark::State &state) { TrackCounters track_counters; Fixture fixture; - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = grpc_completion_queue_create_for_next(NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); void *method_hdl = grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); while (state.KeepRunning()) { - grpc_call_destroy(grpc_channel_create_registered_call( + grpc_call_unref(grpc_channel_create_registered_call( fixture.channel(), NULL, GRPC_PROPAGATE_DEFAULTS, cq, method_hdl, deadline, NULL)); } @@ -132,6 +137,9 @@ static void BM_CallCreateDestroy(benchmark::State &state) { BENCHMARK_TEMPLATE(BM_CallCreateDestroy, InsecureChannel); BENCHMARK_TEMPLATE(BM_CallCreateDestroy, LameChannel); +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks isolating individual filters + static void *tag(int i) { return reinterpret_cast<void *>(static_cast<intptr_t>(i)); } @@ -147,6 +155,7 @@ static void BM_LameChannelCallCreateCpp(benchmark::State &state) { grpc::testing::EchoResponse recv_response; grpc::Status recv_status; while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); grpc::ClientContext cli_ctx; auto reader = stub->AsyncEcho(&cli_ctx, send_request, &cq); reader->Finish(&recv_response, &recv_status, tag(0)); @@ -159,6 +168,165 @@ static void BM_LameChannelCallCreateCpp(benchmark::State &state) { } BENCHMARK(BM_LameChannelCallCreateCpp); +static void do_nothing(void *ignored) {} + +static void BM_LameChannelCallCreateCore(benchmark::State &state) { + TrackCounters track_counters; + + grpc_channel *channel; + grpc_completion_queue *cq; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_byte_buffer *response_payload_recv = NULL; + grpc_status_code status; + grpc_slice details; + grpc::testing::EchoRequest send_request; + grpc_slice send_request_slice = + grpc_slice_new(&send_request, sizeof(send_request), do_nothing); + + channel = grpc_lame_client_channel_create( + "localhost:1234", GRPC_STATUS_UNAUTHENTICATED, "blah"); + cq = grpc_completion_queue_create_for_next(NULL); + void *rc = grpc_channel_register_call( + channel, "/grpc.testing.EchoTestService/Echo", NULL, NULL); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + grpc_call *call = grpc_channel_create_registered_call( + channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, rc, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_byte_buffer *request_payload_send = + grpc_raw_byte_buffer_create(&send_request_slice, 1); + + // Fill in call ops + grpc_op ops[6]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload_send; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, + (size_t)(op - ops), + (void *)1, NULL)); + grpc_event ev = grpc_completion_queue_next( + cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type != GRPC_QUEUE_SHUTDOWN); + GPR_ASSERT(ev.success != 0); + grpc_call_unref(call); + grpc_byte_buffer_destroy(request_payload_send); + grpc_byte_buffer_destroy(response_payload_recv); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + } + grpc_channel_destroy(channel); + grpc_completion_queue_destroy(cq); + grpc_slice_unref(send_request_slice); + track_counters.Finish(state); +} +BENCHMARK(BM_LameChannelCallCreateCore); + +static void BM_LameChannelCallCreateCoreSeparateBatch(benchmark::State &state) { + TrackCounters track_counters; + + grpc_channel *channel; + grpc_completion_queue *cq; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_byte_buffer *response_payload_recv = NULL; + grpc_status_code status; + grpc_slice details; + grpc::testing::EchoRequest send_request; + grpc_slice send_request_slice = + grpc_slice_new(&send_request, sizeof(send_request), do_nothing); + + channel = grpc_lame_client_channel_create( + "localhost:1234", GRPC_STATUS_UNAUTHENTICATED, "blah"); + cq = grpc_completion_queue_create_for_next(NULL); + void *rc = grpc_channel_register_call( + channel, "/grpc.testing.EchoTestService/Echo", NULL, NULL); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + grpc_call *call = grpc_channel_create_registered_call( + channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, rc, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_byte_buffer *request_payload_send = + grpc_raw_byte_buffer_create(&send_request_slice, 1); + + // Fill in call ops + grpc_op ops[3]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload_send; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, + (size_t)(op - ops), + (void *)0, NULL)); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, + (size_t)(op - ops), + (void *)1, NULL)); + grpc_event ev = grpc_completion_queue_next( + cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type != GRPC_QUEUE_SHUTDOWN); + GPR_ASSERT(ev.success == 0); + ev = grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); + GPR_ASSERT(ev.type != GRPC_QUEUE_SHUTDOWN); + GPR_ASSERT(ev.success != 0); + grpc_call_unref(call); + grpc_byte_buffer_destroy(request_payload_send); + grpc_byte_buffer_destroy(response_payload_recv); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + } + grpc_channel_destroy(channel); + grpc_completion_queue_destroy(cq); + grpc_slice_unref(send_request_slice); + track_counters.Finish(state); +} +BENCHMARK(BM_LameChannelCallCreateCoreSeparateBatch); + static void FilterDestroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_free(arg); @@ -216,7 +384,7 @@ namespace dummy_filter { static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) {} + grpc_transport_stream_op_batch *op) {} static void StartTransportOp(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -234,7 +402,7 @@ static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *then_sched_closure) {} grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { @@ -277,7 +445,7 @@ const char *name; /* implementation of grpc_transport_init_stream */ int InitStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { return 0; } @@ -291,7 +459,7 @@ void SetPollsetSet(grpc_exec_ctx *exec_ctx, grpc_transport *self, /* implementation of grpc_transport_perform_stream_op */ void PerformStreamOp(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, grpc_transport_stream_op *op) { + grpc_stream *stream, grpc_transport_stream_op_batch *op) { grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE); } @@ -301,7 +469,7 @@ void PerformOp(grpc_exec_ctx *exec_ctx, grpc_transport *self, /* implementation of grpc_transport_destroy_stream */ void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, void *and_free_memory) {} + grpc_stream *stream, grpc_closure *then_sched_closure) {} /* implementation of grpc_transport_destroy */ void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {} @@ -341,13 +509,15 @@ class SendEmptyMetadata { memset(&op_, 0, sizeof(op_)); op_.on_complete = grpc_closure_init(&closure_, DoNothing, nullptr, grpc_schedule_on_exec_ctx); + op_.send_initial_metadata = true; + op_.payload = &op_payload_; } class Op { public: Op(grpc_exec_ctx *exec_ctx, SendEmptyMetadata *p, grpc_call_stack *s) { grpc_metadata_batch_init(&batch_); - p->op_.send_initial_metadata = &batch_; + p->op_payload_.send_initial_metadata.send_initial_metadata = &batch_; } void Finish(grpc_exec_ctx *exec_ctx) { grpc_metadata_batch_destroy(exec_ctx, &batch_); @@ -361,7 +531,8 @@ class SendEmptyMetadata { const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); const gpr_timespec start_time_ = gpr_now(GPR_CLOCK_MONOTONIC); const grpc_slice method_ = grpc_slice_from_static_string("/foo/bar"); - grpc_transport_stream_op op_; + grpc_transport_stream_op_batch op_; + grpc_transport_stream_op_batch_payload op_payload_; grpc_closure closure_; }; @@ -396,7 +567,7 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_channel_stack *channel_stack = static_cast<grpc_channel_stack *>(gpr_zalloc(channel_size)); GPR_ASSERT(GRPC_LOG_IF_ERROR( - "call_stack_init", + "channel_stack_init", grpc_channel_stack_init(&exec_ctx, 1, FilterDestroy, channel_stack, &filters[0], filters.size(), &channel_args, fixture.flags & REQUIRES_TRANSPORT @@ -411,15 +582,30 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_slice method = grpc_slice_from_static_string("/foo/bar"); grpc_call_final_info final_info; TestOp test_op_data; + grpc_call_element_args call_args; + call_args.call_stack = call_stack; + call_args.server_transport_data = NULL; + call_args.context = NULL; + call_args.path = method; + call_args.start_time = start_time; + call_args.deadline = deadline; + const int kArenaSize = 4096; + call_args.arena = gpr_arena_create(kArenaSize); while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1, - DoNothing, NULL, NULL, NULL, method, - start_time, deadline, call_stack)); + DoNothing, NULL, &call_args)); typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack); grpc_call_stack_destroy(&exec_ctx, call_stack, &final_info, NULL); op.Finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx); + // recreate arena every 64k iterations to avoid oom + if (0 == (state.iterations() & 0xffff)) { + gpr_arena_destroy(call_args.arena); + call_args.arena = gpr_arena_create(kArenaSize); + } } + gpr_arena_destroy(call_args.arena); grpc_channel_stack_destroy(&exec_ctx, channel_stack); grpc_exec_ctx_finish(&exec_ctx); gpr_free(channel_stack); @@ -436,7 +622,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, DummyFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, DummyFilter, SendEmptyMetadata); typedef Fixture<&grpc_client_channel_filter, 0> ClientChannelFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientChannelFilter, NoOp); -typedef Fixture<&grpc_compress_filter, CHECKS_NOT_LAST> CompressFilter; +typedef Fixture<&grpc_message_compress_filter, CHECKS_NOT_LAST> CompressFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, SendEmptyMetadata); typedef Fixture<&grpc_client_deadline_filter, CHECKS_NOT_LAST> @@ -462,4 +648,211 @@ typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST> BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata); +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks isolating grpc_call + +namespace isolated_call_filter { + +static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + if (op->recv_initial_metadata) { + grpc_closure_sched( + exec_ctx, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } + if (op->recv_message) { + grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + } + grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE); +} + +static void StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + if (op->disconnect_with_error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(op->disconnect_with_error); + } + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); +} + +static grpc_error *InitCallElem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + return GRPC_ERROR_NONE; +} + +static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent) {} + +static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_final_info *final_info, + grpc_closure *then_sched_closure) { + grpc_closure_sched(exec_ctx, then_sched_closure, GRPC_ERROR_NONE); +} + +grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_channel_element_args *args) { + return GRPC_ERROR_NONE; +} + +void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} + +char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + return gpr_strdup("peer"); +} + +void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + const grpc_channel_info *channel_info) {} + +static const grpc_channel_filter isolated_call_filter = { + StartTransportStreamOp, + StartTransportOp, + 0, + InitCallElem, + SetPollsetOrPollsetSet, + DestroyCallElem, + 0, + InitChannelElem, + DestroyChannelElem, + GetPeer, + GetChannelInfo, + "isolated_call_filter"}; +} // namespace isolated_call_filter + +class IsolatedCallFixture : public TrackCounters { + public: + IsolatedCallFixture() { + grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_name(builder, "dummy"); + grpc_channel_stack_builder_set_target(builder, "dummy_target"); + GPR_ASSERT(grpc_channel_stack_builder_append_filter( + builder, &isolated_call_filter::isolated_call_filter, NULL, NULL)); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + channel_ = grpc_channel_create_with_builder(&exec_ctx, builder, + GRPC_CLIENT_CHANNEL); + grpc_exec_ctx_finish(&exec_ctx); + } + cq_ = grpc_completion_queue_create_for_next(NULL); + } + + void Finish(benchmark::State &state) { + grpc_completion_queue_destroy(cq_); + grpc_channel_destroy(channel_); + TrackCounters::Finish(state); + } + + grpc_channel *channel() const { return channel_; } + grpc_completion_queue *cq() const { return cq_; } + + private: + grpc_completion_queue *cq_; + grpc_channel *channel_; +}; + +static void BM_IsolatedCall_NoOp(benchmark::State &state) { + IsolatedCallFixture fixture; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + void *method_hdl = + grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + grpc_call_unref(grpc_channel_create_registered_call( + fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), + method_hdl, deadline, NULL)); + } + fixture.Finish(state); +} +BENCHMARK(BM_IsolatedCall_NoOp); + +static void BM_IsolatedCall_Unary(benchmark::State &state) { + IsolatedCallFixture fixture; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + void *method_hdl = + grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); + grpc_slice slice = grpc_slice_from_static_string("hello world"); + grpc_byte_buffer *send_message = grpc_raw_byte_buffer_create(&slice, 1); + grpc_byte_buffer *recv_message = NULL; + grpc_status_code status_code; + grpc_slice status_details = grpc_empty_slice(); + grpc_metadata_array recv_initial_metadata; + grpc_metadata_array_init(&recv_initial_metadata); + grpc_metadata_array recv_trailing_metadata; + grpc_metadata_array_init(&recv_trailing_metadata); + grpc_op ops[6]; + memset(ops, 0, sizeof(ops)); + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].op = GRPC_OP_SEND_MESSAGE; + ops[1].data.send_message.send_message = send_message; + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata.recv_initial_metadata = + &recv_initial_metadata; + ops[4].op = GRPC_OP_RECV_MESSAGE; + ops[4].data.recv_message.recv_message = &recv_message; + ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[5].data.recv_status_on_client.status = &status_code; + ops[5].data.recv_status_on_client.status_details = &status_details; + ops[5].data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata; + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + grpc_call *call = grpc_channel_create_registered_call( + fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), + method_hdl, deadline, NULL); + grpc_call_start_batch(call, ops, 6, tag(1), NULL); + grpc_completion_queue_next(fixture.cq(), + gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); + grpc_call_unref(call); + } + fixture.Finish(state); + grpc_metadata_array_destroy(&recv_initial_metadata); + grpc_metadata_array_destroy(&recv_trailing_metadata); + grpc_byte_buffer_destroy(send_message); +} +BENCHMARK(BM_IsolatedCall_Unary); + +static void BM_IsolatedCall_StreamingSend(benchmark::State &state) { + IsolatedCallFixture fixture; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + void *method_hdl = + grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); + grpc_slice slice = grpc_slice_from_static_string("hello world"); + grpc_byte_buffer *send_message = grpc_raw_byte_buffer_create(&slice, 1); + grpc_metadata_array recv_initial_metadata; + grpc_metadata_array_init(&recv_initial_metadata); + grpc_metadata_array recv_trailing_metadata; + grpc_metadata_array_init(&recv_trailing_metadata); + grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata.recv_initial_metadata = + &recv_initial_metadata; + grpc_call *call = grpc_channel_create_registered_call( + fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), + method_hdl, deadline, NULL); + grpc_call_start_batch(call, ops, 2, tag(1), NULL); + grpc_completion_queue_next(fixture.cq(), gpr_inf_future(GPR_CLOCK_MONOTONIC), + NULL); + memset(ops, 0, sizeof(ops)); + ops[0].op = GRPC_OP_SEND_MESSAGE; + ops[0].data.send_message.send_message = send_message; + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + grpc_call_start_batch(call, ops, 1, tag(2), NULL); + grpc_completion_queue_next(fixture.cq(), + gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); + } + grpc_call_unref(call); + fixture.Finish(state); + grpc_metadata_array_destroy(&recv_initial_metadata); + grpc_metadata_array_destroy(&recv_trailing_metadata); + grpc_byte_buffer_destroy(send_message); +} +BENCHMARK(BM_IsolatedCall_StreamingSend); + BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index 563db758f7..581440682a 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -33,6 +33,7 @@ /* Microbenchmarks around CHTTP2 HPACK operations */ +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <string.h> #include <sstream> @@ -40,6 +41,7 @@ extern "C" { #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/static_metadata.h" } #include "test/cpp/microbenchmarks/helpers.h" @@ -69,6 +71,7 @@ template <class Fixture> static void BM_HpackEncoderEncodeHeader(benchmark::State &state) { TrackCounters track_counters; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + static bool logged_representative_output = false; grpc_metadata_batch b; grpc_metadata_batch_init(&b); @@ -87,8 +90,22 @@ static void BM_HpackEncoderEncodeHeader(benchmark::State &state) { grpc_slice_buffer outbuf; grpc_slice_buffer_init(&outbuf); while (state.KeepRunning()) { - grpc_chttp2_encode_header(&exec_ctx, &c, (uint32_t)state.iterations(), &b, - state.range(0), state.range(1), &stats, &outbuf); + grpc_encode_header_options hopt = { + static_cast<uint32_t>(state.iterations()), + state.range(0) != 0, + Fixture::kEnableTrueBinary, + (size_t)state.range(1), + &stats, + }; + grpc_chttp2_encode_header(&exec_ctx, &c, &b, &hopt, &outbuf); + if (!logged_representative_output && state.iterations() > 3) { + logged_representative_output = true; + for (size_t i = 0; i < outbuf.count; i++) { + char *s = grpc_dump_slice(outbuf.slices[i], GPR_DUMP_HEX); + gpr_log(GPR_DEBUG, "%" PRIdPTR ": %s", i, s); + gpr_free(s); + } + } grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &outbuf); grpc_exec_ctx_flush(&exec_ctx); } @@ -110,6 +127,7 @@ namespace hpack_encoder_fixtures { class EmptyBatch { public: + static constexpr bool kEnableTrueBinary = false; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return {}; } @@ -117,6 +135,7 @@ class EmptyBatch { class SingleStaticElem { public: + static constexpr bool kEnableTrueBinary = false; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return {GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE}; } @@ -124,6 +143,7 @@ class SingleStaticElem { class SingleInternedElem { public: + static constexpr bool kEnableTrueBinary = false; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return {grpc_mdelem_from_slices( exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc")), @@ -131,8 +151,32 @@ class SingleInternedElem { } }; +template <int kLength, bool kTrueBinary> +class SingleInternedBinaryElem { + public: + static constexpr bool kEnableTrueBinary = kTrueBinary; + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { + grpc_slice bytes = MakeBytes(); + std::vector<grpc_mdelem> out = {grpc_mdelem_from_slices( + exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc-bin")), + grpc_slice_intern(bytes))}; + grpc_slice_unref(bytes); + return out; + } + + private: + static grpc_slice MakeBytes() { + std::vector<char> v; + for (int i = 0; i < kLength; i++) { + v.push_back(static_cast<char>(rand())); + } + return grpc_slice_from_copied_buffer(v.data(), v.size()); + } +}; + class SingleInternedKeyElem { public: + static constexpr bool kEnableTrueBinary = false; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return {grpc_mdelem_from_slices( exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc")), @@ -142,6 +186,7 @@ class SingleInternedKeyElem { class SingleNonInternedElem { public: + static constexpr bool kEnableTrueBinary = false; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return {grpc_mdelem_from_slices(exec_ctx, grpc_slice_from_static_string("abc"), @@ -149,8 +194,28 @@ class SingleNonInternedElem { } }; +template <int kLength, bool kTrueBinary> +class SingleNonInternedBinaryElem { + public: + static constexpr bool kEnableTrueBinary = kTrueBinary; + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { + return {grpc_mdelem_from_slices( + exec_ctx, grpc_slice_from_static_string("abc-bin"), MakeBytes())}; + } + + private: + static grpc_slice MakeBytes() { + std::vector<char> v; + for (int i = 0; i < kLength; i++) { + v.push_back(static_cast<char>(rand())); + } + return grpc_slice_from_copied_buffer(v.data(), v.size()); + } +}; + class RepresentativeClientInitialMetadata { public: + static constexpr bool kEnableTrueBinary = true; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return { GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST, @@ -172,6 +237,7 @@ class RepresentativeClientInitialMetadata { class RepresentativeServerInitialMetadata { public: + static constexpr bool kEnableTrueBinary = true; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return {GRPC_MDELEM_STATUS_200, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, @@ -181,6 +247,7 @@ class RepresentativeServerInitialMetadata { class RepresentativeServerTrailingMetadata { public: + static constexpr bool kEnableTrueBinary = true; static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { return {GRPC_MDELEM_GRPC_STATUS_0}; } @@ -195,8 +262,68 @@ BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedKeyElem) ->Args({0, 16384}); BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedElem) ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<1, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<3, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<10, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<31, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<100, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<1, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<3, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<10, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<31, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleInternedBinaryElem<100, true>) + ->Args({0, 16384}); BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedElem) ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<1, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<3, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<10, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<31, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<100, false>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<1, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<3, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<10, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<31, true>) + ->Args({0, 16384}); +BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, + SingleNonInternedBinaryElem<100, true>) + ->Args({0, 16384}); // test with a tiny frame size, to highlight continuation costs BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedElem) ->Args({0, 1}); @@ -255,6 +382,8 @@ static void BM_HpackParserParseHeader(benchmark::State &state) { } grpc_exec_ctx_flush(&exec_ctx); } + for (auto slice : init_slices) grpc_slice_unref(slice); + for (auto slice : benchmark_slices) grpc_slice_unref(slice); grpc_chttp2_hpack_parser_destroy(&exec_ctx, &p); grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); @@ -262,7 +391,7 @@ static void BM_HpackParserParseHeader(benchmark::State &state) { namespace hpack_parser_fixtures { -static grpc_slice MakeSlice(std::initializer_list<uint8_t> bytes) { +static grpc_slice MakeSlice(std::vector<uint8_t> bytes) { grpc_slice s = grpc_slice_malloc(bytes.size()); uint8_t *p = GRPC_SLICE_START_PTR(s); for (auto b : bytes) { @@ -346,6 +475,88 @@ class NonIndexedElem { } }; +template <int kLength, bool kTrueBinary> +class NonIndexedBinaryElem; + +template <int kLength> +class NonIndexedBinaryElem<kLength, true> { + public: + static std::vector<grpc_slice> GetInitSlices() { return {}; } + static std::vector<grpc_slice> GetBenchmarkSlices() { + std::vector<uint8_t> v = { + 0x00, 0x07, 'a', 'b', 'c', + '-', 'b', 'i', 'n', static_cast<uint8_t>(kLength + 1), + 0}; + for (int i = 0; i < kLength; i++) { + v.push_back(static_cast<uint8_t>(i)); + } + return {MakeSlice(v)}; + } +}; + +template <> +class NonIndexedBinaryElem<1, false> { + public: + static std::vector<grpc_slice> GetInitSlices() { return {}; } + static std::vector<grpc_slice> GetBenchmarkSlices() { + return {MakeSlice( + {0x00, 0x07, 'a', 'b', 'c', '-', 'b', 'i', 'n', 0x82, 0xf7, 0xb3})}; + } +}; + +template <> +class NonIndexedBinaryElem<3, false> { + public: + static std::vector<grpc_slice> GetInitSlices() { return {}; } + static std::vector<grpc_slice> GetBenchmarkSlices() { + return {MakeSlice({0x00, 0x07, 'a', 'b', 'c', '-', 'b', 'i', 'n', 0x84, + 0x7f, 0x4e, 0x29, 0x3f})}; + } +}; + +template <> +class NonIndexedBinaryElem<10, false> { + public: + static std::vector<grpc_slice> GetInitSlices() { return {}; } + static std::vector<grpc_slice> GetBenchmarkSlices() { + return {MakeSlice({0x00, 0x07, 'a', 'b', 'c', '-', 'b', + 'i', 'n', 0x8b, 0x71, 0x0c, 0xa5, 0x81, + 0x73, 0x7b, 0x47, 0x13, 0xe9, 0xf7, 0xe3})}; + } +}; + +template <> +class NonIndexedBinaryElem<31, false> { + public: + static std::vector<grpc_slice> GetInitSlices() { return {}; } + static std::vector<grpc_slice> GetBenchmarkSlices() { + return {MakeSlice({0x00, 0x07, 'a', 'b', 'c', '-', 'b', 'i', 'n', + 0xa3, 0x92, 0x43, 0x7f, 0xbe, 0x7c, 0xea, 0x6f, 0xf3, + 0x3d, 0xa7, 0xa7, 0x67, 0xfb, 0xe2, 0x82, 0xf7, 0xf2, + 0x8f, 0x1f, 0x9d, 0xdf, 0xf1, 0x7e, 0xb3, 0xef, 0xb2, + 0x8f, 0x53, 0x77, 0xce, 0x0c, 0x13, 0xe3, 0xfd, 0x87})}; + } +}; + +template <> +class NonIndexedBinaryElem<100, false> { + public: + static std::vector<grpc_slice> GetInitSlices() { return {}; } + static std::vector<grpc_slice> GetBenchmarkSlices() { + return {MakeSlice( + {0x00, 0x07, 'a', 'b', 'c', '-', 'b', 'i', 'n', 0xeb, 0x1d, 0x4d, + 0xe8, 0x96, 0x8c, 0x14, 0x20, 0x06, 0xc1, 0xc3, 0xdf, 0x6e, 0x1f, 0xef, + 0xde, 0x2f, 0xde, 0xb7, 0xf2, 0xfe, 0x6d, 0xd4, 0xe4, 0x7d, 0xf5, 0x55, + 0x46, 0x52, 0x3d, 0x91, 0xf2, 0xd4, 0x6f, 0xca, 0x34, 0xcd, 0xd9, 0x39, + 0xbd, 0x03, 0x27, 0xe3, 0x9c, 0x74, 0xcc, 0x17, 0x34, 0xed, 0xa6, 0x6a, + 0x77, 0x73, 0x10, 0xcd, 0x8e, 0x4e, 0x5c, 0x7c, 0x72, 0x39, 0xd8, 0xe6, + 0x78, 0x6b, 0xdb, 0xa5, 0xb7, 0xab, 0xe7, 0x46, 0xae, 0x21, 0xab, 0x7f, + 0x01, 0x89, 0x13, 0xd7, 0xca, 0x17, 0x6e, 0xcb, 0xd6, 0x79, 0x71, 0x68, + 0xbf, 0x8a, 0x3f, 0x32, 0xe8, 0xba, 0xf5, 0xbe, 0xb3, 0xbc, 0xde, 0x28, + 0xc7, 0xcf, 0x62, 0x7a, 0x58, 0x2c, 0xcf, 0x4d, 0xe3})}; + } +}; + class RepresentativeClientInitialMetadata { public: static std::vector<grpc_slice> GetInitSlices() { @@ -437,6 +648,16 @@ BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, RepresentativeClientInitialMetadata); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc new file mode 100644 index 0000000000..8c5413b5fd --- /dev/null +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -0,0 +1,625 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Microbenchmarks around CHTTP2 transport operations */ + +#include <grpc++/support/channel_arguments.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <string.h> +#include <memory> +#include <queue> +#include <sstream> +extern "C" { +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/transport/static_metadata.h" +} +#include "test/cpp/microbenchmarks/helpers.h" +#include "third_party/benchmark/include/benchmark/benchmark.h" + +auto &force_library_initialization = Library::get(); + +//////////////////////////////////////////////////////////////////////////////// +// Helper classes +// + +class DummyEndpoint : public grpc_endpoint { + public: + DummyEndpoint() { + static const grpc_endpoint_vtable my_vtable = {read, + write, + get_workqueue, + add_to_pollset, + add_to_pollset_set, + shutdown, + destroy, + get_resource_user, + get_peer, + get_fd}; + grpc_endpoint::vtable = &my_vtable; + ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); + } + + void PushInput(grpc_exec_ctx *exec_ctx, grpc_slice slice) { + if (read_cb_ == nullptr) { + GPR_ASSERT(!have_slice_); + buffered_slice_ = slice; + have_slice_ = true; + return; + } + grpc_slice_buffer_add(slices_, slice); + grpc_closure_sched(exec_ctx, read_cb_, GRPC_ERROR_NONE); + read_cb_ = nullptr; + } + + private: + grpc_resource_user *ru_; + grpc_closure *read_cb_ = nullptr; + grpc_slice_buffer *slices_ = nullptr; + bool have_slice_ = false; + grpc_slice buffered_slice_; + + void QueueRead(grpc_exec_ctx *exec_ctx, grpc_slice_buffer *slices, + grpc_closure *cb) { + GPR_ASSERT(read_cb_ == nullptr); + if (have_slice_) { + have_slice_ = false; + grpc_slice_buffer_add(slices, buffered_slice_); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); + return; + } + read_cb_ = cb; + slices_ = slices; + } + + static void read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *slices, grpc_closure *cb) { + static_cast<DummyEndpoint *>(ep)->QueueRead(exec_ctx, slices, cb); + } + + static void write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); + } + + static grpc_workqueue *get_workqueue(grpc_endpoint *ep) { return NULL; } + + static void add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset *pollset) {} + + static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + + static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { + grpc_resource_user_shutdown(exec_ctx, + static_cast<DummyEndpoint *>(ep)->ru_); + grpc_closure_sched(exec_ctx, static_cast<DummyEndpoint *>(ep)->read_cb_, + why); + } + + static void destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_resource_user_unref(exec_ctx, static_cast<DummyEndpoint *>(ep)->ru_); + delete static_cast<DummyEndpoint *>(ep); + } + + static grpc_resource_user *get_resource_user(grpc_endpoint *ep) { + return static_cast<DummyEndpoint *>(ep)->ru_; + } + static char *get_peer(grpc_endpoint *ep) { return gpr_strdup("test"); } + static int get_fd(grpc_endpoint *ep) { return 0; } +}; + +class Fixture { + public: + Fixture(const grpc::ChannelArguments &args, bool client) { + grpc_channel_args c_args = args.c_channel_args(); + ep_ = new DummyEndpoint; + t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client); + grpc_chttp2_transport_start_reading(exec_ctx(), t_, NULL); + FlushExecCtx(); + } + + void FlushExecCtx() { grpc_exec_ctx_flush(&exec_ctx_); } + + ~Fixture() { + grpc_transport_destroy(&exec_ctx_, t_); + grpc_exec_ctx_finish(&exec_ctx_); + } + + grpc_chttp2_transport *chttp2_transport() { + return reinterpret_cast<grpc_chttp2_transport *>(t_); + } + grpc_transport *transport() { return t_; } + grpc_exec_ctx *exec_ctx() { return &exec_ctx_; } + + void PushInput(grpc_slice slice) { ep_->PushInput(exec_ctx(), slice); } + + private: + DummyEndpoint *ep_; + grpc_exec_ctx exec_ctx_ = GRPC_EXEC_CTX_INIT; + grpc_transport *t_; +}; + +static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +class Stream { + public: + Stream(Fixture *f) : f_(f) { + GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); + stream_size_ = grpc_transport_stream_size(f->transport()); + stream_ = gpr_malloc(stream_size_); + arena_ = gpr_arena_create(4096); + } + + ~Stream() { + gpr_free(stream_); + gpr_arena_destroy(arena_); + } + + void Init(benchmark::State &state) { + memset(stream_, 0, stream_size_); + if ((state.iterations() & 0xffff) == 0) { + gpr_arena_destroy(arena_); + arena_ = gpr_arena_create(4096); + } + grpc_transport_init_stream(f_->exec_ctx(), f_->transport(), + static_cast<grpc_stream *>(stream_), &refcount_, + NULL, arena_); + } + + void DestroyThen(grpc_closure *closure) { + grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), + static_cast<grpc_stream *>(stream_), closure); + } + + void Op(grpc_transport_stream_op_batch *op) { + grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + static_cast<grpc_stream *>(stream_), op); + } + + grpc_chttp2_stream *chttp2_stream() { + return static_cast<grpc_chttp2_stream *>(stream_); + } + + private: + Fixture *f_; + grpc_stream_refcount refcount_; + gpr_arena *arena_; + size_t stream_size_; + void *stream_; +}; + +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template <class F> +std::unique_ptr<Closure> MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + grpc_closure_init(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr<Closure>(new C(f, sched)); +} + +template <class F> +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + delete static_cast<C *>(arg); + } + }; + auto *c = new C{f}; + return grpc_closure_init(c, C::Execute, c, sched); +} + +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks +// + +static void BM_StreamCreateDestroy(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + std::unique_ptr<Closure> next = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + s.Init(state); + s.DestroyThen(next.get()); + }); + grpc_closure_run(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + track_counters.Finish(state); +} +BENCHMARK(BM_StreamCreateDestroy); + +class RepresentativeClientInitialMetadata { + public: + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { + return { + GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST, + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_PATH, + grpc_slice_intern(grpc_slice_from_static_string("/foo/bar"))), + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_AUTHORITY, + grpc_slice_intern(grpc_slice_from_static_string( + "foo.test.google.fr:1234"))), + GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP, + GRPC_MDELEM_TE_TRAILERS, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_USER_AGENT, + grpc_slice_intern(grpc_slice_from_static_string( + "grpc-c/3.0.0-dev (linux; chttp2; green)")))}; + } +}; + +template <class Metadata> +static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + std::unique_ptr<Closure> start; + std::unique_ptr<Closure> done; + + auto reset_op = [&]() { + memset(&op, 0, sizeof(op)); + op.payload = &op_payload; + }; + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector<grpc_mdelem> elems = Metadata::GetElems(f.exec_ctx()); + std::vector<grpc_linked_mdelem> storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + f.FlushExecCtx(); + start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + s.Init(state); + reset_op(); + op.on_complete = done.get(); + op.send_initial_metadata = true; + op.payload->send_initial_metadata.send_initial_metadata = &b; + s.Op(&op); + }); + done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + reset_op(); + op.cancel_stream = true; + op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen(start.get()); + }); + grpc_closure_sched(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + track_counters.Finish(state); +} +BENCHMARK_TEMPLATE(BM_StreamCreateSendInitialMetadataDestroy, + RepresentativeClientInitialMetadata); + +static void BM_TransportEmptyOp(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + auto reset_op = [&]() { + memset(&op, 0, sizeof(op)); + op.payload = &op_payload; + }; + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + reset_op(); + op.on_complete = c.get(); + s.Op(&op); + }); + grpc_closure_sched(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); +} +BENCHMARK(BM_TransportEmptyOp); + +static void BM_TransportStreamSend(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + auto reset_op = [&]() { + memset(&op, 0, sizeof(op)); + op.payload = &op_payload; + }; + grpc_slice_buffer_stream send_stream; + grpc_slice_buffer send_buffer; + grpc_slice_buffer_init(&send_buffer); + grpc_slice_buffer_add(&send_buffer, gpr_slice_malloc(state.range(0))); + memset(GRPC_SLICE_START_PTR(send_buffer.slices[0]), 0, + GRPC_SLICE_LENGTH(send_buffer.slices[0])); + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector<grpc_mdelem> elems = + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); + std::vector<grpc_linked_mdelem> storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024; + grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); + reset_op(); + op.on_complete = c.get(); + op.send_message = true; + op.payload->send_message.send_message = &send_stream.base; + s.Op(&op); + }); + + reset_op(); + op.send_initial_metadata = true; + op.payload->send_initial_metadata.send_initial_metadata = &b; + op.on_complete = c.get(); + s.Op(&op); + + f.FlushExecCtx(); + reset_op(); + op.cancel_stream = true; + op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + grpc_slice_buffer_destroy(&send_buffer); +} +BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024); + +#define SLICE_FROM_BUFFER(s) grpc_slice_from_static_buffer(s, sizeof(s) - 1) + +static grpc_slice CreateIncomingDataSlice(size_t length, size_t frame_size) { + std::queue<char> unframed; + + unframed.push(static_cast<uint8_t>(0)); + unframed.push(static_cast<uint8_t>(length >> 24)); + unframed.push(static_cast<uint8_t>(length >> 16)); + unframed.push(static_cast<uint8_t>(length >> 8)); + unframed.push(static_cast<uint8_t>(length)); + for (size_t i = 0; i < length; i++) { + unframed.push('a'); + } + + std::vector<char> framed; + while (unframed.size() > frame_size) { + // frame size + framed.push_back(static_cast<uint8_t>(frame_size >> 16)); + framed.push_back(static_cast<uint8_t>(frame_size >> 8)); + framed.push_back(static_cast<uint8_t>(frame_size)); + // data frame + framed.push_back(0); + // no flags + framed.push_back(0); + // stream id + framed.push_back(0); + framed.push_back(0); + framed.push_back(0); + framed.push_back(1); + // frame data + for (size_t i = 0; i < frame_size; i++) { + framed.push_back(unframed.front()); + unframed.pop(); + } + } + + // frame size + framed.push_back(static_cast<uint8_t>(unframed.size() >> 16)); + framed.push_back(static_cast<uint8_t>(unframed.size() >> 8)); + framed.push_back(static_cast<uint8_t>(unframed.size())); + // data frame + framed.push_back(0); + // no flags + framed.push_back(0); + // stream id + framed.push_back(0); + framed.push_back(0); + framed.push_back(0); + framed.push_back(1); + while (!unframed.empty()) { + framed.push_back(unframed.front()); + unframed.pop(); + } + + return grpc_slice_from_copied_buffer(framed.data(), framed.size()); +} + +static void BM_TransportStreamRecv(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op_batch_payload op_payload; + grpc_transport_stream_op_batch op; + grpc_byte_stream *recv_stream; + grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384); + + auto reset_op = [&]() { + memset(&op, 0, sizeof(op)); + op.payload = &op_payload; + }; + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + grpc_metadata_batch b_recv; + grpc_metadata_batch_init(&b_recv); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector<grpc_mdelem> elems = + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); + std::vector<grpc_linked_mdelem> storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + std::unique_ptr<Closure> do_nothing = + MakeClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}); + + uint32_t received; + + std::unique_ptr<Closure> drain_start; + std::unique_ptr<Closure> drain; + std::unique_ptr<Closure> drain_continue; + grpc_slice recv_slice; + + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->incoming_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->incoming_window = 1024 * 1024 * 1024; + received = 0; + reset_op(); + op.on_complete = do_nothing.get(); + op.recv_message = true; + op.payload->recv_message.recv_message = &recv_stream; + op.payload->recv_message.recv_message_ready = drain_start.get(); + s.Op(&op); + f.PushInput(grpc_slice_ref(incoming_data)); + }); + + drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (recv_stream == NULL) { + GPR_ASSERT(!state.KeepRunning()); + return; + } + grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); + }); + + drain = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + do { + if (received == recv_stream->length) { + grpc_byte_stream_destroy(exec_ctx, recv_stream); + grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE); + return; + } + } while (grpc_byte_stream_next(exec_ctx, recv_stream, + recv_stream->length - received, + drain_continue.get()) && + GRPC_ERROR_NONE == + grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) && + (received += GRPC_SLICE_LENGTH(recv_slice), + grpc_slice_unref_internal(exec_ctx, recv_slice), true)); + }); + + drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice); + received += GRPC_SLICE_LENGTH(recv_slice); + grpc_slice_unref_internal(exec_ctx, recv_slice); + grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); + }); + + reset_op(); + op.send_initial_metadata = true; + op.payload->send_initial_metadata.send_initial_metadata = &b; + op.recv_initial_metadata = true; + op.payload->recv_initial_metadata.recv_initial_metadata = &b_recv; + op.payload->recv_initial_metadata.recv_initial_metadata_ready = + do_nothing.get(); + op.on_complete = c.get(); + s.Op(&op); + f.PushInput(SLICE_FROM_BUFFER( + "\x00\x00\x00\x04\x00\x00\x00\x00\x00" + // Generated using: + // tools/codegen/core/gen_header_frame.py < + // test/cpp/microbenchmarks/representative_server_initial_metadata.headers + "\x00\x00X\x01\x04\x00\x00\x00\x01" + "\x10\x07:status\x03" + "200" + "\x10\x0c" + "content-type\x10" + "application/grpc" + "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip")); + + f.FlushExecCtx(); + reset_op(); + op.cancel_stream = true; + op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + grpc_metadata_batch_destroy(f.exec_ctx(), &b_recv); + grpc_slice_unref(incoming_data); +} +BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024); + +BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 28a385b6c1..d52fe4ee30 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -33,7 +33,9 @@ /* Test various closure related operations */ +#include <benchmark/benchmark.h> #include <grpc/grpc.h> +#include <sstream> extern "C" { #include "src/core/lib/iomgr/closure.h" @@ -43,7 +45,6 @@ extern "C" { } #include "test/cpp/microbenchmarks/helpers.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" auto& force_library_initialization = Library::get(); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 91e6a85101..8b26bf977c 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -34,12 +34,11 @@ /* This benchmark exists to ensure that the benchmark integration is * working */ +#include <benchmark/benchmark.h> #include <grpc++/completion_queue.h> #include <grpc++/impl/grpc_library.h> #include <grpc/grpc.h> - #include "test/cpp/microbenchmarks/helpers.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" extern "C" { #include "src/core/lib/surface/completion_queue.h" @@ -59,10 +58,24 @@ static void BM_CreateDestroyCpp(benchmark::State& state) { } BENCHMARK(BM_CreateDestroyCpp); +/* Create cq using a different constructor */ +static void BM_CreateDestroyCpp2(benchmark::State& state) { + TrackCounters track_counters; + while (state.KeepRunning()) { + grpc_completion_queue* core_cq = + grpc_completion_queue_create_for_next(NULL); + CompletionQueue cq(core_cq); + } + track_counters.Finish(state); +} +BENCHMARK(BM_CreateDestroyCpp2); + static void BM_CreateDestroyCore(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { - grpc_completion_queue_destroy(grpc_completion_queue_create(NULL)); + // TODO: sreek Templatize this benchmark and pass completion type and + // polling type as parameters + grpc_completion_queue_destroy(grpc_completion_queue_create_for_next(NULL)); } track_counters.Finish(state); } @@ -98,7 +111,8 @@ BENCHMARK(BM_Pass1Cpp); static void BM_Pass1Core(benchmark::State& state) { TrackCounters track_counters; - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Templatize this benchmark and pass polling_type as a param + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; @@ -116,7 +130,8 @@ BENCHMARK(BM_Pass1Core); static void BM_Pluck1Core(benchmark::State& state) { TrackCounters track_counters; - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Templatize this benchmark and pass polling_type as a param + grpc_completion_queue* cq = grpc_completion_queue_create_for_pluck(NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; @@ -134,7 +149,8 @@ BENCHMARK(BM_Pluck1Core); static void BM_EmptyCore(benchmark::State& state) { TrackCounters track_counters; - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Templatize this benchmark and pass polling_type as a param + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL); gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_completion_queue_next(cq, deadline, NULL); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc new file mode 100644 index 0000000000..9d7f65d292 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -0,0 +1,160 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <benchmark/benchmark.h> +#include <string.h> +#include <atomic> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include "test/cpp/microbenchmarks/helpers.h" + +extern "C" { +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/port.h" +#include "src/core/lib/surface/completion_queue.h" +} + +struct grpc_pollset { + gpr_mu mu; +}; + +namespace grpc { +namespace testing { + +static void* g_tag = (void*)(intptr_t)10; // Some random number +static grpc_completion_queue* g_cq; +static grpc_event_engine_vtable g_vtable; + +static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, + grpc_closure* closure) { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); +} + +static void pollset_init(grpc_pollset* ps, gpr_mu** mu) { + gpr_mu_init(&ps->mu); + *mu = &ps->mu; +} + +static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); } + +static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) { + return GRPC_ERROR_NONE; +} + +/* Callback when the tag is dequeued from the completion queue. Does nothing */ +static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg, + grpc_cq_completion* cq_completion) { + gpr_free(cq_completion); +} + +/* Queues a completion tag. ZERO polling overhead */ +static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, + grpc_pollset_worker** worker, gpr_timespec now, + gpr_timespec deadline) { + gpr_mu_unlock(&ps->mu); + grpc_cq_begin_op(g_cq, g_tag); + grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL, + (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion))); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&ps->mu); + return GRPC_ERROR_NONE; +} + +static void init_engine_vtable() { + memset(&g_vtable, 0, sizeof(g_vtable)); + + g_vtable.pollset_size = sizeof(grpc_pollset); + g_vtable.pollset_init = pollset_init; + g_vtable.pollset_shutdown = pollset_shutdown; + g_vtable.pollset_destroy = pollset_destroy; + g_vtable.pollset_work = pollset_work; + g_vtable.pollset_kick = pollset_kick; +} + +static void setup() { + grpc_init(); + init_engine_vtable(); + grpc_set_event_engine_test_only(&g_vtable); + + g_cq = grpc_completion_queue_create_for_next(NULL); +} + +static void teardown() { + grpc_completion_queue_shutdown(g_cq); + grpc_completion_queue_destroy(g_cq); +} + +/* A few notes about Multi-threaded benchmarks: + + Setup: + The benchmark framework ensures that none of the threads proceed beyond the + state.KeepRunning() call unless all the threads have called state.keepRunning + atleast once. So it is safe to do the initialization in one of the threads + before state.KeepRunning() is called. + + Teardown: + The benchmark framework also ensures that no thread is running the benchmark + code (i.e the code between two successive calls of state.KeepRunning()) if + state.KeepRunning() returns false. So it is safe to do the teardown in one + of the threads after state.keepRunning() returns false. +*/ +static void BM_Cq_Throughput(benchmark::State& state) { + TrackCounters track_counters; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + + if (state.thread_index == 0) { + setup(); + } + + while (state.KeepRunning()) { + GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, NULL).type == + GRPC_OP_COMPLETE); + } + + state.SetItemsProcessed(state.iterations()); + + if (state.thread_index == 0) { + teardown(); + } + + track_counters.Finish(state); +} + +BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime(); + +} // namespace testing +} // namespace grpc + +BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/bm_error.cc b/test/cpp/microbenchmarks/bm_error.cc index c4f6aa19d5..ea9777bbe6 100644 --- a/test/cpp/microbenchmarks/bm_error.cc +++ b/test/cpp/microbenchmarks/bm_error.cc @@ -33,6 +33,7 @@ /* Test various operations on grpc_error */ +#include <benchmark/benchmark.h> #include <memory> extern "C" { @@ -41,7 +42,6 @@ extern "C" { } #include "test/cpp/microbenchmarks/helpers.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" auto& force_library_initialization = Library::get(); @@ -51,21 +51,30 @@ class ErrorDeleter { }; typedef std::unique_ptr<grpc_error, ErrorDeleter> ErrorPtr; -static void BM_ErrorCreate(benchmark::State& state) { +static void BM_ErrorCreateFromStatic(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { - GRPC_ERROR_UNREF(GRPC_ERROR_CREATE("Error")); + GRPC_ERROR_UNREF(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error")); } track_counters.Finish(state); } -BENCHMARK(BM_ErrorCreate); +BENCHMARK(BM_ErrorCreateFromStatic); + +static void BM_ErrorCreateFromCopied(benchmark::State& state) { + TrackCounters track_counters; + while (state.KeepRunning()) { + GRPC_ERROR_UNREF(GRPC_ERROR_CREATE_FROM_COPIED_STRING("Error not inline")); + } + track_counters.Finish(state); +} +BENCHMARK(BM_ErrorCreateFromCopied); static void BM_ErrorCreateAndSetStatus(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { - GRPC_ERROR_UNREF(grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_ABORTED)); + GRPC_ERROR_UNREF( + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_ABORTED)); } track_counters.Finish(state); } @@ -75,9 +84,10 @@ static void BM_ErrorCreateAndSetIntAndStr(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { GRPC_ERROR_UNREF(grpc_error_set_str( - grpc_error_set_int(GRPC_ERROR_CREATE("GOAWAY received"), - GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)0), - GRPC_ERROR_STR_RAW_BYTES, "raw bytes")); + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"), + GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)0), + GRPC_ERROR_STR_RAW_BYTES, grpc_slice_from_static_string("raw bytes"))); } track_counters.Finish(state); } @@ -85,7 +95,7 @@ BENCHMARK(BM_ErrorCreateAndSetIntAndStr); static void BM_ErrorCreateAndSetIntLoop(benchmark::State& state) { TrackCounters track_counters; - grpc_error* error = GRPC_ERROR_CREATE("Error"); + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"); int n = 0; while (state.KeepRunning()) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, n++); @@ -97,10 +107,11 @@ BENCHMARK(BM_ErrorCreateAndSetIntLoop); static void BM_ErrorCreateAndSetStrLoop(benchmark::State& state) { TrackCounters track_counters; - grpc_error* error = GRPC_ERROR_CREATE("Error"); + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"); const char* str = "hello"; while (state.KeepRunning()) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, str); + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_from_static_string(str)); } GRPC_ERROR_UNREF(error); track_counters.Finish(state); @@ -109,7 +120,7 @@ BENCHMARK(BM_ErrorCreateAndSetStrLoop); static void BM_ErrorRefUnref(benchmark::State& state) { TrackCounters track_counters; - grpc_error* error = GRPC_ERROR_CREATE("Error"); + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"); while (state.KeepRunning()) { GRPC_ERROR_UNREF(GRPC_ERROR_REF(error)); } @@ -138,8 +149,8 @@ BENCHMARK(BM_ErrorGetIntFromNoError); static void BM_ErrorGetMissingInt(benchmark::State& state) { TrackCounters track_counters; - ErrorPtr error( - grpc_error_set_int(GRPC_ERROR_CREATE("Error"), GRPC_ERROR_INT_INDEX, 1)); + ErrorPtr error(grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_INDEX, 1)); while (state.KeepRunning()) { intptr_t value; grpc_error_get_int(error.get(), GRPC_ERROR_INT_OFFSET, &value); @@ -150,8 +161,8 @@ BENCHMARK(BM_ErrorGetMissingInt); static void BM_ErrorGetPresentInt(benchmark::State& state) { TrackCounters track_counters; - ErrorPtr error( - grpc_error_set_int(GRPC_ERROR_CREATE("Error"), GRPC_ERROR_INT_OFFSET, 1)); + ErrorPtr error(grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_OFFSET, 1)); while (state.KeepRunning()) { intptr_t value; grpc_error_get_int(error.get(), GRPC_ERROR_INT_OFFSET, &value); @@ -186,7 +197,7 @@ class SimpleError { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr error_{GRPC_ERROR_CREATE("Error")}; + ErrorPtr error_{GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error")}; }; class ErrorWithGrpcStatus { @@ -196,9 +207,9 @@ class ErrorWithGrpcStatus { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr error_{grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNIMPLEMENTED)}; + ErrorPtr error_{grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNIMPLEMENTED)}; }; class ErrorWithHttpError { @@ -208,9 +219,9 @@ class ErrorWithHttpError { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr error_{grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_HTTP2_ERROR, - GRPC_HTTP2_COMPRESSION_ERROR)}; + ErrorPtr error_{grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_HTTP2_ERROR, + GRPC_HTTP2_COMPRESSION_ERROR)}; }; class ErrorWithNestedGrpcStatus { @@ -220,11 +231,12 @@ class ErrorWithNestedGrpcStatus { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr nested_error_{grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNIMPLEMENTED)}; + ErrorPtr nested_error_{grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNIMPLEMENTED)}; grpc_error* nested_errors_[1] = {nested_error_.get()}; - ErrorPtr error_{GRPC_ERROR_CREATE_REFERENCING("Error", nested_errors_, 1)}; + ErrorPtr error_{GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Error", nested_errors_, 1)}; }; template <class Fixture> @@ -253,8 +265,8 @@ static void BM_ErrorGetStatus(benchmark::State& state) { Fixture fixture; while (state.KeepRunning()) { grpc_status_code status; - const char* msg; - grpc_error_get_status(fixture.error(), fixture.deadline(), &status, &msg, + grpc_slice slice; + grpc_error_get_status(fixture.error(), fixture.deadline(), &status, &slice, NULL); } track_counters.Finish(state); diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc index dc0e7d769a..fd2210c474 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc @@ -33,14 +33,13 @@ /* Benchmark gRPC end2end in various configurations */ +#include <benchmark/benchmark.h> #include <sstream> - #include "src/core/lib/profiling/timers.h" #include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" namespace grpc { namespace testing { @@ -54,86 +53,141 @@ auto& force_library_initialization = Library::get(); static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } -template <class Fixture> -static void BM_PumpStreamClientToServer(benchmark::State& state) { +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPong(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + EchoTestService::AsyncService service; std::unique_ptr<Fixture> fixture(new Fixture(&service)); { + EchoResponse send_response; + EchoResponse recv_response; EchoRequest send_request; EchoRequest recv_request; - if (state.range(0) > 0) { - send_request.set_message(std::string(state.range(0), 'a')); + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); } - Status recv_status; - ServerContext svr_ctx; - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( EchoTestService::NewStub(fixture->channel())); - ClientContext cli_ctx; - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); - void* t; - bool ok; - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - response_rw.Read(&recv_request, tag(0)); + while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - request_rw->Write(send_request, tag(1)); - while (true) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + // Establish async stream between client side and server side + void* t; + bool ok; + int need_tags = (1 << 0) | (1 << 1); + while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - response_rw.Read(&recv_request, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); } + + ping_pong_cnt++; } - } - request_rw->WritesDone(tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); + + request_rw->WritesDone(tag(0)); + response_rw.Finish(Status::OK, tag(1)); + + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); } } + fixture->Finish(state); fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); } -template <class Fixture> -static void BM_PumpStreamServerToClient(benchmark::State& state) { +// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongMsgs(benchmark::State& state) { + const int msg_size = state.range(0); + EchoTestService::AsyncService service; std::unique_ptr<Fixture> fixture(new Fixture(&service)); { EchoResponse send_response; EchoResponse recv_response; - if (state.range(0) > 0) { - send_response.set_message(std::string(state.range(0), 'a')); + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); } - Status recv_status; + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), fixture->cq(), tag(0)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); + + // Establish async stream between client side and server side void* t; bool ok; + int need_tags = (1 << 0) | (1 << 1); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); @@ -141,56 +195,286 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) { GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } - request_rw->Read(&recv_response, tag(0)); + while (state.KeepRunning()) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); - response_rw.Write(send_response, tag(1)); - while (true) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - request_rw->Read(&recv_response, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); } } + + request_rw->WritesDone(tag(0)); response_rw.Finish(Status::OK, tag(1)); - need_tags = (1 << 0) | (1 << 1); + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } + + GPR_ASSERT(recv_status.ok()); } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * 2); +} + +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel. Different from +// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast, +// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving +// sendmsg syscalls for streaming by coalescing 1. initial metadata with first +// message; 2. final streaming message with trailing metadata. +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish +// API and WriteLast API for server. +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + // This options is used to test out server API: WriteLast and WriteAndFinish + // respectively, since we can not use both of them on server side at the same + // time. Value 1 means we are testing out the WriteAndFinish API, and + // otherwise we are testing out the WriteLast API. + const int write_and_finish = state.range(2); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + while (state.KeepRunning()) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 here will never comes up, since we are not performing any op due + // to initial metadata coalescing. + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + void* t; + bool ok; + int need_tags; + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + if (ping_pong_cnt == max_ping_pongs - 1) { + request_rw->WriteLast(send_request, WriteOptions(), tag(2)); + } else { + request_rw->Write(send_request, tag(2)); // Start client send + } + + need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5); + + if (ping_pong_cnt == 0) { + // wait for the server call structure (call_hook, etc.) to be + // initialized (async stream between client side and server side + // established). It is necessary when client init metadata is + // coalesced + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + // In some cases tag:2 comes before tag:0 (write tag comes out + // first), this while loop is to make sure get tag:0. + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + } + + response_rw.Read(&recv_request, tag(3)); // Start server recv + request_rw->Read(&recv_response, tag(4)); // Start client recv + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 3) { + if (ping_pong_cnt == max_ping_pongs - 1) { + if (write_and_finish == 1) { + response_rw.WriteAndFinish(send_response, WriteOptions(), + Status::OK, tag(5)); + } else { + response_rw.WriteLast(send_response, WriteOptions(), tag(5)); + // WriteLast buffers the write, so neither server write op nor + // client read op will finish inside the while loop. + need_tags &= ~(1 << 4); + need_tags &= ~(1 << 5); + } + } else { + response_rw.Write(send_response, tag(5)); + } + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + ping_pong_cnt++; + } + + if (max_ping_pongs == 0) { + need_tags = (1 << 6) | (1 << 7) | (1 << 8); + } else { + if (write_and_finish == 1) { + need_tags = (1 << 8); + } else { + // server's buffered write and the client's read of the buffered write + // tags should come up. + need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8); + } + } + + // No message write or initial metadata write happened yet. + if (max_ping_pongs == 0) { + request_rw->WritesDone(tag(6)); + // wait for server call data structure(call_hook, etc.) to be + // initialized, since initial metadata is corked. + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + response_rw.Finish(Status::OK, tag(7)); + } else { + if (write_and_finish != 1) { + response_rw.Finish(Status::OK, tag(7)); + } + } + + Status recv_status; + request_rw->Finish(&recv_status, tag(8)); + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + } + fixture->Finish(state); fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); } /******************************************************************************* * CONFIGURATIONS */ -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP) +// Generate Args for StreamingPingPong benchmarks. Currently generates args for +// only "small streams" (i.e streams with 0, 1 or 2 messages) +static void StreamingPingPongArgs(benchmark::internal::Benchmark* b) { + int msg_size = 0; + + b->Args({0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here) + + for (msg_size = 0; msg_size <= 128 * 1024 * 1024; + msg_size == 0 ? msg_size++ : msg_size *= 8) { + b->Args({msg_size, 1}); + b->Args({msg_size, 2}); + } +} + +BENCHMARK_TEMPLATE(BM_StreamingPingPong, InProcessCHTTP2, NoOpMutator, + NoOpMutator) + ->Apply(StreamingPingPongArgs); +BENCHMARK_TEMPLATE(BM_StreamingPingPong, TCP, NoOpMutator, NoOpMutator) + ->Apply(StreamingPingPongArgs); + +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator, + NoOpMutator) ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS) +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator) ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair) + +BENCHMARK_TEMPLATE(BM_StreamingPingPong, MinInProcessCHTTP2, NoOpMutator, + NoOpMutator) + ->Apply(StreamingPingPongArgs); +BENCHMARK_TEMPLATE(BM_StreamingPingPong, MinTCP, NoOpMutator, NoOpMutator) + ->Apply(StreamingPingPongArgs); + +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, MinInProcessCHTTP2, NoOpMutator, + NoOpMutator) ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2) +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, MinTCP, NoOpMutator, NoOpMutator) ->Range(0, 128 * 1024 * 1024); +// Generate Args for StreamingPingPongWithCoalescingApi benchmarks. Currently +// generates args for only "small streams" (i.e streams with 0, 1 or 2 messages) +static void StreamingPingPongWithCoalescingApiArgs( + benchmark::internal::Benchmark* b) { + int msg_size = 0; + + b->Args( + {0, 0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here) + b->Args( + {0, 0, 1}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here) + + for (msg_size = 0; msg_size <= 128 * 1024 * 1024; + msg_size == 0 ? msg_size++ : msg_size *= 8) { + b->Args({msg_size, 1, 0}); + b->Args({msg_size, 2, 0}); + b->Args({msg_size, 1, 1}); + b->Args({msg_size, 2, 1}); + } +} + +BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, InProcessCHTTP2, + NoOpMutator, NoOpMutator) + ->Apply(StreamingPingPongWithCoalescingApiArgs); +BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, MinInProcessCHTTP2, + NoOpMutator, NoOpMutator) + ->Apply(StreamingPingPongWithCoalescingApiArgs); + } // namespace testing } // namespace grpc diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc index dc0e7d769a..47705d3031 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc @@ -33,14 +33,13 @@ /* Benchmark gRPC end2end in various configurations */ +#include <benchmark/benchmark.h> #include <sstream> - #include "src/core/lib/profiling/timers.h" #include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" namespace grpc { namespace testing { @@ -190,6 +189,14 @@ BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair) ->Range(0, 128 * 1024 * 1024); BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2) ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinTCP)->Arg(0); +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinUDS)->Arg(0); +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinSockPair)->Arg(0); +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinInProcessCHTTP2)->Arg(0); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinTCP)->Arg(0); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinUDS)->Arg(0); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinSockPair)->Arg(0); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinInProcessCHTTP2)->Arg(0); } // namespace testing } // namespace grpc diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 5011f06368..a5cfeb4f95 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -33,12 +33,12 @@ /* Benchmark gRPC end2end in various configurations */ +#include <benchmark/benchmark.h> #include "src/core/lib/profiling/timers.h" #include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" @@ -53,7 +53,8 @@ static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } class TrickledCHTTP2 : public EndpointPairFixture { public: TrickledCHTTP2(Service* service, size_t megabits_per_second) - : EndpointPairFixture(service, MakeEndpoints(megabits_per_second)) {} + : EndpointPairFixture(service, MakeEndpoints(megabits_per_second), + FixtureConfiguration()) {} void AddToLabel(std::ostream& out, benchmark::State& state) { out << " writes/iter:" diff --git a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc index e51d272b10..7524751fbc 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc @@ -33,14 +33,13 @@ /* Benchmark gRPC end2end in various configurations */ +#include <benchmark/benchmark.h> #include <sstream> - #include "src/core/lib/profiling/timers.h" #include "src/cpp/client/create_channel_internal.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" namespace grpc { namespace testing { @@ -142,12 +141,21 @@ static void SweepSizesArgs(benchmark::internal::Benchmark* b) { BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator) ->Apply(SweepSizesArgs); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinTCP, NoOpMutator, NoOpMutator) + ->Apply(SweepSizesArgs); BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator) ->Args({0, 0}); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinUDS, NoOpMutator, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator) ->Args({0, 0}); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinSockPair, NoOpMutator, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator) ->Apply(SweepSizesArgs); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinInProcessCHTTP2, NoOpMutator, + NoOpMutator) + ->Apply(SweepSizesArgs); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, Client_AddMetadata<RandomBinaryMetadata<10>, 1>, NoOpMutator) ->Args({0, 0}); diff --git a/test/cpp/microbenchmarks/bm_metadata.cc b/test/cpp/microbenchmarks/bm_metadata.cc index 34874b57f5..7029f369ad 100644 --- a/test/cpp/microbenchmarks/bm_metadata.cc +++ b/test/cpp/microbenchmarks/bm_metadata.cc @@ -33,17 +33,15 @@ /* Test out various metadata handling primitives */ +#include <benchmark/benchmark.h> #include <grpc/grpc.h> extern "C" { -#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" -#include "src/core/lib/transport/transport.h" } #include "test/cpp/microbenchmarks/helpers.h" -#include "third_party/benchmark/include/benchmark/benchmark.h" auto& force_library_initialization = Library::get(); @@ -65,19 +63,6 @@ static void BM_SliceFromCopied(benchmark::State& state) { } BENCHMARK(BM_SliceFromCopied); -static void BM_SliceFromStreamOwnedBuffer(benchmark::State& state) { - grpc_stream_refcount r; - GRPC_STREAM_REF_INIT(&r, 1, NULL, NULL, "test"); - char buffer[64]; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - while (state.KeepRunning()) { - grpc_slice_unref_internal(&exec_ctx, grpc_slice_from_stream_owned_buffer( - &r, buffer, sizeof(buffer))); - } - grpc_exec_ctx_finish(&exec_ctx); -} -BENCHMARK(BM_SliceFromStreamOwnedBuffer); - static void BM_SliceIntern(benchmark::State& state) { TrackCounters track_counters; gpr_slice slice = grpc_slice_from_static_string("abc"); diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index dc29701059..f129ede26a 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -61,29 +61,33 @@ extern "C" { namespace grpc { namespace testing { -static void ApplyCommonServerBuilderConfig(ServerBuilder* b) { - b->SetMaxReceiveMessageSize(INT_MAX); - b->SetMaxSendMessageSize(INT_MAX); -} +class FixtureConfiguration { + public: + virtual void ApplyCommonChannelArguments(ChannelArguments* c) const { + c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); + c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); + } -static void ApplyCommonChannelArguments(ChannelArguments* c) { - c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); - c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); -} + virtual void ApplyCommonServerBuilderConfig(ServerBuilder* b) const { + b->SetMaxReceiveMessageSize(INT_MAX); + b->SetMaxSendMessageSize(INT_MAX); + } +}; class BaseFixture : public TrackCounters {}; class FullstackFixture : public BaseFixture { public: - FullstackFixture(Service* service, const grpc::string& address) { + FullstackFixture(Service* service, const FixtureConfiguration& config, + const grpc::string& address) { ServerBuilder b; b.AddListeningPort(address, InsecureServerCredentials()); cq_ = b.AddCompletionQueue(true); b.RegisterService(service); - ApplyCommonServerBuilderConfig(&b); + config.ApplyCommonServerBuilderConfig(&b); server_ = b.BuildAndStart(); ChannelArguments args; - ApplyCommonChannelArguments(&args); + config.ApplyCommonChannelArguments(&args); channel_ = CreateCustomChannel(address, InsecureChannelCredentials(), args); } @@ -107,7 +111,9 @@ class FullstackFixture : public BaseFixture { class TCP : public FullstackFixture { public: - TCP(Service* service) : FullstackFixture(service, MakeAddress()) {} + TCP(Service* service, const FixtureConfiguration& fixture_configuration = + FixtureConfiguration()) + : FullstackFixture(service, fixture_configuration, MakeAddress()) {} private: static grpc::string MakeAddress() { @@ -120,7 +126,9 @@ class TCP : public FullstackFixture { class UDS : public FullstackFixture { public: - UDS(Service* service) : FullstackFixture(service, MakeAddress()) {} + UDS(Service* service, const FixtureConfiguration& fixture_configuration = + FixtureConfiguration()) + : FullstackFixture(service, fixture_configuration, MakeAddress()) {} private: static grpc::string MakeAddress() { @@ -134,12 +142,13 @@ class UDS : public FullstackFixture { class EndpointPairFixture : public BaseFixture { public: - EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) + EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints, + const FixtureConfiguration& fixture_configuration) : endpoint_pair_(endpoints) { ServerBuilder b; cq_ = b.AddCompletionQueue(true); b.RegisterService(service); - ApplyCommonServerBuilderConfig(&b); + fixture_configuration.ApplyCommonServerBuilderConfig(&b); server_ = b.BuildAndStart(); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -169,7 +178,7 @@ class EndpointPairFixture : public BaseFixture { { ChannelArguments args; args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); - ApplyCommonChannelArguments(&args); + fixture_configuration.ApplyCommonChannelArguments(&args); grpc_channel_args c_args = args.c_channel_args(); client_transport_ = @@ -211,15 +220,19 @@ class EndpointPairFixture : public BaseFixture { class SockPair : public EndpointPairFixture { public: - SockPair(Service* service) - : EndpointPairFixture(service, grpc_iomgr_create_endpoint_pair( - "test", Library::get().rq(), 8192)) {} + SockPair(Service* service, const FixtureConfiguration& fixture_configuration = + FixtureConfiguration()) + : EndpointPairFixture(service, + grpc_iomgr_create_endpoint_pair("test", NULL), + fixture_configuration) {} }; class InProcessCHTTP2 : public EndpointPairFixture { public: - InProcessCHTTP2(Service* service) - : EndpointPairFixture(service, MakeEndpoints()) {} + InProcessCHTTP2(Service* service, + const FixtureConfiguration& fixture_configuration = + FixtureConfiguration()) + : EndpointPairFixture(service, MakeEndpoints(), fixture_configuration) {} void AddToLabel(std::ostream& out, benchmark::State& state) { EndpointPairFixture::AddToLabel(out, state); @@ -238,6 +251,32 @@ class InProcessCHTTP2 : public EndpointPairFixture { } }; +//////////////////////////////////////////////////////////////////////////////// +// Minimal stack fixtures + +class MinStackConfiguration : public FixtureConfiguration { + void ApplyCommonChannelArguments(ChannelArguments* a) const override { + a->SetInt(GRPC_ARG_MINIMAL_STACK, 1); + FixtureConfiguration::ApplyCommonChannelArguments(a); + } + + void ApplyCommonServerBuilderConfig(ServerBuilder* b) const override { + b->AddChannelArgument(GRPC_ARG_MINIMAL_STACK, 1); + FixtureConfiguration::ApplyCommonServerBuilderConfig(b); + } +}; + +template <class Base> +class MinStackize : public Base { + public: + MinStackize(Service* service) : Base(service, MinStackConfiguration()) {} +}; + +typedef MinStackize<TCP> MinTCP; +typedef MinStackize<UDS> MinUDS; +typedef MinStackize<SockPair> MinSockPair; +typedef MinStackize<InProcessCHTTP2> MinInProcessCHTTP2; + } // namespace testing } // namespace grpc diff --git a/test/cpp/microbenchmarks/helpers.h b/test/cpp/microbenchmarks/helpers.h index af401dd7bb..7360a1c9f2 100644 --- a/test/cpp/microbenchmarks/helpers.h +++ b/test/cpp/microbenchmarks/helpers.h @@ -41,8 +41,8 @@ extern "C" { #include "test/core/util/memory_counters.h" } +#include <benchmark/benchmark.h> #include <grpc++/impl/grpc_library.h> -#include "third_party/benchmark/include/benchmark/benchmark.h" class Library { public: diff --git a/test/cpp/microbenchmarks/noop-benchmark.cc b/test/cpp/microbenchmarks/noop-benchmark.cc index 99fa6d5f6e..7372ad04f2 100644 --- a/test/cpp/microbenchmarks/noop-benchmark.cc +++ b/test/cpp/microbenchmarks/noop-benchmark.cc @@ -34,7 +34,7 @@ /* This benchmark exists to ensure that the benchmark integration is * working */ -#include "third_party/benchmark/include/benchmark/benchmark.h" +#include <benchmark/benchmark.h> static void BM_NoOp(benchmark::State& state) { while (state.KeepRunning()) { diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD new file mode 100644 index 0000000000..6492b63ec3 --- /dev/null +++ b/test/cpp/qps/BUILD @@ -0,0 +1,194 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +licenses(["notice"]) # 3-clause BSD + +cc_library( + name = "parse_json", + srcs = ["parse_json.cc"], + hdrs = ["parse_json.h"], + deps = ["//:grpc++"], +) + +cc_library( + name = "qps_worker_impl", + srcs = [ + "client_async.cc", + "client_sync.cc", + "qps_worker.cc", + "server_async.cc", + "server_sync.cc", + ], + hdrs = [ + "client.h", + "qps_worker.h", + "server.h", + ], + deps = [ + ":histogram", + ":interarrival", + ":usage_timer", + "//:grpc", + "//:grpc++", + "//external:gtest", + "//src/proto/grpc/testing:control_proto", + "//src/proto/grpc/testing:payloads_proto", + "//src/proto/grpc/testing:services_proto", + "//test/core/end2end:ssl_test_data", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + +cc_library( + name = "driver_impl", + srcs = [ + "driver.cc", + "report.cc", + ], + hdrs = [ + "driver.h", + "report.h", + ], + deps = [ + ":histogram", + ":parse_json", + ":qps_worker_impl", + "//:grpc++", + "//src/proto/grpc/testing:control_proto", + "//src/proto/grpc/testing:messages_proto", + "//src/proto/grpc/testing:services_proto", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], +) + +cc_library( + name = "benchmark_config", + srcs = [ + "benchmark_config.cc", + ], + hdrs = [ + "benchmark_config.h", + ], + deps = [ + ":driver_impl", + ":histogram", + "//:grpc++", + "//external:gflags", + "//src/proto/grpc/testing:control_proto", + ], +) + +cc_library( + name = "histogram", + hdrs = [ + "histogram.h", + "stats.h", + ], + deps = ["//:gpr"], +) + +cc_library( + name = "interarrival", + hdrs = ["interarrival.h"], + deps = ["//:grpc++"], +) + +cc_binary( + name = "json_run_localhost", + srcs = ["json_run_localhost.cc"], + deps = [ + "//:gpr", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + +cc_test( + name = "qps_interarrival_test", + srcs = ["qps_interarrival_test.cc"], + deps = [ + ":histogram", + ":interarrival", + ], +) + +cc_binary( + name = "qps_json_driver", + srcs = ["qps_json_driver.cc"], + deps = [ + ":benchmark_config", + ":driver_impl", + "//:grpc++", + "//external:gflags", + ], +) + +cc_test( + name = "qps_openloop_test", + srcs = ["qps_openloop_test.cc"], + deps = [ + ":benchmark_config", + ":driver_impl", + ":qps_worker_impl", + ], +) + +cc_test( + name = "secure_sync_unary_ping_pong_test", + srcs = ["secure_sync_unary_ping_pong_test.cc"], + deps = [ + ":benchmark_config", + ":driver_impl", + "//:grpc++", + ], +) + +cc_library( + name = "usage_timer", + srcs = ["usage_timer.cc"], + hdrs = ["usage_timer.h"], + deps = ["//:gpr"], +) + +cc_binary( + name = "qps_worker", + srcs = ["worker.cc"], + deps = [ + ":qps_worker_impl", + "//:grpc++", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_config", + "//test/cpp/util:test_util", + ], +) diff --git a/test/cpp/util/benchmark_config.cc b/test/cpp/qps/benchmark_config.cc index 6fc864069e..d33f3e9ae1 100644 --- a/test/cpp/util/benchmark_config.cc +++ b/test/cpp/qps/benchmark_config.cc @@ -31,8 +31,11 @@ * */ -#include "test/cpp/util/benchmark_config.h" +#include "test/cpp/qps/benchmark_config.h" #include <gflags/gflags.h> +#include <grpc++/create_channel.h> +#include <grpc++/security/credentials.h> +#include <grpc/support/log.h> DEFINE_bool(enable_log_reporter, true, "Enable reporting of benchmark results through GprLog"); @@ -51,6 +54,11 @@ DEFINE_string(server_address, "localhost:50052", DEFINE_string(tag, "", "Optional tag for the test"); +DEFINE_string(rpc_reporter_server_address, "", + "Server address for rpc reporter to send results to"); + +DEFINE_bool(enable_rpc_reporter, false, "Enable use of RPC reporter"); + // In some distros, gflags is in the namespace google, and in some others, // in gflags. This hack is enabling us to find both. namespace google {} @@ -75,6 +83,13 @@ static std::shared_ptr<Reporter> InitBenchmarkReporters() { composite_reporter->add(std::unique_ptr<Reporter>( new JsonReporter("JsonReporter", FLAGS_scenario_result_file))); } + if (FLAGS_enable_rpc_reporter) { + GPR_ASSERT(!FLAGS_rpc_reporter_server_address.empty()); + composite_reporter->add(std::unique_ptr<Reporter>(new RpcReporter( + "RpcReporter", + grpc::CreateChannel(FLAGS_rpc_reporter_server_address, + grpc::InsecureChannelCredentials())))); + } return std::shared_ptr<Reporter>(composite_reporter); } diff --git a/test/cpp/util/benchmark_config.h b/test/cpp/qps/benchmark_config.h index 6b308a15ff..6b308a15ff 100644 --- a/test/cpp/util/benchmark_config.h +++ b/test/cpp/qps/benchmark_config.h diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index baa9304cc2..25a19a5a74 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -46,7 +46,7 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/proto/grpc/testing/payloads.grpc.pb.h" +#include "src/proto/grpc/testing/payloads.pb.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/histogram.h" diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 396d308e2a..29a79e7343 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -63,13 +63,13 @@ class ClientRpcContext { virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, HistogramEntry* entry) = 0; - virtual ClientRpcContext* StartNewClone() = 0; + virtual void StartNewClone(CompletionQueue* cq) = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } static ClientRpcContext* detag(void* t) { return reinterpret_cast<ClientRpcContext*>(t); } - virtual void Start(CompletionQueue* cq) = 0; + virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0; }; template <class RequestType, class ResponseType> @@ -94,22 +94,17 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_issue_(next_issue), start_req_(start_req) {} ~ClientRpcContextUnaryImpl() override {} - void Start(CompletionQueue* cq) override { - cq_ = cq; - if (!next_issue_) { // ready to issue - RunNextState(true, nullptr); - } else { // wait for the issue time - alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); - } + void Start(CompletionQueue* cq, const ClientConfig& config) override { + StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); response_reader_ = start_req_(stub_, &context_, req_, cq_); + next_state_ = State::RESP_DONE; response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); - next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: if (status_.ok()) { @@ -123,9 +118,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return false; } } - ClientRpcContext* StartNewClone() override { - return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_, - callback_); + void StartNewClone(CompletionQueue* cq) override { + auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, + start_req_, callback_); + clone->StartInternal(cq); } private: @@ -147,6 +143,15 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; + + void StartInternal(CompletionQueue* cq) { + cq_ = cq; + if (!next_issue_) { // ready to issue + RunNextState(true, nullptr); + } else { // wait for the issue time + alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + } + } }; typedef std::forward_list<ClientRpcContext*> context_list; @@ -185,7 +190,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { auto* cq = cli_cqs_[t].get(); auto ctx = setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_); - ctx->Start(cq); + ctx->Start(cq, config); } t = (t + 1) % cli_cqs_.size(); } @@ -248,8 +253,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } else if (!ctx->RunNextState(ok, entry)) { // The RPC and callback are done, so clone the ctx // and kickstart the new one - auto clone = ctx->StartNewClone(); - clone->Start(cli_cqs_[thread_idx].get()); + ctx->StartNewClone(cli_cqs_[thread_idx].get()); // delete the old version delete ctx; } @@ -330,10 +334,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { next_issue_(next_issue), start_req_(start_req) {} ~ClientRpcContextStreamingImpl() override {} - void Start(CompletionQueue* cq) override { - cq_ = cq; - stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); - next_state_ = State::STREAM_IDLE; + void Start(CompletionQueue* cq, const ClientConfig& config) override { + StartInternal(cq, config.messages_per_stream()); } bool RunNextState(bool ok, HistogramEntry* entry) override { while (true) { @@ -346,9 +348,9 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { } break; // loop around, don't return case State::WAIT: + next_state_ = State::READY_TO_WRITE; alarm_.reset( new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); - next_state_ = State::READY_TO_WRITE; return true; case State::READY_TO_WRITE: if (!ok) { @@ -369,17 +371,32 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { case State::READ_DONE: entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); + if ((messages_per_stream_ != 0) && + (++messages_issued_ >= messages_per_stream_)) { + next_state_ = State::WRITES_DONE_DONE; + stream_->WritesDone(ClientRpcContext::tag(this)); + return true; + } next_state_ = State::STREAM_IDLE; break; // loop around + case State::WRITES_DONE_DONE: + next_state_ = State::FINISH_DONE; + stream_->Finish(&status_, ClientRpcContext::tag(this)); + return true; + case State::FINISH_DONE: + next_state_ = State::INVALID; + return false; + break; default: GPR_ASSERT(false); return false; } } } - ClientRpcContext* StartNewClone() override { - return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_, - start_req_, callback_); + void StartNewClone(CompletionQueue* cq) override { + auto* clone = new ClientRpcContextStreamingImpl(stub_, req_, next_issue_, + start_req_, callback_); + clone->StartInternal(cq, messages_per_stream_); } private: @@ -395,7 +412,9 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { WAIT, READY_TO_WRITE, WRITE_DONE, - READ_DONE + READ_DONE, + WRITES_DONE_DONE, + FINISH_DONE }; State next_state_; std::function<void(grpc::Status, ResponseType*)> callback_; @@ -408,6 +427,18 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> stream_; + + // Allow a limit on number of messages in a stream + int messages_per_stream_; + int messages_issued_; + + void StartInternal(CompletionQueue* cq, int messages_per_stream) { + cq_ = cq; + next_state_ = State::STREAM_IDLE; + stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); + messages_per_stream_ = messages_per_stream; + messages_issued_ = 0; + } }; class AsyncStreamingClient final @@ -459,13 +490,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { next_issue_(next_issue), start_req_(start_req) {} ~ClientRpcContextGenericStreamingImpl() override {} - void Start(CompletionQueue* cq) override { - cq_ = cq; - const grpc::string kMethodName( - "/grpc.testing.BenchmarkService/StreamingCall"); - stream_ = start_req_(stub_, &context_, kMethodName, cq, - ClientRpcContext::tag(this)); - next_state_ = State::STREAM_IDLE; + void Start(CompletionQueue* cq, const ClientConfig& config) override { + StartInternal(cq, config.messages_per_stream()); } bool RunNextState(bool ok, HistogramEntry* entry) override { while (true) { @@ -478,9 +504,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { } break; // loop around, don't return case State::WAIT: + next_state_ = State::READY_TO_WRITE; alarm_.reset( new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); - next_state_ = State::READY_TO_WRITE; return true; case State::READY_TO_WRITE: if (!ok) { @@ -501,17 +527,32 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { case State::READ_DONE: entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); + if ((messages_per_stream_ != 0) && + (++messages_issued_ >= messages_per_stream_)) { + next_state_ = State::WRITES_DONE_DONE; + stream_->WritesDone(ClientRpcContext::tag(this)); + return true; + } next_state_ = State::STREAM_IDLE; break; // loop around + case State::WRITES_DONE_DONE: + next_state_ = State::FINISH_DONE; + stream_->Finish(&status_, ClientRpcContext::tag(this)); + return true; + case State::FINISH_DONE: + next_state_ = State::INVALID; + return false; + break; default: GPR_ASSERT(false); return false; } } } - ClientRpcContext* StartNewClone() override { - return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_, - start_req_, callback_); + void StartNewClone(CompletionQueue* cq) override { + auto* clone = new ClientRpcContextGenericStreamingImpl( + stub_, req_, next_issue_, start_req_, callback_); + clone->StartInternal(cq, messages_per_stream_); } private: @@ -527,7 +568,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { WAIT, READY_TO_WRITE, WRITE_DONE, - READ_DONE + READ_DONE, + WRITES_DONE_DONE, + FINISH_DONE }; State next_state_; std::function<void(grpc::Status, ByteBuffer*)> callback_; @@ -539,6 +582,21 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { grpc::Status status_; double start_; std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; + + // Allow a limit on number of messages in a stream + int messages_per_stream_; + int messages_issued_; + + void StartInternal(CompletionQueue* cq, int messages_per_stream) { + cq_ = cq; + const grpc::string kMethodName( + "/grpc.testing.BenchmarkService/StreamingCall"); + next_state_ = State::STREAM_IDLE; + stream_ = start_req_(stub_, &context_, kMethodName, cq, + ClientRpcContext::tag(this)); + messages_per_stream_ = messages_per_stream; + messages_issued_ = 0; + } }; static std::unique_ptr<grpc::GenericStub> GenericStubCreator( diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index a944c45496..f8ce2cccbe 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -142,24 +142,33 @@ class SynchronousStreamingClient final : public SynchronousClient { SynchronousStreamingClient(const ClientConfig& config) : SynchronousClient(config), context_(num_threads_), - stream_(num_threads_) { + stream_(num_threads_), + messages_per_stream_(config.messages_per_stream()), + messages_issued_(num_threads_) { for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + messages_issued_[thread_idx] = 0; } StartThreads(num_threads_); } ~SynchronousStreamingClient() { + std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i, - s.error_message().c_str()); + cleanup_threads.emplace_back([this, i]() { + auto stream = &stream_[i]; + if (*stream) { + (*stream)->WritesDone(); + Status s = (*stream)->Finish(); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i, + s.error_message().c_str()); + } } - } + }); + } + for (size_t i = 0; i < num_threads_; i++) { + cleanup_threads[i].join(); } } @@ -173,11 +182,19 @@ class SynchronousStreamingClient final : public SynchronousClient { stream_[thread_idx]->Read(&responses_[thread_idx])) { entry->set_value((UsageTimer::Now() - start) * 1e9); // don't set the status since there isn't one yet - return true; + if ((messages_per_stream_ != 0) && + (++messages_issued_[thread_idx] < messages_per_stream_)) { + return true; + } else if (messages_per_stream_ == 0) { + return true; + } else { + // Fall through to the below resetting code after finish + } } stream_[thread_idx]->WritesDone(); Status s = stream_[thread_idx]->Finish(); - // don't set the value since the stream is failed and shouldn't be timed + // don't set the value since this is either a failure (shouldn't be timed) + // or a stream-end (already has been timed) entry->set_status(s.error_code()); if (!s.ok()) { gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, @@ -187,6 +204,7 @@ class SynchronousStreamingClient final : public SynchronousClient { context_[thread_idx].~ClientContext(); new (&context_[thread_idx]) ClientContext(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + messages_issued_[thread_idx] = 0; return true; } @@ -197,6 +215,8 @@ class SynchronousStreamingClient final : public SynchronousClient { std::vector< std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_; + const int messages_per_stream_; + std::vector<int> messages_issued_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient( diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index e72d30a4ef..dd32a16c87 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -36,7 +36,7 @@ #include <memory> -#include "src/proto/grpc/testing/control.grpc.pb.h" +#include "src/proto/grpc/testing/control.pb.h" #include "test/cpp/qps/histogram.h" namespace grpc { diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index acb415f0a1..470a394301 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -35,7 +35,7 @@ #define TEST_QPS_HISTOGRAM_H #include <grpc/support/histogram.h> -#include "src/proto/grpc/testing/stats.grpc.pb.h" +#include "src/proto/grpc/testing/stats.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index ddaaa7ca75..a906137474 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -31,6 +31,7 @@ * */ +#include <iostream> #include <memory> #include <set> @@ -39,10 +40,10 @@ #include <gflags/gflags.h> #include <grpc/support/log.h> +#include "test/cpp/qps/benchmark_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/parse_json.h" #include "test/cpp/qps/report.h" -#include "test/cpp/util/benchmark_config.h" DEFINE_string(scenarios_file, "", "JSON file containing an array of Scenario objects"); diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index 70e2709ac0..28b396739f 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -36,9 +36,9 @@ #include <grpc/support/log.h> #include "test/core/util/test_config.h" +#include "test/cpp/qps/benchmark_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" -#include "test/cpp/util/benchmark_config.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc index f94ea0cb49..7c4e2cfd3e 100644 --- a/test/cpp/qps/qps_test.cc +++ b/test/cpp/qps/qps_test.cc @@ -35,9 +35,9 @@ #include <grpc/support/log.h> +#include "test/cpp/qps/benchmark_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" -#include "test/cpp/util/benchmark_config.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index 7f84816421..a9130bf5d4 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -40,6 +40,9 @@ #include "test/cpp/qps/parse_json.h" #include "test/cpp/qps/stats.h" +#include <grpc++/client_context.h> +#include "src/proto/grpc/testing/services.grpc.pb.h" + namespace grpc { namespace testing { @@ -142,5 +145,37 @@ void JsonReporter::ReportCpuUsage(const ScenarioResult& result) { // NOP - all reporting is handled by ReportQPS. } +void RpcReporter::ReportQPS(const ScenarioResult& result) { + grpc::ClientContext context; + grpc::Status status; + Void dummy; + + gpr_log(GPR_INFO, "RPC reporter sending scenario result to server"); + status = stub_->ReportScenario(&context, result, &dummy); + + if (status.ok()) { + gpr_log(GPR_INFO, "RpcReporter report RPC success!"); + } else { + gpr_log(GPR_ERROR, "RpcReporter report RPC: code: %d. message: %s", + status.error_code(), status.error_message().c_str()); + } +} + +void RpcReporter::ReportQPSPerCore(const ScenarioResult& result) { + // NOP - all reporting is handled by ReportQPS. +} + +void RpcReporter::ReportLatency(const ScenarioResult& result) { + // NOP - all reporting is handled by ReportQPS. +} + +void RpcReporter::ReportTimes(const ScenarioResult& result) { + // NOP - all reporting is handled by ReportQPS. +} + +void RpcReporter::ReportCpuUsage(const ScenarioResult& result) { + // NOP - all reporting is handled by ReportQPS. +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h index faf87ff060..1749be98c6 100644 --- a/test/cpp/qps/report.h +++ b/test/cpp/qps/report.h @@ -42,6 +42,9 @@ #include "test/cpp/qps/driver.h" +#include <grpc++/channel.h> +#include "src/proto/grpc/testing/services.grpc.pb.h" + namespace grpc { namespace testing { @@ -124,6 +127,21 @@ class JsonReporter : public Reporter { const string report_file_; }; +class RpcReporter : public Reporter { + public: + RpcReporter(const string& name, std::shared_ptr<grpc::Channel> channel) + : Reporter(name), stub_(ReportQpsScenarioService::NewStub(channel)) {} + + private: + void ReportQPS(const ScenarioResult& result) override; + void ReportQPSPerCore(const ScenarioResult& result) override; + void ReportLatency(const ScenarioResult& result) override; + void ReportTimes(const ScenarioResult& result) override; + void ReportCpuUsage(const ScenarioResult& result) override; + + std::unique_ptr<ReportQpsScenarioService::Stub> stub_; +}; + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc index d0c47d102a..509d9f89c3 100644 --- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc @@ -35,9 +35,9 @@ #include <grpc/support/log.h> +#include "test/cpp/qps/benchmark_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" -#include "test/cpp/util/benchmark_config.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 821d5935be..8fbf37a095 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -38,8 +38,8 @@ #include <grpc/support/cpu.h> #include <vector> -#include "src/proto/grpc/testing/control.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/control.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/cpp/qps/usage_timer.h" diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b3a06aeaf5..b499b82091 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -103,24 +103,25 @@ class AsyncQpsServerTest final : public grpc::testing::Server { server_ = builder.BuildAndStart(); - using namespace std::placeholders; - auto process_rpc_bound = - std::bind(process_rpc, config.payload_config(), _1, _2); + std::bind(process_rpc, config.payload_config(), std::placeholders::_1, + std::placeholders::_2); for (int i = 0; i < 15000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { - auto request_unary = - std::bind(request_unary_function, &async_service_, _1, _2, _3, - srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + auto request_unary = std::bind( + request_unary_function, &async_service_, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_4); contexts_.emplace_back( new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); } if (request_streaming_function) { - auto request_streaming = - std::bind(request_streaming_function, &async_service_, _1, _2, - srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + auto request_streaming = std::bind( + request_streaming_function, &async_service_, + std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_3); contexts_.emplace_back(new ServerRpcContextStreamingImpl( request_streaming, process_rpc_bound)); } @@ -234,18 +235,17 @@ class AsyncQpsServerTest final : public grpc::testing::Server { return false; } - ResponseType response; - // Call the RPC processing function - grpc::Status status = invoke_method_(&req_, &response); + grpc::Status status = invoke_method_(&req_, &response_); // Have the response writer work and invoke on_finish when done next_state_ = &ServerRpcContextUnaryImpl::finisher; - response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this)); + response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this)); return true; } std::unique_ptr<ServerContextType> srv_ctx_; RequestType req_; + ResponseType response_; bool (ServerRpcContextUnaryImpl::*next_state_)(bool); std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> @@ -297,11 +297,10 @@ class AsyncQpsServerTest final : public grpc::testing::Server { bool read_done(bool ok) { if (ok) { // invoke the method - ResponseType response; // Call the RPC processing function - grpc::Status status = invoke_method_(&req_, &response); + grpc::Status status = invoke_method_(&req_, &response_); // initiate the write - stream_.Write(response, AsyncQpsServerTest::tag(this)); + stream_.Write(response_, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::write_done; } else { // client has sent writes done // finish the stream @@ -325,6 +324,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::unique_ptr<ServerContextType> srv_ctx_; RequestType req_; + ResponseType response_; bool (ServerRpcContextStreamingImpl::*next_state_)(bool); std::function<void( ServerContextType *, diff --git a/test/cpp/util/BUILD b/test/cpp/util/BUILD index dc90a4e172..c83f89eb90 100644 --- a/test/cpp/util/BUILD +++ b/test/cpp/util/BUILD @@ -29,6 +29,15 @@ licenses(["notice"]) # 3-clause BSD +# The following builds a shared-object to confirm that grpc++_unsecure +# builds properly. Build-only is sufficient here +cc_binary( + name = "testso.so", + srcs = [], + deps = ["//:grpc++_unsecure"], + linkshared = 1, +) + cc_library( name = "test_config", srcs = [ @@ -62,7 +71,6 @@ cc_library( cc_library( name = "test_util", srcs = [ - # "test/cpp/end2end/test_service_impl.cc", "byte_buffer_proto_helper.cc", "create_test_channel.cc", "string_ref_helper.cc", @@ -83,3 +91,18 @@ cc_library( "//test/core/util:gpr_test_util", ], ) + +cc_test( + name = "error_details_test", + srcs = [ + "error_details_test.cc", + ], + deps = [ + "//:grpc++_error_details", + "//external:gtest", + "//src/proto/grpc/testing:echo_messages_proto", + ], +) + + + diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc new file mode 100644 index 0000000000..d01fd3b087 --- /dev/null +++ b/test/cpp/util/error_details_test.cc @@ -0,0 +1,120 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/support/error_details.h> +#include <gtest/gtest.h> + +#include "src/proto/grpc/status/status.pb.h" +#include "src/proto/grpc/testing/echo_messages.pb.h" + +namespace grpc { +namespace { + +TEST(ExtractTest, Success) { + google::rpc::Status expected; + expected.set_code(13); // INTERNAL + expected.set_message("I am an error message"); + testing::EchoRequest expected_details; + expected_details.set_message(grpc::string(100, '\0')); + expected.add_details()->PackFrom(expected_details); + + google::rpc::Status to; + grpc::string error_details = expected.SerializeAsString(); + Status from(static_cast<StatusCode>(expected.code()), expected.message(), + error_details); + EXPECT_TRUE(ExtractErrorDetails(from, &to).ok()); + EXPECT_EQ(expected.code(), to.code()); + EXPECT_EQ(expected.message(), to.message()); + EXPECT_EQ(1, to.details_size()); + testing::EchoRequest details; + to.details(0).UnpackTo(&details); + EXPECT_EQ(expected_details.message(), details.message()); +} + +TEST(ExtractTest, NullInput) { + EXPECT_EQ(StatusCode::FAILED_PRECONDITION, + ExtractErrorDetails(Status(), nullptr).error_code()); +} + +TEST(ExtractTest, Unparsable) { + grpc::string error_details("I am not a status object"); + Status from(StatusCode::INTERNAL, "", error_details); + google::rpc::Status to; + EXPECT_EQ(StatusCode::INVALID_ARGUMENT, + ExtractErrorDetails(from, &to).error_code()); +} + +TEST(SetTest, Success) { + google::rpc::Status expected; + expected.set_code(13); // INTERNAL + expected.set_message("I am an error message"); + testing::EchoRequest expected_details; + expected_details.set_message(grpc::string(100, '\0')); + expected.add_details()->PackFrom(expected_details); + + Status to; + Status s = SetErrorDetails(expected, &to); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(expected.code(), to.error_code()); + EXPECT_EQ(expected.message(), to.error_message()); + EXPECT_EQ(expected.SerializeAsString(), to.error_details()); +} + +TEST(SetTest, NullInput) { + EXPECT_EQ(StatusCode::FAILED_PRECONDITION, + SetErrorDetails(google::rpc::Status(), nullptr).error_code()); +} + +TEST(SetTest, OutOfScopeErrorCode) { + google::rpc::Status expected; + expected.set_code(20); // Out of scope (DATA_LOSS is 15). + expected.set_message("I am an error message"); + testing::EchoRequest expected_details; + expected_details.set_message(grpc::string(100, '\0')); + expected.add_details()->PackFrom(expected_details); + + Status to; + Status s = SetErrorDetails(expected, &to); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(StatusCode::UNKNOWN, to.error_code()); + EXPECT_EQ(expected.message(), to.error_message()); + EXPECT_EQ(expected.SerializeAsString(), to.error_details()); +} + +} // namespace +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc index 856cd32c3c..c7acb7c631 100644 --- a/test/cpp/util/grpc_tool.cc +++ b/test/cpp/util/grpc_tool.cc @@ -321,6 +321,7 @@ bool GrpcTool::ListServices(int argc, const char** argv, std::vector<grpc::string> service_list; if (!desc_db.GetServices(&service_list)) { + fprintf(stderr, "Received an error when querying services endpoint.\n"); return false; } |