diff options
author | Yuchen Zeng <zyc@google.com> | 2016-06-23 13:58:29 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-06-23 13:58:29 -0700 |
commit | 09b1ec8ccb7b0e5b9073fb346840bcfd00f35caf (patch) | |
tree | b2bc46c3e3741240b98c99bc1673da0b43676c03 /test/cpp/interop | |
parent | d139da8b9a9c482a83b23c926690f01b49025640 (diff) | |
parent | 0140f7c9e6c20fe78035d9635a3f5725d51ad35a (diff) |
Merge remote-tracking branch 'upstream/master' into auto-build-example
Diffstat (limited to 'test/cpp/interop')
-rw-r--r-- | test/cpp/interop/client.cc | 102 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.cc | 357 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.h | 6 | ||||
-rw-r--r-- | test/cpp/interop/interop_server.cc (renamed from test/cpp/interop/server_main.cc) | 111 | ||||
-rw-r--r-- | test/cpp/interop/rnd.dat | bin | 524288 -> 0 bytes | |||
-rw-r--r-- | test/cpp/interop/server_helper.cc | 4 | ||||
-rw-r--r-- | test/cpp/interop/server_helper.h | 1 | ||||
-rw-r--r-- | test/cpp/interop/stress_interop_client.cc | 14 | ||||
-rw-r--r-- | test/cpp/interop/stress_interop_client.h | 36 |
9 files changed, 385 insertions, 246 deletions
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index addaf174f2..e8ae6ee572 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -40,7 +40,9 @@ #include <grpc++/client_context.h> #include <grpc/grpc.h> #include <grpc/support/log.h> +#include <grpc/support/useful.h> +#include "src/core/lib/support/string.h" #include "test/cpp/interop/client_helper.h" #include "test/cpp/interop/interop_client.h" #include "test/cpp/util/test_config.h" @@ -52,30 +54,31 @@ DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); DEFINE_string(server_host_override, "foo.test.google.fr", "Override the server host which is sent in HTTP header"); DEFINE_string(test_case, "large_unary", - "Configure different test cases. Valid options are: " - "empty_unary : empty (zero bytes) request and response; " - "large_unary : single request and (large) response; " - "large_compressed_unary : single request and compressed (large) " - "response; " - "client_streaming : request streaming with single response; " - "server_streaming : single request with response streaming; " + "Configure different test cases. Valid options are:\n\n" + "all : all test cases;\n" + "cancel_after_begin : cancel stream after starting it;\n" + "cancel_after_first_response: cancel on first response;\n" + "client_compressed_streaming : compressed request streaming with " + "client_compressed_unary : single compressed request;\n" + "client_streaming : request streaming with single response;\n" + "compute_engine_creds: large_unary with compute engine auth;\n" + "custom_metadata: server will echo custom metadata;\n" + "empty_stream : bi-di stream with no request/response;\n" + "empty_unary : empty (zero bytes) request and response;\n" + "half_duplex : half-duplex streaming;\n" + "jwt_token_creds: large_unary with JWT token auth;\n" + "large_unary : single request and (large) response;\n" + "oauth2_auth_token: raw oauth2 access token auth;\n" + "per_rpc_creds: raw oauth2 access token on a single rpc;\n" + "ping_pong : full-duplex streaming;\n" + "response streaming;\n" "server_compressed_streaming : single request with compressed " - "response streaming; " - "slow_consumer : single request with response; " - " streaming with slow client consumer; " - "half_duplex : half-duplex streaming; " - "ping_pong : full-duplex streaming; " - "cancel_after_begin : cancel stream after starting it; " - "cancel_after_first_response: cancel on first response; " - "timeout_on_sleeping_server: deadline exceeds on stream; " - "empty_stream : bi-di stream with no request/response; " - "compute_engine_creds: large_unary with compute engine auth; " - "jwt_token_creds: large_unary with JWT token auth; " - "oauth2_auth_token: raw oauth2 access token auth; " - "per_rpc_creds: raw oauth2 access token on a single rpc; " - "status_code_and_message: verify status code & message; " - "custom_metadata: server will echo custom metadata;" - "all : all of above."); + "server_compressed_unary : single compressed response;\n" + "server_streaming : single request with response streaming;\n" + "slow_consumer : single request with response streaming with " + "slow client consumer;\n" + "status_code_and_message: verify status code & message;\n" + "timeout_on_sleeping_server: deadline exceeds on stream;\n"); DEFINE_string(default_service_account, "", "Email of GCE default service account"); DEFINE_string(service_account_key_file, "", @@ -104,14 +107,18 @@ int main(int argc, char** argv) { client.DoEmpty(); } else if (FLAGS_test_case == "large_unary") { client.DoLargeUnary(); - } else if (FLAGS_test_case == "large_compressed_unary") { - client.DoLargeCompressedUnary(); + } else if (FLAGS_test_case == "server_compressed_unary") { + client.DoServerCompressedUnary(); + } else if (FLAGS_test_case == "client_compressed_unary") { + client.DoClientCompressedUnary(); } else if (FLAGS_test_case == "client_streaming") { client.DoRequestStreaming(); } else if (FLAGS_test_case == "server_streaming") { client.DoResponseStreaming(); } else if (FLAGS_test_case == "server_compressed_streaming") { - client.DoResponseCompressedStreaming(); + client.DoServerCompressedStreaming(); + } else if (FLAGS_test_case == "client_compressed_streaming") { + client.DoClientCompressedStreaming(); } else if (FLAGS_test_case == "slow_consumer") { client.DoResponseStreamingWithSlowConsumer(); } else if (FLAGS_test_case == "half_duplex") { @@ -144,9 +151,12 @@ int main(int argc, char** argv) { } else if (FLAGS_test_case == "all") { client.DoEmpty(); client.DoLargeUnary(); + client.DoClientCompressedUnary(); + client.DoServerCompressedUnary(); client.DoRequestStreaming(); client.DoResponseStreaming(); - client.DoResponseCompressedStreaming(); + client.DoClientCompressedStreaming(); + client.DoServerCompressedStreaming(); client.DoHalfDuplex(); client.DoPingPong(); client.DoCancelAfterBegin(); @@ -165,15 +175,35 @@ int main(int argc, char** argv) { } // compute_engine_creds only runs in GCE. } else { - gpr_log( - GPR_ERROR, - "Unsupported test case %s. Valid options are all|empty_unary|" - "large_unary|large_compressed_unary|client_streaming|server_streaming|" - "server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|" - "cancel_after_first_response|timeout_on_sleeping_server|empty_stream|" - "compute_engine_creds|jwt_token_creds|oauth2_auth_token|per_rpc_creds|" - "status_code_and_message|custom_metadata", - FLAGS_test_case.c_str()); + const char* testcases[] = {"all", + "cancel_after_begin", + "cancel_after_first_response", + "client_compressed_streaming", + "client_compressed_unary", + "client_streaming", + "compute_engine_creds", + "custom_metadata", + "empty_stream", + "empty_unary", + "half_duplex", + "jwt_token_creds", + "large_unary", + "oauth2_auth_token", + "oauth2_auth_token", + "per_rpc_creds", + "per_rpc_creds", + "ping_pong", + "server_compressed_streaming", + "server_compressed_unary", + "server_streaming", + "status_code_and_message", + "timeout_on_sleeping_server"}; + char* joined_testcases = + gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL); + + gpr_log(GPR_ERROR, "Unsupported test case %s. Valid options are\n%s", + FLAGS_test_case.c_str(), joined_testcases); + gpr_free(joined_testcases); ret = 1; } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 608f902d6f..89f841dbe9 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,7 +57,7 @@ namespace testing { namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; -const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979}; +const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979}; const int kNumResponseMessages = 2000; const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; @@ -67,28 +67,23 @@ const int kLargeResponseSize = 314159; void NoopChecks(const InteropClientContextInspector& inspector, const SimpleRequest* request, const SimpleResponse* response) {} -void CompressionChecks(const InteropClientContextInspector& inspector, - const SimpleRequest* request, - const SimpleResponse* response) { +void UnaryCompressionChecks(const InteropClientContextInspector& inspector, + const SimpleRequest* request, + const SimpleResponse* response) { const grpc_compression_algorithm received_compression = inspector.GetCallCompressionAlgorithm(); - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { + if (request->response_compressed().value()) { + if (received_compression == GRPC_COMPRESS_NONE) { // Requested some compression, got NONE. This is an error. gpr_log(GPR_ERROR, "Failure: Requested compression but got uncompressed response " "from server."); abort(); } - } - if (!request->request_compressed_response()) { - GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } else if (request->response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should always - // be compressed. GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // Didn't request compression -> make sure the response is uncompressed + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } } } // namespace @@ -190,11 +185,16 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, CheckerFn custom_checks_fn) { ClientContext context; InteropClientContextInspector inspector(context); - // If the request doesn't already specify the response type, default to - // COMPRESSABLE. request->set_response_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); + if (request->has_expect_compressed()) { + if (request->expect_compressed().value()) { + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + } else { + context.set_compression_algorithm(GRPC_COMPRESS_NONE); + } + } Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); if (!AssertStatusOk(s)) { @@ -204,27 +204,8 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, custom_checks_fn(inspector, request, response); // Payload related checks. - GPR_ASSERT(response->payload().type() == request->response_type()); - switch (response->payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); - break; - case PayloadType::UNCOMPRESSABLE: { - // We don't really check anything: We can't assert that the payload is - // uncompressed because it's the server's prerogative to decide on that, - // and different implementations decide differently (ie, Java always - // compresses when requested to do so, whereas C core throws away the - // compressed payload if the output is larger than the input). - // In addition, we don't compare the actual random bytes received because - // asserting that data is sent/received properly isn't the purpose of this - // test. Moreover, different implementations are also free to use - // different sets of random bytes. - } break; - default: - GPR_ASSERT(false); - } - + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); return true; } @@ -237,7 +218,6 @@ bool InteropClient::DoComputeEngineCreds( SimpleResponse response; request.set_fill_username(true); request.set_fill_oauth_scope(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -311,7 +291,6 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { SimpleRequest request; SimpleResponse response; request.set_fill_username(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -327,7 +306,6 @@ bool InteropClient::DoLargeUnary() { gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; } @@ -335,32 +313,73 @@ bool InteropClient::DoLargeUnary() { return true; } -bool InteropClient::DoLargeCompressedUnary() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.", - log_suffix); - SimpleRequest request; - SimpleResponse response; - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - if (!PerformLargeUnary(&request, &response, CompressionChecks)) { - gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); - gpr_free(log_suffix); - return false; - } - - gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix); +bool InteropClient::DoClientCompressedUnary() { + // Probing for compression-checks support. + ClientContext probe_context; + SimpleRequest probe_req; + SimpleResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + + probe_req.set_response_size(kLargeResponseSize); + probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed unary request."); + const Status s = + serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed unary request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding."); + + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_expect_compressed()->set_value(compressions[i]); + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + return false; + } + + gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + } + + return true; +} + +bool InteropClient::DoServerCompressedUnary() { + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.", + log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_response_compressed()->set_value(compressions[i]); + + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix); gpr_free(log_suffix); + return false; } + + gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix); + gpr_free(log_suffix); } return true; @@ -387,7 +406,7 @@ bool InteropClient::DoRequestStreaming() { serviceStub_.Get()->StreamingInputCall(&context, &response)); int aggregated_payload_size = 0; - for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { + for (size_t i = 0; i < request_stream_sizes.size(); ++i) { Payload* payload = request.mutable_payload(); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); if (!stream->Write(request)) { @@ -396,7 +415,7 @@ bool InteropClient::DoRequestStreaming() { } aggregated_payload_size += request_stream_sizes[i]; } - stream->WritesDone(); + GPR_ASSERT(stream->WritesDone()); Status s = stream->Finish(); if (!AssertStatusOk(s)) { @@ -446,92 +465,129 @@ bool InteropClient::DoResponseStreaming() { return true; } -bool InteropClient::DoResponseCompressedStreaming() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - ClientContext context; - InteropClientContextInspector inspector(context); - StreamingOutputCallRequest request; - - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); - - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - for (size_t k = 0; k < response_stream_sizes.size(); ++k) { - ResponseParameters* response_parameter = - request.add_response_parameters(); - response_parameter->set_size(response_stream_sizes[k]); - } - StreamingOutputCallResponse response; - - std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( - serviceStub_.Get()->StreamingOutputCall(&context, request)); - - size_t k = 0; - while (stream->Read(&response)) { - // Payload related checks. - GPR_ASSERT(response.payload().type() == request.response_type()); - switch (response.payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response.payload().body() == - grpc::string(response_stream_sizes[k], '\0')); - break; - case PayloadType::UNCOMPRESSABLE: - break; - default: - GPR_ASSERT(false); - } - - // Compression related checks. - if (request.request_compressed_response()) { - GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > - GRPC_COMPRESS_NONE); - if (request.response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should - // always be compressed. - GPR_ASSERT(inspector.GetMessageFlags() & - GRPC_WRITE_INTERNAL_COMPRESS); - } - } else { - // requested *no* compression. - GPR_ASSERT( - !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } - - ++k; - } - - gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix); - gpr_free(log_suffix); +bool InteropClient::DoClientCompressedStreaming() { + // Probing for compression-checks support. + ClientContext probe_context; + StreamingInputCallRequest probe_req; + StreamingInputCallResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + probe_req.mutable_payload()->set_body(grpc::string(27182, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request."); + + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream( + serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res)); + + if (!probe_stream->Write(probe_req)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + Status s = probe_stream->Finish(); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed streaming request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, + "Compressed streaming request probe succeeded. Proceeding."); + + ClientContext context; + StreamingInputCallRequest request; + StreamingInputCallResponse response; + + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream( + serviceStub_.Get()->StreamingInputCall(&context, &response)); + + request.mutable_payload()->set_body(grpc::string(27182, '\0')); + request.mutable_expect_compressed()->set_value(true); + gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled"); + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + + WriteOptions wopts; + wopts.set_no_compression(); + request.mutable_payload()->set_body(grpc::string(45904, '\0')); + request.mutable_expect_compressed()->set_value(false); + gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled"); + if (!stream->Write(request, wopts)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + GPR_ASSERT(stream->WritesDone()); + + s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } + + return true; +} - if (k < response_stream_sizes.size()) { - // stream->Read() failed before reading all the expected messages. This - // is most likely due to a connection failure. - gpr_log(GPR_ERROR, - "DoResponseCompressedStreaming(): Responses read (k=%" PRIuPTR - ") is " - "less than the expected messages (i.e " - "response_stream_sizes.size() (%" PRIuPTR ")). (i=%" PRIuPTR - ", j=%" PRIuPTR ")", - k, response_stream_sizes.size(), i, j); - return TransientFailureOrAbort(); - } - - Status s = stream->Finish(); - if (!AssertStatusOk(s)) { - return false; - } +bool InteropClient::DoServerCompressedStreaming() { + const std::vector<bool> compressions = {true, false}; + const std::vector<int> sizes = {31415, 92653}; + + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; + + GPR_ASSERT(compressions.size() == sizes.size()); + for (size_t i = 0; i < sizes.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; size=%d)", + compressions[i] ? "true" : "false", sizes[i]); + + gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix); + gpr_free(log_suffix); + + ResponseParameters* const response_parameter = + request.add_response_parameters(); + response_parameter->mutable_compressed()->set_value(compressions[i]); + response_parameter->set_size(sizes[i]); + } + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + serviceStub_.Get()->StreamingOutputCall(&context, request)); + + size_t k = 0; + StreamingOutputCallResponse response; + while (stream->Read(&response)) { + // Payload size checks. + GPR_ASSERT(response.payload().body() == + grpc::string(request.response_parameters(k).size(), '\0')); + + // Compression checks. + GPR_ASSERT(request.response_parameters(k).has_compressed()); + if (request.response_parameters(k).compressed().value()) { + GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE); + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // requested *no* compression. + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } + ++k; + } + + if (k < sizes.size()) { + // stream->Read() failed before reading all the expected messages. This + // is most likely due to a connection failure. + gpr_log(GPR_ERROR, "%s(): Responses read (k=%" PRIuPTR + ") is " + "less than the expected messages (i.e " + "response_stream_sizes.size() (%" PRIuPTR ")).", + __func__, k, response_stream_sizes.size()); + return TransientFailureOrAbort(); } + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } return true; } @@ -632,7 +688,6 @@ bool InteropClient::DoPingPong() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); Payload* payload = request.mutable_payload(); StreamingOutputCallResponse response; @@ -699,7 +754,6 @@ bool InteropClient::DoCancelAfterFirstResponse() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(31415); request.mutable_payload()->set_body(grpc::string(27182, '\0')); @@ -839,7 +893,6 @@ bool InteropClient::DoCustomMetadata() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index c8d6810c12..eb886fcb7e 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -64,12 +64,14 @@ class InteropClient { bool DoEmpty(); bool DoLargeUnary(); - bool DoLargeCompressedUnary(); + bool DoServerCompressedUnary(); + bool DoClientCompressedUnary(); bool DoPingPong(); bool DoHalfDuplex(); bool DoRequestStreaming(); bool DoResponseStreaming(); - bool DoResponseCompressedStreaming(); + bool DoServerCompressedStreaming(); + bool DoClientCompressedStreaming(); bool DoResponseStreamingWithSlowConsumer(); bool DoCancelAfterBegin(); bool DoCancelAfterFirstResponse(); diff --git a/test/cpp/interop/server_main.cc b/test/cpp/interop/interop_server.cc index 062857f3d6..ebef0002a3 100644 --- a/test/cpp/interop/server_main.cc +++ b/test/cpp/interop/interop_server.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,6 +48,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/lib/transport/byte_stream.h" #include "src/proto/grpc/testing/empty.grpc.pb.h" #include "src/proto/grpc/testing/messages.grpc.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" @@ -64,10 +65,10 @@ using grpc::ServerCredentials; using grpc::ServerReader; using grpc::ServerReaderWriter; using grpc::ServerWriter; +using grpc::WriteOptions; using grpc::SslServerCredentialsOptions; using grpc::testing::InteropServerContextInspector; using grpc::testing::Payload; -using grpc::testing::PayloadType; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::StreamingInputCallRequest; @@ -78,7 +79,6 @@ using grpc::testing::TestService; using grpc::Status; static bool got_sigint = false; -static const char* kRandomFile = "test/cpp/interop/rnd.dat"; const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial"; const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin"; @@ -110,34 +110,41 @@ void MaybeEchoMetadata(ServerContext* context) { } } -bool SetPayload(PayloadType response_type, int size, Payload* payload) { - payload->set_type(response_type); - switch (response_type) { - case PayloadType::COMPRESSABLE: { - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - } break; - case PayloadType::UNCOMPRESSABLE: { - std::unique_ptr<char[]> body(new char[size]()); - std::ifstream rnd_file(kRandomFile); - GPR_ASSERT(rnd_file.good()); - rnd_file.read(body.get(), size); - GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available - payload->set_body(body.get(), size); - } break; - default: - GPR_ASSERT(false); - } +bool SetPayload(int size, Payload* payload) { + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); return true; } -template <typename RequestType> -void SetResponseCompression(ServerContext* context, - const RequestType& request) { - if (request.request_compressed_response()) { - // Any level would do, let's go for HIGH because we are overachievers. - context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); +bool CheckExpectedCompression(const ServerContext& context, + const bool compression_expected) { + const InteropServerContextInspector inspector(context); + const grpc_compression_algorithm received_compression = + inspector.GetCallCompressionAlgorithm(); + + if (compression_expected) { + if (received_compression == GRPC_COMPRESS_NONE) { + // Expected some compression, got NONE. This is an error. + gpr_log(GPR_ERROR, + "Expected compression but got uncompressed request from client."); + return false; + } + if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) { + gpr_log(GPR_ERROR, + "Failure: Requested compression in a compressable request, but " + "compression bit in message flags not set."); + return false; + } + } else { + // Didn't expect compression -> make sure the request is uncompressed + if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) { + gpr_log(GPR_ERROR, + "Failure: Didn't requested compression, but compression bit in " + "message flags set."); + return false; + } } + return true; } class TestServiceImpl : public TestService::Service { @@ -151,11 +158,26 @@ class TestServiceImpl : public TestService::Service { Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) { MaybeEchoMetadata(context); - SetResponseCompression(context, *request); + if (request->has_response_compressed()) { + const bool compression_requested = request->response_compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (compression_requested) { + // Any level would do, let's go for HIGH because we are overachievers. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + } else { + context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE); + } + } + if (!CheckExpectedCompression(*context, + request->expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + if (!SetPayload(request->response_size(), response->mutable_payload())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); } } @@ -171,15 +193,26 @@ class TestServiceImpl : public TestService::Service { Status StreamingOutputCall( ServerContext* context, const StreamingOutputCallRequest* request, ServerWriter<StreamingOutputCallResponse>* writer) { - SetResponseCompression(context, *request); StreamingOutputCallResponse response; bool write_success = true; for (int i = 0; write_success && i < request->response_parameters_size(); i++) { - if (!SetPayload(request->response_type(), - request->response_parameters(i).size(), + if (!SetPayload(request->response_parameters(i).size(), response.mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); + } + WriteOptions wopts; + if (request->response_parameters(i).has_compressed()) { + // Compress by default. Disabled on a per-message basis. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + const bool compression_requested = + request->response_parameters(i).compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (!compression_requested) { + wopts.set_no_compression(); + } // else, compression is already enabled via the context. } int time_us; if ((time_us = request->response_parameters(i).interval_us()) > 0) { @@ -189,7 +222,7 @@ class TestServiceImpl : public TestService::Service { gpr_time_from_micros(time_us, GPR_TIMESPAN)); gpr_sleep_until(sleep_time); } - write_success = writer->Write(response); + write_success = writer->Write(response, wopts); } if (write_success) { return Status::OK; @@ -204,6 +237,11 @@ class TestServiceImpl : public TestService::Service { StreamingInputCallRequest request; int aggregated_payload_size = 0; while (reader->Read(&request)) { + if (!CheckExpectedCompression(*context, + request.expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request.has_payload()) { aggregated_payload_size += request.payload().body().size(); } @@ -221,7 +259,6 @@ class TestServiceImpl : public TestService::Service { StreamingOutputCallResponse response; bool write_success = true; while (write_success && stream->Read(&request)) { - SetResponseCompression(context, request); if (request.response_parameters_size() != 0) { response.mutable_payload()->set_type(request.payload().type()); response.mutable_payload()->set_body( diff --git a/test/cpp/interop/rnd.dat b/test/cpp/interop/rnd.dat Binary files differdeleted file mode 100644 index 8c7f38f9e0..0000000000 --- a/test/cpp/interop/rnd.dat +++ /dev/null diff --git a/test/cpp/interop/server_helper.cc b/test/cpp/interop/server_helper.cc index c6d891ad71..8b0b511bcb 100644 --- a/test/cpp/interop/server_helper.cc +++ b/test/cpp/interop/server_helper.cc @@ -72,6 +72,10 @@ uint32_t InteropServerContextInspector::GetEncodingsAcceptedByClient() const { return grpc_call_test_only_get_encodings_accepted_by_peer(context_.call_); } +uint32_t InteropServerContextInspector::GetMessageFlags() const { + return grpc_call_test_only_get_message_flags(context_.call_); +} + std::shared_ptr<const AuthContext> InteropServerContextInspector::GetAuthContext() const { return context_.auth_context(); diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 12865e4032..a1da14a4c8 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -54,6 +54,7 @@ class InteropServerContextInspector { bool IsCancelled() const; grpc_compression_algorithm GetCallCompressionAlgorithm() const; uint32_t GetEncodingsAcceptedByClient() const; + uint32_t GetMessageFlags() const; private: const ::grpc::ServerContext& context_; diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index aa95682e74..1d5fc80cf2 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -138,8 +138,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoLargeUnary(); break; } - case LARGE_COMPRESSED_UNARY: { - is_success = interop_client_->DoLargeCompressedUnary(); + case CLIENT_COMPRESSED_UNARY: { + is_success = interop_client_->DoClientCompressedUnary(); + break; + } + case CLIENT_COMPRESSED_STREAMING: { + is_success = interop_client_->DoClientCompressedStreaming(); break; } case CLIENT_STREAMING: { @@ -150,8 +154,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoResponseStreaming(); break; } + case SERVER_COMPRESSED_UNARY: { + is_success = interop_client_->DoServerCompressedUnary(); + break; + } case SERVER_COMPRESSED_STREAMING: { - is_success = interop_client_->DoResponseCompressedStreaming(); + is_success = interop_client_->DoServerCompressedStreaming(); break; } case SLOW_CONSUMER: { diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index aa93b58b4a..cf6a713473 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -51,29 +51,33 @@ using std::vector; enum TestCaseType { UNKNOWN_TEST = -1, - EMPTY_UNARY = 0, - LARGE_UNARY = 1, - LARGE_COMPRESSED_UNARY = 2, - CLIENT_STREAMING = 3, - SERVER_STREAMING = 4, - SERVER_COMPRESSED_STREAMING = 5, - SLOW_CONSUMER = 6, - HALF_DUPLEX = 7, - PING_PONG = 8, - CANCEL_AFTER_BEGIN = 9, - CANCEL_AFTER_FIRST_RESPONSE = 10, - TIMEOUT_ON_SLEEPING_SERVER = 11, - EMPTY_STREAM = 12, - STATUS_CODE_AND_MESSAGE = 13, - CUSTOM_METADATA = 14 + EMPTY_UNARY, + LARGE_UNARY, + CLIENT_COMPRESSED_UNARY, + CLIENT_COMPRESSED_STREAMING, + CLIENT_STREAMING, + SERVER_STREAMING, + SERVER_COMPRESSED_UNARY, + SERVER_COMPRESSED_STREAMING, + SLOW_CONSUMER, + HALF_DUPLEX, + PING_PONG, + CANCEL_AFTER_BEGIN, + CANCEL_AFTER_FIRST_RESPONSE, + TIMEOUT_ON_SLEEPING_SERVER, + EMPTY_STREAM, + STATUS_CODE_AND_MESSAGE, + CUSTOM_METADATA }; const vector<pair<TestCaseType, grpc::string>> kTestCaseList = { {EMPTY_UNARY, "empty_unary"}, {LARGE_UNARY, "large_unary"}, - {LARGE_COMPRESSED_UNARY, "large_compressed_unary"}, + {CLIENT_COMPRESSED_UNARY, "client_compressed_unary"}, + {CLIENT_COMPRESSED_STREAMING, "client_compressed_streaming"}, {CLIENT_STREAMING, "client_streaming"}, {SERVER_STREAMING, "server_streaming"}, + {SERVER_COMPRESSED_UNARY, "server_compressed_unary"}, {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"}, {SLOW_CONSUMER, "slow_consumer"}, {HALF_DUPLEX, "half_duplex"}, |