diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 34 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 28 | ||||
-rw-r--r-- | test/cpp/end2end/server_builder_plugin_test.cc | 256 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 8 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 1 | ||||
-rw-r--r-- | test/cpp/interop/client.cc | 13 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.cc | 339 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.h | 62 | ||||
-rw-r--r-- | test/cpp/interop/metrics_client.cc | 16 | ||||
-rw-r--r-- | test/cpp/interop/stress_interop_client.cc | 59 | ||||
-rw-r--r-- | test/cpp/interop/stress_interop_client.h | 28 | ||||
-rw-r--r-- | test/cpp/interop/stress_test.cc | 21 |
12 files changed, 739 insertions, 126 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 0232a9fa31..45f5eb1ddd 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -199,6 +199,28 @@ class Verifier { bool spin_; }; +// This class disables the server builder plugins that may add sync services to +// the server. If there are sync services, UnimplementedRpc test will triger +// the sync unkown rpc routine on the server side, rather than the async one +// that needs to be tested here. +class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption { + public: + void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {} + + void UpdatePlugins( + std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins) + GRPC_OVERRIDE { + auto plugin = plugins->begin(); + while (plugin != plugins->end()) { + if ((*plugin).second->has_sync_methods()) { + plugins->erase(plugin++); + } else { + plugin++; + } + } + } +}; + class TestScenario { public: TestScenario(bool non_block, const grpc::string& creds_type, @@ -223,8 +245,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { void SetUp() GRPC_OVERRIDE { poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking)); - int port = grpc_pick_unused_port_or_die(); - server_address_ << "localhost:" << port; + port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port_; // Setup server ServerBuilder builder; @@ -232,6 +254,12 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { builder.AddListeningPort(server_address_.str(), server_creds); builder.RegisterService(&service_); cq_ = builder.AddCompletionQueue(); + + // TODO(zyc): make a test option to choose wheather sync plugins should be + // deleted + std::unique_ptr<ServerBuilderOption> sync_plugin_disabler( + new ServerBuilderSyncPluginDisabler()); + builder.SetOption(move(sync_plugin_disabler)); server_ = builder.BuildAndStart(); gpr_tls_set(&g_is_async_end2end_test, 1); @@ -246,6 +274,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { ; poll_overrider_.reset(); gpr_tls_set(&g_is_async_end2end_test, 0); + grpc_recycle_unused_port(port_); } void ResetStub() { @@ -297,6 +326,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { std::unique_ptr<Server> server_; grpc::testing::EchoTestService::AsyncService service_; std::ostringstream server_address_; + int port_; std::unique_ptr<PollingOverrider> poll_overrider_; }; diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 0311864759..f52aa52f39 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -975,6 +975,34 @@ TEST_P(End2endTest, NonExistingService) { EXPECT_EQ("", s.error_message()); } +// Ask the server to send back a serialized proto in trailer. +// This is an example of setting error details. +TEST_P(End2endTest, BinaryTrailerTest) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + request.mutable_param()->set_echo_metadata(true); + DebugInfo* info = request.mutable_param()->mutable_debug_info(); + info->add_stack_entries("stack_entry_1"); + info->add_stack_entries("stack_entry_2"); + info->add_stack_entries("stack_entry_3"); + info->set_detail("detailed debug info"); + grpc::string expected_string = info->SerializeAsString(); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + auto trailers = context.GetServerTrailingMetadata(); + EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey)); + auto iter = trailers.find(kDebugInfoTrailerKey); + EXPECT_EQ(expected_string, iter->second); + // Parse the returned trailer into a DebugInfo proto. + DebugInfo returned_info; + EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second))); +} + ////////////////////////////////////////////////////////////////////////// // Test with and without a proxy. class ProxyEnd2endTest : public End2endTest { diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc new file mode 100644 index 0000000000..87e3709d7d --- /dev/null +++ b/test/cpp/end2end/server_builder_plugin_test.cc @@ -0,0 +1,256 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/impl/server_builder_option.h> +#include <grpc++/impl/server_builder_plugin.h> +#include <grpc++/impl/server_initializer.h> +#include <grpc++/security/credentials.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc/grpc.h> +#include <gtest/gtest.h> + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" + +#define PLUGIN_NAME "TestServerBuilderPlugin" + +namespace grpc { +namespace testing { + +class TestServerBuilderPlugin : public ServerBuilderPlugin { + public: + TestServerBuilderPlugin() : service_(new TestServiceImpl()) { + init_server_is_called_ = false; + finish_is_called_ = false; + change_arguments_is_called_ = false; + } + + grpc::string name() GRPC_OVERRIDE { return PLUGIN_NAME; } + + void InitServer(ServerInitializer* si) GRPC_OVERRIDE { + init_server_is_called_ = true; + if (register_service_) { + si->RegisterService(service_); + } + } + + void Finish(ServerInitializer* si) GRPC_OVERRIDE { finish_is_called_ = true; } + + void ChangeArguments(const grpc::string& name, void* value) GRPC_OVERRIDE { + change_arguments_is_called_ = true; + } + + bool has_async_methods() const GRPC_OVERRIDE { + if (register_service_) { + return service_->has_async_methods(); + } + return false; + } + + bool has_sync_methods() const GRPC_OVERRIDE { + if (register_service_) { + return service_->has_synchronous_methods(); + } + return false; + } + + void SetRegisterService() { register_service_ = true; } + + bool init_server_is_called() { return init_server_is_called_; } + bool finish_is_called() { return finish_is_called_; } + bool change_arguments_is_called() { return change_arguments_is_called_; } + + private: + bool init_server_is_called_; + bool finish_is_called_; + bool change_arguments_is_called_; + bool register_service_; + std::shared_ptr<TestServiceImpl> service_; +}; + +class InsertPluginServerBuilderOption : public ServerBuilderOption { + public: + InsertPluginServerBuilderOption() { register_service_ = false; } + + void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {} + + void UpdatePlugins( + std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins) + GRPC_OVERRIDE { + plugins->clear(); + + std::unique_ptr<TestServerBuilderPlugin> plugin( + new TestServerBuilderPlugin()); + if (register_service_) plugin->SetRegisterService(); + (*plugins)[plugin->name()] = std::move(plugin); + } + + void SetRegisterService() { register_service_ = true; } + + private: + bool register_service_; +}; + +std::unique_ptr<ServerBuilderPlugin> CreateTestServerBuilderPlugin() { + return std::unique_ptr<ServerBuilderPlugin>(new TestServerBuilderPlugin()); +} + +void AddTestServerBuilderPlugin() { + static bool already_here = false; + if (already_here) return; + already_here = true; + ::grpc::ServerBuilder::InternalAddPluginFactory( + &CreateTestServerBuilderPlugin); +} + +// Force AddServerBuilderPlugin() to be called at static initialization time. +struct StaticTestPluginInitializer { + StaticTestPluginInitializer() { AddTestServerBuilderPlugin(); } +} static_plugin_initializer_test_; + +// When the param boolean is true, the ServerBuilder plugin will be added at the +// time of static initialization. When it's false, the ServerBuilder plugin will +// be added using ServerBuilder::SetOption(). +class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { + public: + ServerBuilderPluginTest() {} + + void SetUp() GRPC_OVERRIDE { + port_ = grpc_pick_unused_port_or_die(); + builder_.reset(new ServerBuilder()); + } + + void InsertPlugin() { + if (GetParam()) { + // Add ServerBuilder plugin in static initialization + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + } else { + // Add ServerBuilder plugin using ServerBuilder::SetOption() + builder_->SetOption(std::unique_ptr<ServerBuilderOption>( + new InsertPluginServerBuilderOption())); + } + } + + void InsertPluginWithTestService() { + if (GetParam()) { + // Add ServerBuilder plugin in static initialization + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + auto plugin = static_cast<TestServerBuilderPlugin*>( + builder_->plugins_[PLUGIN_NAME].get()); + EXPECT_TRUE(plugin != nullptr); + plugin->SetRegisterService(); + } else { + // Add ServerBuilder plugin using ServerBuilder::SetOption() + std::unique_ptr<InsertPluginServerBuilderOption> option( + new InsertPluginServerBuilderOption()); + option->SetRegisterService(); + builder_->SetOption(std::move(option)); + } + } + + void StartServer() { + grpc::string server_address = "localhost:" + to_string(port_); + builder_->AddListeningPort(server_address, InsecureServerCredentials()); + server_ = builder_->BuildAndStart(); + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + } + + void ResetStub() { + string target = "dns:localhost:" + to_string(port_); + channel_ = CreateChannel(target, InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + } + + void TearDown() GRPC_OVERRIDE { + EXPECT_TRUE(builder_->plugins_[PLUGIN_NAME] != nullptr); + auto plugin = static_cast<TestServerBuilderPlugin*>( + builder_->plugins_[PLUGIN_NAME].get()); + EXPECT_TRUE(plugin != nullptr); + EXPECT_TRUE(plugin->init_server_is_called()); + EXPECT_TRUE(plugin->finish_is_called()); + } + + string to_string(const int number) { + std::stringstream strs; + strs << number; + return strs.str(); + } + + protected: + std::shared_ptr<Channel> channel_; + std::unique_ptr<ServerBuilder> builder_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<Server> server_; + TestServiceImpl service_; + int port_; +}; + +TEST_P(ServerBuilderPluginTest, PluginWithoutServiceTest) { + InsertPlugin(); + StartServer(); +} + +TEST_P(ServerBuilderPluginTest, PluginWithServiceTest) { + InsertPluginWithTestService(); + StartServer(); + ResetStub(); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello hello hello hello"); + ClientContext context; + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + +INSTANTIATE_TEST_CASE_P(ServerBuilderPluginTest, ServerBuilderPluginTest, + ::testing::Values(false, true)); + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 2f5dd6d49e..cbaee92228 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -135,6 +135,14 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, context->AddTrailingMetadata(ToString(iter->first), ToString(iter->second)); } + // Terminate rpc with error and debug info in trailer. + if (request->param().debug_info().stack_entries_size() || + !request->param().debug_info().detail().empty()) { + grpc::string serialized_debug_info = + request->param().debug_info().SerializeAsString(); + context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info); + return Status::CANCELLED; + } } if (request->has_param() && (request->param().expected_client_identity().length() > 0 || diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 1ab6ced9e0..c89f88c900 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -47,6 +47,7 @@ namespace testing { const int kNumResponseStreamsMsgs = 3; const char* const kServerCancelAfterReads = "cancel_after_reads"; const char* const kServerTryCancelRequest = "server_try_cancel"; +const char* const kDebugInfoTrailerKey = "debug-info-bin"; typedef enum { DO_NOT_CANCEL = 0, diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 9af6a88044..7727824979 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "", DEFINE_string(service_account_key_file, "", "Path to service account json key file."); DEFINE_string(oauth_scope, "", "Scope for OAuth tokens."); +DEFINE_bool(do_not_abort_on_transient_failures, false, + "If set to 'true', abort() is not called in case of transient " + "failures (i.e failures that are temporary and will likely go away " + "on retrying; like a temporary connection failure) and an error " + "message is printed instead. Note that this flag just controls " + "whether abort() is called or not. It does not control whether the " + "test is retried in case of transient failures (and currently the " + "interop tests are not retried even if this flag is set to true)"); using grpc::testing::CreateChannelForTestCase; using grpc::testing::GetServiceAccountJsonKey; @@ -89,8 +97,9 @@ int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str()); int ret = 0; - grpc::testing::InteropClient client( - CreateChannelForTestCase(FLAGS_test_case)); + grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case), + true, + FLAGS_do_not_abort_on_transient_failures); if (FLAGS_test_case == "empty_unary") { client.DoEmpty(); } else if (FLAGS_test_case == "large_unary") { diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 22293d211f..189e4a8aab 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -60,7 +60,7 @@ static const char* kRandomFile = "test/cpp/interop/rnd.dat"; namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; -const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979}; +const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979}; const int kNumResponseMessages = 2000; const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; @@ -134,23 +134,43 @@ void InteropClient::Reset(std::shared_ptr<Channel> channel) { serviceStub_.Reset(channel); } -InteropClient::InteropClient(std::shared_ptr<Channel> channel) - : serviceStub_(channel, true) {} - InteropClient::InteropClient(std::shared_ptr<Channel> channel, - bool new_stub_every_test_case) - : serviceStub_(channel, new_stub_every_test_case) {} + bool new_stub_every_test_case, + bool do_not_abort_on_transient_failures) + : serviceStub_(channel, new_stub_every_test_case), + do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {} -void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) { +bool InteropClient::AssertStatusOk(const Status& s) { if (s.ok()) { - return; + return true; } - gpr_log(GPR_ERROR, "Error status code: %d, message: %s", s.error_code(), - s.error_message().c_str()); - GPR_ASSERT(0); + + // Note: At this point, s.error_code is definitely not StatusCode::OK (we + // already checked for s.ok() above). So, the following will call abort() + // (unless s.error_code() corresponds to a transient failure and + // 'do_not_abort_on_transient_failures' is true) + return AssertStatusCode(s, StatusCode::OK); } -void InteropClient::DoEmpty() { +bool InteropClient::AssertStatusCode(const Status& s, + StatusCode expected_code) { + if (s.error_code() == expected_code) { + return true; + } + + gpr_log(GPR_ERROR, "Error status code: %d (expected: %d), message: %s", + s.error_code(), expected_code, s.error_message().c_str()); + + // In case of transient transient/retryable failures (like a broken + // connection) we may or may not abort (see TransientFailureOrAbort()) + if (s.error_code() == grpc::StatusCode::UNAVAILABLE) { + return TransientFailureOrAbort(); + } + + abort(); +} + +bool InteropClient::DoEmpty() { gpr_log(GPR_DEBUG, "Sending an empty rpc..."); Empty request = Empty::default_instance(); @@ -158,17 +178,21 @@ void InteropClient::DoEmpty() { ClientContext context; Status s = serviceStub_.Get()->EmptyCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + + if (!AssertStatusOk(s)) { + return false; + } gpr_log(GPR_DEBUG, "Empty rpc done."); + return true; } -void InteropClient::PerformLargeUnary(SimpleRequest* request, +bool InteropClient::PerformLargeUnary(SimpleRequest* request, SimpleResponse* response) { - PerformLargeUnary(request, response, NoopChecks); + return PerformLargeUnary(request, response, NoopChecks); } -void InteropClient::PerformLargeUnary(SimpleRequest* request, +bool InteropClient::PerformLargeUnary(SimpleRequest* request, SimpleResponse* response, CheckerFn custom_checks_fn) { ClientContext context; @@ -180,7 +204,9 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } custom_checks_fn(inspector, request, response); @@ -203,9 +229,11 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, default: GPR_ASSERT(false); } + + return true; } -void InteropClient::DoComputeEngineCreds( +bool InteropClient::DoComputeEngineCreds( const grpc::string& default_service_account, const grpc::string& oauth_scope) { gpr_log(GPR_DEBUG, @@ -215,7 +243,11 @@ void InteropClient::DoComputeEngineCreds( request.set_fill_username(true); request.set_fill_oauth_scope(true); request.set_response_type(PayloadType::COMPRESSABLE); - PerformLargeUnary(&request, &response); + + if (!PerformLargeUnary(&request, &response)) { + return false; + } + gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str()); gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str()); GPR_ASSERT(!response.username().empty()); @@ -224,9 +256,10 @@ void InteropClient::DoComputeEngineCreds( const char* oauth_scope_str = response.oauth_scope().c_str(); GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); gpr_log(GPR_DEBUG, "Large unary with compute engine creds done."); + return true; } -void InteropClient::DoOauth2AuthToken(const grpc::string& username, +bool InteropClient::DoOauth2AuthToken(const grpc::string& username, const grpc::string& oauth_scope) { gpr_log(GPR_DEBUG, "Sending a unary rpc with raw oauth2 access token credentials ..."); @@ -239,16 +272,20 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username, Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + GPR_ASSERT(!response.username().empty()); GPR_ASSERT(!response.oauth_scope().empty()); GPR_ASSERT(username == response.username()); const char* oauth_scope_str = response.oauth_scope().c_str(); GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done."); + return true; } -void InteropClient::DoPerRpcCreds(const grpc::string& json_key) { +bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) { gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ..."); SimpleRequest request; SimpleResponse response; @@ -263,35 +300,47 @@ void InteropClient::DoPerRpcCreds(const grpc::string& json_key) { Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + GPR_ASSERT(!response.username().empty()); GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos); gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done."); + return true; } -void InteropClient::DoJwtTokenCreds(const grpc::string& username) { +bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { gpr_log(GPR_DEBUG, "Sending a large unary rpc with JWT token credentials ..."); SimpleRequest request; SimpleResponse response; request.set_fill_username(true); request.set_response_type(PayloadType::COMPRESSABLE); - PerformLargeUnary(&request, &response); + + if (!PerformLargeUnary(&request, &response)) { + return false; + } + GPR_ASSERT(!response.username().empty()); GPR_ASSERT(username.find(response.username()) != grpc::string::npos); gpr_log(GPR_DEBUG, "Large unary with JWT token creds done."); + return true; } -void InteropClient::DoLargeUnary() { +bool InteropClient::DoLargeUnary() { gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; request.set_response_type(PayloadType::COMPRESSABLE); - PerformLargeUnary(&request, &response); + if (!PerformLargeUnary(&request, &response)) { + return false; + } gpr_log(GPR_DEBUG, "Large unary done."); + return true; } -void InteropClient::DoLargeCompressedUnary() { +bool InteropClient::DoLargeCompressedUnary() { const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { @@ -307,14 +356,32 @@ void InteropClient::DoLargeCompressedUnary() { SimpleResponse response; request.set_response_type(payload_types[i]); request.set_response_compression(compression_types[j]); - PerformLargeUnary(&request, &response, CompressionChecks); + + if (!PerformLargeUnary(&request, &response, CompressionChecks)) { + gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); + gpr_free(log_suffix); + return false; + } + gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix); gpr_free(log_suffix); } } + + return true; +} + +// Either abort() (unless do_not_abort_on_transient_failures_ is true) or return +// false +bool InteropClient::TransientFailureOrAbort() { + if (do_not_abort_on_transient_failures_) { + return false; + } + + abort(); } -void InteropClient::DoRequestStreaming() { +bool InteropClient::DoRequestStreaming() { gpr_log(GPR_DEBUG, "Sending request steaming rpc ..."); ClientContext context; @@ -328,19 +395,25 @@ void InteropClient::DoRequestStreaming() { for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { Payload* payload = request.mutable_payload(); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); - GPR_ASSERT(stream->Write(request)); + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed"); + return TransientFailureOrAbort(); + } aggregated_payload_size += request_stream_sizes[i]; } stream->WritesDone(); + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); - AssertOkOrPrintErrorStatus(s); - gpr_log(GPR_DEBUG, "Request streaming done."); + return true; } -void InteropClient::DoResponseStreaming() { - gpr_log(GPR_DEBUG, "Receiving response steaming rpc ..."); +bool InteropClient::DoResponseStreaming() { + gpr_log(GPR_DEBUG, "Receiving response streaming rpc ..."); ClientContext context; StreamingOutputCallRequest request; @@ -358,13 +431,27 @@ void InteropClient::DoResponseStreaming() { grpc::string(response_stream_sizes[i], '\0')); ++i; } - GPR_ASSERT(response_stream_sizes.size() == i); + + if (i < response_stream_sizes.size()) { + // stream->Read() failed before reading all the expected messages. This is + // most likely due to connection failure. + gpr_log(GPR_ERROR, + "DoResponseStreaming(): Read fewer streams (%d) than " + "response_stream_sizes.size() (%d)", + i, response_stream_sizes.size()); + return TransientFailureOrAbort(); + } + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "Response streaming done."); + return true; } -void InteropClient::DoResponseCompressedStreaming() { +bool InteropClient::DoResponseCompressedStreaming() { const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { @@ -378,7 +465,7 @@ void InteropClient::DoResponseCompressedStreaming() { CompressionType_Name(compression_types[j]).c_str(), PayloadType_Name(payload_types[i]).c_str()); - gpr_log(GPR_DEBUG, "Receiving response steaming rpc %s.", log_suffix); + gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); request.set_response_type(payload_types[i]); request.set_response_compression(compression_types[j]); @@ -432,18 +519,32 @@ void InteropClient::DoResponseCompressedStreaming() { ++k; } - GPR_ASSERT(response_stream_sizes.size() == k); - Status s = stream->Finish(); - - AssertOkOrPrintErrorStatus(s); gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix); gpr_free(log_suffix); + + if (k < response_stream_sizes.size()) { + // stream->Read() failed before reading all the expected messages. This + // is most likely due to a connection failure. + gpr_log(GPR_ERROR, + "DoResponseCompressedStreaming(): Responses read (k=%d) is " + "less than the expected messages (i.e " + "response_stream_sizes.size() (%d)). (i=%d, j=%d)", + k, response_stream_sizes.size(), i, j); + return TransientFailureOrAbort(); + } + + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } } } + + return true; } -void InteropClient::DoResponseStreamingWithSlowConsumer() { - gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ..."); +bool InteropClient::DoResponseStreamingWithSlowConsumer() { + gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ..."); ClientContext context; StreamingOutputCallRequest request; @@ -464,14 +565,26 @@ void InteropClient::DoResponseStreamingWithSlowConsumer() { usleep(kReceiveDelayMilliSeconds * 1000); ++i; } - GPR_ASSERT(kNumResponseMessages == i); + + if (i < kNumResponseMessages) { + gpr_log(GPR_ERROR, + "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is " + "less than the expected messages (i.e kNumResponseMessages = %d)", + i, kNumResponseMessages); + + return TransientFailureOrAbort(); + } + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } - AssertOkOrPrintErrorStatus(s); gpr_log(GPR_DEBUG, "Response streaming done."); + return true; } -void InteropClient::DoHalfDuplex() { +bool InteropClient::DoHalfDuplex() { gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ..."); ClientContext context; @@ -483,7 +596,11 @@ void InteropClient::DoHalfDuplex() { ResponseParameters* response_parameter = request.add_response_parameters(); for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { response_parameter->set_size(response_stream_sizes[i]); - GPR_ASSERT(stream->Write(request)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i); + return TransientFailureOrAbort(); + } } stream->WritesDone(); @@ -494,13 +611,27 @@ void InteropClient::DoHalfDuplex() { grpc::string(response_stream_sizes[i], '\0')); ++i; } - GPR_ASSERT(response_stream_sizes.size() == i); + + if (i < response_stream_sizes.size()) { + // stream->Read() failed before reading all the expected messages. This is + // most likely due to a connection failure + gpr_log(GPR_ERROR, + "DoHalfDuplex(): Responses read (i=%d) are less than the expected " + "number of messages response_stream_sizes.size() (%d)", + i, response_stream_sizes.size()); + return TransientFailureOrAbort(); + } + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done."); + return true; } -void InteropClient::DoPingPong() { +bool InteropClient::DoPingPong() { gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ..."); ClientContext context; @@ -513,24 +644,40 @@ void InteropClient::DoPingPong() { ResponseParameters* response_parameter = request.add_response_parameters(); Payload* payload = request.mutable_payload(); StreamingOutputCallResponse response; + for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { response_parameter->set_size(response_stream_sizes[i]); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); - GPR_ASSERT(stream->Write(request)); - GPR_ASSERT(stream->Read(&response)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i); + return TransientFailureOrAbort(); + } + + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i); + return TransientFailureOrAbort(); + } + GPR_ASSERT(response.payload().body() == grpc::string(response_stream_sizes[i], '\0')); } stream->WritesDone(); + GPR_ASSERT(!stream->Read(&response)); + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "Ping pong streaming done."); + return true; } -void InteropClient::DoCancelAfterBegin() { - gpr_log(GPR_DEBUG, "Sending request steaming rpc ..."); +bool InteropClient::DoCancelAfterBegin() { + gpr_log(GPR_DEBUG, "Sending request streaming rpc ..."); ClientContext context; StreamingInputCallRequest request; @@ -542,11 +689,16 @@ void InteropClient::DoCancelAfterBegin() { gpr_log(GPR_DEBUG, "Trying to cancel..."); context.TryCancel(); Status s = stream->Finish(); - GPR_ASSERT(s.error_code() == StatusCode::CANCELLED); + + if (!AssertStatusCode(s, StatusCode::CANCELLED)) { + return false; + } + gpr_log(GPR_DEBUG, "Canceling streaming done."); + return true; } -void InteropClient::DoCancelAfterFirstResponse() { +bool InteropClient::DoCancelAfterFirstResponse() { gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ..."); ClientContext context; @@ -560,17 +712,27 @@ void InteropClient::DoCancelAfterFirstResponse() { response_parameter->set_size(31415); request.mutable_payload()->set_body(grpc::string(27182, '\0')); StreamingOutputCallResponse response; - GPR_ASSERT(stream->Write(request)); - GPR_ASSERT(stream->Read(&response)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed"); + return TransientFailureOrAbort(); + } + + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed"); + return TransientFailureOrAbort(); + } GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0')); + gpr_log(GPR_DEBUG, "Trying to cancel..."); context.TryCancel(); Status s = stream->Finish(); gpr_log(GPR_DEBUG, "Canceling pingpong streaming done."); + return true; } -void InteropClient::DoTimeoutOnSleepingServer() { +bool InteropClient::DoTimeoutOnSleepingServer() { gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc with a short deadline..."); @@ -587,11 +749,15 @@ void InteropClient::DoTimeoutOnSleepingServer() { stream->Write(request); Status s = stream->Finish(); - GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED); + if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED)) { + return false; + } + gpr_log(GPR_DEBUG, "Pingpong streaming timeout done."); + return true; } -void InteropClient::DoEmptyStream() { +bool InteropClient::DoEmptyStream() { gpr_log(GPR_DEBUG, "Starting empty_stream."); ClientContext context; @@ -601,12 +767,17 @@ void InteropClient::DoEmptyStream() { stream->WritesDone(); StreamingOutputCallResponse response; GPR_ASSERT(stream->Read(&response) == false); + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + gpr_log(GPR_DEBUG, "empty_stream done."); + return true; } -void InteropClient::DoStatusWithMessage() { +bool InteropClient::DoStatusWithMessage() { gpr_log(GPR_DEBUG, "Sending RPC with a request for status code 2 and message"); @@ -620,12 +791,16 @@ void InteropClient::DoStatusWithMessage() { Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN); + if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) { + return false; + } + GPR_ASSERT(s.error_message() == test_msg); gpr_log(GPR_DEBUG, "Done testing Status and Message"); + return true; } -void InteropClient::DoCustomMetadata() { +bool InteropClient::DoCustomMetadata() { const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial"); const grpc::string kInitialMetadataValue("test_initial_metadata_value"); const grpc::string kEchoTrailingBinMetadataKey( @@ -645,7 +820,10 @@ void InteropClient::DoCustomMetadata() { request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + const auto& server_initial_metadata = context.GetServerInitialMetadata(); auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); GPR_ASSERT(iter != server_initial_metadata.end()); @@ -675,14 +853,29 @@ void InteropClient::DoCustomMetadata() { grpc::string payload(kLargeRequestSize, '\0'); request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); StreamingOutputCallResponse response; - GPR_ASSERT(stream->Write(request)); + + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed"); + return TransientFailureOrAbort(); + } + stream->WritesDone(); - GPR_ASSERT(stream->Read(&response)); + + if (!stream->Read(&response)) { + gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed"); + return TransientFailureOrAbort(); + } + GPR_ASSERT(response.payload().body() == grpc::string(kLargeResponseSize, '\0')); + GPR_ASSERT(!stream->Read(&response)); + Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); + if (!AssertStatusOk(s)) { + return false; + } + const auto& server_initial_metadata = context.GetServerInitialMetadata(); auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); GPR_ASSERT(iter != server_initial_metadata.end()); @@ -695,6 +888,8 @@ void InteropClient::DoCustomMetadata() { gpr_log(GPR_DEBUG, "Done testing stream with custom metadata"); } + + return true; } } // namespace testing diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index a3794fd93f..ae75762bb8 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -51,41 +51,42 @@ using CheckerFn = class InteropClient { public: - explicit InteropClient(std::shared_ptr<Channel> channel); - explicit InteropClient( - std::shared_ptr<Channel> channel, - bool new_stub_every_test_case); // If new_stub_every_test_case is true, - // a new TestService::Stub object is - // created for every test case below + /// If new_stub_every_test_case is true, a new TestService::Stub object is + /// created for every test case + /// If do_not_abort_on_transient_failures is true, abort() is not called in + /// case of transient failures (like connection failures) + explicit InteropClient(std::shared_ptr<Channel> channel, + bool new_stub_every_test_case, + bool do_not_abort_on_transient_failures); ~InteropClient() {} void Reset(std::shared_ptr<Channel> channel); - void DoEmpty(); - void DoLargeUnary(); - void DoLargeCompressedUnary(); - void DoPingPong(); - void DoHalfDuplex(); - void DoRequestStreaming(); - void DoResponseStreaming(); - void DoResponseCompressedStreaming(); - void DoResponseStreamingWithSlowConsumer(); - void DoCancelAfterBegin(); - void DoCancelAfterFirstResponse(); - void DoTimeoutOnSleepingServer(); - void DoEmptyStream(); - void DoStatusWithMessage(); - void DoCustomMetadata(); + bool DoEmpty(); + bool DoLargeUnary(); + bool DoLargeCompressedUnary(); + bool DoPingPong(); + bool DoHalfDuplex(); + bool DoRequestStreaming(); + bool DoResponseStreaming(); + bool DoResponseCompressedStreaming(); + bool DoResponseStreamingWithSlowConsumer(); + bool DoCancelAfterBegin(); + bool DoCancelAfterFirstResponse(); + bool DoTimeoutOnSleepingServer(); + bool DoEmptyStream(); + bool DoStatusWithMessage(); + bool DoCustomMetadata(); // Auth tests. // username is a string containing the user email - void DoJwtTokenCreds(const grpc::string& username); - void DoComputeEngineCreds(const grpc::string& default_service_account, + bool DoJwtTokenCreds(const grpc::string& username); + bool DoComputeEngineCreds(const grpc::string& default_service_account, const grpc::string& oauth_scope); // username the GCE default service account email - void DoOauth2AuthToken(const grpc::string& username, + bool DoOauth2AuthToken(const grpc::string& username, const grpc::string& oauth_scope); // username is a string containing the user email - void DoPerRpcCreds(const grpc::string& json_key); + bool DoPerRpcCreds(const grpc::string& json_key); private: class ServiceStub { @@ -105,13 +106,18 @@ class InteropClient { // Get() call }; - void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response); + bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response); /// Run \a custom_check_fn as an additional check. - void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response, + bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response, CheckerFn custom_checks_fn); - void AssertOkOrPrintErrorStatus(const Status& s); + bool AssertStatusOk(const Status& s); + bool AssertStatusCode(const Status& s, StatusCode expected_code); + bool TransientFailureOrAbort(); ServiceStub serviceStub_; + + /// If true, abort() is not called for transient failures + bool do_not_abort_on_transient_failures_; }; } // namespace testing diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc index cc304f2e89..c8c2215fab 100644 --- a/test/cpp/interop/metrics_client.cc +++ b/test/cpp/interop/metrics_client.cc @@ -42,13 +42,15 @@ #include "test/cpp/util/metrics_server.h" #include "test/cpp/util/test_config.h" -DEFINE_string(metrics_server_address, "", +int kDeadlineSecs = 10; + +DEFINE_string(metrics_server_address, "localhost:8081", "The metrics server addresses in the fomrat <hostname>:<port>"); +DEFINE_int32(deadline_secs, kDeadlineSecs, + "The deadline (in seconds) for RCP call"); DEFINE_bool(total_only, false, "If true, this prints only the total value of all gauges"); -int kDeadlineSecs = 10; - using grpc::testing::EmptyMessage; using grpc::testing::GaugeResponse; using grpc::testing::MetricsService; @@ -56,12 +58,13 @@ using grpc::testing::MetricsServiceImpl; // Prints the values of all Gauges (unless total_only is set to 'true' in which // case this only prints the sum of all gauge values). -bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) { +bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only, + int deadline_secs) { grpc::ClientContext context; EmptyMessage message; std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs); + std::chrono::system_clock::now() + std::chrono::seconds(deadline_secs); context.set_deadline(deadline); @@ -108,7 +111,8 @@ int main(int argc, char** argv) { std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel( FLAGS_metrics_server_address, grpc::InsecureChannelCredentials())); - if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) { + if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only, + FLAGS_deadline_secs)) { return 1; } diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index f287a5aa3b..aa95682e74 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -84,11 +84,12 @@ StressTestInteropClient::StressTestInteropClient( int test_id, const grpc::string& server_address, std::shared_ptr<Channel> channel, const WeightedRandomTestSelector& test_selector, long test_duration_secs, - long sleep_duration_ms) + long sleep_duration_ms, bool do_not_abort_on_transient_failures) : test_id_(test_id), server_address_(server_address), channel_(channel), - interop_client_(new InteropClient(channel, false)), + interop_client_(new InteropClient(channel, false, + do_not_abort_on_transient_failures)), test_selector_(test_selector), test_duration_secs_(test_duration_secs), sleep_duration_ms_(sleep_duration_ms) {} @@ -126,31 +127,67 @@ void StressTestInteropClient::MainLoop(std::shared_ptr<QpsGauge> qps_gauge) { } } -// TODO(sree): Add all interop tests -void StressTestInteropClient::RunTest(TestCaseType test_case) { +bool StressTestInteropClient::RunTest(TestCaseType test_case) { + bool is_success = false; switch (test_case) { case EMPTY_UNARY: { - interop_client_->DoEmpty(); + is_success = interop_client_->DoEmpty(); break; } case LARGE_UNARY: { - interop_client_->DoLargeUnary(); + is_success = interop_client_->DoLargeUnary(); break; } case LARGE_COMPRESSED_UNARY: { - interop_client_->DoLargeCompressedUnary(); + is_success = interop_client_->DoLargeCompressedUnary(); break; } case CLIENT_STREAMING: { - interop_client_->DoRequestStreaming(); + is_success = interop_client_->DoRequestStreaming(); break; } case SERVER_STREAMING: { - interop_client_->DoResponseStreaming(); + is_success = interop_client_->DoResponseStreaming(); + break; + } + case SERVER_COMPRESSED_STREAMING: { + is_success = interop_client_->DoResponseCompressedStreaming(); + break; + } + case SLOW_CONSUMER: { + is_success = interop_client_->DoResponseStreamingWithSlowConsumer(); + break; + } + case HALF_DUPLEX: { + is_success = interop_client_->DoHalfDuplex(); + break; + } + case PING_PONG: { + is_success = interop_client_->DoPingPong(); + break; + } + case CANCEL_AFTER_BEGIN: { + is_success = interop_client_->DoCancelAfterBegin(); + break; + } + case CANCEL_AFTER_FIRST_RESPONSE: { + is_success = interop_client_->DoCancelAfterFirstResponse(); + break; + } + case TIMEOUT_ON_SLEEPING_SERVER: { + is_success = interop_client_->DoTimeoutOnSleepingServer(); break; } case EMPTY_STREAM: { - interop_client_->DoEmptyStream(); + is_success = interop_client_->DoEmptyStream(); + break; + } + case STATUS_CODE_AND_MESSAGE: { + is_success = interop_client_->DoStatusWithMessage(); + break; + } + case CUSTOM_METADATA: { + is_success = interop_client_->DoCustomMetadata(); break; } default: { @@ -159,6 +196,8 @@ void StressTestInteropClient::RunTest(TestCaseType test_case) { break; } } + + return is_success; } } // namespace testing diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index cb0cd98821..aa93b58b4a 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -49,7 +49,6 @@ namespace testing { using std::pair; using std::vector; -// TODO(sreek): Add more test cases here in future enum TestCaseType { UNKNOWN_TEST = -1, EMPTY_UNARY = 0, @@ -57,7 +56,16 @@ enum TestCaseType { LARGE_COMPRESSED_UNARY = 2, CLIENT_STREAMING = 3, SERVER_STREAMING = 4, - EMPTY_STREAM = 5 + SERVER_COMPRESSED_STREAMING = 5, + SLOW_CONSUMER = 6, + HALF_DUPLEX = 7, + PING_PONG = 8, + CANCEL_AFTER_BEGIN = 9, + CANCEL_AFTER_FIRST_RESPONSE = 10, + TIMEOUT_ON_SLEEPING_SERVER = 11, + EMPTY_STREAM = 12, + STATUS_CODE_AND_MESSAGE = 13, + CUSTOM_METADATA = 14 }; const vector<pair<TestCaseType, grpc::string>> kTestCaseList = { @@ -66,7 +74,16 @@ const vector<pair<TestCaseType, grpc::string>> kTestCaseList = { {LARGE_COMPRESSED_UNARY, "large_compressed_unary"}, {CLIENT_STREAMING, "client_streaming"}, {SERVER_STREAMING, "server_streaming"}, - {EMPTY_STREAM, "empty_stream"}}; + {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"}, + {SLOW_CONSUMER, "slow_consumer"}, + {HALF_DUPLEX, "half_duplex"}, + {PING_PONG, "ping_pong"}, + {CANCEL_AFTER_BEGIN, "cancel_after_begin"}, + {CANCEL_AFTER_FIRST_RESPONSE, "cancel_after_first_response"}, + {TIMEOUT_ON_SLEEPING_SERVER, "timeout_on_sleeping_server"}, + {EMPTY_STREAM, "empty_stream"}, + {STATUS_CODE_AND_MESSAGE, "status_code_and_message"}, + {CUSTOM_METADATA, "custom_metadata"}}; class WeightedRandomTestSelector { public: @@ -87,14 +104,15 @@ class StressTestInteropClient { StressTestInteropClient(int test_id, const grpc::string& server_address, std::shared_ptr<Channel> channel, const WeightedRandomTestSelector& test_selector, - long test_duration_secs, long sleep_duration_ms); + long test_duration_secs, long sleep_duration_ms, + bool do_not_abort_on_transient_failures); // The main function. Use this as the thread entry point. // qps_gauge is the QpsGauge to record the requests per second metric void MainLoop(std::shared_ptr<QpsGauge> qps_gauge); private: - void RunTest(TestCaseType test_case); + bool RunTest(TestCaseType test_case); int test_id_; const grpc::string& server_address_; diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc index d9e3fd25c5..7787931900 100644 --- a/test/cpp/interop/stress_test.cc +++ b/test/cpp/interop/stress_test.cc @@ -89,7 +89,16 @@ DEFINE_string(test_cases, "", " large_compressed_unary\n" " client_streaming\n" " server_streaming\n" + " server_compressed_streaming\n" + " slow_consumer\n" + " half_duplex\n" + " ping_pong\n" + " cancel_after_begin\n" + " cancel_after_first_response\n" + " timeout_on_sleeping_server\n" " empty_stream\n" + " status_code_and_message\n" + " custom_metadata\n" " Example: \"empty_unary:20,large_unary:10,empty_stream:70\"\n" " The above will execute 'empty_unary', 20% of the time," " 'large_unary', 10% of the time and 'empty_stream' the remaining" @@ -101,6 +110,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO, "The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 " "(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)"); +DEFINE_bool(do_not_abort_on_transient_failures, true, + "If set to 'true', abort() is not called in case of transient " + "failures like temporary connection failures."); + using grpc::testing::kTestCaseList; using grpc::testing::MetricsService; using grpc::testing::MetricsServiceImpl; @@ -189,6 +202,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses, gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str()); gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms); gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs); + gpr_log(GPR_INFO, "num_channels_per_server: %d", + FLAGS_num_channels_per_server); + gpr_log(GPR_INFO, "num_stubs_per_channel: %d", FLAGS_num_stubs_per_channel); + gpr_log(GPR_INFO, "log_level: %d", FLAGS_log_level); + gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s", + FLAGS_do_not_abort_on_transient_failures ? "true" : "false"); int num = 0; for (auto it = addresses.begin(); it != addresses.end(); it++) { @@ -272,7 +291,7 @@ int main(int argc, char** argv) { stub_idx++) { StressTestInteropClient* client = new StressTestInteropClient( ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, - FLAGS_sleep_duration_ms); + FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures); bool is_already_created = false; // QpsGauge name |