diff options
Diffstat (limited to 'test/cpp/interop/interop_client.cc')
-rw-r--r-- | test/cpp/interop/interop_client.cc | 357 |
1 files changed, 205 insertions, 152 deletions
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 608f902d6f..89f841dbe9 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,7 +57,7 @@ namespace testing { namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; -const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979}; +const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979}; const int kNumResponseMessages = 2000; const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; @@ -67,28 +67,23 @@ const int kLargeResponseSize = 314159; void NoopChecks(const InteropClientContextInspector& inspector, const SimpleRequest* request, const SimpleResponse* response) {} -void CompressionChecks(const InteropClientContextInspector& inspector, - const SimpleRequest* request, - const SimpleResponse* response) { +void UnaryCompressionChecks(const InteropClientContextInspector& inspector, + const SimpleRequest* request, + const SimpleResponse* response) { const grpc_compression_algorithm received_compression = inspector.GetCallCompressionAlgorithm(); - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { + if (request->response_compressed().value()) { + if (received_compression == GRPC_COMPRESS_NONE) { // Requested some compression, got NONE. This is an error. gpr_log(GPR_ERROR, "Failure: Requested compression but got uncompressed response " "from server."); abort(); } - } - if (!request->request_compressed_response()) { - GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } else if (request->response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should always - // be compressed. GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // Didn't request compression -> make sure the response is uncompressed + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } } } // namespace @@ -190,11 +185,16 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, CheckerFn custom_checks_fn) { ClientContext context; InteropClientContextInspector inspector(context); - // If the request doesn't already specify the response type, default to - // COMPRESSABLE. request->set_response_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); + if (request->has_expect_compressed()) { + if (request->expect_compressed().value()) { + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + } else { + context.set_compression_algorithm(GRPC_COMPRESS_NONE); + } + } Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); if (!AssertStatusOk(s)) { @@ -204,27 +204,8 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, custom_checks_fn(inspector, request, response); // Payload related checks. - GPR_ASSERT(response->payload().type() == request->response_type()); - switch (response->payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); - break; - case PayloadType::UNCOMPRESSABLE: { - // We don't really check anything: We can't assert that the payload is - // uncompressed because it's the server's prerogative to decide on that, - // and different implementations decide differently (ie, Java always - // compresses when requested to do so, whereas C core throws away the - // compressed payload if the output is larger than the input). - // In addition, we don't compare the actual random bytes received because - // asserting that data is sent/received properly isn't the purpose of this - // test. Moreover, different implementations are also free to use - // different sets of random bytes. - } break; - default: - GPR_ASSERT(false); - } - + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); return true; } @@ -237,7 +218,6 @@ bool InteropClient::DoComputeEngineCreds( SimpleResponse response; request.set_fill_username(true); request.set_fill_oauth_scope(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -311,7 +291,6 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { SimpleRequest request; SimpleResponse response; request.set_fill_username(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -327,7 +306,6 @@ bool InteropClient::DoLargeUnary() { gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; } @@ -335,32 +313,73 @@ bool InteropClient::DoLargeUnary() { return true; } -bool InteropClient::DoLargeCompressedUnary() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.", - log_suffix); - SimpleRequest request; - SimpleResponse response; - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - if (!PerformLargeUnary(&request, &response, CompressionChecks)) { - gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); - gpr_free(log_suffix); - return false; - } - - gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix); +bool InteropClient::DoClientCompressedUnary() { + // Probing for compression-checks support. + ClientContext probe_context; + SimpleRequest probe_req; + SimpleResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + + probe_req.set_response_size(kLargeResponseSize); + probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed unary request."); + const Status s = + serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed unary request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding."); + + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_expect_compressed()->set_value(compressions[i]); + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + return false; + } + + gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + } + + return true; +} + +bool InteropClient::DoServerCompressedUnary() { + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.", + log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_response_compressed()->set_value(compressions[i]); + + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix); gpr_free(log_suffix); + return false; } + + gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix); + gpr_free(log_suffix); } return true; @@ -387,7 +406,7 @@ bool InteropClient::DoRequestStreaming() { serviceStub_.Get()->StreamingInputCall(&context, &response)); int aggregated_payload_size = 0; - for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { + for (size_t i = 0; i < request_stream_sizes.size(); ++i) { Payload* payload = request.mutable_payload(); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); if (!stream->Write(request)) { @@ -396,7 +415,7 @@ bool InteropClient::DoRequestStreaming() { } aggregated_payload_size += request_stream_sizes[i]; } - stream->WritesDone(); + GPR_ASSERT(stream->WritesDone()); Status s = stream->Finish(); if (!AssertStatusOk(s)) { @@ -446,92 +465,129 @@ bool InteropClient::DoResponseStreaming() { return true; } -bool InteropClient::DoResponseCompressedStreaming() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - ClientContext context; - InteropClientContextInspector inspector(context); - StreamingOutputCallRequest request; - - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); - - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - for (size_t k = 0; k < response_stream_sizes.size(); ++k) { - ResponseParameters* response_parameter = - request.add_response_parameters(); - response_parameter->set_size(response_stream_sizes[k]); - } - StreamingOutputCallResponse response; - - std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( - serviceStub_.Get()->StreamingOutputCall(&context, request)); - - size_t k = 0; - while (stream->Read(&response)) { - // Payload related checks. - GPR_ASSERT(response.payload().type() == request.response_type()); - switch (response.payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response.payload().body() == - grpc::string(response_stream_sizes[k], '\0')); - break; - case PayloadType::UNCOMPRESSABLE: - break; - default: - GPR_ASSERT(false); - } - - // Compression related checks. - if (request.request_compressed_response()) { - GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > - GRPC_COMPRESS_NONE); - if (request.response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should - // always be compressed. - GPR_ASSERT(inspector.GetMessageFlags() & - GRPC_WRITE_INTERNAL_COMPRESS); - } - } else { - // requested *no* compression. - GPR_ASSERT( - !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } - - ++k; - } - - gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix); - gpr_free(log_suffix); +bool InteropClient::DoClientCompressedStreaming() { + // Probing for compression-checks support. + ClientContext probe_context; + StreamingInputCallRequest probe_req; + StreamingInputCallResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + probe_req.mutable_payload()->set_body(grpc::string(27182, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request."); + + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream( + serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res)); + + if (!probe_stream->Write(probe_req)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + Status s = probe_stream->Finish(); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed streaming request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, + "Compressed streaming request probe succeeded. Proceeding."); + + ClientContext context; + StreamingInputCallRequest request; + StreamingInputCallResponse response; + + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream( + serviceStub_.Get()->StreamingInputCall(&context, &response)); + + request.mutable_payload()->set_body(grpc::string(27182, '\0')); + request.mutable_expect_compressed()->set_value(true); + gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled"); + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + + WriteOptions wopts; + wopts.set_no_compression(); + request.mutable_payload()->set_body(grpc::string(45904, '\0')); + request.mutable_expect_compressed()->set_value(false); + gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled"); + if (!stream->Write(request, wopts)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + GPR_ASSERT(stream->WritesDone()); + + s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } + + return true; +} - if (k < response_stream_sizes.size()) { - // stream->Read() failed before reading all the expected messages. This - // is most likely due to a connection failure. - gpr_log(GPR_ERROR, - "DoResponseCompressedStreaming(): Responses read (k=%" PRIuPTR - ") is " - "less than the expected messages (i.e " - "response_stream_sizes.size() (%" PRIuPTR ")). (i=%" PRIuPTR - ", j=%" PRIuPTR ")", - k, response_stream_sizes.size(), i, j); - return TransientFailureOrAbort(); - } - - Status s = stream->Finish(); - if (!AssertStatusOk(s)) { - return false; - } +bool InteropClient::DoServerCompressedStreaming() { + const std::vector<bool> compressions = {true, false}; + const std::vector<int> sizes = {31415, 92653}; + + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; + + GPR_ASSERT(compressions.size() == sizes.size()); + for (size_t i = 0; i < sizes.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; size=%d)", + compressions[i] ? "true" : "false", sizes[i]); + + gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix); + gpr_free(log_suffix); + + ResponseParameters* const response_parameter = + request.add_response_parameters(); + response_parameter->mutable_compressed()->set_value(compressions[i]); + response_parameter->set_size(sizes[i]); + } + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + serviceStub_.Get()->StreamingOutputCall(&context, request)); + + size_t k = 0; + StreamingOutputCallResponse response; + while (stream->Read(&response)) { + // Payload size checks. + GPR_ASSERT(response.payload().body() == + grpc::string(request.response_parameters(k).size(), '\0')); + + // Compression checks. + GPR_ASSERT(request.response_parameters(k).has_compressed()); + if (request.response_parameters(k).compressed().value()) { + GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE); + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // requested *no* compression. + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } + ++k; + } + + if (k < sizes.size()) { + // stream->Read() failed before reading all the expected messages. This + // is most likely due to a connection failure. + gpr_log(GPR_ERROR, "%s(): Responses read (k=%" PRIuPTR + ") is " + "less than the expected messages (i.e " + "response_stream_sizes.size() (%" PRIuPTR ")).", + __func__, k, response_stream_sizes.size()); + return TransientFailureOrAbort(); } + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } return true; } @@ -632,7 +688,6 @@ bool InteropClient::DoPingPong() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); Payload* payload = request.mutable_payload(); StreamingOutputCallResponse response; @@ -699,7 +754,6 @@ bool InteropClient::DoCancelAfterFirstResponse() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(31415); request.mutable_payload()->set_body(grpc::string(27182, '\0')); @@ -839,7 +893,6 @@ bool InteropClient::DoCustomMetadata() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); |