aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2018-12-17 08:52:20 -0800
committerGravatar yang-g <yangg@google.com>2018-12-17 08:52:20 -0800
commit038a71d826f02d06397e1132f2ec1d071e90f08c (patch)
tree43afe8c684c43faae9fa0445009187ebbe643e93 /test/cpp/end2end
parentbd5d86935f6fdefd22e12eab5c64e7cb1ba7d7bc (diff)
parentb250f34b1225cde1bb19496c5cc5d66e40111052 (diff)
Merge remote-tracking branch 'upstream/master' into gpr_test_util_to_grpc_test_util
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r--test/cpp/end2end/BUILD13
-rw-r--r--test/cpp/end2end/channelz_service_test.cc82
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc47
-rw-r--r--test/cpp/end2end/client_interceptors_end2end_test.cc6
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc111
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc41
-rw-r--r--test/cpp/end2end/health_service_end2end_test.cc149
-rw-r--r--test/cpp/end2end/interceptors_util.cc1
-rw-r--r--test/cpp/end2end/server_interceptors_end2end_test.cc29
-rw-r--r--test/cpp/end2end/test_health_check_service_impl.cc97
-rw-r--r--test/cpp/end2end/test_health_check_service_impl.h58
11 files changed, 528 insertions, 106 deletions
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index a648f4f063..762d2302af 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -36,6 +36,18 @@ grpc_cc_library(
)
grpc_cc_library(
+ name = "test_health_check_service_impl",
+ testonly = True,
+ srcs = ["test_health_check_service_impl.cc"],
+ hdrs = ["test_health_check_service_impl.h"],
+ deps = [
+ "//:grpc",
+ "//:grpc++",
+ "//src/proto/grpc/health/v1:health_proto",
+ ],
+)
+
+grpc_cc_library(
name = "interceptors_util",
testonly = True,
srcs = ["interceptors_util.cc"],
@@ -272,6 +284,7 @@ grpc_cc_test(
"gtest",
],
deps = [
+ ":test_health_check_service_impl",
":test_service_impl",
"//:gpr",
"//:grpc",
diff --git a/test/cpp/end2end/channelz_service_test.cc b/test/cpp/end2end/channelz_service_test.cc
index 29b59e4e5e..425334d972 100644
--- a/test/cpp/end2end/channelz_service_test.cc
+++ b/test/cpp/end2end/channelz_service_test.cc
@@ -54,6 +54,14 @@ using grpc::channelz::v1::GetSubchannelResponse;
using grpc::channelz::v1::GetTopChannelsRequest;
using grpc::channelz::v1::GetTopChannelsResponse;
+// This code snippet can be used to print out any responses for
+// visual debugging.
+//
+//
+// string out_str;
+// google::protobuf::TextFormat::PrintToString(resp, &out_str);
+// std::cout << "resp: " << out_str << "\n";
+
namespace grpc {
namespace testing {
namespace {
@@ -164,6 +172,19 @@ class ChannelzServerTest : public ::testing::Test {
echo_stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> NewEchoStub() {
+ static int salt = 0;
+ string target = "dns:localhost:" + to_string(proxy_port_);
+ ChannelArguments args;
+ // disable channelz. We only want to focus on proxy to backend outbound.
+ args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0);
+ // This ensures that gRPC will not do connection sharing.
+ args.SetInt("salt", salt++);
+ std::shared_ptr<Channel> channel =
+ CreateCustomChannel(target, InsecureChannelCredentials(), args);
+ return grpc::testing::EchoTestService::NewStub(channel);
+ }
+
void SendSuccessfulEcho(int channel_idx) {
EchoRequest request;
EchoResponse response;
@@ -651,6 +672,67 @@ TEST_F(ChannelzServerTest, GetServerSocketsTest) {
EXPECT_EQ(get_server_sockets_response.socket_ref_size(), 1);
}
+TEST_F(ChannelzServerTest, GetServerSocketsPaginationTest) {
+ ResetStubs();
+ ConfigureProxy(1);
+ std::vector<std::unique_ptr<grpc::testing::EchoTestService::Stub>> stubs;
+ const int kNumServerSocketsCreated = 20;
+ for (int i = 0; i < kNumServerSocketsCreated; ++i) {
+ stubs.push_back(NewEchoStub());
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Hello channelz");
+ request.mutable_param()->set_backend_channel_idx(0);
+ ClientContext context;
+ Status s = stubs.back()->Echo(&context, request, &response);
+ EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
+ }
+ GetServersRequest get_server_request;
+ GetServersResponse get_server_response;
+ get_server_request.set_start_server_id(0);
+ ClientContext get_server_context;
+ Status s = channelz_stub_->GetServers(&get_server_context, get_server_request,
+ &get_server_response);
+ EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
+ EXPECT_EQ(get_server_response.server_size(), 1);
+ // Make a request that gets all of the serversockets
+ {
+ GetServerSocketsRequest get_server_sockets_request;
+ GetServerSocketsResponse get_server_sockets_response;
+ get_server_sockets_request.set_server_id(
+ get_server_response.server(0).ref().server_id());
+ get_server_sockets_request.set_start_socket_id(0);
+ ClientContext get_server_sockets_context;
+ s = channelz_stub_->GetServerSockets(&get_server_sockets_context,
+ get_server_sockets_request,
+ &get_server_sockets_response);
+ EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
+ // We add one to account the the channelz stub that will end up creating
+ // a serversocket.
+ EXPECT_EQ(get_server_sockets_response.socket_ref_size(),
+ kNumServerSocketsCreated + 1);
+ EXPECT_TRUE(get_server_sockets_response.end());
+ }
+ // Now we make a request that exercises pagination.
+ {
+ GetServerSocketsRequest get_server_sockets_request;
+ GetServerSocketsResponse get_server_sockets_response;
+ get_server_sockets_request.set_server_id(
+ get_server_response.server(0).ref().server_id());
+ get_server_sockets_request.set_start_socket_id(0);
+ const int kMaxResults = 10;
+ get_server_sockets_request.set_max_results(kMaxResults);
+ ClientContext get_server_sockets_context;
+ s = channelz_stub_->GetServerSockets(&get_server_sockets_context,
+ get_server_sockets_request,
+ &get_server_sockets_response);
+ EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
+ EXPECT_EQ(get_server_sockets_response.socket_ref_size(), kMaxResults);
+ EXPECT_FALSE(get_server_sockets_response.end());
+ }
+}
+
TEST_F(ChannelzServerTest, GetServerListenSocketsTest) {
ResetStubs();
ConfigureProxy(1);
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index 65434bac6b..a999321992 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -182,7 +182,7 @@ class ClientCallbackEnd2endTest
}
}
- void SendGenericEchoAsBidi(int num_rpcs) {
+ void SendGenericEchoAsBidi(int num_rpcs, int reuses) {
const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
grpc::string test_string("");
for (int i = 0; i < num_rpcs; i++) {
@@ -191,14 +191,26 @@ class ClientCallbackEnd2endTest
ByteBuffer> {
public:
Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
- const grpc::string& test_str) {
- test->generic_stub_->experimental().PrepareBidiStreamingCall(
- &cli_ctx_, method_name, this);
- request_.set_message(test_str);
- send_buf_ = SerializeToByteBuffer(&request_);
- StartWrite(send_buf_.get());
- StartRead(&recv_buf_);
- StartCall();
+ const grpc::string& test_str, int reuses)
+ : reuses_remaining_(reuses) {
+ activate_ = [this, test, method_name, test_str] {
+ if (reuses_remaining_ > 0) {
+ cli_ctx_.reset(new ClientContext);
+ reuses_remaining_--;
+ test->generic_stub_->experimental().PrepareBidiStreamingCall(
+ cli_ctx_.get(), method_name, this);
+ request_.set_message(test_str);
+ send_buf_ = SerializeToByteBuffer(&request_);
+ StartWrite(send_buf_.get());
+ StartRead(&recv_buf_);
+ StartCall();
+ } else {
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ };
+ activate_();
}
void OnWriteDone(bool ok) override { StartWritesDone(); }
void OnReadDone(bool ok) override {
@@ -208,9 +220,7 @@ class ClientCallbackEnd2endTest
};
void OnDone(const Status& s) override {
EXPECT_TRUE(s.ok());
- std::unique_lock<std::mutex> l(mu_);
- done_ = true;
- cv_.notify_one();
+ activate_();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
@@ -222,11 +232,13 @@ class ClientCallbackEnd2endTest
EchoRequest request_;
std::unique_ptr<ByteBuffer> send_buf_;
ByteBuffer recv_buf_;
- ClientContext cli_ctx_;
+ std::unique_ptr<ClientContext> cli_ctx_;
+ int reuses_remaining_;
+ std::function<void()> activate_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
- } rpc{this, kMethodName, test_string};
+ } rpc{this, kMethodName, test_string, reuses};
rpc.Await();
}
@@ -293,7 +305,12 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
ResetStub();
- SendGenericEchoAsBidi(10);
+ SendGenericEchoAsBidi(10, 1);
+}
+
+TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
+ ResetStub();
+ SendGenericEchoAsBidi(10, 10);
}
#if GRPC_ALLOW_EXCEPTIONS
diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc
index 3a191d1e03..968b5d49d1 100644
--- a/test/cpp/end2end/client_interceptors_end2end_test.cc
+++ b/test/cpp/end2end/client_interceptors_end2end_test.cc
@@ -50,6 +50,7 @@ class HijackingInterceptor : public experimental::Interceptor {
info_ = info;
// Make sure it is the right method
EXPECT_EQ(strcmp("/grpc.testing.EchoTestService/Echo", info->method()), 0);
+ EXPECT_EQ(info->type(), experimental::ClientRpcInfo::Type::UNARY);
}
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
@@ -383,6 +384,7 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingTest) {
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
// Add 20 dummy interceptors before hijacking interceptor
+ creators.reserve(20);
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
@@ -423,6 +425,7 @@ TEST_F(ClientInterceptorsEnd2endTest,
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
// Add 5 dummy interceptors before hijacking interceptor
+ creators.reserve(5);
for (auto i = 0; i < 5; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
@@ -570,6 +573,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, DummyGlobalInterceptor) {
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
// Add 20 dummy interceptors
+ creators.reserve(20);
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
@@ -595,6 +599,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, LoggingGlobalInterceptor) {
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
// Add 20 dummy interceptors
+ creators.reserve(20);
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
@@ -620,6 +625,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, HijackingGlobalInterceptor) {
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
// Add 20 dummy interceptors
+ creators.reserve(20);
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index b667460cf0..929c2bb589 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -35,7 +35,9 @@
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
+#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gpr/env.h"
@@ -116,7 +118,10 @@ class MyTestServiceImpl : public TestServiceImpl {
class ClientLbEnd2endTest : public ::testing::Test {
protected:
ClientLbEnd2endTest()
- : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {
+ : server_host_("localhost"),
+ kRequestMessage_("Live long and prosper."),
+ creds_(new SecureChannelCredentials(
+ grpc_fake_transport_security_credentials_create())) {
// Make the backup poller poll very frequently in order to pick up
// updates from all the subchannels's FDs.
gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1");
@@ -156,24 +161,22 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
grpc_channel_args* BuildFakeResults(const std::vector<int>& ports) {
- grpc_lb_addresses* addresses =
- grpc_lb_addresses_create(ports.size(), nullptr);
- for (size_t i = 0; i < ports.size(); ++i) {
+ grpc_core::ServerAddressList addresses;
+ for (const int& port : ports) {
char* lb_uri_str;
- gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]);
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
GPR_ASSERT(lb_uri != nullptr);
- grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri,
- false /* is balancer */,
- "" /* balancer name */, nullptr);
+ grpc_resolved_address address;
+ GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
+ addresses.emplace_back(address.addr, address.len, nullptr /* args */);
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
const grpc_arg fake_addresses =
- grpc_lb_addresses_create_channel_arg(addresses);
+ CreateServerAddressListChannelArg(&addresses);
grpc_channel_args* fake_results =
grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1);
- grpc_lb_addresses_destroy(addresses);
return fake_results;
}
@@ -215,9 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
} // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
- std::shared_ptr<ChannelCredentials> creds(new SecureChannelCredentials(
- grpc_fake_transport_security_credentials_create()));
- return CreateCustomChannel("fake:///", std::move(creds), args);
+ return CreateCustomChannel("fake:///", creds_, args);
}
bool SendRpc(
@@ -265,6 +266,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
+ bool started_ = false;
explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
@@ -272,6 +274,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
+ started_ = true;
std::mutex mu;
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
@@ -297,9 +300,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
cond->notify_one();
}
- void Shutdown(bool join = true) {
+ void Shutdown() {
+ if (!started_) return;
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
- if (join) thread_->join();
+ thread_->join();
+ started_ = false;
}
void SetServingStatus(const grpc::string& service, bool serving) {
@@ -378,6 +383,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
response_generator_;
const grpc::string kRequestMessage_;
+ std::shared_ptr<ChannelCredentials> creds_;
};
TEST_F(ClientLbEnd2endTest, PickFirst) {
@@ -422,6 +428,30 @@ TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
CheckRpcSendOk(second_stub, DEBUG_LOCATION);
}
+TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
+ ChannelArguments args;
+ constexpr int kInitialBackOffMs = 5000;
+ args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
+ // Create 2 servers, but start only the second one.
+ std::vector<int> ports = {grpc_pick_unused_port_or_die(),
+ grpc_pick_unused_port_or_die()};
+ CreateServers(2, ports);
+ StartServer(1);
+ auto channel1 = BuildChannel("pick_first", args);
+ auto stub1 = BuildStub(channel1);
+ SetNextResolution(ports);
+ // Wait for second server to be ready.
+ WaitForServer(stub1, 1, DEBUG_LOCATION);
+ // Create a second channel with the same addresses. Its PF instance
+ // should immediately pick the second subchannel, since it's already
+ // in READY state.
+ auto channel2 = BuildChannel("pick_first", args);
+ SetNextResolution(ports);
+ // Check that the channel reports READY without waiting for the
+ // initial backoff.
+ EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
+}
+
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
ChannelArguments args;
constexpr int kInitialBackOffMs = 100;
@@ -507,6 +537,51 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
EXPECT_LT(waited_ms, kInitialBackOffMs);
}
+TEST_F(ClientLbEnd2endTest,
+ PickFirstResetConnectionBackoffNextAttemptStartsImmediately) {
+ ChannelArguments args;
+ constexpr int kInitialBackOffMs = 1000;
+ args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
+ const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
+ auto channel = BuildChannel("pick_first", args);
+ auto stub = BuildStub(channel);
+ SetNextResolution(ports);
+ // Wait for connect, which should fail ~immediately, because the server
+ // is not up.
+ gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
+ EXPECT_FALSE(
+ channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
+ // Reset connection backoff.
+ // Note that the time at which the third attempt will be started is
+ // actually computed at this point, so we record the start time here.
+ gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
+ const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
+ experimental::ChannelResetConnectionBackoff(channel.get());
+ // Trigger a second connection attempt. This should also fail
+ // ~immediately, but the retry should be scheduled for
+ // kInitialBackOffMs instead of applying the multiplier.
+ gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT");
+ EXPECT_FALSE(
+ channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
+ // Bring up a server on the chosen port.
+ gpr_log(GPR_INFO, "=== STARTING BACKEND");
+ StartServers(1, ports);
+ // Wait for connect. Should happen within kInitialBackOffMs.
+ // Give an extra 100ms to account for the time spent in the second and
+ // third connection attempts themselves (since what we really want to
+ // measure is the time between the two). As long as this is less than
+ // the 1.6x increase we would see if the backoff state was not reset
+ // properly, the test is still proving that the backoff was reset.
+ constexpr int kWaitMs = kInitialBackOffMs + 100;
+ gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT");
+ EXPECT_TRUE(channel->WaitForConnected(
+ grpc_timeout_milliseconds_to_deadline(kWaitMs)));
+ const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
+ const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
+ gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
+ EXPECT_LT(waited_ms, kWaitMs);
+}
+
TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
@@ -899,7 +974,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
servers_[0]->service_.ResetCounters();
// Shutdown one of the servers to be sent in the update.
- servers_[1]->Shutdown(false);
+ servers_[1]->Shutdown();
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
@@ -958,7 +1033,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Kill all servers
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
for (size_t i = 0; i < servers_.size(); ++i) {
- servers_[i]->Shutdown(true);
+ servers_[i]->Shutdown();
}
gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
@@ -1006,7 +1081,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
}
const auto pre_death = servers_[0]->service_.request_count();
// Kill the first server.
- servers_[0]->Shutdown(true);
+ servers_[0]->Shutdown();
// Client request still succeed. May need retrying if RR had returned a pick
// before noticing the change in the server's connectivity.
while (!SendRpc(stub)) {
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 9c4cd05061..2eaacd429d 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -32,7 +32,9 @@
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
+#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -412,8 +414,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
std::shared_ptr<ChannelCredentials> creds(
new SecureChannelCredentials(grpc_composite_channel_credentials_create(
channel_creds, call_creds, nullptr)));
- grpc_call_credentials_unref(call_creds);
- grpc_channel_credentials_unref(channel_creds);
+ call_creds->Unref();
+ channel_creds->Unref();
channel_ = CreateCustomChannel(uri.str(), creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
@@ -486,18 +488,27 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc::string balancer_name;
};
- grpc_lb_addresses* CreateLbAddressesFromAddressDataList(
+ grpc_core::ServerAddressList CreateLbAddressesFromAddressDataList(
const std::vector<AddressData>& address_data) {
- grpc_lb_addresses* addresses =
- grpc_lb_addresses_create(address_data.size(), nullptr);
- for (size_t i = 0; i < address_data.size(); ++i) {
+ grpc_core::ServerAddressList addresses;
+ for (const auto& addr : address_data) {
char* lb_uri_str;
- gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port);
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", addr.port);
grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
GPR_ASSERT(lb_uri != nullptr);
- grpc_lb_addresses_set_address_from_uri(
- addresses, i, lb_uri, address_data[i].is_balancer,
- address_data[i].balancer_name.c_str(), nullptr);
+ grpc_resolved_address address;
+ GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
+ std::vector<grpc_arg> args_to_add;
+ if (addr.is_balancer) {
+ args_to_add.emplace_back(grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_ADDRESS_IS_BALANCER), 1));
+ args_to_add.emplace_back(grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME),
+ const_cast<char*>(addr.balancer_name.c_str())));
+ }
+ grpc_channel_args* args = grpc_channel_args_copy_and_add(
+ nullptr, args_to_add.data(), args_to_add.size());
+ addresses.emplace_back(address.addr, address.len, args);
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
@@ -506,23 +517,21 @@ class GrpclbEnd2endTest : public ::testing::Test {
void SetNextResolution(const std::vector<AddressData>& address_data) {
grpc_core::ExecCtx exec_ctx;
- grpc_lb_addresses* addresses =
+ grpc_core::ServerAddressList addresses =
CreateLbAddressesFromAddressDataList(address_data);
- grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_arg fake_addresses = CreateServerAddressListChannelArg(&addresses);
grpc_channel_args fake_result = {1, &fake_addresses};
response_generator_->SetResponse(&fake_result);
- grpc_lb_addresses_destroy(addresses);
}
void SetNextReresolutionResponse(
const std::vector<AddressData>& address_data) {
grpc_core::ExecCtx exec_ctx;
- grpc_lb_addresses* addresses =
+ grpc_core::ServerAddressList addresses =
CreateLbAddressesFromAddressDataList(address_data);
- grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_arg fake_addresses = CreateServerAddressListChannelArg(&addresses);
grpc_channel_args fake_result = {1, &fake_addresses};
response_generator_->SetReresolutionResponse(&fake_result);
- grpc_lb_addresses_destroy(addresses);
}
const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc
index 89c4bef09c..b96ff53a3e 100644
--- a/test/cpp/end2end/health_service_end2end_test.cc
+++ b/test/cpp/end2end/health_service_end2end_test.cc
@@ -37,6 +37,7 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_health_check_service_impl.h"
#include "test/cpp/end2end/test_service_impl.h"
#include <gtest/gtest.h>
@@ -49,62 +50,6 @@ namespace grpc {
namespace testing {
namespace {
-// A sample sync implementation of the health checking service. This does the
-// same thing as the default one.
-class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service {
- public:
- Status Check(ServerContext* context, const HealthCheckRequest* request,
- HealthCheckResponse* response) override {
- std::lock_guard<std::mutex> lock(mu_);
- auto iter = status_map_.find(request->service());
- if (iter == status_map_.end()) {
- return Status(StatusCode::NOT_FOUND, "");
- }
- response->set_status(iter->second);
- return Status::OK;
- }
-
- Status Watch(ServerContext* context, const HealthCheckRequest* request,
- ::grpc::ServerWriter<HealthCheckResponse>* writer) override {
- auto last_state = HealthCheckResponse::UNKNOWN;
- while (!context->IsCancelled()) {
- {
- std::lock_guard<std::mutex> lock(mu_);
- HealthCheckResponse response;
- auto iter = status_map_.find(request->service());
- if (iter == status_map_.end()) {
- response.set_status(response.SERVICE_UNKNOWN);
- } else {
- response.set_status(iter->second);
- }
- if (response.status() != last_state) {
- writer->Write(response, ::grpc::WriteOptions());
- }
- }
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_millis(1000, GPR_TIMESPAN)));
- }
- return Status::OK;
- }
-
- void SetStatus(const grpc::string& service_name,
- HealthCheckResponse::ServingStatus status) {
- std::lock_guard<std::mutex> lock(mu_);
- status_map_[service_name] = status;
- }
-
- void SetAll(HealthCheckResponse::ServingStatus status) {
- std::lock_guard<std::mutex> lock(mu_);
- for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) {
- iter->second = status;
- }
- }
-
- private:
- std::mutex mu_;
- std::map<const grpc::string, HealthCheckResponse::ServingStatus> status_map_;
-};
-
// A custom implementation of the health checking service interface. This is
// used to test that it prevents the server from creating a default service and
// also serves as an example of how to override the default service.
@@ -125,6 +70,8 @@ class CustomHealthCheckService : public HealthCheckServiceInterface {
: HealthCheckResponse::NOT_SERVING);
}
+ void Shutdown() override { impl_->Shutdown(); }
+
private:
HealthCheckServiceImpl* impl_; // not owned
};
@@ -260,6 +207,74 @@ class HealthServiceEnd2endTest : public ::testing::Test {
context.TryCancel();
}
+ // Verify that after HealthCheckServiceInterface::Shutdown is called
+ // 1. unary client will see NOT_SERVING.
+ // 2. unary client still sees NOT_SERVING after a SetServing(true) is called.
+ // 3. streaming (Watch) client will see an update.
+ // 4. setting a new service to serving after shutdown will add the service
+ // name but return NOT_SERVING to client.
+ // This has to be called last.
+ void VerifyHealthCheckServiceShutdown() {
+ HealthCheckServiceInterface* service = server_->GetHealthCheckService();
+ EXPECT_TRUE(service != nullptr);
+ const grpc::string kHealthyService("healthy_service");
+ const grpc::string kUnhealthyService("unhealthy_service");
+ const grpc::string kNotRegisteredService("not_registered");
+ const grpc::string kNewService("add_after_shutdown");
+ service->SetServingStatus(kHealthyService, true);
+ service->SetServingStatus(kUnhealthyService, false);
+
+ ResetStubs();
+
+ // Start Watch for service.
+ ClientContext context;
+ HealthCheckRequest request;
+ request.set_service(kHealthyService);
+ std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader =
+ hc_stub_->Watch(&context, request);
+
+ HealthCheckResponse response;
+ EXPECT_TRUE(reader->Read(&response));
+ EXPECT_EQ(response.SERVING, response.status());
+
+ SendHealthCheckRpc("", Status::OK, HealthCheckResponse::SERVING);
+ SendHealthCheckRpc(kHealthyService, Status::OK,
+ HealthCheckResponse::SERVING);
+ SendHealthCheckRpc(kUnhealthyService, Status::OK,
+ HealthCheckResponse::NOT_SERVING);
+ SendHealthCheckRpc(kNotRegisteredService,
+ Status(StatusCode::NOT_FOUND, ""));
+ SendHealthCheckRpc(kNewService, Status(StatusCode::NOT_FOUND, ""));
+
+ // Shutdown health check service.
+ service->Shutdown();
+
+ // Watch client gets another update.
+ EXPECT_TRUE(reader->Read(&response));
+ EXPECT_EQ(response.NOT_SERVING, response.status());
+ // Finish Watch call.
+ context.TryCancel();
+
+ SendHealthCheckRpc("", Status::OK, HealthCheckResponse::NOT_SERVING);
+ SendHealthCheckRpc(kHealthyService, Status::OK,
+ HealthCheckResponse::NOT_SERVING);
+ SendHealthCheckRpc(kUnhealthyService, Status::OK,
+ HealthCheckResponse::NOT_SERVING);
+ SendHealthCheckRpc(kNotRegisteredService,
+ Status(StatusCode::NOT_FOUND, ""));
+
+ // Setting status after Shutdown has no effect.
+ service->SetServingStatus(kHealthyService, true);
+ SendHealthCheckRpc(kHealthyService, Status::OK,
+ HealthCheckResponse::NOT_SERVING);
+
+ // Adding serving status for a new service after shutdown will return
+ // NOT_SERVING.
+ service->SetServingStatus(kNewService, true);
+ SendHealthCheckRpc(kNewService, Status::OK,
+ HealthCheckResponse::NOT_SERVING);
+ }
+
TestServiceImpl echo_test_service_;
HealthCheckServiceImpl health_check_service_impl_;
std::unique_ptr<Health::Stub> hc_stub_;
@@ -295,6 +310,13 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
Status(StatusCode::INVALID_ARGUMENT, ""));
}
+TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceShutdown) {
+ EnableDefaultHealthCheckService(true);
+ EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
+ SetUpServer(true, false, false, nullptr);
+ VerifyHealthCheckServiceShutdown();
+}
+
// Provide an empty service to disable the default service.
TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) {
EnableDefaultHealthCheckService(true);
@@ -326,6 +348,21 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) {
VerifyHealthCheckServiceStreaming();
}
+TEST_F(HealthServiceEnd2endTest, ExplicitlyHealthServiceShutdown) {
+ EnableDefaultHealthCheckService(true);
+ EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
+ std::unique_ptr<HealthCheckServiceInterface> override_service(
+ new CustomHealthCheckService(&health_check_service_impl_));
+ HealthCheckServiceInterface* underlying_service = override_service.get();
+ SetUpServer(false, false, true, std::move(override_service));
+ HealthCheckServiceInterface* service = server_->GetHealthCheckService();
+ EXPECT_TRUE(service == underlying_service);
+
+ ResetStubs();
+
+ VerifyHealthCheckServiceShutdown();
+}
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/end2end/interceptors_util.cc b/test/cpp/end2end/interceptors_util.cc
index 5d59c1a4b7..e0ad7d1526 100644
--- a/test/cpp/end2end/interceptors_util.cc
+++ b/test/cpp/end2end/interceptors_util.cc
@@ -137,6 +137,7 @@ CreateDummyClientInterceptors() {
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
// Add 20 dummy interceptors before hijacking interceptor
+ creators.reserve(20);
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc
index c98b6143c6..9460a7d6c6 100644
--- a/test/cpp/end2end/server_interceptors_end2end_test.cc
+++ b/test/cpp/end2end/server_interceptors_end2end_test.cc
@@ -44,7 +44,34 @@ namespace {
class LoggingInterceptor : public experimental::Interceptor {
public:
- LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; }
+ LoggingInterceptor(experimental::ServerRpcInfo* info) {
+ info_ = info;
+
+ // Check the method name and compare to the type
+ const char* method = info->method();
+ experimental::ServerRpcInfo::Type type = info->type();
+
+ // Check that we use one of our standard methods with expected type.
+ // Also allow the health checking service.
+ // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService
+ // being tested (the GenericRpc test).
+ // The empty method is for the Unimplemented requests that arise
+ // when draining the CQ.
+ EXPECT_TRUE(
+ strstr(method, "/grpc.health") == method ||
+ (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 &&
+ (type == experimental::ServerRpcInfo::Type::UNARY ||
+ type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) ||
+ (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 &&
+ type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) ||
+ (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 &&
+ type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) ||
+ (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 &&
+ type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) ||
+ strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 ||
+ (strcmp(method, "") == 0 &&
+ type == experimental::ServerRpcInfo::Type::BIDI_STREAMING));
+ }
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
if (methods->QueryInterceptionHookPoint(
diff --git a/test/cpp/end2end/test_health_check_service_impl.cc b/test/cpp/end2end/test_health_check_service_impl.cc
new file mode 100644
index 0000000000..0801e30199
--- /dev/null
+++ b/test/cpp/end2end/test_health_check_service_impl.cc
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test/cpp/end2end/test_health_check_service_impl.h"
+
+#include <grpc/grpc.h>
+
+using grpc::health::v1::HealthCheckRequest;
+using grpc::health::v1::HealthCheckResponse;
+
+namespace grpc {
+namespace testing {
+
+Status HealthCheckServiceImpl::Check(ServerContext* context,
+ const HealthCheckRequest* request,
+ HealthCheckResponse* response) {
+ std::lock_guard<std::mutex> lock(mu_);
+ auto iter = status_map_.find(request->service());
+ if (iter == status_map_.end()) {
+ return Status(StatusCode::NOT_FOUND, "");
+ }
+ response->set_status(iter->second);
+ return Status::OK;
+}
+
+Status HealthCheckServiceImpl::Watch(
+ ServerContext* context, const HealthCheckRequest* request,
+ ::grpc::ServerWriter<HealthCheckResponse>* writer) {
+ auto last_state = HealthCheckResponse::UNKNOWN;
+ while (!context->IsCancelled()) {
+ {
+ std::lock_guard<std::mutex> lock(mu_);
+ HealthCheckResponse response;
+ auto iter = status_map_.find(request->service());
+ if (iter == status_map_.end()) {
+ response.set_status(response.SERVICE_UNKNOWN);
+ } else {
+ response.set_status(iter->second);
+ }
+ if (response.status() != last_state) {
+ writer->Write(response, ::grpc::WriteOptions());
+ }
+ }
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(1000, GPR_TIMESPAN)));
+ }
+ return Status::OK;
+}
+
+void HealthCheckServiceImpl::SetStatus(
+ const grpc::string& service_name,
+ HealthCheckResponse::ServingStatus status) {
+ std::lock_guard<std::mutex> lock(mu_);
+ if (shutdown_) {
+ status = HealthCheckResponse::NOT_SERVING;
+ }
+ status_map_[service_name] = status;
+}
+
+void HealthCheckServiceImpl::SetAll(HealthCheckResponse::ServingStatus status) {
+ std::lock_guard<std::mutex> lock(mu_);
+ if (shutdown_) {
+ return;
+ }
+ for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) {
+ iter->second = status;
+ }
+}
+
+void HealthCheckServiceImpl::Shutdown() {
+ std::lock_guard<std::mutex> lock(mu_);
+ if (shutdown_) {
+ return;
+ }
+ shutdown_ = true;
+ for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) {
+ iter->second = HealthCheckResponse::NOT_SERVING;
+ }
+}
+
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/end2end/test_health_check_service_impl.h b/test/cpp/end2end/test_health_check_service_impl.h
new file mode 100644
index 0000000000..5d36ce5320
--- /dev/null
+++ b/test/cpp/end2end/test_health_check_service_impl.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H
+#define GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H
+
+#include <map>
+#include <mutex>
+
+#include <grpcpp/server_context.h>
+#include <grpcpp/support/status.h>
+
+#include "src/proto/grpc/health/v1/health.grpc.pb.h"
+
+namespace grpc {
+namespace testing {
+
+// A sample sync implementation of the health checking service. This does the
+// same thing as the default one.
+class HealthCheckServiceImpl : public health::v1::Health::Service {
+ public:
+ Status Check(ServerContext* context,
+ const health::v1::HealthCheckRequest* request,
+ health::v1::HealthCheckResponse* response) override;
+ Status Watch(ServerContext* context,
+ const health::v1::HealthCheckRequest* request,
+ ServerWriter<health::v1::HealthCheckResponse>* writer) override;
+ void SetStatus(const grpc::string& service_name,
+ health::v1::HealthCheckResponse::ServingStatus status);
+ void SetAll(health::v1::HealthCheckResponse::ServingStatus status);
+
+ void Shutdown();
+
+ private:
+ std::mutex mu_;
+ bool shutdown_ = false;
+ std::map<const grpc::string, health::v1::HealthCheckResponse::ServingStatus>
+ status_map_;
+};
+
+} // namespace testing
+} // namespace grpc
+
+#endif // GRPC_TEST_CPP_END2END_TEST_HEALTH_CHECK_SERVICE_IMPL_H