aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c1
-rw-r--r--test/cpp/interop/client.cc13
-rw-r--r--test/cpp/interop/interop_client.cc336
-rw-r--r--test/cpp/interop/interop_client.h62
-rw-r--r--test/cpp/interop/stress_interop_client.cc5
-rw-r--r--test/cpp/interop/stress_interop_client.h3
-rw-r--r--test/cpp/interop/stress_test.cc12
7 files changed, 330 insertions, 102 deletions
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 9918fbdcb4..91fa917661 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -174,6 +174,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) {
+ gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
fail_locked(exec_ctx, holder);
} else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
/* already cancelled before subchannel became ready */
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 9af6a88044..7727824979 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "",
DEFINE_string(service_account_key_file, "",
"Path to service account json key file.");
DEFINE_string(oauth_scope, "", "Scope for OAuth tokens.");
+DEFINE_bool(do_not_abort_on_transient_failures, false,
+ "If set to 'true', abort() is not called in case of transient "
+ "failures (i.e failures that are temporary and will likely go away "
+ "on retrying; like a temporary connection failure) and an error "
+ "message is printed instead. Note that this flag just controls "
+ "whether abort() is called or not. It does not control whether the "
+ "test is retried in case of transient failures (and currently the "
+ "interop tests are not retried even if this flag is set to true)");
using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey;
@@ -89,8 +97,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0;
- grpc::testing::InteropClient client(
- CreateChannelForTestCase(FLAGS_test_case));
+ grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
+ true,
+ FLAGS_do_not_abort_on_transient_failures);
if (FLAGS_test_case == "empty_unary") {
client.DoEmpty();
} else if (FLAGS_test_case == "large_unary") {
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index 22293d211f..e5853b40f8 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -134,23 +134,43 @@ void InteropClient::Reset(std::shared_ptr<Channel> channel) {
serviceStub_.Reset(channel);
}
-InteropClient::InteropClient(std::shared_ptr<Channel> channel)
- : serviceStub_(channel, true) {}
-
InteropClient::InteropClient(std::shared_ptr<Channel> channel,
- bool new_stub_every_test_case)
- : serviceStub_(channel, new_stub_every_test_case) {}
+ bool new_stub_every_test_case,
+ bool do_not_abort_on_transient_failures)
+ : serviceStub_(channel, new_stub_every_test_case),
+ do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
-void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
+bool InteropClient::AssertStatusOk(const Status& s) {
if (s.ok()) {
- return;
+ return true;
}
- gpr_log(GPR_ERROR, "Error status code: %d, message: %s", s.error_code(),
- s.error_message().c_str());
- GPR_ASSERT(0);
+
+ // Note: At this point, s.error_code is definitely not StatusCode::OK (we
+ // already checked for s.ok() above). So, the following will call abort()
+ // (unless s.error_code() corresponds to a transient failure and
+ // 'do_not_abort_on_transient_failures' is true)
+ return AssertStatusCode(s, StatusCode::OK);
}
-void InteropClient::DoEmpty() {
+bool InteropClient::AssertStatusCode(const Status& s,
+ StatusCode expected_code) {
+ if (s.error_code() == expected_code) {
+ return true;
+ }
+
+ gpr_log(GPR_ERROR, "Error status code: %d (expected: %d), message: %s",
+ s.error_code(), expected_code, s.error_message().c_str());
+
+ // In case of transient transient/retryable failures (like a broken
+ // connection) we may or may not abort (see TransientFailureOrAbort())
+ if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
+ return TransientFailureOrAbort();
+ }
+
+ abort();
+}
+
+bool InteropClient::DoEmpty() {
gpr_log(GPR_DEBUG, "Sending an empty rpc...");
Empty request = Empty::default_instance();
@@ -158,17 +178,21 @@ void InteropClient::DoEmpty() {
ClientContext context;
Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
gpr_log(GPR_DEBUG, "Empty rpc done.");
+ return true;
}
-void InteropClient::PerformLargeUnary(SimpleRequest* request,
+bool InteropClient::PerformLargeUnary(SimpleRequest* request,
SimpleResponse* response) {
- PerformLargeUnary(request, response, NoopChecks);
+ return PerformLargeUnary(request, response, NoopChecks);
}
-void InteropClient::PerformLargeUnary(SimpleRequest* request,
+bool InteropClient::PerformLargeUnary(SimpleRequest* request,
SimpleResponse* response,
CheckerFn custom_checks_fn) {
ClientContext context;
@@ -180,7 +204,9 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
custom_checks_fn(inspector, request, response);
@@ -203,9 +229,11 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
default:
GPR_ASSERT(false);
}
+
+ return true;
}
-void InteropClient::DoComputeEngineCreds(
+bool InteropClient::DoComputeEngineCreds(
const grpc::string& default_service_account,
const grpc::string& oauth_scope) {
gpr_log(GPR_DEBUG,
@@ -215,7 +243,11 @@ void InteropClient::DoComputeEngineCreds(
request.set_fill_username(true);
request.set_fill_oauth_scope(true);
request.set_response_type(PayloadType::COMPRESSABLE);
- PerformLargeUnary(&request, &response);
+
+ if (!PerformLargeUnary(&request, &response)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
GPR_ASSERT(!response.username().empty());
@@ -224,9 +256,10 @@ void InteropClient::DoComputeEngineCreds(
const char* oauth_scope_str = response.oauth_scope().c_str();
GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
+ return true;
}
-void InteropClient::DoOauth2AuthToken(const grpc::string& username,
+bool InteropClient::DoOauth2AuthToken(const grpc::string& username,
const grpc::string& oauth_scope) {
gpr_log(GPR_DEBUG,
"Sending a unary rpc with raw oauth2 access token credentials ...");
@@ -239,16 +272,20 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username,
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(!response.oauth_scope().empty());
GPR_ASSERT(username == response.username());
const char* oauth_scope_str = response.oauth_scope().c_str();
GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
+ return true;
}
-void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
+bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
SimpleRequest request;
SimpleResponse response;
@@ -263,35 +300,47 @@ void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
+ return true;
}
-void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
+bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
gpr_log(GPR_DEBUG,
"Sending a large unary rpc with JWT token credentials ...");
SimpleRequest request;
SimpleResponse response;
request.set_fill_username(true);
request.set_response_type(PayloadType::COMPRESSABLE);
- PerformLargeUnary(&request, &response);
+
+ if (!PerformLargeUnary(&request, &response)) {
+ return false;
+ }
+
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
+ return true;
}
-void InteropClient::DoLargeUnary() {
+bool InteropClient::DoLargeUnary() {
gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
SimpleRequest request;
SimpleResponse response;
request.set_response_type(PayloadType::COMPRESSABLE);
- PerformLargeUnary(&request, &response);
+ if (!PerformLargeUnary(&request, &response)) {
+ return false;
+ }
gpr_log(GPR_DEBUG, "Large unary done.");
+ return true;
}
-void InteropClient::DoLargeCompressedUnary() {
+bool InteropClient::DoLargeCompressedUnary() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
@@ -307,14 +356,32 @@ void InteropClient::DoLargeCompressedUnary() {
SimpleResponse response;
request.set_response_type(payload_types[i]);
request.set_response_compression(compression_types[j]);
- PerformLargeUnary(&request, &response, CompressionChecks);
+
+ if (!PerformLargeUnary(&request, &response, CompressionChecks)) {
+ gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix);
+ gpr_free(log_suffix);
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix);
gpr_free(log_suffix);
}
}
+
+ return true;
}
-void InteropClient::DoRequestStreaming() {
+// Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
+// false
+bool InteropClient::TransientFailureOrAbort() {
+ if (do_not_abort_on_transient_failures_) {
+ return false;
+ }
+
+ abort();
+}
+
+bool InteropClient::DoRequestStreaming() {
gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
ClientContext context;
@@ -328,18 +395,24 @@ void InteropClient::DoRequestStreaming() {
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
Payload* payload = request.mutable_payload();
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
- GPR_ASSERT(stream->Write(request));
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
+ return TransientFailureOrAbort();
+ }
aggregated_payload_size += request_stream_sizes[i];
}
stream->WritesDone();
+
Status s = stream->Finish();
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
- AssertOkOrPrintErrorStatus(s);
- gpr_log(GPR_DEBUG, "Request streaming done.");
+ return true;
}
-void InteropClient::DoResponseStreaming() {
+bool InteropClient::DoResponseStreaming() {
gpr_log(GPR_DEBUG, "Receiving response steaming rpc ...");
ClientContext context;
@@ -358,13 +431,27 @@ void InteropClient::DoResponseStreaming() {
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
- GPR_ASSERT(response_stream_sizes.size() == i);
+
+ if (i < response_stream_sizes.size()) {
+ // stream->Read() failed before reading all the expected messages. This is
+ // most likely due to connection failure.
+ gpr_log(GPR_ERROR,
+ "DoResponseStreaming(): Read fewer streams (%d) than "
+ "response_stream_sizes.size() (%d)",
+ i, response_stream_sizes.size());
+ return TransientFailureOrAbort();
+ }
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Response streaming done.");
+ return true;
}
-void InteropClient::DoResponseCompressedStreaming() {
+bool InteropClient::DoResponseCompressedStreaming() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
@@ -432,17 +519,31 @@ void InteropClient::DoResponseCompressedStreaming() {
++k;
}
- GPR_ASSERT(response_stream_sizes.size() == k);
- Status s = stream->Finish();
-
- AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix);
gpr_free(log_suffix);
+
+ if (k < response_stream_sizes.size()) {
+ // stream->Read() failed before reading all the expected messages. This
+ // is most likely due to a connection failure.
+ gpr_log(GPR_ERROR,
+ "DoResponseCompressedStreaming(): Responses read (k=%d) is "
+ "less than the expected messages (i.e "
+ "response_stream_sizes.size() (%d)). (i=%d, j=%d)",
+ k, response_stream_sizes.size(), i, j);
+ return TransientFailureOrAbort();
+ }
+
+ Status s = stream->Finish();
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
}
}
+
+ return true;
}
-void InteropClient::DoResponseStreamingWithSlowConsumer() {
+bool InteropClient::DoResponseStreamingWithSlowConsumer() {
gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ...");
ClientContext context;
@@ -464,14 +565,26 @@ void InteropClient::DoResponseStreamingWithSlowConsumer() {
usleep(kReceiveDelayMilliSeconds * 1000);
++i;
}
- GPR_ASSERT(kNumResponseMessages == i);
+
+ if (i < kNumResponseMessages) {
+ gpr_log(GPR_ERROR,
+ "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
+ "less than the expected messages (i.e kNumResponseMessages = %d)",
+ i, kNumResponseMessages);
+
+ return TransientFailureOrAbort();
+ }
+
Status s = stream->Finish();
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
- AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_DEBUG, "Response streaming done.");
+ return true;
}
-void InteropClient::DoHalfDuplex() {
+bool InteropClient::DoHalfDuplex() {
gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
ClientContext context;
@@ -483,7 +596,11 @@ void InteropClient::DoHalfDuplex() {
ResponseParameters* response_parameter = request.add_response_parameters();
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
- GPR_ASSERT(stream->Write(request));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
+ return TransientFailureOrAbort();
+ }
}
stream->WritesDone();
@@ -494,13 +611,27 @@ void InteropClient::DoHalfDuplex() {
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
- GPR_ASSERT(response_stream_sizes.size() == i);
+
+ if (i < response_stream_sizes.size()) {
+ // stream->Read() failed before reading all the expected messages. This is
+ // most likely due to a connection failure
+ gpr_log(GPR_ERROR,
+ "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
+ "number of messages response_stream_sizes.size() (%d)",
+ i, response_stream_sizes.size());
+ return TransientFailureOrAbort();
+ }
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
+ return true;
}
-void InteropClient::DoPingPong() {
+bool InteropClient::DoPingPong() {
gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
ClientContext context;
@@ -513,23 +644,39 @@ void InteropClient::DoPingPong() {
ResponseParameters* response_parameter = request.add_response_parameters();
Payload* payload = request.mutable_payload();
StreamingOutputCallResponse response;
+
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
- GPR_ASSERT(stream->Write(request));
- GPR_ASSERT(stream->Read(&response));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
+ return TransientFailureOrAbort();
+ }
+
+ if (!stream->Read(&response)) {
+ gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
+ return TransientFailureOrAbort();
+ }
+
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
}
stream->WritesDone();
+
GPR_ASSERT(!stream->Read(&response));
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Ping pong streaming done.");
+ return true;
}
-void InteropClient::DoCancelAfterBegin() {
+bool InteropClient::DoCancelAfterBegin() {
gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
ClientContext context;
@@ -542,11 +689,16 @@ void InteropClient::DoCancelAfterBegin() {
gpr_log(GPR_DEBUG, "Trying to cancel...");
context.TryCancel();
Status s = stream->Finish();
- GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
+
+ if (!AssertStatusCode(s, StatusCode::CANCELLED)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Canceling streaming done.");
+ return true;
}
-void InteropClient::DoCancelAfterFirstResponse() {
+bool InteropClient::DoCancelAfterFirstResponse() {
gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
ClientContext context;
@@ -560,17 +712,27 @@ void InteropClient::DoCancelAfterFirstResponse() {
response_parameter->set_size(31415);
request.mutable_payload()->set_body(grpc::string(27182, '\0'));
StreamingOutputCallResponse response;
- GPR_ASSERT(stream->Write(request));
- GPR_ASSERT(stream->Read(&response));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
+ return TransientFailureOrAbort();
+ }
+
+ if (!stream->Read(&response)) {
+ gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
+ return TransientFailureOrAbort();
+ }
GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
+
gpr_log(GPR_DEBUG, "Trying to cancel...");
context.TryCancel();
Status s = stream->Finish();
gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
+ return true;
}
-void InteropClient::DoTimeoutOnSleepingServer() {
+bool InteropClient::DoTimeoutOnSleepingServer() {
gpr_log(GPR_DEBUG,
"Sending Ping Pong streaming rpc with a short deadline...");
@@ -584,14 +746,23 @@ void InteropClient::DoTimeoutOnSleepingServer() {
StreamingOutputCallRequest request;
request.mutable_payload()->set_body(grpc::string(27182, '\0'));
- stream->Write(request);
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoTimeoutOnSleepingServer(): stream->Write() failed");
+ return TransientFailureOrAbort();
+ }
Status s = stream->Finish();
- GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
+
+ if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
+ return true;
}
-void InteropClient::DoEmptyStream() {
+bool InteropClient::DoEmptyStream() {
gpr_log(GPR_DEBUG, "Starting empty_stream.");
ClientContext context;
@@ -601,12 +772,17 @@ void InteropClient::DoEmptyStream() {
stream->WritesDone();
StreamingOutputCallResponse response;
GPR_ASSERT(stream->Read(&response) == false);
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
gpr_log(GPR_DEBUG, "empty_stream done.");
+ return true;
}
-void InteropClient::DoStatusWithMessage() {
+bool InteropClient::DoStatusWithMessage() {
gpr_log(GPR_DEBUG,
"Sending RPC with a request for status code 2 and message");
@@ -620,12 +796,16 @@ void InteropClient::DoStatusWithMessage() {
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
+ if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) {
+ return false;
+ }
+
GPR_ASSERT(s.error_message() == test_msg);
gpr_log(GPR_DEBUG, "Done testing Status and Message");
+ return true;
}
-void InteropClient::DoCustomMetadata() {
+bool InteropClient::DoCustomMetadata() {
const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
const grpc::string kInitialMetadataValue("test_initial_metadata_value");
const grpc::string kEchoTrailingBinMetadataKey(
@@ -645,7 +825,10 @@ void InteropClient::DoCustomMetadata() {
request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
@@ -675,14 +858,29 @@ void InteropClient::DoCustomMetadata() {
grpc::string payload(kLargeRequestSize, '\0');
request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
StreamingOutputCallResponse response;
- GPR_ASSERT(stream->Write(request));
+
+ if (!stream->Write(request)) {
+ gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
+ return TransientFailureOrAbort();
+ }
+
stream->WritesDone();
- GPR_ASSERT(stream->Read(&response));
+
+ if (!stream->Read(&response)) {
+ gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
+ return TransientFailureOrAbort();
+ }
+
GPR_ASSERT(response.payload().body() ==
grpc::string(kLargeResponseSize, '\0'));
+
GPR_ASSERT(!stream->Read(&response));
+
Status s = stream->Finish();
- AssertOkOrPrintErrorStatus(s);
+ if (!AssertStatusOk(s)) {
+ return false;
+ }
+
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
@@ -695,6 +893,8 @@ void InteropClient::DoCustomMetadata() {
gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
}
+
+ return true;
}
} // namespace testing
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index a3794fd93f..ae75762bb8 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -51,41 +51,42 @@ using CheckerFn =
class InteropClient {
public:
- explicit InteropClient(std::shared_ptr<Channel> channel);
- explicit InteropClient(
- std::shared_ptr<Channel> channel,
- bool new_stub_every_test_case); // If new_stub_every_test_case is true,
- // a new TestService::Stub object is
- // created for every test case below
+ /// If new_stub_every_test_case is true, a new TestService::Stub object is
+ /// created for every test case
+ /// If do_not_abort_on_transient_failures is true, abort() is not called in
+ /// case of transient failures (like connection failures)
+ explicit InteropClient(std::shared_ptr<Channel> channel,
+ bool new_stub_every_test_case,
+ bool do_not_abort_on_transient_failures);
~InteropClient() {}
void Reset(std::shared_ptr<Channel> channel);
- void DoEmpty();
- void DoLargeUnary();
- void DoLargeCompressedUnary();
- void DoPingPong();
- void DoHalfDuplex();
- void DoRequestStreaming();
- void DoResponseStreaming();
- void DoResponseCompressedStreaming();
- void DoResponseStreamingWithSlowConsumer();
- void DoCancelAfterBegin();
- void DoCancelAfterFirstResponse();
- void DoTimeoutOnSleepingServer();
- void DoEmptyStream();
- void DoStatusWithMessage();
- void DoCustomMetadata();
+ bool DoEmpty();
+ bool DoLargeUnary();
+ bool DoLargeCompressedUnary();
+ bool DoPingPong();
+ bool DoHalfDuplex();
+ bool DoRequestStreaming();
+ bool DoResponseStreaming();
+ bool DoResponseCompressedStreaming();
+ bool DoResponseStreamingWithSlowConsumer();
+ bool DoCancelAfterBegin();
+ bool DoCancelAfterFirstResponse();
+ bool DoTimeoutOnSleepingServer();
+ bool DoEmptyStream();
+ bool DoStatusWithMessage();
+ bool DoCustomMetadata();
// Auth tests.
// username is a string containing the user email
- void DoJwtTokenCreds(const grpc::string& username);
- void DoComputeEngineCreds(const grpc::string& default_service_account,
+ bool DoJwtTokenCreds(const grpc::string& username);
+ bool DoComputeEngineCreds(const grpc::string& default_service_account,
const grpc::string& oauth_scope);
// username the GCE default service account email
- void DoOauth2AuthToken(const grpc::string& username,
+ bool DoOauth2AuthToken(const grpc::string& username,
const grpc::string& oauth_scope);
// username is a string containing the user email
- void DoPerRpcCreds(const grpc::string& json_key);
+ bool DoPerRpcCreds(const grpc::string& json_key);
private:
class ServiceStub {
@@ -105,13 +106,18 @@ class InteropClient {
// Get() call
};
- void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
+ bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
/// Run \a custom_check_fn as an additional check.
- void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
+ bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
CheckerFn custom_checks_fn);
- void AssertOkOrPrintErrorStatus(const Status& s);
+ bool AssertStatusOk(const Status& s);
+ bool AssertStatusCode(const Status& s, StatusCode expected_code);
+ bool TransientFailureOrAbort();
ServiceStub serviceStub_;
+
+ /// If true, abort() is not called for transient failures
+ bool do_not_abort_on_transient_failures_;
};
} // namespace testing
diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc
index f287a5aa3b..31f5a424a0 100644
--- a/test/cpp/interop/stress_interop_client.cc
+++ b/test/cpp/interop/stress_interop_client.cc
@@ -84,11 +84,12 @@ StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector, long test_duration_secs,
- long sleep_duration_ms)
+ long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id),
server_address_(server_address),
channel_(channel),
- interop_client_(new InteropClient(channel, false)),
+ interop_client_(new InteropClient(channel, false,
+ do_not_abort_on_transient_failures)),
test_selector_(test_selector),
test_duration_secs_(test_duration_secs),
sleep_duration_ms_(sleep_duration_ms) {}
diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h
index cb0cd98821..c41ac6afc7 100644
--- a/test/cpp/interop/stress_interop_client.h
+++ b/test/cpp/interop/stress_interop_client.h
@@ -87,7 +87,8 @@ class StressTestInteropClient {
StressTestInteropClient(int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector,
- long test_duration_secs, long sleep_duration_ms);
+ long test_duration_secs, long sleep_duration_ms,
+ bool do_not_abort_on_transient_failures);
// The main function. Use this as the thread entry point.
// qps_gauge is the QpsGauge to record the requests per second metric
diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc
index d9e3fd25c5..f0e9e3287e 100644
--- a/test/cpp/interop/stress_test.cc
+++ b/test/cpp/interop/stress_test.cc
@@ -101,6 +101,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO,
"The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 "
"(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)");
+DEFINE_bool(do_not_abort_on_transient_failures, true,
+ "If set to 'true', abort() is not called in case of transient "
+ "failures like temporary connection failures.");
+
using grpc::testing::kTestCaseList;
using grpc::testing::MetricsService;
using grpc::testing::MetricsServiceImpl;
@@ -189,6 +193,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses,
gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str());
gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms);
gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs);
+ gpr_log(GPR_INFO, "num_channels_per_server: %d",
+ FLAGS_num_channels_per_server);
+ gpr_log(GPR_INFO, "num_stubs_per_channel: %d", FLAGS_num_stubs_per_channel);
+ gpr_log(GPR_INFO, "log_level: %d", FLAGS_log_level);
+ gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s",
+ FLAGS_do_not_abort_on_transient_failures ? "true" : "false");
int num = 0;
for (auto it = addresses.begin(); it != addresses.end(); it++) {
@@ -272,7 +282,7 @@ int main(int argc, char** argv) {
stub_idx++) {
StressTestInteropClient* client = new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
- FLAGS_sleep_duration_ms);
+ FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures);
bool is_already_created = false;
// QpsGauge name