diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-12-26 15:10:41 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-12-26 15:10:41 -0800 |
commit | 24e37e249a4db24ff2c886960e3a00311e2591dd (patch) | |
tree | 7bcb4a002432e7ef04054750b1efe8dfd9ba8ade /test/cpp/end2end | |
parent | 0911e489e3fe22e2ca5d7c927dac83358f2f05b7 (diff) | |
parent | fc7d0911a3a44d7bc926d3db99b7300a0c0f33dc (diff) |
Merge branch 'master' into failhijackedrecv
Diffstat (limited to 'test/cpp/end2end')
31 files changed, 1357 insertions, 306 deletions
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 4e3d841db0..762d2302af 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -36,6 +36,18 @@ grpc_cc_library( ) grpc_cc_library( + name = "test_health_check_service_impl", + testonly = True, + srcs = ["test_health_check_service_impl.cc"], + hdrs = ["test_health_check_service_impl.h"], + deps = [ + "//:grpc", + "//:grpc++", + "//src/proto/grpc/health/v1:health_proto", + ], +) + +grpc_cc_library( name = "interceptors_util", testonly = True, srcs = ["interceptors_util.cc"], @@ -63,7 +75,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -85,7 +96,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -106,7 +116,6 @@ grpc_cc_binary( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -125,7 +134,7 @@ grpc_cc_test( "//:grpc++", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", - "//test/core/util:gpr_test_util", + "//src/proto/grpc/testing:simple_messages_proto", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -145,7 +154,6 @@ grpc_cc_test( "//:grpc++", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -167,7 +175,6 @@ grpc_cc_library( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -188,7 +195,6 @@ grpc_cc_test( "//src/proto/grpc/channelz:channelz_proto", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -206,7 +212,6 @@ grpc_cc_test( "//:grpc++", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -231,7 +236,6 @@ grpc_cc_test( "//:grpc++", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -250,7 +254,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -269,7 +272,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -282,6 +284,7 @@ grpc_cc_test( "gtest", ], deps = [ + ":test_health_check_service_impl", ":test_service_impl", "//:gpr", "//:grpc", @@ -290,7 +293,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -310,7 +312,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -330,7 +331,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -351,7 +351,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -369,7 +368,6 @@ grpc_cc_test( "//:grpc++", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -389,7 +387,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -412,7 +409,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -434,7 +430,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:grpc++_proto_reflection_desc_db", "//test/cpp/util:test_util", @@ -455,7 +450,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -477,7 +471,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -498,7 +491,6 @@ grpc_cc_binary( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -518,7 +510,6 @@ grpc_cc_test( "//:grpc++", "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -551,7 +542,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -570,7 +560,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], @@ -589,7 +578,6 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", ], diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 6ecb957801..e09f54dcc3 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -1884,7 +1884,7 @@ int main(int argc, char** argv) { // Change the backup poll interval from 5s to 100ms to speed up the // ReconnectChannel test gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "100"); - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); int ret = RUN_ALL_TESTS(); return ret; diff --git a/test/cpp/end2end/channelz_service_test.cc b/test/cpp/end2end/channelz_service_test.cc index f04ffe4f2d..425334d972 100644 --- a/test/cpp/end2end/channelz_service_test.cc +++ b/test/cpp/end2end/channelz_service_test.cc @@ -54,6 +54,14 @@ using grpc::channelz::v1::GetSubchannelResponse; using grpc::channelz::v1::GetTopChannelsRequest; using grpc::channelz::v1::GetTopChannelsResponse; +// This code snippet can be used to print out any responses for +// visual debugging. +// +// +// string out_str; +// google::protobuf::TextFormat::PrintToString(resp, &out_str); +// std::cout << "resp: " << out_str << "\n"; + namespace grpc { namespace testing { namespace { @@ -164,6 +172,19 @@ class ChannelzServerTest : public ::testing::Test { echo_stub_ = grpc::testing::EchoTestService::NewStub(channel); } + std::unique_ptr<grpc::testing::EchoTestService::Stub> NewEchoStub() { + static int salt = 0; + string target = "dns:localhost:" + to_string(proxy_port_); + ChannelArguments args; + // disable channelz. We only want to focus on proxy to backend outbound. + args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0); + // This ensures that gRPC will not do connection sharing. + args.SetInt("salt", salt++); + std::shared_ptr<Channel> channel = + CreateCustomChannel(target, InsecureChannelCredentials(), args); + return grpc::testing::EchoTestService::NewStub(channel); + } + void SendSuccessfulEcho(int channel_idx) { EchoRequest request; EchoResponse response; @@ -651,6 +672,67 @@ TEST_F(ChannelzServerTest, GetServerSocketsTest) { EXPECT_EQ(get_server_sockets_response.socket_ref_size(), 1); } +TEST_F(ChannelzServerTest, GetServerSocketsPaginationTest) { + ResetStubs(); + ConfigureProxy(1); + std::vector<std::unique_ptr<grpc::testing::EchoTestService::Stub>> stubs; + const int kNumServerSocketsCreated = 20; + for (int i = 0; i < kNumServerSocketsCreated; ++i) { + stubs.push_back(NewEchoStub()); + EchoRequest request; + EchoResponse response; + request.set_message("Hello channelz"); + request.mutable_param()->set_backend_channel_idx(0); + ClientContext context; + Status s = stubs.back()->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + } + GetServersRequest get_server_request; + GetServersResponse get_server_response; + get_server_request.set_start_server_id(0); + ClientContext get_server_context; + Status s = channelz_stub_->GetServers(&get_server_context, get_server_request, + &get_server_response); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + EXPECT_EQ(get_server_response.server_size(), 1); + // Make a request that gets all of the serversockets + { + GetServerSocketsRequest get_server_sockets_request; + GetServerSocketsResponse get_server_sockets_response; + get_server_sockets_request.set_server_id( + get_server_response.server(0).ref().server_id()); + get_server_sockets_request.set_start_socket_id(0); + ClientContext get_server_sockets_context; + s = channelz_stub_->GetServerSockets(&get_server_sockets_context, + get_server_sockets_request, + &get_server_sockets_response); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + // We add one to account the the channelz stub that will end up creating + // a serversocket. + EXPECT_EQ(get_server_sockets_response.socket_ref_size(), + kNumServerSocketsCreated + 1); + EXPECT_TRUE(get_server_sockets_response.end()); + } + // Now we make a request that exercises pagination. + { + GetServerSocketsRequest get_server_sockets_request; + GetServerSocketsResponse get_server_sockets_response; + get_server_sockets_request.set_server_id( + get_server_response.server(0).ref().server_id()); + get_server_sockets_request.set_start_socket_id(0); + const int kMaxResults = 10; + get_server_sockets_request.set_max_results(kMaxResults); + ClientContext get_server_sockets_context; + s = channelz_stub_->GetServerSockets(&get_server_sockets_context, + get_server_sockets_request, + &get_server_sockets_response); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + EXPECT_EQ(get_server_sockets_response.socket_ref_size(), kMaxResults); + EXPECT_FALSE(get_server_sockets_response.end()); + } +} + TEST_F(ChannelzServerTest, GetServerListenSocketsTest) { ResetStubs(); ConfigureProxy(1); @@ -677,7 +759,7 @@ TEST_F(ChannelzServerTest, GetServerListenSocketsTest) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 7ffc610ce2..a999321992 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -34,6 +34,7 @@ #include "test/core/util/test_config.h" #include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/byte_buffer_proto_helper.h" +#include "test/cpp/util/string_ref_helper.h" #include <gtest/gtest.h> @@ -100,11 +101,13 @@ class ClientCallbackEnd2endTest test_string += "Hello world. "; request.set_message(test_string); - + grpc::string val; if (with_binary_metadata) { + request.mutable_param()->set_echo_metadata(true); char bytes[8] = {'\0', '\1', '\2', '\3', '\4', '\5', '\6', static_cast<char>(i)}; - cli_ctx.AddMetadata("custom-bin", grpc::string(bytes, 8)); + val = grpc::string(bytes, 8); + cli_ctx.AddMetadata("custom-bin", val); } cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); @@ -114,10 +117,18 @@ class ClientCallbackEnd2endTest bool done = false; stub_->experimental_async()->Echo( &cli_ctx, &request, &response, - [&request, &response, &done, &mu, &cv](Status s) { + [&cli_ctx, &request, &response, &done, &mu, &cv, val, + with_binary_metadata](Status s) { GPR_ASSERT(s.ok()); EXPECT_EQ(request.message(), response.message()); + if (with_binary_metadata) { + EXPECT_EQ( + 1u, cli_ctx.GetServerTrailingMetadata().count("custom-bin")); + EXPECT_EQ(val, ToString(cli_ctx.GetServerTrailingMetadata() + .find("custom-bin") + ->second)); + } std::lock_guard<std::mutex> l(mu); done = true; cv.notify_one(); @@ -171,6 +182,67 @@ class ClientCallbackEnd2endTest } } + void SendGenericEchoAsBidi(int num_rpcs, int reuses) { + const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + test_string += "Hello world. "; + class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer, + ByteBuffer> { + public: + Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name, + const grpc::string& test_str, int reuses) + : reuses_remaining_(reuses) { + activate_ = [this, test, method_name, test_str] { + if (reuses_remaining_ > 0) { + cli_ctx_.reset(new ClientContext); + reuses_remaining_--; + test->generic_stub_->experimental().PrepareBidiStreamingCall( + cli_ctx_.get(), method_name, this); + request_.set_message(test_str); + send_buf_ = SerializeToByteBuffer(&request_); + StartWrite(send_buf_.get()); + StartRead(&recv_buf_); + StartCall(); + } else { + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + }; + activate_(); + } + void OnWriteDone(bool ok) override { StartWritesDone(); } + void OnReadDone(bool ok) override { + EchoResponse response; + EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); + EXPECT_EQ(request_.message(), response.message()); + }; + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + activate_(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + EchoRequest request_; + std::unique_ptr<ByteBuffer> send_buf_; + ByteBuffer recv_buf_; + std::unique_ptr<ClientContext> cli_ctx_; + int reuses_remaining_; + std::function<void()> activate_; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } rpc{this, kMethodName, test_string, reuses}; + + rpc.Await(); + } + } bool is_server_started_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; @@ -190,6 +262,37 @@ TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { SendRpcs(10, false); } +TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) { + ResetStub(); + SimpleRequest request; + SimpleResponse response; + ClientContext cli_ctx; + + cli_ctx.AddMetadata(kCheckClientInitialMetadataKey, + kCheckClientInitialMetadataVal); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->CheckClientInitialMetadata( + &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) { + GPR_ASSERT(s.ok()); + + std::lock_guard<std::mutex> l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock<std::mutex> l(mu); + while (!done) { + cv.wait(l); + } +} + +TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) { + ResetStub(); + SendRpcs(1, true); +} + TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { ResetStub(); SendRpcs(10, true); @@ -200,6 +303,16 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { SendRpcsGeneric(10, false); } +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { + ResetStub(); + SendGenericEchoAsBidi(10, 1); +} + +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) { + ResetStub(); + SendGenericEchoAsBidi(10, 10); +} + #if GRPC_ALLOW_EXCEPTIONS TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { ResetStub(); @@ -256,6 +369,156 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { } } +TEST_P(ClientCallbackEnd2endTest, RequestStream) { + ResetStub(); + class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + context_.set_initial_metadata_corked(true); + stub->experimental_async()->RequestStream(&context_, &response_, this); + StartCall(); + request_.set_message("Hello server."); + StartWrite(&request_); + } + void OnWriteDone(bool ok) override { + writes_left_--; + if (writes_left_ > 1) { + StartWrite(&request_); + } else if (writes_left_ == 1) { + StartWriteLast(&request_, WriteOptions()); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server."); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int writes_left_{3}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + +TEST_P(ClientCallbackEnd2endTest, ResponseStream) { + ResetStub(); + class Client : public grpc::experimental::ClientReadReactor<EchoResponse> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + request_.set_message("Hello client "); + stub->experimental_async()->ResponseStream(&context_, &request_, this); + StartCall(); + StartRead(&response_); + } + void OnReadDone(bool ok) override { + if (!ok) { + EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); + } else { + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + EXPECT_EQ(response_.message(), + request_.message() + grpc::to_string(reads_complete_)); + reads_complete_++; + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int reads_complete_{0}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + +TEST_P(ClientCallbackEnd2endTest, BidiStream) { + ResetStub(); + class Client : public grpc::experimental::ClientBidiReactor<EchoRequest, + EchoResponse> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + request_.set_message("Hello fren "); + stub->experimental_async()->BidiStream(&context_, this); + StartCall(); + StartRead(&response_); + StartWrite(&request_); + } + void OnReadDone(bool ok) override { + if (!ok) { + EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); + } else { + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + EXPECT_EQ(response_.message(), request_.message()); + reads_complete_++; + StartRead(&response_); + } + } + void OnWriteDone(bool ok) override { + EXPECT_TRUE(ok); + if (++writes_complete_ == kServerDefaultResponseStreamsToSend) { + StartWritesDone(); + } else { + StartWrite(&request_); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int reads_complete_{0}; + int writes_complete_{0}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}}; INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, @@ -266,7 +529,7 @@ INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/client_crash_test.cc b/test/cpp/end2end/client_crash_test.cc index 2a06f44c60..992f3c488f 100644 --- a/test/cpp/end2end/client_crash_test.cc +++ b/test/cpp/end2end/client_crash_test.cc @@ -135,7 +135,7 @@ int main(int argc, char** argv) { g_root = "."; } - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); // Order seems to matter on these tests: run three times to eliminate that for (int i = 0; i < 3; i++) { diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 4d4760a652..d96191737e 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -23,11 +23,11 @@ #include <grpcpp/client_context.h> #include <grpcpp/create_channel.h> #include <grpcpp/generic/generic_stub.h> -#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/proto_utils.h> #include <grpcpp/server.h> #include <grpcpp/server_builder.h> #include <grpcpp/server_context.h> +#include <grpcpp/support/client_interceptor.h> #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -50,6 +50,7 @@ class HijackingInterceptor : public experimental::Interceptor { info_ = info; // Make sure it is the right method EXPECT_EQ(strcmp("/grpc.testing.EchoTestService/Echo", info->method()), 0); + EXPECT_EQ(info->type(), experimental::ClientRpcInfo::Type::UNARY); } virtual void Intercept(experimental::InterceptorBatchMethods* methods) { @@ -329,7 +330,7 @@ class ServerStreamingRpcHijackingInterceptor experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) { // Only the last message will be a failure EXPECT_FALSE(got_failed_message_); - got_failed_message_ = !methods->GetRecvMessageStatus(); + got_failed_message_ = methods->GetRecvMessage() == nullptr; } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_RECV_STATUS)) { @@ -458,15 +459,13 @@ class ClientInterceptorsEnd2endTest : public ::testing::Test { TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLoggingTest) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); - creators->push_back(std::unique_ptr<LoggingInterceptorFactory>( + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<LoggingInterceptorFactory>( new LoggingInterceptorFactory())); // Add 20 dummy interceptors for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -479,20 +478,19 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLoggingTest) { TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingTest) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; // Add 20 dummy interceptors before hijacking interceptor + creators.reserve(20); for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } - creators->push_back(std::unique_ptr<HijackingInterceptorFactory>( + creators.push_back(std::unique_ptr<HijackingInterceptorFactory>( new HijackingInterceptorFactory())); // Add 20 dummy interceptors after hijacking interceptor for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -505,13 +503,11 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingTest) { TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLogThenHijackTest) { ChannelArguments args; - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); - creators->push_back(std::unique_ptr<LoggingInterceptorFactory>( + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<LoggingInterceptorFactory>( new LoggingInterceptorFactory())); - creators->push_back(std::unique_ptr<HijackingInterceptorFactory>( + creators.push_back(std::unique_ptr<HijackingInterceptorFactory>( new HijackingInterceptorFactory())); auto channel = experimental::CreateCustomChannelWithInterceptors( server_address_, InsecureChannelCredentials(), args, std::move(creators)); @@ -523,21 +519,20 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingMakesAnotherCallTest) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; // Add 5 dummy interceptors before hijacking interceptor + creators.reserve(5); for (auto i = 0; i < 5; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } - creators->push_back( + creators.push_back( std::unique_ptr<experimental::ClientInterceptorFactoryInterface>( new HijackingInterceptorMakesAnotherCallFactory())); // Add 7 dummy interceptors after hijacking interceptor for (auto i = 0; i < 7; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = server_->experimental().InProcessChannelWithInterceptors( @@ -553,15 +548,13 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLoggingTestWithCallback) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); - creators->push_back(std::unique_ptr<LoggingInterceptorFactory>( + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<LoggingInterceptorFactory>( new LoggingInterceptorFactory())); // Add 20 dummy interceptors for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = server_->experimental().InProcessChannelWithInterceptors( @@ -571,6 +564,28 @@ TEST_F(ClientInterceptorsEnd2endTest, EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } +TEST_F(ClientInterceptorsEnd2endTest, + ClientInterceptorFactoryAllowsNullptrReturn) { + ChannelArguments args; + DummyInterceptor::Reset(); + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<LoggingInterceptorFactory>( + new LoggingInterceptorFactory())); + // Add 20 dummy interceptors and 20 null interceptors + for (auto i = 0; i < 20; i++) { + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( + new DummyInterceptorFactory())); + creators.push_back( + std::unique_ptr<NullInterceptorFactory>(new NullInterceptorFactory())); + } + auto channel = server_->experimental().InProcessChannelWithInterceptors( + args, std::move(creators)); + MakeCallbackCall(channel); + // Make sure all 20 dummy interceptors were run + EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); +} + class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test { protected: ClientInterceptorsStreamingEnd2endTest() { @@ -593,15 +608,13 @@ class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test { TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientStreamingTest) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); - creators->push_back(std::unique_ptr<LoggingInterceptorFactory>( + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<LoggingInterceptorFactory>( new LoggingInterceptorFactory())); // Add 20 dummy interceptors for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -614,15 +627,13 @@ TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientStreamingTest) { TEST_F(ClientInterceptorsStreamingEnd2endTest, ServerStreamingTest) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); - creators->push_back(std::unique_ptr<LoggingInterceptorFactory>( + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<LoggingInterceptorFactory>( new LoggingInterceptorFactory())); // Add 20 dummy interceptors for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -635,11 +646,9 @@ TEST_F(ClientInterceptorsStreamingEnd2endTest, ServerStreamingTest) { TEST_F(ClientInterceptorsStreamingEnd2endTest, ServerStreamingHijackingTest) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); - creators->push_back( + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back( std::unique_ptr<ServerStreamingRpcHijackingInterceptorFactory>( new ServerStreamingRpcHijackingInterceptorFactory())); auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -651,15 +660,13 @@ TEST_F(ClientInterceptorsStreamingEnd2endTest, ServerStreamingHijackingTest) { TEST_F(ClientInterceptorsStreamingEnd2endTest, BidiStreamingTest) { ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); - creators->push_back(std::unique_ptr<LoggingInterceptorFactory>( + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<LoggingInterceptorFactory>( new LoggingInterceptorFactory())); // Add 20 dummy interceptors for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -696,13 +703,12 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, DummyGlobalInterceptor) { experimental::RegisterGlobalClientInterceptorFactory(&global_factory); ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; // Add 20 dummy interceptors + creators.reserve(20); for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -723,13 +729,12 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, LoggingGlobalInterceptor) { experimental::RegisterGlobalClientInterceptorFactory(&global_factory); ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; // Add 20 dummy interceptors + creators.reserve(20); for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -750,13 +755,12 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, HijackingGlobalInterceptor) { experimental::RegisterGlobalClientInterceptorFactory(&global_factory); ChannelArguments args; DummyInterceptor::Reset(); - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; // Add 20 dummy interceptors + creators.reserve(20); for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( @@ -774,7 +778,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, HijackingGlobalInterceptor) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 9218c85717..929c2bb589 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -35,7 +35,9 @@ #include <grpcpp/server.h> #include <grpcpp/server_builder.h> +#include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/gpr/env.h" @@ -116,7 +118,10 @@ class MyTestServiceImpl : public TestServiceImpl { class ClientLbEnd2endTest : public ::testing::Test { protected: ClientLbEnd2endTest() - : server_host_("localhost"), kRequestMessage_("Live long and prosper.") { + : server_host_("localhost"), + kRequestMessage_("Live long and prosper."), + creds_(new SecureChannelCredentials( + grpc_fake_transport_security_credentials_create())) { // Make the backup poller poll very frequently in order to pick up // updates from all the subchannels's FDs. gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); @@ -149,31 +154,29 @@ class ClientLbEnd2endTest : public ::testing::Test { void StartServers(size_t num_servers, std::vector<int> ports = std::vector<int>()) { - CreateServers(num_servers, ports); + CreateServers(num_servers, std::move(ports)); for (size_t i = 0; i < num_servers; ++i) { StartServer(i); } } grpc_channel_args* BuildFakeResults(const std::vector<int>& ports) { - grpc_lb_addresses* addresses = - grpc_lb_addresses_create(ports.size(), nullptr); - for (size_t i = 0; i < ports.size(); ++i) { + grpc_core::ServerAddressList addresses; + for (const int& port : ports) { char* lb_uri_str; - gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]); + gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port); grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); GPR_ASSERT(lb_uri != nullptr); - grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri, - false /* is balancer */, - "" /* balancer name */, nullptr); + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(lb_uri, &address)); + addresses.emplace_back(address.addr, address.len, nullptr /* args */); grpc_uri_destroy(lb_uri); gpr_free(lb_uri_str); } const grpc_arg fake_addresses = - grpc_lb_addresses_create_channel_arg(addresses); + CreateServerAddressListChannelArg(&addresses); grpc_channel_args* fake_results = grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1); - grpc_lb_addresses_destroy(addresses); return fake_results; } @@ -191,6 +194,11 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_channel_args_destroy(fake_results); } + void SetFailureOnReresolution() { + grpc_core::ExecCtx exec_ctx; + response_generator_->SetFailureOnReresolution(); + } + std::vector<int> GetServersPorts() { std::vector<int> ports; for (const auto& server : servers_) ports.push_back(server->port_); @@ -210,9 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test { } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); - std::shared_ptr<ChannelCredentials> creds(new SecureChannelCredentials( - grpc_fake_transport_security_credentials_create())); - return CreateCustomChannel("fake:///", std::move(creds), args); + return CreateCustomChannel("fake:///", creds_, args); } bool SendRpc( @@ -260,6 +266,7 @@ class ClientLbEnd2endTest : public ::testing::Test { MyTestServiceImpl service_; std::unique_ptr<std::thread> thread_; bool server_ready_ = false; + bool started_ = false; explicit ServerData(int port = 0) { port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); @@ -267,6 +274,7 @@ class ClientLbEnd2endTest : public ::testing::Test { void Start(const grpc::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); + started_ = true; std::mutex mu; std::unique_lock<std::mutex> lock(mu); std::condition_variable cond; @@ -292,9 +300,11 @@ class ClientLbEnd2endTest : public ::testing::Test { cond->notify_one(); } - void Shutdown(bool join = true) { + void Shutdown() { + if (!started_) return; server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); - if (join) thread_->join(); + thread_->join(); + started_ = false; } void SetServingStatus(const grpc::string& service, bool serving) { @@ -373,6 +383,7 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> response_generator_; const grpc::string kRequestMessage_; + std::shared_ptr<ChannelCredentials> creds_; }; TEST_F(ClientLbEnd2endTest, PickFirst) { @@ -417,6 +428,30 @@ TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) { CheckRpcSendOk(second_stub, DEBUG_LOCATION); } +TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 5000; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + // Create 2 servers, but start only the second one. + std::vector<int> ports = {grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die()}; + CreateServers(2, ports); + StartServer(1); + auto channel1 = BuildChannel("pick_first", args); + auto stub1 = BuildStub(channel1); + SetNextResolution(ports); + // Wait for second server to be ready. + WaitForServer(stub1, 1, DEBUG_LOCATION); + // Create a second channel with the same addresses. Its PF instance + // should immediately pick the second subchannel, since it's already + // in READY state. + auto channel2 = BuildChannel("pick_first", args); + SetNextResolution(ports); + // Check that the channel reports READY without waiting for the + // initial backoff. + EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */)); +} + TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { ChannelArguments args; constexpr int kInitialBackOffMs = 100; @@ -502,6 +537,51 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { EXPECT_LT(waited_ms, kInitialBackOffMs); } +TEST_F(ClientLbEnd2endTest, + PickFirstResetConnectionBackoffNextAttemptStartsImmediately) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 1000; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + const std::vector<int> ports = {grpc_pick_unused_port_or_die()}; + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + SetNextResolution(ports); + // Wait for connect, which should fail ~immediately, because the server + // is not up. + gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT"); + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Reset connection backoff. + // Note that the time at which the third attempt will be started is + // actually computed at this point, so we record the start time here. + gpr_log(GPR_INFO, "=== RESETTING BACKOFF"); + const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); + experimental::ChannelResetConnectionBackoff(channel.get()); + // Trigger a second connection attempt. This should also fail + // ~immediately, but the retry should be scheduled for + // kInitialBackOffMs instead of applying the multiplier. + gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT"); + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Bring up a server on the chosen port. + gpr_log(GPR_INFO, "=== STARTING BACKEND"); + StartServers(1, ports); + // Wait for connect. Should happen within kInitialBackOffMs. + // Give an extra 100ms to account for the time spent in the second and + // third connection attempts themselves (since what we really want to + // measure is the time between the two). As long as this is less than + // the 1.6x increase we would see if the backoff state was not reset + // properly, the test is still proving that the backoff was reset. + constexpr int kWaitMs = kInitialBackOffMs + 100; + gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT"); + EXPECT_TRUE(channel->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kWaitMs))); + const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC); + const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0)); + gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms); + EXPECT_LT(waited_ms, kWaitMs); +} + TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; @@ -728,6 +808,23 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { + // Start server, send RPC, and make sure channel is READY. + const int kNumServers = 1; + StartServers(kNumServers); + auto channel = BuildChannel(""); // pick_first is the default. + auto stub = BuildStub(channel); + SetNextResolution(GetServersPorts()); + CheckRpcSendOk(stub, DEBUG_LOCATION); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + // Stop server. Channel should go into state IDLE. + SetFailureOnReresolution(); + servers_[0]->Shutdown(); + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); + servers_.clear(); +} + TEST_F(ClientLbEnd2endTest, RoundRobin) { // Start servers and send one RPC per server. const int kNumServers = 3; @@ -877,7 +974,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { servers_[0]->service_.ResetCounters(); // Shutdown one of the servers to be sent in the update. - servers_[1]->Shutdown(false); + servers_[1]->Shutdown(); ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); SetNextResolution(ports); @@ -936,7 +1033,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { // Kill all servers gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******"); for (size_t i = 0; i < servers_.size(); ++i) { - servers_[i]->Shutdown(true); + servers_[i]->Shutdown(); } gpr_log(GPR_INFO, "****** SERVERS KILLED *******"); gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******"); @@ -984,7 +1081,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { } const auto pre_death = servers_[0]->service_.request_count(); // Kill the first server. - servers_[0]->Shutdown(true); + servers_[0]->Shutdown(); // Client request still succeed. May need retrying if RR had returned a pick // before noticing the change in the server's connectivity. while (!SendRpc(stub)) { @@ -1131,7 +1228,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); const auto result = RUN_ALL_TESTS(); return result; } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 4558437102..05cd4330c6 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -196,16 +196,18 @@ class TestServiceImplDupPkg class TestScenario { public: TestScenario(bool interceptors, bool proxy, bool inproc_stub, - const grpc::string& creds_type) + const grpc::string& creds_type, bool use_callback_server) : use_interceptors(interceptors), use_proxy(proxy), inproc(inproc_stub), - credentials_type(creds_type) {} + credentials_type(creds_type), + callback_server(use_callback_server) {} void Log() const; bool use_interceptors; bool use_proxy; bool inproc; const grpc::string credentials_type; + bool callback_server; }; static std::ostream& operator<<(std::ostream& out, @@ -214,6 +216,8 @@ static std::ostream& operator<<(std::ostream& out, << (scenario.use_interceptors ? "true" : "false") << ", use_proxy=" << (scenario.use_proxy ? "true" : "false") << ", inproc=" << (scenario.inproc ? "true" : "false") + << ", server_type=" + << (scenario.callback_server ? "callback" : "sync") << ", credentials='" << scenario.credentials_type << "'}"; } @@ -272,6 +276,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> creators; // Add 20 dummy server interceptors + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -279,7 +284,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { builder.experimental().SetInterceptorCreators(std::move(creators)); } builder.AddListeningPort(server_address_.str(), server_creds); - builder.RegisterService(&service_); + if (!GetParam().callback_server) { + builder.RegisterService(&service_); + } else { + builder.RegisterService(&callback_service_); + } builder.RegisterService("foo.test.youtube.com", &special_service_); builder.RegisterService(&dup_pkg_service_); @@ -361,6 +370,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { std::ostringstream server_address_; const int kMaxMessageSize_; TestServiceImpl service_; + CallbackTestServiceImpl callback_service_; TestServiceImpl special_service_; TestServiceImplDupPkg dup_pkg_service_; grpc::string user_agent_prefix_; @@ -1015,7 +1025,8 @@ TEST_P(End2endTest, DiffPackageServices) { EXPECT_TRUE(s.ok()); } -void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { +template <class ServiceType> +void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(delay_us, GPR_TIMESPAN))); while (!service->signal_client()) { @@ -1445,7 +1456,24 @@ TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); ClientContext context; - std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_); + std::thread cancel_thread; + if (!GetParam().callback_server) { + cancel_thread = std::thread( + [&context, this](int delay) { CancelRpc(&context, delay, &service_); }, + kCancelDelayUs); + // Note: the unusual pattern above (and below) is caused by a conflict + // between two sets of compiler expectations. clang allows const to be + // captured without mention, so there is no need to capture kCancelDelayUs + // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler + // in our tests requires an explicit capture even for const. We square this + // circle by passing the const value in as an argument to the lambda. + } else { + cancel_thread = std::thread( + [&context, this](int delay) { + CancelRpc(&context, delay, &callback_service_); + }, + kCancelDelayUs); + } Status s = stub_->Echo(&context, request, &response); cancel_thread.join(); EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); @@ -1837,10 +1865,12 @@ TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) { EXPECT_TRUE(s.ok()); } +// TODO(vjpai): refactor arguments into a struct if it makes sense std::vector<TestScenario> CreateTestScenarios(bool use_proxy, bool test_insecure, bool test_secure, - bool test_inproc) { + bool test_inproc, + bool test_callback_server) { std::vector<TestScenario> scenarios; std::vector<grpc::string> credentials_types; if (test_secure) { @@ -1856,41 +1886,48 @@ std::vector<TestScenario> CreateTestScenarios(bool use_proxy, if (test_insecure && insec_ok()) { credentials_types.push_back(kInsecureCredentialsType); } + + // For now test callback server only with inproc GPR_ASSERT(!credentials_types.empty()); for (const auto& cred : credentials_types) { - scenarios.emplace_back(false, false, false, cred); - scenarios.emplace_back(true, false, false, cred); + scenarios.emplace_back(false, false, false, cred, false); + scenarios.emplace_back(true, false, false, cred, false); if (use_proxy) { - scenarios.emplace_back(false, true, false, cred); - scenarios.emplace_back(true, true, false, cred); + scenarios.emplace_back(false, true, false, cred, false); + scenarios.emplace_back(true, true, false, cred, false); } } if (test_inproc && insec_ok()) { - scenarios.emplace_back(false, false, true, kInsecureCredentialsType); - scenarios.emplace_back(true, false, true, kInsecureCredentialsType); + scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false); + scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false); + if (test_callback_server) { + scenarios.emplace_back(false, false, true, kInsecureCredentialsType, + true); + scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true); + } } return scenarios; } -INSTANTIATE_TEST_CASE_P(End2end, End2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + End2end, End2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); -INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + End2endServerTryCancel, End2endServerTryCancelTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); -INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(true, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + ProxyEnd2end, ProxyEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, false))); -INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, false, - true, false))); +INSTANTIATE_TEST_CASE_P( + SecureEnd2end, SecureEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true))); -INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, false))); } // namespace } // namespace testing @@ -1898,7 +1935,7 @@ INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, int main(int argc, char** argv) { gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200"); - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/exception_test.cc b/test/cpp/end2end/exception_test.cc index 5343997663..0d2c00263b 100644 --- a/test/cpp/end2end/exception_test.cc +++ b/test/cpp/end2end/exception_test.cc @@ -117,7 +117,7 @@ TEST_F(ExceptionTest, RequestStream) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index 88f8f380c3..ad67402e3d 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -331,7 +331,7 @@ void RegisterFilter() { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); grpc::testing::RegisterFilter(); return RUN_ALL_TESTS(); diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 88a1227ca2..015862bfe8 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -333,7 +333,7 @@ TEST_F(GenericEnd2endTest, Deadline) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 6ce0696114..f739ed032b 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -18,6 +18,7 @@ #include <memory> #include <mutex> +#include <set> #include <sstream> #include <thread> @@ -32,7 +33,9 @@ #include <grpcpp/server.h> #include <grpcpp/server_builder.h> +#include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -143,6 +146,7 @@ class BackendServiceImpl : public BackendService { IncreaseRequestCount(); const auto status = TestServiceImpl::Echo(context, request, response); IncreaseResponseCount(); + AddClient(context->peer()); return status; } @@ -155,9 +159,21 @@ class BackendServiceImpl : public BackendService { return prev; } + std::set<grpc::string> clients() { + std::unique_lock<std::mutex> lock(clients_mu_); + return clients_; + } + private: + void AddClient(const grpc::string& client) { + std::unique_lock<std::mutex> lock(clients_mu_); + clients_.insert(client); + } + std::mutex mu_; bool shutdown_ = false; + std::mutex clients_mu_; + std::set<grpc::string> clients_; }; grpc::string Ip4ToPackedString(const char* ip_str) { @@ -301,6 +317,11 @@ class BalancerServiceImpl : public BalancerService { auto* server = response.mutable_server_list()->add_servers(); server->set_ip_address(Ip4ToPackedString("127.0.0.1")); server->set_port(backend_port); + static int token_count = 0; + char* token; + gpr_asprintf(&token, "token%03d", ++token_count); + server->set_load_balance_token(token); + gpr_free(token); } return response; } @@ -412,8 +433,8 @@ class GrpclbEnd2endTest : public ::testing::Test { std::shared_ptr<ChannelCredentials> creds( new SecureChannelCredentials(grpc_composite_channel_credentials_create( channel_creds, call_creds, nullptr))); - grpc_call_credentials_unref(call_creds); - grpc_channel_credentials_unref(channel_creds); + call_creds->Unref(); + channel_creds->Unref(); channel_ = CreateCustomChannel(uri.str(), creds, args); stub_ = grpc::testing::EchoTestService::NewStub(channel_); } @@ -486,18 +507,27 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc::string balancer_name; }; - grpc_lb_addresses* CreateLbAddressesFromAddressDataList( + grpc_core::ServerAddressList CreateLbAddressesFromAddressDataList( const std::vector<AddressData>& address_data) { - grpc_lb_addresses* addresses = - grpc_lb_addresses_create(address_data.size(), nullptr); - for (size_t i = 0; i < address_data.size(); ++i) { + grpc_core::ServerAddressList addresses; + for (const auto& addr : address_data) { char* lb_uri_str; - gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port); + gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", addr.port); grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); GPR_ASSERT(lb_uri != nullptr); - grpc_lb_addresses_set_address_from_uri( - addresses, i, lb_uri, address_data[i].is_balancer, - address_data[i].balancer_name.c_str(), nullptr); + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(lb_uri, &address)); + std::vector<grpc_arg> args_to_add; + if (addr.is_balancer) { + args_to_add.emplace_back(grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_ADDRESS_IS_BALANCER), 1)); + args_to_add.emplace_back(grpc_channel_arg_string_create( + const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME), + const_cast<char*>(addr.balancer_name.c_str()))); + } + grpc_channel_args* args = grpc_channel_args_copy_and_add( + nullptr, args_to_add.data(), args_to_add.size()); + addresses.emplace_back(address.addr, address.len, args); grpc_uri_destroy(lb_uri); gpr_free(lb_uri_str); } @@ -506,23 +536,21 @@ class GrpclbEnd2endTest : public ::testing::Test { void SetNextResolution(const std::vector<AddressData>& address_data) { grpc_core::ExecCtx exec_ctx; - grpc_lb_addresses* addresses = + grpc_core::ServerAddressList addresses = CreateLbAddressesFromAddressDataList(address_data); - grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses); + grpc_arg fake_addresses = CreateServerAddressListChannelArg(&addresses); grpc_channel_args fake_result = {1, &fake_addresses}; response_generator_->SetResponse(&fake_result); - grpc_lb_addresses_destroy(addresses); } void SetNextReresolutionResponse( const std::vector<AddressData>& address_data) { grpc_core::ExecCtx exec_ctx; - grpc_lb_addresses* addresses = + grpc_core::ServerAddressList addresses = CreateLbAddressesFromAddressDataList(address_data); - grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses); + grpc_arg fake_addresses = CreateServerAddressListChannelArg(&addresses); grpc_channel_args fake_result = {1, &fake_addresses}; response_generator_->SetReresolutionResponse(&fake_result); - grpc_lb_addresses_destroy(addresses); } const std::vector<int> GetBackendPorts(const size_t start_index = 0) const { @@ -553,10 +581,11 @@ class GrpclbEnd2endTest : public ::testing::Test { return status; } - void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000) { + void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000, + bool wait_for_ready = false) { for (size_t i = 0; i < times; ++i) { EchoResponse response; - const Status status = SendRpc(&response, timeout_ms); + const Status status = SendRpc(&response, timeout_ms, wait_for_ready); EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); EXPECT_EQ(response.message(), kRequestMessage_); @@ -665,6 +694,28 @@ TEST_F(SingleBalancerTest, Vanilla) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { + SetNextResolutionAllBalancers(); + // Same backend listed twice. + std::vector<int> ports; + ports.push_back(backend_servers_[0].port_); + ports.push_back(backend_servers_[0].port_); + const size_t kNumRpcsPerAddress = 10; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0); + // We need to wait for the backend to come online. + WaitForBackend(0); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * ports.size()); + // Backend should have gotten 20 requests. + EXPECT_EQ(kNumRpcsPerAddress * 2, + backend_servers_[0].service_->request_count()); + // And they should have come from a single client port, because of + // subchannel sharing. + EXPECT_EQ(1UL, backends_[0]->clients().size()); + balancers_[0]->NotifyDoneWithServerlists(); +} + TEST_F(SingleBalancerTest, SecureNaming) { ResetStub(0, kApplicationTargetName_ + ";lb"); SetNextResolution({AddressData{balancer_servers_[0].port_, true, "lb"}}); @@ -717,10 +768,9 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); - const auto t0 = system_clock::now(); // Client will block: LB will initially send empty serverlist. - CheckRpcSendOk(1, kCallDeadlineMs); + CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */); const auto ellapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - t0); @@ -1518,7 +1568,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { int main(int argc, char** argv) { grpc_init(); - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); const auto result = RUN_ALL_TESTS(); grpc_shutdown(); diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc index fca65dfc13..b96ff53a3e 100644 --- a/test/cpp/end2end/health_service_end2end_test.cc +++ b/test/cpp/end2end/health_service_end2end_test.cc @@ -37,6 +37,7 @@ #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_health_check_service_impl.h" #include "test/cpp/end2end/test_service_impl.h" #include <gtest/gtest.h> @@ -49,62 +50,6 @@ namespace grpc { namespace testing { namespace { -// A sample sync implementation of the health checking service. This does the -// same thing as the default one. -class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service { - public: - Status Check(ServerContext* context, const HealthCheckRequest* request, - HealthCheckResponse* response) override { - std::lock_guard<std::mutex> lock(mu_); - auto iter = status_map_.find(request->service()); - if (iter == status_map_.end()) { - return Status(StatusCode::NOT_FOUND, ""); - } - response->set_status(iter->second); - return Status::OK; - } - - Status Watch(ServerContext* context, const HealthCheckRequest* request, - ::grpc::ServerWriter<HealthCheckResponse>* writer) override { - auto last_state = HealthCheckResponse::UNKNOWN; - while (!context->IsCancelled()) { - { - std::lock_guard<std::mutex> lock(mu_); - HealthCheckResponse response; - auto iter = status_map_.find(request->service()); - if (iter == status_map_.end()) { - response.set_status(response.SERVICE_UNKNOWN); - } else { - response.set_status(iter->second); - } - if (response.status() != last_state) { - writer->Write(response, ::grpc::WriteOptions()); - } - } - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(1000, GPR_TIMESPAN))); - } - return Status::OK; - } - - void SetStatus(const grpc::string& service_name, - HealthCheckResponse::ServingStatus status) { - std::lock_guard<std::mutex> lock(mu_); - status_map_[service_name] = status; - } - - void SetAll(HealthCheckResponse::ServingStatus status) { - std::lock_guard<std::mutex> lock(mu_); - for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) { - iter->second = status; - } - } - - private: - std::mutex mu_; - std::map<const grpc::string, HealthCheckResponse::ServingStatus> status_map_; -}; - // A custom implementation of the health checking service interface. This is // used to test that it prevents the server from creating a default service and // also serves as an example of how to override the default service. @@ -125,6 +70,8 @@ class CustomHealthCheckService : public HealthCheckServiceInterface { : HealthCheckResponse::NOT_SERVING); } + void Shutdown() override { impl_->Shutdown(); } + private: HealthCheckServiceImpl* impl_; // not owned }; @@ -260,6 +207,74 @@ class HealthServiceEnd2endTest : public ::testing::Test { context.TryCancel(); } + // Verify that after HealthCheckServiceInterface::Shutdown is called + // 1. unary client will see NOT_SERVING. + // 2. unary client still sees NOT_SERVING after a SetServing(true) is called. + // 3. streaming (Watch) client will see an update. + // 4. setting a new service to serving after shutdown will add the service + // name but return NOT_SERVING to client. + // This has to be called last. + void VerifyHealthCheckServiceShutdown() { + HealthCheckServiceInterface* service = server_->GetHealthCheckService(); + EXPECT_TRUE(service != nullptr); + const grpc::string kHealthyService("healthy_service"); + const grpc::string kUnhealthyService("unhealthy_service"); + const grpc::string kNotRegisteredService("not_registered"); + const grpc::string kNewService("add_after_shutdown"); + service->SetServingStatus(kHealthyService, true); + service->SetServingStatus(kUnhealthyService, false); + + ResetStubs(); + + // Start Watch for service. + ClientContext context; + HealthCheckRequest request; + request.set_service(kHealthyService); + std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader = + hc_stub_->Watch(&context, request); + + HealthCheckResponse response; + EXPECT_TRUE(reader->Read(&response)); + EXPECT_EQ(response.SERVING, response.status()); + + SendHealthCheckRpc("", Status::OK, HealthCheckResponse::SERVING); + SendHealthCheckRpc(kHealthyService, Status::OK, + HealthCheckResponse::SERVING); + SendHealthCheckRpc(kUnhealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kNotRegisteredService, + Status(StatusCode::NOT_FOUND, "")); + SendHealthCheckRpc(kNewService, Status(StatusCode::NOT_FOUND, "")); + + // Shutdown health check service. + service->Shutdown(); + + // Watch client gets another update. + EXPECT_TRUE(reader->Read(&response)); + EXPECT_EQ(response.NOT_SERVING, response.status()); + // Finish Watch call. + context.TryCancel(); + + SendHealthCheckRpc("", Status::OK, HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kHealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kUnhealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kNotRegisteredService, + Status(StatusCode::NOT_FOUND, "")); + + // Setting status after Shutdown has no effect. + service->SetServingStatus(kHealthyService, true); + SendHealthCheckRpc(kHealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + + // Adding serving status for a new service after shutdown will return + // NOT_SERVING. + service->SetServingStatus(kNewService, true); + SendHealthCheckRpc(kNewService, Status::OK, + HealthCheckResponse::NOT_SERVING); + } + TestServiceImpl echo_test_service_; HealthCheckServiceImpl health_check_service_impl_; std::unique_ptr<Health::Stub> hc_stub_; @@ -295,6 +310,13 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) { Status(StatusCode::INVALID_ARGUMENT, "")); } +TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceShutdown) { + EnableDefaultHealthCheckService(true); + EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); + SetUpServer(true, false, false, nullptr); + VerifyHealthCheckServiceShutdown(); +} + // Provide an empty service to disable the default service. TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) { EnableDefaultHealthCheckService(true); @@ -326,12 +348,27 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) { VerifyHealthCheckServiceStreaming(); } +TEST_F(HealthServiceEnd2endTest, ExplicitlyHealthServiceShutdown) { + EnableDefaultHealthCheckService(true); + EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); + std::unique_ptr<HealthCheckServiceInterface> override_service( + new CustomHealthCheckService(&health_check_service_impl_)); + HealthCheckServiceInterface* underlying_service = override_service.get(); + SetUpServer(false, false, true, std::move(override_service)); + HealthCheckServiceInterface* service = server_->GetHealthCheckService(); + EXPECT_TRUE(service == underlying_service); + + ResetStubs(); + + VerifyHealthCheckServiceShutdown(); +} + } // namespace } // namespace testing } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 339eadde92..18bb1ff4b9 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -894,7 +894,7 @@ TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/interceptors_util.cc b/test/cpp/end2end/interceptors_util.cc index 602d1695a3..e0ad7d1526 100644 --- a/test/cpp/end2end/interceptors_util.cc +++ b/test/cpp/end2end/interceptors_util.cc @@ -132,16 +132,14 @@ bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map, return false; } -std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> +std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> CreateDummyClientInterceptors() { - auto creators = std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( - new std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + creators; // Add 20 dummy interceptors before hijacking interceptor + creators.reserve(20); for (auto i = 0; i < 20; i++) { - creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); } return creators; diff --git a/test/cpp/end2end/interceptors_util.h b/test/cpp/end2end/interceptors_util.h index b4c4791fca..659e613d2e 100644 --- a/test/cpp/end2end/interceptors_util.h +++ b/test/cpp/end2end/interceptors_util.h @@ -82,6 +82,22 @@ class DummyInterceptorFactory } }; +/* This interceptor factory returns nullptr on interceptor creation */ +class NullInterceptorFactory + : public experimental::ClientInterceptorFactoryInterface, + public experimental::ServerInterceptorFactoryInterface { + public: + virtual experimental::Interceptor* CreateClientInterceptor( + experimental::ClientRpcInfo* info) override { + return nullptr; + } + + virtual experimental::Interceptor* CreateServerInterceptor( + experimental::ServerRpcInfo* info) override { + return nullptr; + } +}; + class EchoTestServiceStreamingImpl : public EchoTestService::Service { public: ~EchoTestServiceStreamingImpl() override {} @@ -149,8 +165,7 @@ void MakeCallbackCall(const std::shared_ptr<Channel>& channel); bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map, const string& key, const string& value); -std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> +std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> CreateDummyClientInterceptors(); inline void* tag(int i) { return (void*)static_cast<intptr_t>(i); } diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index ba3122c895..917ca28020 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -345,7 +345,7 @@ TEST_F(MockTest, BidiStream) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/nonblocking_test.cc b/test/cpp/end2end/nonblocking_test.cc index d8337baca2..36dea1fcb3 100644 --- a/test/cpp/end2end/nonblocking_test.cc +++ b/test/cpp/end2end/nonblocking_test.cc @@ -187,7 +187,7 @@ int main(int argc, char** argv) { grpc_poll_function = maybe_assert_non_blocking_poll; #endif // GRPC_POSIX_SOCKET - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); int ret = RUN_ALL_TESTS(); return ret; diff --git a/test/cpp/end2end/proto_server_reflection_test.cc b/test/cpp/end2end/proto_server_reflection_test.cc index 21a275ef62..ff097aa9a7 100644 --- a/test/cpp/end2end/proto_server_reflection_test.cc +++ b/test/cpp/end2end/proto_server_reflection_test.cc @@ -144,7 +144,7 @@ TEST_F(ProtoServerReflectionTest, CheckResponseWithLocalDescriptorPool) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/raw_end2end_test.cc b/test/cpp/end2end/raw_end2end_test.cc index a413905ef7..c8556bae95 100644 --- a/test/cpp/end2end/raw_end2end_test.cc +++ b/test/cpp/end2end/raw_end2end_test.cc @@ -363,7 +363,7 @@ TEST_F(RawEnd2EndTest, CompileTest) { int main(int argc, char** argv) { // Change the backup poll interval from 5s to 100ms to speed up the // ReconnectChannel test - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); int ret = RUN_ALL_TESTS(); return ret; diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc index d54523fcbb..d744a93912 100644 --- a/test/cpp/end2end/server_builder_plugin_test.cc +++ b/test/cpp/end2end/server_builder_plugin_test.cc @@ -264,7 +264,7 @@ INSTANTIATE_TEST_CASE_P(ServerBuilderPluginTest, ServerBuilderPluginTest, } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/server_crash_test.cc b/test/cpp/end2end/server_crash_test.cc index 93257b2705..353ebf713a 100644 --- a/test/cpp/end2end/server_crash_test.cc +++ b/test/cpp/end2end/server_crash_test.cc @@ -153,7 +153,7 @@ int main(int argc, char** argv) { g_root = "."; } - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/server_early_return_test.cc b/test/cpp/end2end/server_early_return_test.cc index 8948e5b854..c47e25052e 100644 --- a/test/cpp/end2end/server_early_return_test.cc +++ b/test/cpp/end2end/server_early_return_test.cc @@ -226,7 +226,7 @@ TEST_F(ServerEarlyReturnTest, RequestStreamEarlyCancel) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index 4ae086ea76..53d8c4dc96 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -24,10 +24,10 @@ #include <grpcpp/create_channel.h> #include <grpcpp/generic/generic_stub.h> #include <grpcpp/impl/codegen/proto_utils.h> -#include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/server.h> #include <grpcpp/server_builder.h> #include <grpcpp/server_context.h> +#include <grpcpp/support/server_interceptor.h> #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -44,7 +44,34 @@ namespace { class LoggingInterceptor : public experimental::Interceptor { public: - LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; } + LoggingInterceptor(experimental::ServerRpcInfo* info) { + info_ = info; + + // Check the method name and compare to the type + const char* method = info->method(); + experimental::ServerRpcInfo::Type type = info->type(); + + // Check that we use one of our standard methods with expected type. + // Also allow the health checking service. + // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService + // being tested (the GenericRpc test). + // The empty method is for the Unimplemented requests that arise + // when draining the CQ. + EXPECT_TRUE( + strstr(method, "/grpc.health") == method || + (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 && + (type == experimental::ServerRpcInfo::Type::UNARY || + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) || + (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 && + type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) || + (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 && + type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) || + (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 && + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) || + strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 || + (strcmp(method, "") == 0 && + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)); + } virtual void Intercept(experimental::InterceptorBatchMethods* methods) { if (methods->QueryInterceptionHookPoint( @@ -149,9 +176,12 @@ class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test { creators.push_back( std::unique_ptr<experimental::ServerInterceptorFactoryInterface>( new LoggingInterceptorFactory())); + // Add 20 dummy interceptor factories and null interceptor factories for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); + creators.push_back(std::unique_ptr<NullInterceptorFactory>( + new NullInterceptorFactory())); } builder.experimental().SetInterceptorCreators(std::move(creators)); server_ = builder.BuildAndStart(); @@ -391,6 +421,7 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { builder.RegisterAsyncGenericService(&service); std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> creators; + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -486,6 +517,7 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) { builder.AddListeningPort(server_address, InsecureServerCredentials()); std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> creators; + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -539,6 +571,7 @@ TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest, UnimplementedRpcTest) { builder.AddListeningPort(server_address, InsecureServerCredentials()); std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> creators; + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -574,7 +607,7 @@ TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest, UnimplementedRpcTest) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/shutdown_test.cc b/test/cpp/end2end/shutdown_test.cc index a53de691bc..da42178d67 100644 --- a/test/cpp/end2end/shutdown_test.cc +++ b/test/cpp/end2end/shutdown_test.cc @@ -164,7 +164,7 @@ TEST_P(ShutdownTest, ShutdownTest) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/streaming_throughput_test.cc b/test/cpp/end2end/streaming_throughput_test.cc index 898d1ec118..440656588b 100644 --- a/test/cpp/end2end/streaming_throughput_test.cc +++ b/test/cpp/end2end/streaming_throughput_test.cc @@ -187,7 +187,7 @@ TEST_F(End2endTest, StreamingThroughput) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/test_health_check_service_impl.cc b/test/cpp/end2end/test_health_check_service_impl.cc new file mode 100644 index 0000000000..0801e30199 --- /dev/null +++ b/test/cpp/end2end/test_health_check_service_impl.cc @@ -0,0 +1,97 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/cpp/end2end/test_health_check_service_impl.h" + +#include <grpc/grpc.h> + +using grpc::health::v1::HealthCheckRequest; +using grpc::health::v1::HealthCheckResponse; + +namespace grpc { +namespace testing { + +Status HealthCheckServiceImpl::Check(ServerContext* context, + const HealthCheckRequest* request, + HealthCheckResponse* response) { + std::lock_guard<std::mutex> lock(mu_); + auto iter = status_map_.find(request->service()); + if (iter == status_map_.end()) { + return Status(StatusCode::NOT_FOUND, ""); + } + response->set_status(iter->second); + return Status::OK; +} + +Status HealthCheckServiceImpl::Watch( + ServerContext* context, const HealthCheckRequest* request, + ::grpc::ServerWriter<HealthCheckResponse>* writer) { + auto last_state = HealthCheckResponse::UNKNOWN; + while (!context->IsCancelled()) { + { + std::lock_guard<std::mutex> lock(mu_); + HealthCheckResponse response; + auto iter = status_map_.find(request->service()); + if (iter == status_map_.end()) { + response.set_status(response.SERVICE_UNKNOWN); + } else { + response.set_status(iter->second); + } + if (response.status() != last_state) { + writer->Write(response, ::grpc::WriteOptions()); + } + } + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(1000, GPR_TIMESPAN))); + } + return Status::OK; +} + +void HealthCheckServiceImpl::SetStatus( + const grpc::string& service_name, + HealthCheckResponse::ServingStatus status) { + std::lock_guard<std::mutex> lock(mu_); + if (shutdown_) { + status = HealthCheckResponse::NOT_SERVING; + } + status_map_[service_name] = status; +} + +void HealthCheckServiceImpl::SetAll(HealthCheckResponse::ServingStatus status) { + std::lock_guard<std::mutex> lock(mu_); + if (shutdown_) { + return; + } + for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) { + iter->second = status; + } +} + +void HealthCheckServiceImpl::Shutdown() { + std::lock_guard<std::mutex> lock(mu_); + if (shutdown_) { + return; + } + shutdown_ = true; + for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) { + iter->second = HealthCheckResponse::NOT_SERVING; + } +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/end2end/test_health_check_service_impl.h b/test/cpp/end2end/test_health_check_service_impl.h new file mode 100644 index 0000000000..5d36ce5320 --- /dev/null +++ b/test/cpp/end2end/test_health_check_service_impl.h @@ -0,0 +1,58 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H +#define GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H + +#include <map> +#include <mutex> + +#include <grpcpp/server_context.h> +#include <grpcpp/support/status.h> + +#include "src/proto/grpc/health/v1/health.grpc.pb.h" + +namespace grpc { +namespace testing { + +// A sample sync implementation of the health checking service. This does the +// same thing as the default one. +class HealthCheckServiceImpl : public health::v1::Health::Service { + public: + Status Check(ServerContext* context, + const health::v1::HealthCheckRequest* request, + health::v1::HealthCheckResponse* response) override; + Status Watch(ServerContext* context, + const health::v1::HealthCheckRequest* request, + ServerWriter<health::v1::HealthCheckResponse>* writer) override; + void SetStatus(const grpc::string& service_name, + health::v1::HealthCheckResponse::ServingStatus status); + void SetAll(health::v1::HealthCheckResponse::ServingStatus status); + + void Shutdown(); + + private: + std::mutex mu_; + bool shutdown_ = false; + std::map<const grpc::string, health::v1::HealthCheckResponse::ServingStatus> + status_map_; +}; + +} // namespace testing +} // namespace grpc + +#endif // GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 605356724f..6729ad14f4 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -69,6 +69,62 @@ void CheckServerAuthContext( EXPECT_EQ(expected_client_identity, identity[0]); } } + +// Returns the number of pairs in metadata that exactly match the given +// key-value pair. Returns -1 if the pair wasn't found. +int MetadataMatchCount( + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + const grpc::string& key, const grpc::string& value) { + int count = 0; + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter = + metadata.begin(); + iter != metadata.end(); ++iter) { + if (ToString(iter->first) == key && ToString(iter->second) == value) { + count++; + } + } + return count; +} +} // namespace + +namespace { +int GetIntValueFromMetadataHelper( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + if (metadata.find(key) != metadata.end()) { + std::istringstream iss(ToString(metadata.find(key)->second)); + iss >> default_value; + gpr_log(GPR_INFO, "%s : %d", key, default_value); + } + + return default_value; +} + +int GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + return GetIntValueFromMetadataHelper(key, metadata, default_value); +} + +void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + // Now wait until it's really canceled + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN))); + } +} + +void ServerTryCancelNonblocking(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); +} + } // namespace Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, @@ -165,6 +221,18 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, return Status::OK; } +Status TestServiceImpl::CheckClientInitialMetadata(ServerContext* context, + const SimpleRequest* request, + SimpleResponse* response) { + EXPECT_EQ(MetadataMatchCount(context->client_metadata(), + kCheckClientInitialMetadataKey, + kCheckClientInitialMetadataVal), + 1); + EXPECT_EQ(1u, + context->client_metadata().count(kCheckClientInitialMetadataKey)); + return Status::OK; +} + void CallbackTestServiceImpl::Echo( ServerContext* context, const EchoRequest* request, EchoResponse* response, experimental::ServerCallbackRpcController* controller) { @@ -183,6 +251,19 @@ void CallbackTestServiceImpl::Echo( } } +void CallbackTestServiceImpl::CheckClientInitialMetadata( + ServerContext* context, const SimpleRequest* request, + SimpleResponse* response, + experimental::ServerCallbackRpcController* controller) { + EXPECT_EQ(MetadataMatchCount(context->client_metadata(), + kCheckClientInitialMetadataKey, + kCheckClientInitialMetadataVal), + 1); + EXPECT_EQ(1u, + context->client_metadata().count(kCheckClientInitialMetadataKey)); + controller->Finish(Status::OK); +} + void CallbackTestServiceImpl::EchoNonDelayed( ServerContext* context, const EchoRequest* request, EchoResponse* response, experimental::ServerCallbackRpcController* controller) { @@ -195,6 +276,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( controller->Finish(Status(static_cast<StatusCode>(error.code()), error.error_message(), error.binary_error_details())); + return; } int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); @@ -223,6 +305,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( return; } + gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str()); response->set_message(request->message()); MaybeEchoDeadline(context, request, response); if (host_) { @@ -253,7 +336,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( alarm_.experimental().Set( gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), + gpr_time_from_micros(request->param().server_cancel_after_us(), GPR_TIMESPAN)), [controller](bool) { controller->Finish(Status::CANCELLED); }); return; @@ -278,6 +361,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( request->param().debug_info().SerializeAsString(); context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info); controller->Finish(Status::CANCELLED); + return; } } if (request->has_param() && @@ -324,7 +408,7 @@ Status TestServiceImpl::RequestStream(ServerContext* context, std::thread* server_try_cancel_thd = nullptr; if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + new std::thread([context] { ServerTryCancel(context); }); } int num_msgs_read = 0; @@ -379,7 +463,7 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, std::thread* server_try_cancel_thd = nullptr; if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + new std::thread([context] { ServerTryCancel(context); }); } for (int i = 0; i < server_responses_to_send; i++) { @@ -430,7 +514,7 @@ Status TestServiceImpl::BidiStream( std::thread* server_try_cancel_thd = nullptr; if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + new std::thread([context] { ServerTryCancel(context); }); } // kServerFinishAfterNReads suggests after how many reads, the server should @@ -464,44 +548,244 @@ Status TestServiceImpl::BidiStream( return Status::OK; } -namespace { -int GetIntValueFromMetadataHelper( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value) { - if (metadata.find(key) != metadata.end()) { - std::istringstream iss(ToString(metadata.find(key)->second)); - iss >> default_value; - gpr_log(GPR_INFO, "%s : %d", key, default_value); - } +experimental::ServerReadReactor<EchoRequest, EchoResponse>* +CallbackTestServiceImpl::RequestStream() { + class Reactor : public ::grpc::experimental::ServerReadReactor<EchoRequest, + EchoResponse> { + public: + Reactor() {} + void OnStarted(ServerContext* context, EchoResponse* response) override { + ctx_ = context; + response_ = response; + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the + // value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server + // reads any message from the client CANCEL_DURING_PROCESSING: The RPC + // is cancelled while the server is reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + server_try_cancel_ = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + response_->set_message(""); + + if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } - return default_value; -} -}; // namespace + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + ctx_->TryCancel(); + // Don't wait for it here + } -int TestServiceImpl::GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value) { - return GetIntValueFromMetadataHelper(key, metadata, default_value); + StartRead(&request_); + } + void OnDone() override { delete this; } + void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnReadDone(bool ok) override { + if (ok) { + response_->mutable_message()->append(request_.message()); + num_msgs_read_++; + StartRead(&request_); + } else { + gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read_); + + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + // Let OnCancel recover this + return; + } + if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } + FinishOnce(Status::OK); + } + } + + private: + void FinishOnce(const Status& s) { + std::lock_guard<std::mutex> l(finish_mu_); + if (!finished_) { + Finish(s); + finished_ = true; + } + } + + ServerContext* ctx_; + EchoResponse* response_; + EchoRequest request_; + int num_msgs_read_{0}; + int server_try_cancel_; + std::mutex finish_mu_; + bool finished_{false}; + }; + + return new Reactor; } -int CallbackTestServiceImpl::GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value) { - return GetIntValueFromMetadataHelper(key, metadata, default_value); +// Return 'kNumResponseStreamMsgs' messages. +// TODO(yangg) make it generic by adding a parameter into EchoRequest +experimental::ServerWriteReactor<EchoRequest, EchoResponse>* +CallbackTestServiceImpl::ResponseStream() { + class Reactor + : public ::grpc::experimental::ServerWriteReactor<EchoRequest, + EchoResponse> { + public: + Reactor() {} + void OnStarted(ServerContext* context, + const EchoRequest* request) override { + ctx_ = context; + request_ = request; + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the + // value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server + // reads any message from the client CANCEL_DURING_PROCESSING: The RPC + // is cancelled while the server is reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + server_try_cancel_ = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + server_coalescing_api_ = GetIntValueFromMetadata( + kServerUseCoalescingApi, context->client_metadata(), 0); + server_responses_to_send_ = GetIntValueFromMetadata( + kServerResponseStreamsToSend, context->client_metadata(), + kServerDefaultResponseStreamsToSend); + if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } + + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + ctx_->TryCancel(); + } + if (num_msgs_sent_ < server_responses_to_send_) { + NextWrite(); + } + } + void OnDone() override { delete this; } + void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnWriteDone(bool ok) override { + if (num_msgs_sent_ < server_responses_to_send_) { + NextWrite(); + } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + // Let OnCancel recover this + } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + } else { + FinishOnce(Status::OK); + } + } + + private: + void FinishOnce(const Status& s) { + std::lock_guard<std::mutex> l(finish_mu_); + if (!finished_) { + Finish(s); + finished_ = true; + } + } + + void NextWrite() { + response_.set_message(request_->message() + + grpc::to_string(num_msgs_sent_)); + if (num_msgs_sent_ == server_responses_to_send_ - 1 && + server_coalescing_api_ != 0) { + num_msgs_sent_++; + StartWriteLast(&response_, WriteOptions()); + } else { + num_msgs_sent_++; + StartWrite(&response_); + } + } + ServerContext* ctx_; + const EchoRequest* request_; + EchoResponse response_; + int num_msgs_sent_{0}; + int server_try_cancel_; + int server_coalescing_api_; + int server_responses_to_send_; + std::mutex finish_mu_; + bool finished_{false}; + }; + return new Reactor; } -void TestServiceImpl::ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - // Now wait until it's really canceled - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000, GPR_TIMESPAN))); - } +experimental::ServerBidiReactor<EchoRequest, EchoResponse>* +CallbackTestServiceImpl::BidiStream() { + class Reactor : public ::grpc::experimental::ServerBidiReactor<EchoRequest, + EchoResponse> { + public: + Reactor() {} + void OnStarted(ServerContext* context) override { + ctx_ = context; + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the + // value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server + // reads any message from the client CANCEL_DURING_PROCESSING: The RPC + // is cancelled while the server is reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + server_try_cancel_ = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + server_write_last_ = GetIntValueFromMetadata( + kServerFinishAfterNReads, context->client_metadata(), 0); + if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } + + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + ctx_->TryCancel(); + } + + StartRead(&request_); + } + void OnDone() override { delete this; } + void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnReadDone(bool ok) override { + if (ok) { + num_msgs_read_++; + gpr_log(GPR_INFO, "recv msg %s", request_.message().c_str()); + response_.set_message(request_.message()); + if (num_msgs_read_ == server_write_last_) { + StartWriteLast(&response_, WriteOptions()); + } else { + StartWrite(&response_); + } + } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + // Let OnCancel handle this + } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + } else { + FinishOnce(Status::OK); + } + } + void OnWriteDone(bool ok) override { StartRead(&request_); } + + private: + void FinishOnce(const Status& s) { + std::lock_guard<std::mutex> l(finish_mu_); + if (!finished_) { + Finish(s); + finished_ = true; + } + } + + ServerContext* ctx_; + EchoRequest request_; + EchoResponse response_; + int num_msgs_read_{0}; + int server_try_cancel_; + int server_write_last_; + std::mutex finish_mu_; + bool finished_{false}; + }; + + return new Reactor; } } // namespace testing diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index ddfe94487e..e36423d44e 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -36,6 +36,8 @@ const char* const kServerTryCancelRequest = "server_try_cancel"; const char* const kDebugInfoTrailerKey = "debug-info-bin"; const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; +const char* const kCheckClientInitialMetadataKey = "custom_client_metadata"; +const char* const kCheckClientInitialMetadataVal = "Value for client metadata"; typedef enum { DO_NOT_CANCEL = 0, @@ -53,6 +55,10 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override; + Status CheckClientInitialMetadata(ServerContext* context, + const SimpleRequest* request, + SimpleResponse* response) override; + // Unimplemented is left unimplemented to test the returned error. Status RequestStream(ServerContext* context, @@ -72,13 +78,6 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { } private: - int GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value); - - void ServerTryCancel(ServerContext* context); - bool signal_client_; std::mutex mu_; std::unique_ptr<grpc::string> host_; @@ -95,6 +94,20 @@ class CallbackTestServiceImpl EchoResponse* response, experimental::ServerCallbackRpcController* controller) override; + void CheckClientInitialMetadata( + ServerContext* context, const SimpleRequest* request, + SimpleResponse* response, + experimental::ServerCallbackRpcController* controller) override; + + experimental::ServerReadReactor<EchoRequest, EchoResponse>* RequestStream() + override; + + experimental::ServerWriteReactor<EchoRequest, EchoResponse>* ResponseStream() + override; + + experimental::ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream() + override; + // Unimplemented is left unimplemented to test the returned error. bool signal_client() { std::unique_lock<std::mutex> lock(mu_); @@ -106,11 +119,6 @@ class CallbackTestServiceImpl EchoResponse* response, experimental::ServerCallbackRpcController* controller); - int GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value); - Alarm alarm_; bool signal_client_; std::mutex mu_; diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 1a5ed28a2c..e30ce0dbcb 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -429,7 +429,7 @@ TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) { } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } |