diff options
Diffstat (limited to 'test')
19 files changed, 428 insertions, 90 deletions
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index 4f5d2a2862..ae1e42a4e0 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -66,7 +66,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { thd_args* a = static_cast<thd_args*>(ts); grpc_core::ExecCtx exec_ctx; grpc_server_setup_transport(a->server, transport, nullptr, - grpc_server_get_channel_args(a->server), 0); + grpc_server_get_channel_args(a->server), nullptr); } /* Sets the read_done event */ diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc index 4f393b22d0..45f78b5964 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.cc +++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc @@ -53,7 +53,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq)); grpc_server_setup_transport(f->server, transport, nullptr, - grpc_server_get_channel_args(f->server), 0); + grpc_server_get_channel_args(f->server), nullptr); } typedef struct { diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc index 1627fe0eb4..77bce7ebb3 100644 --- a/test/core/end2end/fixtures/h2_sockpair.cc +++ b/test/core/end2end/fixtures/h2_sockpair.cc @@ -47,7 +47,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq)); grpc_server_setup_transport(f->server, transport, nullptr, - grpc_server_get_channel_args(f->server), 0); + grpc_server_get_channel_args(f->server), nullptr); } typedef struct { diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc index 8f1024b774..ac37841dc4 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc @@ -47,7 +47,7 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq)); grpc_server_setup_transport(f->server, transport, nullptr, - grpc_server_get_channel_args(f->server), 0); + grpc_server_get_channel_args(f->server), nullptr); } typedef struct { diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 9b6eddee6e..a3de56d4f9 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -420,7 +420,7 @@ static void do_connect(void* arg, grpc_error* error) { grpc_transport* transport = grpc_create_chttp2_transport(nullptr, server, false); - grpc_server_setup_transport(g_server, transport, nullptr, nullptr, 0); + grpc_server_setup_transport(g_server, transport, nullptr, nullptr, nullptr); grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_NONE); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index bd686215dd..d370dc7de8 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -62,7 +62,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_server_start(server); grpc_transport* transport = grpc_create_chttp2_transport(nullptr, mock_endpoint, false); - grpc_server_setup_transport(server, transport, nullptr, nullptr, 0); + grpc_server_setup_transport(server, transport, nullptr, nullptr, nullptr); grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); grpc_call* call1 = nullptr; diff --git a/test/core/end2end/tests/channelz.cc b/test/core/end2end/tests/channelz.cc index 922783aa0d..49a0bc8011 100644 --- a/test/core/end2end/tests/channelz.cc +++ b/test/core/end2end/tests/channelz.cc @@ -260,7 +260,7 @@ static void test_channelz(grpc_end2end_test_config config) { gpr_free(json); json = channelz_server->RenderServerSockets(0); - GPR_ASSERT(nullptr != strstr(json, "\"socketRef\":")); + GPR_ASSERT(nullptr != strstr(json, "\"end\":true")); gpr_free(json); end_test(&f); diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 446804401a..eb600ffb17 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -36,6 +36,18 @@ grpc_cc_library( ) grpc_cc_library( + name = "test_health_check_service_impl", + testonly = True, + srcs = ["test_health_check_service_impl.cc"], + hdrs = ["test_health_check_service_impl.h"], + deps = [ + "//:grpc", + "//:grpc++", + "//src/proto/grpc/health/v1:health_proto", + ], +) + +grpc_cc_library( name = "interceptors_util", testonly = True, srcs = ["interceptors_util.cc"], @@ -283,6 +295,7 @@ grpc_cc_test( "gtest", ], deps = [ + ":test_health_check_service_impl", ":test_service_impl", "//:gpr", "//:grpc", diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 65434bac6b..a999321992 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -182,7 +182,7 @@ class ClientCallbackEnd2endTest } } - void SendGenericEchoAsBidi(int num_rpcs) { + void SendGenericEchoAsBidi(int num_rpcs, int reuses) { const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); grpc::string test_string(""); for (int i = 0; i < num_rpcs; i++) { @@ -191,14 +191,26 @@ class ClientCallbackEnd2endTest ByteBuffer> { public: Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name, - const grpc::string& test_str) { - test->generic_stub_->experimental().PrepareBidiStreamingCall( - &cli_ctx_, method_name, this); - request_.set_message(test_str); - send_buf_ = SerializeToByteBuffer(&request_); - StartWrite(send_buf_.get()); - StartRead(&recv_buf_); - StartCall(); + const grpc::string& test_str, int reuses) + : reuses_remaining_(reuses) { + activate_ = [this, test, method_name, test_str] { + if (reuses_remaining_ > 0) { + cli_ctx_.reset(new ClientContext); + reuses_remaining_--; + test->generic_stub_->experimental().PrepareBidiStreamingCall( + cli_ctx_.get(), method_name, this); + request_.set_message(test_str); + send_buf_ = SerializeToByteBuffer(&request_); + StartWrite(send_buf_.get()); + StartRead(&recv_buf_); + StartCall(); + } else { + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + }; + activate_(); } void OnWriteDone(bool ok) override { StartWritesDone(); } void OnReadDone(bool ok) override { @@ -208,9 +220,7 @@ class ClientCallbackEnd2endTest }; void OnDone(const Status& s) override { EXPECT_TRUE(s.ok()); - std::unique_lock<std::mutex> l(mu_); - done_ = true; - cv_.notify_one(); + activate_(); } void Await() { std::unique_lock<std::mutex> l(mu_); @@ -222,11 +232,13 @@ class ClientCallbackEnd2endTest EchoRequest request_; std::unique_ptr<ByteBuffer> send_buf_; ByteBuffer recv_buf_; - ClientContext cli_ctx_; + std::unique_ptr<ClientContext> cli_ctx_; + int reuses_remaining_; + std::function<void()> activate_; std::mutex mu_; std::condition_variable cv_; bool done_ = false; - } rpc{this, kMethodName, test_string}; + } rpc{this, kMethodName, test_string, reuses}; rpc.Await(); } @@ -293,7 +305,12 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { ResetStub(); - SendGenericEchoAsBidi(10); + SendGenericEchoAsBidi(10, 1); +} + +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) { + ResetStub(); + SendGenericEchoAsBidi(10, 10); } #if GRPC_ALLOW_EXCEPTIONS diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 3a191d1e03..968b5d49d1 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -50,6 +50,7 @@ class HijackingInterceptor : public experimental::Interceptor { info_ = info; // Make sure it is the right method EXPECT_EQ(strcmp("/grpc.testing.EchoTestService/Echo", info->method()), 0); + EXPECT_EQ(info->type(), experimental::ClientRpcInfo::Type::UNARY); } virtual void Intercept(experimental::InterceptorBatchMethods* methods) { @@ -383,6 +384,7 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingTest) { std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> creators; // Add 20 dummy interceptors before hijacking interceptor + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -423,6 +425,7 @@ TEST_F(ClientInterceptorsEnd2endTest, std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> creators; // Add 5 dummy interceptors before hijacking interceptor + creators.reserve(5); for (auto i = 0; i < 5; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -570,6 +573,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, DummyGlobalInterceptor) { std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> creators; // Add 20 dummy interceptors + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -595,6 +599,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, LoggingGlobalInterceptor) { std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> creators; // Add 20 dummy interceptors + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); @@ -620,6 +625,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, HijackingGlobalInterceptor) { std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> creators; // Add 20 dummy interceptors + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index b667460cf0..1dfa91a76c 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -116,7 +116,10 @@ class MyTestServiceImpl : public TestServiceImpl { class ClientLbEnd2endTest : public ::testing::Test { protected: ClientLbEnd2endTest() - : server_host_("localhost"), kRequestMessage_("Live long and prosper.") { + : server_host_("localhost"), + kRequestMessage_("Live long and prosper."), + creds_(new SecureChannelCredentials( + grpc_fake_transport_security_credentials_create())) { // Make the backup poller poll very frequently in order to pick up // updates from all the subchannels's FDs. gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); @@ -215,9 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test { } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); - std::shared_ptr<ChannelCredentials> creds(new SecureChannelCredentials( - grpc_fake_transport_security_credentials_create())); - return CreateCustomChannel("fake:///", std::move(creds), args); + return CreateCustomChannel("fake:///", creds_, args); } bool SendRpc( @@ -265,6 +266,7 @@ class ClientLbEnd2endTest : public ::testing::Test { MyTestServiceImpl service_; std::unique_ptr<std::thread> thread_; bool server_ready_ = false; + bool started_ = false; explicit ServerData(int port = 0) { port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); @@ -272,6 +274,7 @@ class ClientLbEnd2endTest : public ::testing::Test { void Start(const grpc::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); + started_ = true; std::mutex mu; std::unique_lock<std::mutex> lock(mu); std::condition_variable cond; @@ -297,9 +300,11 @@ class ClientLbEnd2endTest : public ::testing::Test { cond->notify_one(); } - void Shutdown(bool join = true) { + void Shutdown() { + if (!started_) return; server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); - if (join) thread_->join(); + thread_->join(); + started_ = false; } void SetServingStatus(const grpc::string& service, bool serving) { @@ -378,6 +383,7 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> response_generator_; const grpc::string kRequestMessage_; + std::shared_ptr<ChannelCredentials> creds_; }; TEST_F(ClientLbEnd2endTest, PickFirst) { @@ -422,6 +428,30 @@ TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) { CheckRpcSendOk(second_stub, DEBUG_LOCATION); } +TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 5000; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + // Create 2 servers, but start only the second one. + std::vector<int> ports = {grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die()}; + CreateServers(2, ports); + StartServer(1); + auto channel1 = BuildChannel("pick_first", args); + auto stub1 = BuildStub(channel1); + SetNextResolution(ports); + // Wait for second server to be ready. + WaitForServer(stub1, 1, DEBUG_LOCATION); + // Create a second channel with the same addresses. Its PF instance + // should immediately pick the second subchannel, since it's already + // in READY state. + auto channel2 = BuildChannel("pick_first", args); + SetNextResolution(ports); + // Check that the channel reports READY without waiting for the + // initial backoff. + EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */)); +} + TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { ChannelArguments args; constexpr int kInitialBackOffMs = 100; @@ -507,6 +537,51 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { EXPECT_LT(waited_ms, kInitialBackOffMs); } +TEST_F(ClientLbEnd2endTest, + PickFirstResetConnectionBackoffNextAttemptStartsImmediately) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 1000; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + const std::vector<int> ports = {grpc_pick_unused_port_or_die()}; + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + SetNextResolution(ports); + // Wait for connect, which should fail ~immediately, because the server + // is not up. + gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT"); + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Reset connection backoff. + // Note that the time at which the third attempt will be started is + // actually computed at this point, so we record the start time here. + gpr_log(GPR_INFO, "=== RESETTING BACKOFF"); + const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); + experimental::ChannelResetConnectionBackoff(channel.get()); + // Trigger a second connection attempt. This should also fail + // ~immediately, but the retry should be scheduled for + // kInitialBackOffMs instead of applying the multiplier. + gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT"); + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Bring up a server on the chosen port. + gpr_log(GPR_INFO, "=== STARTING BACKEND"); + StartServers(1, ports); + // Wait for connect. Should happen within kInitialBackOffMs. + // Give an extra 100ms to account for the time spent in the second and + // third connection attempts themselves (since what we really want to + // measure is the time between the two). As long as this is less than + // the 1.6x increase we would see if the backoff state was not reset + // properly, the test is still proving that the backoff was reset. + constexpr int kWaitMs = kInitialBackOffMs + 100; + gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT"); + EXPECT_TRUE(channel->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kWaitMs))); + const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC); + const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0)); + gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms); + EXPECT_LT(waited_ms, kWaitMs); +} + TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; @@ -899,7 +974,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { servers_[0]->service_.ResetCounters(); // Shutdown one of the servers to be sent in the update. - servers_[1]->Shutdown(false); + servers_[1]->Shutdown(); ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); SetNextResolution(ports); @@ -958,7 +1033,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { // Kill all servers gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******"); for (size_t i = 0; i < servers_.size(); ++i) { - servers_[i]->Shutdown(true); + servers_[i]->Shutdown(); } gpr_log(GPR_INFO, "****** SERVERS KILLED *******"); gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******"); @@ -1006,7 +1081,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { } const auto pre_death = servers_[0]->service_.request_count(); // Kill the first server. - servers_[0]->Shutdown(true); + servers_[0]->Shutdown(); // Client request still succeed. May need retrying if RR had returned a pick // before noticing the change in the server's connectivity. while (!SendRpc(stub)) { diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc index 89c4bef09c..b96ff53a3e 100644 --- a/test/cpp/end2end/health_service_end2end_test.cc +++ b/test/cpp/end2end/health_service_end2end_test.cc @@ -37,6 +37,7 @@ #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_health_check_service_impl.h" #include "test/cpp/end2end/test_service_impl.h" #include <gtest/gtest.h> @@ -49,62 +50,6 @@ namespace grpc { namespace testing { namespace { -// A sample sync implementation of the health checking service. This does the -// same thing as the default one. -class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service { - public: - Status Check(ServerContext* context, const HealthCheckRequest* request, - HealthCheckResponse* response) override { - std::lock_guard<std::mutex> lock(mu_); - auto iter = status_map_.find(request->service()); - if (iter == status_map_.end()) { - return Status(StatusCode::NOT_FOUND, ""); - } - response->set_status(iter->second); - return Status::OK; - } - - Status Watch(ServerContext* context, const HealthCheckRequest* request, - ::grpc::ServerWriter<HealthCheckResponse>* writer) override { - auto last_state = HealthCheckResponse::UNKNOWN; - while (!context->IsCancelled()) { - { - std::lock_guard<std::mutex> lock(mu_); - HealthCheckResponse response; - auto iter = status_map_.find(request->service()); - if (iter == status_map_.end()) { - response.set_status(response.SERVICE_UNKNOWN); - } else { - response.set_status(iter->second); - } - if (response.status() != last_state) { - writer->Write(response, ::grpc::WriteOptions()); - } - } - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(1000, GPR_TIMESPAN))); - } - return Status::OK; - } - - void SetStatus(const grpc::string& service_name, - HealthCheckResponse::ServingStatus status) { - std::lock_guard<std::mutex> lock(mu_); - status_map_[service_name] = status; - } - - void SetAll(HealthCheckResponse::ServingStatus status) { - std::lock_guard<std::mutex> lock(mu_); - for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) { - iter->second = status; - } - } - - private: - std::mutex mu_; - std::map<const grpc::string, HealthCheckResponse::ServingStatus> status_map_; -}; - // A custom implementation of the health checking service interface. This is // used to test that it prevents the server from creating a default service and // also serves as an example of how to override the default service. @@ -125,6 +70,8 @@ class CustomHealthCheckService : public HealthCheckServiceInterface { : HealthCheckResponse::NOT_SERVING); } + void Shutdown() override { impl_->Shutdown(); } + private: HealthCheckServiceImpl* impl_; // not owned }; @@ -260,6 +207,74 @@ class HealthServiceEnd2endTest : public ::testing::Test { context.TryCancel(); } + // Verify that after HealthCheckServiceInterface::Shutdown is called + // 1. unary client will see NOT_SERVING. + // 2. unary client still sees NOT_SERVING after a SetServing(true) is called. + // 3. streaming (Watch) client will see an update. + // 4. setting a new service to serving after shutdown will add the service + // name but return NOT_SERVING to client. + // This has to be called last. + void VerifyHealthCheckServiceShutdown() { + HealthCheckServiceInterface* service = server_->GetHealthCheckService(); + EXPECT_TRUE(service != nullptr); + const grpc::string kHealthyService("healthy_service"); + const grpc::string kUnhealthyService("unhealthy_service"); + const grpc::string kNotRegisteredService("not_registered"); + const grpc::string kNewService("add_after_shutdown"); + service->SetServingStatus(kHealthyService, true); + service->SetServingStatus(kUnhealthyService, false); + + ResetStubs(); + + // Start Watch for service. + ClientContext context; + HealthCheckRequest request; + request.set_service(kHealthyService); + std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader = + hc_stub_->Watch(&context, request); + + HealthCheckResponse response; + EXPECT_TRUE(reader->Read(&response)); + EXPECT_EQ(response.SERVING, response.status()); + + SendHealthCheckRpc("", Status::OK, HealthCheckResponse::SERVING); + SendHealthCheckRpc(kHealthyService, Status::OK, + HealthCheckResponse::SERVING); + SendHealthCheckRpc(kUnhealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kNotRegisteredService, + Status(StatusCode::NOT_FOUND, "")); + SendHealthCheckRpc(kNewService, Status(StatusCode::NOT_FOUND, "")); + + // Shutdown health check service. + service->Shutdown(); + + // Watch client gets another update. + EXPECT_TRUE(reader->Read(&response)); + EXPECT_EQ(response.NOT_SERVING, response.status()); + // Finish Watch call. + context.TryCancel(); + + SendHealthCheckRpc("", Status::OK, HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kHealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kUnhealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + SendHealthCheckRpc(kNotRegisteredService, + Status(StatusCode::NOT_FOUND, "")); + + // Setting status after Shutdown has no effect. + service->SetServingStatus(kHealthyService, true); + SendHealthCheckRpc(kHealthyService, Status::OK, + HealthCheckResponse::NOT_SERVING); + + // Adding serving status for a new service after shutdown will return + // NOT_SERVING. + service->SetServingStatus(kNewService, true); + SendHealthCheckRpc(kNewService, Status::OK, + HealthCheckResponse::NOT_SERVING); + } + TestServiceImpl echo_test_service_; HealthCheckServiceImpl health_check_service_impl_; std::unique_ptr<Health::Stub> hc_stub_; @@ -295,6 +310,13 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) { Status(StatusCode::INVALID_ARGUMENT, "")); } +TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceShutdown) { + EnableDefaultHealthCheckService(true); + EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); + SetUpServer(true, false, false, nullptr); + VerifyHealthCheckServiceShutdown(); +} + // Provide an empty service to disable the default service. TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) { EnableDefaultHealthCheckService(true); @@ -326,6 +348,21 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) { VerifyHealthCheckServiceStreaming(); } +TEST_F(HealthServiceEnd2endTest, ExplicitlyHealthServiceShutdown) { + EnableDefaultHealthCheckService(true); + EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); + std::unique_ptr<HealthCheckServiceInterface> override_service( + new CustomHealthCheckService(&health_check_service_impl_)); + HealthCheckServiceInterface* underlying_service = override_service.get(); + SetUpServer(false, false, true, std::move(override_service)); + HealthCheckServiceInterface* service = server_->GetHealthCheckService(); + EXPECT_TRUE(service == underlying_service); + + ResetStubs(); + + VerifyHealthCheckServiceShutdown(); +} + } // namespace } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/interceptors_util.cc b/test/cpp/end2end/interceptors_util.cc index 5d59c1a4b7..e0ad7d1526 100644 --- a/test/cpp/end2end/interceptors_util.cc +++ b/test/cpp/end2end/interceptors_util.cc @@ -137,6 +137,7 @@ CreateDummyClientInterceptors() { std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> creators; // Add 20 dummy interceptors before hijacking interceptor + creators.reserve(20); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr<DummyInterceptorFactory>( new DummyInterceptorFactory())); diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index c98b6143c6..9460a7d6c6 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -44,7 +44,34 @@ namespace { class LoggingInterceptor : public experimental::Interceptor { public: - LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; } + LoggingInterceptor(experimental::ServerRpcInfo* info) { + info_ = info; + + // Check the method name and compare to the type + const char* method = info->method(); + experimental::ServerRpcInfo::Type type = info->type(); + + // Check that we use one of our standard methods with expected type. + // Also allow the health checking service. + // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService + // being tested (the GenericRpc test). + // The empty method is for the Unimplemented requests that arise + // when draining the CQ. + EXPECT_TRUE( + strstr(method, "/grpc.health") == method || + (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 && + (type == experimental::ServerRpcInfo::Type::UNARY || + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) || + (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 && + type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) || + (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 && + type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) || + (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 && + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) || + strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 || + (strcmp(method, "") == 0 && + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)); + } virtual void Intercept(experimental::InterceptorBatchMethods* methods) { if (methods->QueryInterceptionHookPoint( diff --git a/test/cpp/end2end/test_health_check_service_impl.cc b/test/cpp/end2end/test_health_check_service_impl.cc new file mode 100644 index 0000000000..0801e30199 --- /dev/null +++ b/test/cpp/end2end/test_health_check_service_impl.cc @@ -0,0 +1,97 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/cpp/end2end/test_health_check_service_impl.h" + +#include <grpc/grpc.h> + +using grpc::health::v1::HealthCheckRequest; +using grpc::health::v1::HealthCheckResponse; + +namespace grpc { +namespace testing { + +Status HealthCheckServiceImpl::Check(ServerContext* context, + const HealthCheckRequest* request, + HealthCheckResponse* response) { + std::lock_guard<std::mutex> lock(mu_); + auto iter = status_map_.find(request->service()); + if (iter == status_map_.end()) { + return Status(StatusCode::NOT_FOUND, ""); + } + response->set_status(iter->second); + return Status::OK; +} + +Status HealthCheckServiceImpl::Watch( + ServerContext* context, const HealthCheckRequest* request, + ::grpc::ServerWriter<HealthCheckResponse>* writer) { + auto last_state = HealthCheckResponse::UNKNOWN; + while (!context->IsCancelled()) { + { + std::lock_guard<std::mutex> lock(mu_); + HealthCheckResponse response; + auto iter = status_map_.find(request->service()); + if (iter == status_map_.end()) { + response.set_status(response.SERVICE_UNKNOWN); + } else { + response.set_status(iter->second); + } + if (response.status() != last_state) { + writer->Write(response, ::grpc::WriteOptions()); + } + } + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(1000, GPR_TIMESPAN))); + } + return Status::OK; +} + +void HealthCheckServiceImpl::SetStatus( + const grpc::string& service_name, + HealthCheckResponse::ServingStatus status) { + std::lock_guard<std::mutex> lock(mu_); + if (shutdown_) { + status = HealthCheckResponse::NOT_SERVING; + } + status_map_[service_name] = status; +} + +void HealthCheckServiceImpl::SetAll(HealthCheckResponse::ServingStatus status) { + std::lock_guard<std::mutex> lock(mu_); + if (shutdown_) { + return; + } + for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) { + iter->second = status; + } +} + +void HealthCheckServiceImpl::Shutdown() { + std::lock_guard<std::mutex> lock(mu_); + if (shutdown_) { + return; + } + shutdown_ = true; + for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) { + iter->second = HealthCheckResponse::NOT_SERVING; + } +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/end2end/test_health_check_service_impl.h b/test/cpp/end2end/test_health_check_service_impl.h new file mode 100644 index 0000000000..5d36ce5320 --- /dev/null +++ b/test/cpp/end2end/test_health_check_service_impl.h @@ -0,0 +1,58 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H +#define GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H + +#include <map> +#include <mutex> + +#include <grpcpp/server_context.h> +#include <grpcpp/support/status.h> + +#include "src/proto/grpc/health/v1/health.grpc.pb.h" + +namespace grpc { +namespace testing { + +// A sample sync implementation of the health checking service. This does the +// same thing as the default one. +class HealthCheckServiceImpl : public health::v1::Health::Service { + public: + Status Check(ServerContext* context, + const health::v1::HealthCheckRequest* request, + health::v1::HealthCheckResponse* response) override; + Status Watch(ServerContext* context, + const health::v1::HealthCheckRequest* request, + ServerWriter<health::v1::HealthCheckResponse>* writer) override; + void SetStatus(const grpc::string& service_name, + health::v1::HealthCheckResponse::ServingStatus status); + void SetAll(health::v1::HealthCheckResponse::ServingStatus status); + + void Shutdown(); + + private: + std::mutex mu_; + bool shutdown_ = false; + std::map<const grpc::string, health::v1::HealthCheckResponse::ServingStatus> + status_map_; +}; + +} // namespace testing +} // namespace grpc + +#endif // GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 5ae9a9a791..097e92f583 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -29,6 +29,7 @@ grpc_cc_test( grpc_cc_library( name = "helpers", + testonly = 1, srcs = ["helpers.cc"], hdrs = [ "fullstack_context_mutators.h", @@ -55,6 +56,7 @@ grpc_cc_binary( grpc_cc_binary( name = "bm_arena", + testonly = 1, srcs = ["bm_arena.cc"], deps = [":helpers"], ) @@ -68,6 +70,7 @@ grpc_cc_binary( grpc_cc_binary( name = "bm_call_create", + testonly = 1, srcs = ["bm_call_create.cc"], deps = [":helpers"], ) @@ -95,6 +98,7 @@ grpc_cc_binary( grpc_cc_library( name = "fullstack_streaming_ping_pong_h", + testonly = 1, hdrs = [ "fullstack_streaming_ping_pong.h", ], @@ -112,6 +116,7 @@ grpc_cc_binary( grpc_cc_library( name = "fullstack_streaming_pump_h", + testonly = 1, hdrs = [ "fullstack_streaming_pump.h", ], @@ -129,12 +134,14 @@ grpc_cc_binary( grpc_cc_binary( name = "bm_fullstack_trickle", + testonly = 1, srcs = ["bm_fullstack_trickle.cc"], deps = [":helpers"], ) grpc_cc_library( name = "fullstack_unary_ping_pong_h", + testonly = 1, hdrs = [ "fullstack_unary_ping_pong.h", ], diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index 71e8d9972b..6bbf553bbd 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -200,7 +200,7 @@ class EndpointPairFixture : public BaseFixture { } grpc_server_setup_transport(server_->c_server(), server_transport_, - nullptr, server_args, 0); + nullptr, server_args, nullptr); grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr); } diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index 3c5b346884..7b22f23cf0 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -100,7 +100,7 @@ class EndpointPairFixture { } grpc_server_setup_transport(server_->c_server(), transport, nullptr, - server_args, 0); + server_args, nullptr); grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } |