aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Noah Eisen <ncteisen@gmail.com>2018-07-18 11:43:52 -0700
committerGravatar GitHub <noreply@github.com>2018-07-18 11:43:52 -0700
commit2d5e0f1e72cda4e7466f28fdeaf20eceabef00f3 (patch)
tree64fdc57e6e420e091abb059b0af087a5098d84a4
parent0418b3aded781b291b40d499420c7adce8f79240 (diff)
parent21fa7905a3881c67fb6dec41023be58df1dee8dd (diff)
Merge pull request #16044 from ncteisen/interop-soak-test
Add Two New Soak Interop Tests
-rw-r--r--doc/interop-test-descriptions.md19
-rw-r--r--test/cpp/interop/client.cc16
-rw-r--r--test/cpp/interop/interop_client.cc64
-rw-r--r--test/cpp/interop/interop_client.h20
-rw-r--r--test/cpp/interop/stress_interop_client.cc6
-rw-r--r--test/cpp/interop/stress_interop_client.h4
-rw-r--r--test/cpp/interop/stress_test.cc11
7 files changed, 106 insertions, 34 deletions
diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md
index 0ee2cae2bd..9781925533 100644
--- a/doc/interop-test-descriptions.md
+++ b/doc/interop-test-descriptions.md
@@ -899,6 +899,25 @@ Status: TODO
This test verifies that a client sending faster than a server can drain sees
pushback (i.e., attempts to send succeed only after appropriate delays).
+### Experimental Tests
+
+These tests are not yet standardized, and are not yet implemented in all
+languages. Therefore they are not part of our interop matrix.
+
+#### rpc_soak
+
+The client performs many large_unary RPCs in sequence over the same channel.
+The number of RPCs is configured by the experimental flag, `soak_iterations`.
+
+#### channel_soak
+
+The client performs many large_unary RPCs in sequence. Before each RPC, it
+tears down and rebuilds the channel. The number of RPCs is configured by
+the experimental flag, `soak_iterations`.
+
+This tests puts stress on several gRPC components; the resolver, the load
+balancer, and the RPC hotpath.
+
### TODO Tests
#### High priority:
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 3eb155ef95..7bcf23c0eb 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -46,6 +46,7 @@ DEFINE_string(
"all : all test cases;\n"
"cancel_after_begin : cancel stream after starting it;\n"
"cancel_after_first_response: cancel on first response;\n"
+ "channel_soak: sends 'soak_iterations' rpcs, rebuilds channel each time;\n"
"client_compressed_streaming : compressed request streaming with "
"client_compressed_unary : single compressed request;\n"
"client_streaming : request streaming with single response;\n"
@@ -60,6 +61,7 @@ DEFINE_string(
"per_rpc_creds: raw oauth2 access token on a single rpc;\n"
"ping_pong : full-duplex streaming;\n"
"response streaming;\n"
+ "rpc_soak: 'sends soak_iterations' large_unary rpcs;\n"
"server_compressed_streaming : single request with compressed "
"server_compressed_unary : single compressed response;\n"
"server_streaming : single request with response streaming;\n"
@@ -83,6 +85,10 @@ DEFINE_bool(do_not_abort_on_transient_failures, false,
"test is retried in case of transient failures (and currently the "
"interop tests are not retried even if this flag is set to true)");
+DEFINE_int32(soak_iterations, 1000,
+ "number of iterations to use for the two soak tests; rpc_soak and "
+ "channel_soak");
+
using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey;
using grpc::testing::UpdateActions;
@@ -91,8 +97,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0;
- grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
- true,
+ grpc::testing::ChannelCreationFunc channel_creation_func =
+ std::bind(&CreateChannelForTestCase, FLAGS_test_case);
+ grpc::testing::InteropClient client(channel_creation_func, true,
FLAGS_do_not_abort_on_transient_failures);
std::unordered_map<grpc::string, std::function<bool()>> actions;
@@ -151,6 +158,11 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
actions["cacheable_unary"] =
std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
+ actions["channel_soak"] =
+ std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client,
+ FLAGS_soak_iterations);
+ actions["rpc_soak"] = std::bind(&grpc::testing::InteropClient::DoRpcSoakTest,
+ &client, FLAGS_soak_iterations);
UpdateActions(&actions);
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index fce99a1697..b7ce90803b 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -74,13 +74,15 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
}
} // namespace
-InteropClient::ServiceStub::ServiceStub(const std::shared_ptr<Channel>& channel,
- bool new_stub_every_call)
- : channel_(channel), new_stub_every_call_(new_stub_every_call) {
+InteropClient::ServiceStub::ServiceStub(
+ ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
+ : channel_creation_func_(channel_creation_func),
+ channel_(channel_creation_func_()),
+ new_stub_every_call_(new_stub_every_call) {
// If new_stub_every_call is false, then this is our chance to initialize
// stub_. (see Get())
if (!new_stub_every_call) {
- stub_ = TestService::NewStub(channel);
+ stub_ = TestService::NewStub(channel_);
}
}
@@ -100,27 +102,17 @@ InteropClient::ServiceStub::GetUnimplementedServiceStub() {
return unimplemented_service_stub_.get();
}
-void InteropClient::ServiceStub::Reset(
- const std::shared_ptr<Channel>& channel) {
- channel_ = channel;
-
- // Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset
- // the stub_ since the next call to Get() will create a new stub
- if (new_stub_every_call_) {
- stub_.reset();
- } else {
- stub_ = TestService::NewStub(channel);
+void InteropClient::ServiceStub::ResetChannel() {
+ channel_ = channel_creation_func_();
+ if (!new_stub_every_call_) {
+ stub_ = TestService::NewStub(channel_);
}
}
-void InteropClient::Reset(const std::shared_ptr<Channel>& channel) {
- serviceStub_.Reset(std::move(channel));
-}
-
-InteropClient::InteropClient(const std::shared_ptr<Channel>& channel,
+InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures)
- : serviceStub_(std::move(channel), new_stub_every_test_case),
+ : serviceStub_(channel_creation_func, new_stub_every_test_case),
do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
bool InteropClient::AssertStatusOk(const Status& s,
@@ -1028,6 +1020,38 @@ bool InteropClient::DoCustomMetadata() {
return true;
}
+bool InteropClient::DoRpcSoakTest(int32_t soak_iterations) {
+ gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
+ GPR_ASSERT(soak_iterations > 0);
+ SimpleRequest request;
+ SimpleResponse response;
+ for (int i = 0; i < soak_iterations; ++i) {
+ if (!PerformLargeUnary(&request, &response)) {
+ gpr_log(GPR_ERROR, "rpc_soak test failed on iteration %d", i);
+ return false;
+ }
+ }
+ gpr_log(GPR_DEBUG, "rpc_soak test done.");
+ return true;
+}
+
+bool InteropClient::DoChannelSoakTest(int32_t soak_iterations) {
+ gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
+ soak_iterations);
+ GPR_ASSERT(soak_iterations > 0);
+ SimpleRequest request;
+ SimpleResponse response;
+ for (int i = 0; i < soak_iterations; ++i) {
+ serviceStub_.ResetChannel();
+ if (!PerformLargeUnary(&request, &response)) {
+ gpr_log(GPR_ERROR, "channel_soak test failed on iteration %d", i);
+ return false;
+ }
+ }
+ gpr_log(GPR_DEBUG, "channel_soak test done.");
+ return true;
+}
+
bool InteropClient::DoUnimplementedService() {
gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index 480eb3f4b6..e5be44d1d4 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -34,13 +34,15 @@ typedef std::function<void(const InteropClientContextInspector&,
const SimpleRequest*, const SimpleResponse*)>
CheckerFn;
+typedef std::function<std::shared_ptr<Channel>(void)> ChannelCreationFunc;
+
class InteropClient {
public:
/// If new_stub_every_test_case is true, a new TestService::Stub object is
/// created for every test case
/// If do_not_abort_on_transient_failures is true, abort() is not called in
/// case of transient failures (like connection failures)
- explicit InteropClient(const std::shared_ptr<Channel>& channel,
+ explicit InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures);
~InteropClient() {}
@@ -67,6 +69,14 @@ class InteropClient {
bool DoUnimplementedMethod();
bool DoUnimplementedService();
bool DoCacheableUnary();
+
+ // The following interop test are not yet part of the interop spec, and are
+ // not implemented cross-language. They are considered experimental for now,
+ // but at some point in the future, might be codified and implemented in all
+ // languages
+ bool DoChannelSoakTest(int32_t soak_iterations);
+ bool DoRpcSoakTest(int32_t soak_iterations);
+
// Auth tests.
// username is a string containing the user email
bool DoJwtTokenCreds(const grpc::string& username);
@@ -83,15 +93,17 @@ class InteropClient {
public:
// If new_stub_every_call = true, pointer to a new instance of
// TestServce::Stub is returned by Get() everytime it is called
- ServiceStub(const std::shared_ptr<Channel>& channel,
+ ServiceStub(ChannelCreationFunc channel_creation_func,
bool new_stub_every_call);
TestService::Stub* Get();
UnimplementedService::Stub* GetUnimplementedServiceStub();
- void Reset(const std::shared_ptr<Channel>& channel);
+ // forces channel to be recreated.
+ void ResetChannel();
private:
+ ChannelCreationFunc channel_creation_func_;
std::unique_ptr<TestService::Stub> stub_;
std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_;
std::shared_ptr<Channel> channel_;
@@ -109,8 +121,8 @@ class InteropClient {
bool AssertStatusCode(const Status& s, StatusCode expected_code,
const grpc::string& optional_debug_string);
bool TransientFailureOrAbort();
- ServiceStub serviceStub_;
+ ServiceStub serviceStub_;
/// If true, abort() is not called for transient failures
bool do_not_abort_on_transient_failures_;
};
diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc
index 9d373c3cd9..7dc1956f78 100644
--- a/test/cpp/interop/stress_interop_client.cc
+++ b/test/cpp/interop/stress_interop_client.cc
@@ -68,13 +68,13 @@ TestCaseType WeightedRandomTestSelector::GetNextTest() const {
StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address,
- const std::shared_ptr<Channel>& channel,
+ ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector, long test_duration_secs,
long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id),
server_address_(server_address),
- channel_(channel),
- interop_client_(new InteropClient(channel, false,
+ channel_creation_func_(channel_creation_func),
+ interop_client_(new InteropClient(channel_creation_func_, false,
do_not_abort_on_transient_failures)),
test_selector_(test_selector),
test_duration_secs_(test_duration_secs),
diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h
index e4fa7d0973..58680d8093 100644
--- a/test/cpp/interop/stress_interop_client.h
+++ b/test/cpp/interop/stress_interop_client.h
@@ -91,7 +91,7 @@ class WeightedRandomTestSelector {
class StressTestInteropClient {
public:
StressTestInteropClient(int test_id, const grpc::string& server_address,
- const std::shared_ptr<Channel>& channel,
+ ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms,
bool do_not_abort_on_transient_failures);
@@ -105,7 +105,7 @@ class StressTestInteropClient {
int test_id_;
const grpc::string& server_address_;
- std::shared_ptr<Channel> channel_;
+ ChannelCreationFunc channel_creation_func_;
std::unique_ptr<InteropClient> interop_client_;
const WeightedRandomTestSelector& test_selector_;
long test_duration_secs_;
diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc
index 023e0c8f0b..ebbd14beba 100644
--- a/test/cpp/interop/stress_test.cc
+++ b/test/cpp/interop/stress_test.cc
@@ -283,15 +283,20 @@ int main(int argc, char** argv) {
channel_idx++) {
gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(),
channel_idx);
- std::shared_ptr<grpc::Channel> channel = grpc::CreateTestChannel(
+ grpc::testing::ChannelCreationFunc channel_creation_func = std::bind(
+ static_cast<std::shared_ptr<grpc::Channel> (*)(
+ const grpc::string&, const grpc::string&,
+ grpc::testing::transport_security, bool)>(
+ grpc::CreateTestChannel),
*it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca);
// Create stub(s) for each channel
for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
stub_idx++) {
clients.emplace_back(new StressTestInteropClient(
- ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
- FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures));
+ ++thread_idx, *it, channel_creation_func, test_selector,
+ FLAGS_test_duration_secs, FLAGS_sleep_duration_ms,
+ FLAGS_do_not_abort_on_transient_failures));
bool is_already_created = false;
// QpsGauge name