diff options
Diffstat (limited to 'test/cpp/interop/interop_client.cc')
-rw-r--r-- | test/cpp/interop/interop_client.cc | 124 |
1 files changed, 97 insertions, 27 deletions
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 3a28c704b5..a0770a9f35 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -33,12 +33,14 @@ #include "test/cpp/interop/interop_client.h" +#include <fstream> #include <memory> #include <unistd.h> #include <grpc/grpc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc++/channel_interface.h> #include <grpc++/client_context.h> #include <grpc++/status.h> @@ -48,10 +50,13 @@ #include "test/proto/test.grpc.pb.h" #include "test/proto/empty.grpc.pb.h" #include "test/proto/messages.grpc.pb.h" +#include "src/core/transport/stream_op.h" namespace grpc { namespace testing { +static const char* kRandomFile = "test/cpp/interop/rnd.dat"; + namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; @@ -102,13 +107,40 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, Status s = stub->UnaryCall(&context, *request, response); + // Compression related checks. GPR_ASSERT(request->response_compression() == GetInteropCompressionTypeFromCompressionAlgorithm( inspector.GetCallCompressionAlgorithm())); + if (request->response_compression() == NONE) { + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + } else if (request->response_type() == PayloadType::COMPRESSABLE) { + // requested compression and compressable response => results should always + // be compressed. + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } + AssertOkOrPrintErrorStatus(s); - GPR_ASSERT(response->payload().type() == request->response_type()); - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); + + // Payload related checks. + if (request->response_type() != PayloadType::RANDOM) { + GPR_ASSERT(response->payload().type() == request->response_type()); + } + switch (response->payload().type()) { + case PayloadType::COMPRESSABLE: + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); + break; + case PayloadType::UNCOMPRESSABLE: { + std::ifstream rnd_file(kRandomFile); + GPR_ASSERT(rnd_file.good()); + for (int i = 0; i < kLargeResponseSize; i++) { + GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get()); + } + } + break; + default: + GPR_ASSERT(false); + } } void InteropClient::DoComputeEngineCreds( @@ -190,13 +222,19 @@ void InteropClient::DoLargeUnary() { const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; for (const auto payload_type : payload_types) { for (const auto compression_type : compression_types) { - gpr_log(GPR_INFO, "Sending a large unary rpc..."); + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", + CompressionType_Name(compression_type).c_str(), + PayloadType_Name(payload_type).c_str()); + + gpr_log(GPR_INFO, "Sending a large unary rpc %s.", log_suffix); SimpleRequest request; SimpleResponse response; request.set_response_type(payload_type); request.set_response_compression(compression_type); PerformLargeUnary(&request, &response); - gpr_log(GPR_INFO, "Large unary done."); + gpr_log(GPR_INFO, "Large unary done %s.", log_suffix); + gpr_free(log_suffix); } } } @@ -228,34 +266,66 @@ void InteropClient::DoRequestStreaming() { } void InteropClient::DoResponseStreaming() { - gpr_log(GPR_INFO, "Receiving response steaming rpc ..."); std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); - ClientContext context; - StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); - request.set_response_compression(CompressionType::GZIP); + const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; + const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; + for (const auto payload_type : payload_types) { + for (const auto compression_type : compression_types) { + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; - for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { - ResponseParameters* response_parameter = request.add_response_parameters(); - response_parameter->set_size(response_stream_sizes[i]); - } - StreamingOutputCallResponse response; + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", + CompressionType_Name(compression_type).c_str(), + PayloadType_Name(payload_type).c_str()); - std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( - stub->StreamingOutputCall(&context, request)); + gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix); - unsigned int i = 0; - while (stream->Read(&response)) { - GPR_ASSERT(response.payload().body() == - grpc::string(response_stream_sizes[i], '\0')); - ++i; - } - GPR_ASSERT(response_stream_sizes.size() == i); - Status s = stream->Finish(); + request.set_response_type(payload_type); + request.set_response_compression(compression_type); - AssertOkOrPrintErrorStatus(s); - gpr_log(GPR_INFO, "Response streaming done."); + for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { + ResponseParameters* response_parameter = + request.add_response_parameters(); + response_parameter->set_size(response_stream_sizes[i]); + } + StreamingOutputCallResponse response; + + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + stub->StreamingOutputCall(&context, request)); + + unsigned int i = 0; + while (stream->Read(&response)) { + GPR_ASSERT(response.payload().body() == + grpc::string(response_stream_sizes[i], '\0')); + + // Compression related checks. + GPR_ASSERT(request.response_compression() == + GetInteropCompressionTypeFromCompressionAlgorithm( + inspector.GetCallCompressionAlgorithm())); + if (request.response_compression() == NONE) { + GPR_ASSERT( + !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + } else if (request.response_type() == PayloadType::COMPRESSABLE) { + // requested compression and compressable response => results should + // always be compressed. + GPR_ASSERT(inspector.GetMessageFlags() & + GRPC_WRITE_INTERNAL_COMPRESS); + } + + ++i; + } + + GPR_ASSERT(response_stream_sizes.size() == i); + Status s = stream->Finish(); + + AssertOkOrPrintErrorStatus(s); + gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix); + gpr_free(log_suffix); + } + } } void InteropClient::DoResponseStreamingWithSlowConsumer() { |