aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/interop
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/interop')
-rw-r--r--test/cpp/interop/client_helper.cc3
-rw-r--r--test/cpp/interop/client_helper.h1
-rw-r--r--test/cpp/interop/interop_client.cc124
-rw-r--r--test/cpp/interop/rnd.datbin0 -> 524288 bytes
-rw-r--r--test/cpp/interop/server.cc47
5 files changed, 126 insertions, 49 deletions
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index 9df79bdbb5..cdc04b0bce 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -165,6 +165,9 @@ InteropClientContextInspector::GetCallCompressionAlgorithm() const {
return grpc_call_get_compression_algorithm(context_.call_);
}
+gpr_uint32 InteropClientContextInspector::GetMessageFlags() const {
+ return grpc_call_get_message_flags(context_.call_);
+}
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h
index fb8a6644e4..1c7036d25d 100644
--- a/test/cpp/interop/client_helper.h
+++ b/test/cpp/interop/client_helper.h
@@ -61,6 +61,7 @@ class InteropClientContextInspector {
// Inspector methods, able to peek inside ClientContext, follow.
grpc_compression_algorithm GetCallCompressionAlgorithm() const;
+ gpr_uint32 GetMessageFlags() const;
private:
const ::grpc::ClientContext& context_;
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index 3a28c704b5..a0770a9f35 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -33,12 +33,14 @@
#include "test/cpp/interop/interop_client.h"
+#include <fstream>
#include <memory>
#include <unistd.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/status.h>
@@ -48,10 +50,13 @@
#include "test/proto/test.grpc.pb.h"
#include "test/proto/empty.grpc.pb.h"
#include "test/proto/messages.grpc.pb.h"
+#include "src/core/transport/stream_op.h"
namespace grpc {
namespace testing {
+static const char* kRandomFile = "test/cpp/interop/rnd.dat";
+
namespace {
// The same value is defined by the Java client.
const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
@@ -102,13 +107,40 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
Status s = stub->UnaryCall(&context, *request, response);
+ // Compression related checks.
GPR_ASSERT(request->response_compression() ==
GetInteropCompressionTypeFromCompressionAlgorithm(
inspector.GetCallCompressionAlgorithm()));
+ if (request->response_compression() == NONE) {
+ GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
+ } else if (request->response_type() == PayloadType::COMPRESSABLE) {
+ // requested compression and compressable response => results should always
+ // be compressed.
+ GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
+ }
+
AssertOkOrPrintErrorStatus(s);
- GPR_ASSERT(response->payload().type() == request->response_type());
- GPR_ASSERT(response->payload().body() ==
- grpc::string(kLargeResponseSize, '\0'));
+
+ // Payload related checks.
+ if (request->response_type() != PayloadType::RANDOM) {
+ GPR_ASSERT(response->payload().type() == request->response_type());
+ }
+ switch (response->payload().type()) {
+ case PayloadType::COMPRESSABLE:
+ GPR_ASSERT(response->payload().body() ==
+ grpc::string(kLargeResponseSize, '\0'));
+ break;
+ case PayloadType::UNCOMPRESSABLE: {
+ std::ifstream rnd_file(kRandomFile);
+ GPR_ASSERT(rnd_file.good());
+ for (int i = 0; i < kLargeResponseSize; i++) {
+ GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
+ }
+ }
+ break;
+ default:
+ GPR_ASSERT(false);
+ }
}
void InteropClient::DoComputeEngineCreds(
@@ -190,13 +222,19 @@ void InteropClient::DoLargeUnary() {
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (const auto payload_type : payload_types) {
for (const auto compression_type : compression_types) {
- gpr_log(GPR_INFO, "Sending a large unary rpc...");
+ char* log_suffix;
+ gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
+ CompressionType_Name(compression_type).c_str(),
+ PayloadType_Name(payload_type).c_str());
+
+ gpr_log(GPR_INFO, "Sending a large unary rpc %s.", log_suffix);
SimpleRequest request;
SimpleResponse response;
request.set_response_type(payload_type);
request.set_response_compression(compression_type);
PerformLargeUnary(&request, &response);
- gpr_log(GPR_INFO, "Large unary done.");
+ gpr_log(GPR_INFO, "Large unary done %s.", log_suffix);
+ gpr_free(log_suffix);
}
}
}
@@ -228,34 +266,66 @@ void InteropClient::DoRequestStreaming() {
}
void InteropClient::DoResponseStreaming() {
- gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
- ClientContext context;
- StreamingOutputCallRequest request;
- request.set_response_type(PayloadType::COMPRESSABLE);
- request.set_response_compression(CompressionType::GZIP);
+ const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
+ const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
+ for (const auto payload_type : payload_types) {
+ for (const auto compression_type : compression_types) {
+ ClientContext context;
+ InteropClientContextInspector inspector(context);
+ StreamingOutputCallRequest request;
- for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
- ResponseParameters* response_parameter = request.add_response_parameters();
- response_parameter->set_size(response_stream_sizes[i]);
- }
- StreamingOutputCallResponse response;
+ char* log_suffix;
+ gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
+ CompressionType_Name(compression_type).c_str(),
+ PayloadType_Name(payload_type).c_str());
- std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
- stub->StreamingOutputCall(&context, request));
+ gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
- unsigned int i = 0;
- while (stream->Read(&response)) {
- GPR_ASSERT(response.payload().body() ==
- grpc::string(response_stream_sizes[i], '\0'));
- ++i;
- }
- GPR_ASSERT(response_stream_sizes.size() == i);
- Status s = stream->Finish();
+ request.set_response_type(payload_type);
+ request.set_response_compression(compression_type);
- AssertOkOrPrintErrorStatus(s);
- gpr_log(GPR_INFO, "Response streaming done.");
+ for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
+ ResponseParameters* response_parameter =
+ request.add_response_parameters();
+ response_parameter->set_size(response_stream_sizes[i]);
+ }
+ StreamingOutputCallResponse response;
+
+ std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
+ stub->StreamingOutputCall(&context, request));
+
+ unsigned int i = 0;
+ while (stream->Read(&response)) {
+ GPR_ASSERT(response.payload().body() ==
+ grpc::string(response_stream_sizes[i], '\0'));
+
+ // Compression related checks.
+ GPR_ASSERT(request.response_compression() ==
+ GetInteropCompressionTypeFromCompressionAlgorithm(
+ inspector.GetCallCompressionAlgorithm()));
+ if (request.response_compression() == NONE) {
+ GPR_ASSERT(
+ !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
+ } else if (request.response_type() == PayloadType::COMPRESSABLE) {
+ // requested compression and compressable response => results should
+ // always be compressed.
+ GPR_ASSERT(inspector.GetMessageFlags() &
+ GRPC_WRITE_INTERNAL_COMPRESS);
+ }
+
+ ++i;
+ }
+
+ GPR_ASSERT(response_stream_sizes.size() == i);
+ Status s = stream->Finish();
+
+ AssertOkOrPrintErrorStatus(s);
+ gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix);
+ gpr_free(log_suffix);
+ }
+ }
}
void InteropClient::DoResponseStreamingWithSlowConsumer() {
diff --git a/test/cpp/interop/rnd.dat b/test/cpp/interop/rnd.dat
new file mode 100644
index 0000000000..8c7f38f9e0
--- /dev/null
+++ b/test/cpp/interop/rnd.dat
Binary files differ
diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc
index 55df82b567..32c60aff44 100644
--- a/test/cpp/interop/server.cc
+++ b/test/cpp/interop/server.cc
@@ -31,6 +31,7 @@
*
*/
+#include <fstream>
#include <memory>
#include <sstream>
#include <thread>
@@ -48,6 +49,7 @@
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
+
#include "test/proto/test.grpc.pb.h"
#include "test/proto/empty.grpc.pb.h"
#include "test/proto/messages.grpc.pb.h"
@@ -77,31 +79,32 @@ using grpc::testing::TestService;
using grpc::Status;
static bool got_sigint = false;
+static const char* kRandomFile = "test/cpp/interop/rnd.dat";
bool SetPayload(PayloadType type, int size, Payload* payload) {
- PayloadType response_type = type;
+ PayloadType response_type;
+ if (type == PayloadType::RANDOM) {
+ response_type =
+ rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE;
+ } else {
+ response_type = type;
+ }
payload->set_type(response_type);
- switch (type) {
- case PayloadType::COMPRESSABLE:
- {
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- }
- break;
- case PayloadType::UNCOMPRESSABLE:
- {
- // XXX
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- }
- break;
- case PayloadType::RANDOM:
- {
- // XXX
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- }
- break;
+ switch (response_type) {
+ case PayloadType::COMPRESSABLE: {
+ std::unique_ptr<char[]> body(new char[size]());
+ payload->set_body(body.get(), size);
+ } break;
+ case PayloadType::UNCOMPRESSABLE: {
+ std::unique_ptr<char[]> body(new char[size]());
+ std::ifstream rnd_file(kRandomFile);
+ GPR_ASSERT(rnd_file.good());
+ rnd_file.read(body.get(), size);
+ GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available
+ payload->set_body(body.get(), size);
+ } break;
+ default:
+ GPR_ASSERT(false);
}
return true;
}