diff options
Diffstat (limited to 'test/cpp/interop/interop_client.cc')
-rw-r--r-- | test/cpp/interop/interop_client.cc | 179 |
1 files changed, 167 insertions, 12 deletions
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 066877e0c6..ca13cdc53d 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -33,17 +33,20 @@ #include "test/cpp/interop/interop_client.h" -#include <memory> - #include <unistd.h> +#include <fstream> +#include <memory> + #include <grpc/grpc.h> #include <grpc/support/log.h> -#include <grpc++/channel_interface.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> +#include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/credentials.h> -#include <grpc++/status.h> -#include <grpc++/stream.h> + +#include "src/core/transport/stream_op.h" #include "test/cpp/interop/client_helper.h" #include "test/proto/test.grpc.pb.h" #include "test/proto/empty.grpc.pb.h" @@ -52,6 +55,8 @@ namespace grpc { namespace testing { +static const char* kRandomFile = "test/cpp/interop/rnd.dat"; + namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; @@ -61,9 +66,23 @@ const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; const int kLargeRequestSize = 271828; const int kLargeResponseSize = 314159; + +CompressionType GetInteropCompressionTypeFromCompressionAlgorithm( + grpc_compression_algorithm algorithm) { + switch (algorithm) { + case GRPC_COMPRESS_NONE: + return CompressionType::NONE; + case GRPC_COMPRESS_GZIP: + return CompressionType::GZIP; + case GRPC_COMPRESS_DEFLATE: + return CompressionType::DEFLATE; + default: + GPR_ASSERT(false); + } +} } // namespace -InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel) +InteropClient::InteropClient(std::shared_ptr<Channel> channel) : channel_(channel) {} void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) { @@ -95,17 +114,48 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); ClientContext context; - request->set_response_type(PayloadType::COMPRESSABLE); + InteropClientContextInspector inspector(context); + // If the request doesn't already specify the response type, default to + // COMPRESSABLE. request->set_response_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); Status s = stub->UnaryCall(&context, *request, response); + // Compression related checks. + GPR_ASSERT(request->response_compression() == + GetInteropCompressionTypeFromCompressionAlgorithm( + inspector.GetCallCompressionAlgorithm())); + if (request->response_compression() == NONE) { + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + } else if (request->response_type() == PayloadType::COMPRESSABLE) { + // requested compression and compressable response => results should always + // be compressed. + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } + AssertOkOrPrintErrorStatus(s); - GPR_ASSERT(response->payload().type() == PayloadType::COMPRESSABLE); - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); + + // Payload related checks. + if (request->response_type() != PayloadType::RANDOM) { + GPR_ASSERT(response->payload().type() == request->response_type()); + } + switch (response->payload().type()) { + case PayloadType::COMPRESSABLE: + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); + break; + case PayloadType::UNCOMPRESSABLE: { + std::ifstream rnd_file(kRandomFile); + GPR_ASSERT(rnd_file.good()); + for (int i = 0; i < kLargeResponseSize; i++) { + GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get()); + } + } break; + default: + GPR_ASSERT(false); + } } void InteropClient::DoComputeEngineCreds( @@ -117,6 +167,7 @@ void InteropClient::DoComputeEngineCreds( SimpleResponse response; request.set_fill_username(true); request.set_fill_oauth_scope(true); + request.set_response_type(PayloadType::COMPRESSABLE); PerformLargeUnary(&request, &response); gpr_log(GPR_INFO, "Got username %s", response.username().c_str()); gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str()); @@ -136,6 +187,7 @@ void InteropClient::DoServiceAccountCreds(const grpc::string& username, SimpleResponse response; request.set_fill_username(true); request.set_fill_oauth_scope(true); + request.set_response_type(PayloadType::COMPRESSABLE); PerformLargeUnary(&request, &response); GPR_ASSERT(!response.username().empty()); GPR_ASSERT(!response.oauth_scope().empty()); @@ -199,6 +251,7 @@ void InteropClient::DoJwtTokenCreds(const grpc::string& username) { SimpleRequest request; SimpleResponse response; request.set_fill_username(true); + request.set_response_type(PayloadType::COMPRESSABLE); PerformLargeUnary(&request, &response); GPR_ASSERT(!response.username().empty()); GPR_ASSERT(username.find(response.username()) != grpc::string::npos); @@ -209,10 +262,33 @@ void InteropClient::DoLargeUnary() { gpr_log(GPR_INFO, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; + request.set_response_type(PayloadType::COMPRESSABLE); PerformLargeUnary(&request, &response); gpr_log(GPR_INFO, "Large unary done."); } +void InteropClient::DoLargeCompressedUnary() { + const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; + const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { + for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", + CompressionType_Name(compression_types[j]).c_str(), + PayloadType_Name(payload_types[i]).c_str()); + + gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix); + SimpleRequest request; + SimpleResponse response; + request.set_response_type(payload_types[i]); + request.set_response_compression(compression_types[j]); + PerformLargeUnary(&request, &response); + gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix); + gpr_free(log_suffix); + } + } +} + void InteropClient::DoRequestStreaming() { gpr_log(GPR_INFO, "Sending request steaming rpc ..."); std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); @@ -261,11 +337,90 @@ void InteropClient::DoResponseStreaming() { } GPR_ASSERT(response_stream_sizes.size() == i); Status s = stream->Finish(); - AssertOkOrPrintErrorStatus(s); gpr_log(GPR_INFO, "Response streaming done."); } +void InteropClient::DoResponseCompressedStreaming() { + std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); + + const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; + const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { + for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) { + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; + + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", + CompressionType_Name(compression_types[j]).c_str(), + PayloadType_Name(payload_types[i]).c_str()); + + gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix); + + request.set_response_type(payload_types[i]); + request.set_response_compression(compression_types[j]); + + for (size_t k = 0; k < response_stream_sizes.size(); ++k) { + ResponseParameters* response_parameter = + request.add_response_parameters(); + response_parameter->set_size(response_stream_sizes[k]); + } + StreamingOutputCallResponse response; + + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + stub->StreamingOutputCall(&context, request)); + + size_t k = 0; + while (stream->Read(&response)) { + // Payload related checks. + if (request.response_type() != PayloadType::RANDOM) { + GPR_ASSERT(response.payload().type() == request.response_type()); + } + switch (response.payload().type()) { + case PayloadType::COMPRESSABLE: + GPR_ASSERT(response.payload().body() == + grpc::string(response_stream_sizes[k], '\0')); + break; + case PayloadType::UNCOMPRESSABLE: { + std::ifstream rnd_file(kRandomFile); + GPR_ASSERT(rnd_file.good()); + for (int n = 0; n < response_stream_sizes[k]; n++) { + GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get()); + } + } break; + default: + GPR_ASSERT(false); + } + + // Compression related checks. + GPR_ASSERT(request.response_compression() == + GetInteropCompressionTypeFromCompressionAlgorithm( + inspector.GetCallCompressionAlgorithm())); + if (request.response_compression() == NONE) { + GPR_ASSERT( + !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + } else if (request.response_type() == PayloadType::COMPRESSABLE) { + // requested compression and compressable response => results should + // always be compressed. + GPR_ASSERT(inspector.GetMessageFlags() & + GRPC_WRITE_INTERNAL_COMPRESS); + } + + ++k; + } + + GPR_ASSERT(response_stream_sizes.size() == k); + Status s = stream->Finish(); + + AssertOkOrPrintErrorStatus(s); + gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix); + gpr_free(log_suffix); + } + } +} + void InteropClient::DoResponseStreamingWithSlowConsumer() { gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ..."); std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); @@ -427,7 +582,7 @@ void InteropClient::DoStatusWithMessage() { ClientContext context; SimpleRequest request; SimpleResponse response; - EchoStatus *requested_status = request.mutable_response_status(); + EchoStatus* requested_status = request.mutable_response_status(); requested_status->set_code(grpc::StatusCode::UNKNOWN); grpc::string test_msg = "This is a test message"; requested_status->set_message(test_msg); |