diff options
Diffstat (limited to 'test/cpp/interop')
-rw-r--r-- | test/cpp/interop/client_helper.cc | 3 | ||||
-rw-r--r-- | test/cpp/interop/client_helper.h | 1 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.cc | 124 | ||||
-rw-r--r-- | test/cpp/interop/rnd.dat | bin | 0 -> 524288 bytes | |||
-rw-r--r-- | test/cpp/interop/server.cc | 47 |
5 files changed, 126 insertions, 49 deletions
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc index 9df79bdbb5..cdc04b0bce 100644 --- a/test/cpp/interop/client_helper.cc +++ b/test/cpp/interop/client_helper.cc @@ -165,6 +165,9 @@ InteropClientContextInspector::GetCallCompressionAlgorithm() const { return grpc_call_get_compression_algorithm(context_.call_); } +gpr_uint32 InteropClientContextInspector::GetMessageFlags() const { + return grpc_call_get_message_flags(context_.call_); +} } // namespace testing } // namespace grpc diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h index fb8a6644e4..1c7036d25d 100644 --- a/test/cpp/interop/client_helper.h +++ b/test/cpp/interop/client_helper.h @@ -61,6 +61,7 @@ class InteropClientContextInspector { // Inspector methods, able to peek inside ClientContext, follow. grpc_compression_algorithm GetCallCompressionAlgorithm() const; + gpr_uint32 GetMessageFlags() const; private: const ::grpc::ClientContext& context_; diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 3a28c704b5..a0770a9f35 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -33,12 +33,14 @@ #include "test/cpp/interop/interop_client.h" +#include <fstream> #include <memory> #include <unistd.h> #include <grpc/grpc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc++/channel_interface.h> #include <grpc++/client_context.h> #include <grpc++/status.h> @@ -48,10 +50,13 @@ #include "test/proto/test.grpc.pb.h" #include "test/proto/empty.grpc.pb.h" #include "test/proto/messages.grpc.pb.h" +#include "src/core/transport/stream_op.h" namespace grpc { namespace testing { +static const char* kRandomFile = "test/cpp/interop/rnd.dat"; + namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; @@ -102,13 +107,40 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, Status s = stub->UnaryCall(&context, *request, response); + // Compression related checks. GPR_ASSERT(request->response_compression() == GetInteropCompressionTypeFromCompressionAlgorithm( inspector.GetCallCompressionAlgorithm())); + if (request->response_compression() == NONE) { + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + } else if (request->response_type() == PayloadType::COMPRESSABLE) { + // requested compression and compressable response => results should always + // be compressed. + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } + AssertOkOrPrintErrorStatus(s); - GPR_ASSERT(response->payload().type() == request->response_type()); - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); + + // Payload related checks. + if (request->response_type() != PayloadType::RANDOM) { + GPR_ASSERT(response->payload().type() == request->response_type()); + } + switch (response->payload().type()) { + case PayloadType::COMPRESSABLE: + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); + break; + case PayloadType::UNCOMPRESSABLE: { + std::ifstream rnd_file(kRandomFile); + GPR_ASSERT(rnd_file.good()); + for (int i = 0; i < kLargeResponseSize; i++) { + GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get()); + } + } + break; + default: + GPR_ASSERT(false); + } } void InteropClient::DoComputeEngineCreds( @@ -190,13 +222,19 @@ void InteropClient::DoLargeUnary() { const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; for (const auto payload_type : payload_types) { for (const auto compression_type : compression_types) { - gpr_log(GPR_INFO, "Sending a large unary rpc..."); + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", + CompressionType_Name(compression_type).c_str(), + PayloadType_Name(payload_type).c_str()); + + gpr_log(GPR_INFO, "Sending a large unary rpc %s.", log_suffix); SimpleRequest request; SimpleResponse response; request.set_response_type(payload_type); request.set_response_compression(compression_type); PerformLargeUnary(&request, &response); - gpr_log(GPR_INFO, "Large unary done."); + gpr_log(GPR_INFO, "Large unary done %s.", log_suffix); + gpr_free(log_suffix); } } } @@ -228,34 +266,66 @@ void InteropClient::DoRequestStreaming() { } void InteropClient::DoResponseStreaming() { - gpr_log(GPR_INFO, "Receiving response steaming rpc ..."); std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); - ClientContext context; - StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); - request.set_response_compression(CompressionType::GZIP); + const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; + const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; + for (const auto payload_type : payload_types) { + for (const auto compression_type : compression_types) { + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; - for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { - ResponseParameters* response_parameter = request.add_response_parameters(); - response_parameter->set_size(response_stream_sizes[i]); - } - StreamingOutputCallResponse response; + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", + CompressionType_Name(compression_type).c_str(), + PayloadType_Name(payload_type).c_str()); - std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( - stub->StreamingOutputCall(&context, request)); + gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix); - unsigned int i = 0; - while (stream->Read(&response)) { - GPR_ASSERT(response.payload().body() == - grpc::string(response_stream_sizes[i], '\0')); - ++i; - } - GPR_ASSERT(response_stream_sizes.size() == i); - Status s = stream->Finish(); + request.set_response_type(payload_type); + request.set_response_compression(compression_type); - AssertOkOrPrintErrorStatus(s); - gpr_log(GPR_INFO, "Response streaming done."); + for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { + ResponseParameters* response_parameter = + request.add_response_parameters(); + response_parameter->set_size(response_stream_sizes[i]); + } + StreamingOutputCallResponse response; + + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + stub->StreamingOutputCall(&context, request)); + + unsigned int i = 0; + while (stream->Read(&response)) { + GPR_ASSERT(response.payload().body() == + grpc::string(response_stream_sizes[i], '\0')); + + // Compression related checks. + GPR_ASSERT(request.response_compression() == + GetInteropCompressionTypeFromCompressionAlgorithm( + inspector.GetCallCompressionAlgorithm())); + if (request.response_compression() == NONE) { + GPR_ASSERT( + !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + } else if (request.response_type() == PayloadType::COMPRESSABLE) { + // requested compression and compressable response => results should + // always be compressed. + GPR_ASSERT(inspector.GetMessageFlags() & + GRPC_WRITE_INTERNAL_COMPRESS); + } + + ++i; + } + + GPR_ASSERT(response_stream_sizes.size() == i); + Status s = stream->Finish(); + + AssertOkOrPrintErrorStatus(s); + gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix); + gpr_free(log_suffix); + } + } } void InteropClient::DoResponseStreamingWithSlowConsumer() { diff --git a/test/cpp/interop/rnd.dat b/test/cpp/interop/rnd.dat Binary files differnew file mode 100644 index 0000000000..8c7f38f9e0 --- /dev/null +++ b/test/cpp/interop/rnd.dat diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc index 55df82b567..32c60aff44 100644 --- a/test/cpp/interop/server.cc +++ b/test/cpp/interop/server.cc @@ -31,6 +31,7 @@ * */ +#include <fstream> #include <memory> #include <sstream> #include <thread> @@ -48,6 +49,7 @@ #include <grpc++/server_credentials.h> #include <grpc++/status.h> #include <grpc++/stream.h> + #include "test/proto/test.grpc.pb.h" #include "test/proto/empty.grpc.pb.h" #include "test/proto/messages.grpc.pb.h" @@ -77,31 +79,32 @@ using grpc::testing::TestService; using grpc::Status; static bool got_sigint = false; +static const char* kRandomFile = "test/cpp/interop/rnd.dat"; bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; + PayloadType response_type; + if (type == PayloadType::RANDOM) { + response_type = + rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE; + } else { + response_type = type; + } payload->set_type(response_type); - switch (type) { - case PayloadType::COMPRESSABLE: - { - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - } - break; - case PayloadType::UNCOMPRESSABLE: - { - // XXX - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - } - break; - case PayloadType::RANDOM: - { - // XXX - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - } - break; + switch (response_type) { + case PayloadType::COMPRESSABLE: { + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); + } break; + case PayloadType::UNCOMPRESSABLE: { + std::unique_ptr<char[]> body(new char[size]()); + std::ifstream rnd_file(kRandomFile); + GPR_ASSERT(rnd_file.good()); + rnd_file.read(body.get(), size); + GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available + payload->set_body(body.get(), size); + } break; + default: + GPR_ASSERT(false); } return true; } |