aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/interop
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/interop')
-rw-r--r--test/cpp/interop/client.cc25
-rw-r--r--test/cpp/interop/client_helper.cc11
-rw-r--r--test/cpp/interop/client_helper.h25
-rw-r--r--test/cpp/interop/interop_client.cc182
-rw-r--r--test/cpp/interop/interop_client.h12
-rw-r--r--test/cpp/interop/interop_test.cc11
-rw-r--r--test/cpp/interop/reconnect_interop_client.cc7
-rw-r--r--test/cpp/interop/reconnect_interop_server.cc9
-rw-r--r--test/cpp/interop/rnd.datbin0 -> 524288 bytes
-rw-r--r--test/cpp/interop/server.cc85
-rw-r--r--test/cpp/interop/server_helper.cc22
-rw-r--r--test/cpp/interop/server_helper.h7
12 files changed, 315 insertions, 81 deletions
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index ebc5cfc85a..cb5232153b 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -38,10 +38,9 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <gflags/gflags.h>
-#include <grpc++/channel_interface.h>
+#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/status.h>
-#include <grpc++/stream.h>
+
#include "test/cpp/interop/client_helper.h"
#include "test/cpp/interop/interop_client.h"
#include "test/cpp/util/test_config.h"
@@ -56,8 +55,12 @@ DEFINE_string(test_case, "large_unary",
"Configure different test cases. Valid options are: "
"empty_unary : empty (zero bytes) request and response; "
"large_unary : single request and (large) response; "
+ "large_compressed_unary : single request and compressed (large) "
+ "response; "
"client_streaming : request streaming with single response; "
"server_streaming : single request with response streaming; "
+ "server_compressed_streaming : single request with compressed "
+ "response streaming; "
"slow_consumer : single request with response; "
" streaming with slow client consumer; "
"half_duplex : half-duplex streaming; "
@@ -70,7 +73,7 @@ DEFINE_string(test_case, "large_unary",
"jwt_token_creds: large_unary with JWT token auth; "
"oauth2_auth_token: raw oauth2 access token auth; "
"per_rpc_creds: raw oauth2 access token on a single rpc; "
- "status_code_and_message: verify status code & message; "
+ "status_code_and_message: verify status code & message; "
"all : all of above.");
DEFINE_string(default_service_account, "",
"Email of GCE default service account");
@@ -91,10 +94,14 @@ int main(int argc, char** argv) {
client.DoEmpty();
} else if (FLAGS_test_case == "large_unary") {
client.DoLargeUnary();
+ } else if (FLAGS_test_case == "large_compressed_unary") {
+ client.DoLargeCompressedUnary();
} else if (FLAGS_test_case == "client_streaming") {
client.DoRequestStreaming();
} else if (FLAGS_test_case == "server_streaming") {
client.DoResponseStreaming();
+ } else if (FLAGS_test_case == "server_compressed_streaming") {
+ client.DoResponseCompressedStreaming();
} else if (FLAGS_test_case == "slow_consumer") {
client.DoResponseStreamingWithSlowConsumer();
} else if (FLAGS_test_case == "half_duplex") {
@@ -129,6 +136,7 @@ int main(int argc, char** argv) {
client.DoLargeUnary();
client.DoRequestStreaming();
client.DoResponseStreaming();
+ client.DoResponseCompressedStreaming();
client.DoHalfDuplex();
client.DoPingPong();
client.DoCancelAfterBegin();
@@ -148,10 +156,11 @@ int main(int argc, char** argv) {
gpr_log(
GPR_ERROR,
"Unsupported test case %s. Valid options are all|empty_unary|"
- "large_unary|client_streaming|server_streaming|half_duplex|ping_pong|"
- "cancel_after_begin|cancel_after_first_response|"
- "timeout_on_sleeping_server|service_account_creds|compute_engine_creds|"
- "jwt_token_creds|oauth2_auth_token|per_rpc_creds",
+ "large_unary|large_compressed_unary|client_streaming|server_streaming|"
+ "server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|"
+ "cancel_after_first_response|timeout_on_sleeping_server|"
+ "service_account_creds|compute_engine_creds|jwt_token_creds|"
+ "oauth2_auth_token|per_rpc_creds",
FLAGS_test_case.c_str());
ret = 1;
}
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index 73d82f7b88..abc14aeb98 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -33,21 +33,20 @@
#include "test/cpp/interop/client_helper.h"
+#include <unistd.h>
+
#include <fstream>
#include <memory>
#include <sstream>
-#include <unistd.h>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <gflags/gflags.h>
-#include <grpc++/channel_arguments.h>
-#include <grpc++/channel_interface.h>
+#include <grpc++/channel.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
-#include <grpc++/stream.h>
+
#include "src/cpp/client/secure_credentials.h"
#include "test/core/security/oauth2_utils.h"
#include "test/cpp/util/create_test_channel.h"
@@ -100,7 +99,7 @@ grpc::string GetOauth2AccessToken() {
return access_token;
}
-std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
+std::shared_ptr<Channel> CreateChannelForTestCase(
const grpc::string& test_case) {
GPR_ASSERT(FLAGS_server_port);
const int host_port_buf_size = 1024;
diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h
index c4361bb9de..92d5078f48 100644
--- a/test/cpp/interop/client_helper.h
+++ b/test/cpp/interop/client_helper.h
@@ -36,8 +36,9 @@
#include <memory>
-#include <grpc++/config.h>
-#include <grpc++/channel_interface.h>
+#include <grpc++/channel.h>
+
+#include "src/core/surface/call.h"
namespace grpc {
namespace testing {
@@ -46,9 +47,27 @@ grpc::string GetServiceAccountJsonKey();
grpc::string GetOauth2AccessToken();
-std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
+std::shared_ptr<Channel> CreateChannelForTestCase(
const grpc::string& test_case);
+class InteropClientContextInspector {
+ public:
+ InteropClientContextInspector(const ::grpc::ClientContext& context)
+ : context_(context) {}
+
+ // Inspector methods, able to peek inside ClientContext, follow.
+ grpc_compression_algorithm GetCallCompressionAlgorithm() const {
+ return grpc_call_get_compression_algorithm(context_.call_);
+ }
+
+ gpr_uint32 GetMessageFlags() const {
+ return grpc_call_get_message_flags(context_.call_);
+ }
+
+ private:
+ const ::grpc::ClientContext& context_;
+};
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index dfb90fadc2..ca13cdc53d 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -33,17 +33,20 @@
#include "test/cpp/interop/interop_client.h"
-#include <memory>
-
#include <unistd.h>
+#include <fstream>
+#include <memory>
+
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-#include <grpc++/channel_interface.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/credentials.h>
-#include <grpc++/status.h>
-#include <grpc++/stream.h>
+
+#include "src/core/transport/stream_op.h"
#include "test/cpp/interop/client_helper.h"
#include "test/proto/test.grpc.pb.h"
#include "test/proto/empty.grpc.pb.h"
@@ -52,6 +55,8 @@
namespace grpc {
namespace testing {
+static const char* kRandomFile = "test/cpp/interop/rnd.dat";
+
namespace {
// The same value is defined by the Java client.
const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
@@ -61,9 +66,23 @@ const int kResponseMessageSize = 1030;
const int kReceiveDelayMilliSeconds = 20;
const int kLargeRequestSize = 271828;
const int kLargeResponseSize = 314159;
+
+CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
+ grpc_compression_algorithm algorithm) {
+ switch (algorithm) {
+ case GRPC_COMPRESS_NONE:
+ return CompressionType::NONE;
+ case GRPC_COMPRESS_GZIP:
+ return CompressionType::GZIP;
+ case GRPC_COMPRESS_DEFLATE:
+ return CompressionType::DEFLATE;
+ default:
+ GPR_ASSERT(false);
+ }
+}
} // namespace
-InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel)
+InteropClient::InteropClient(std::shared_ptr<Channel> channel)
: channel_(channel) {}
void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
@@ -95,17 +114,48 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
ClientContext context;
- request->set_response_type(PayloadType::COMPRESSABLE);
+ InteropClientContextInspector inspector(context);
+ // If the request doesn't already specify the response type, default to
+ // COMPRESSABLE.
request->set_response_size(kLargeResponseSize);
grpc::string payload(kLargeRequestSize, '\0');
request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = stub->UnaryCall(&context, *request, response);
+ // Compression related checks.
+ GPR_ASSERT(request->response_compression() ==
+ GetInteropCompressionTypeFromCompressionAlgorithm(
+ inspector.GetCallCompressionAlgorithm()));
+ if (request->response_compression() == NONE) {
+ GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
+ } else if (request->response_type() == PayloadType::COMPRESSABLE) {
+ // requested compression and compressable response => results should always
+ // be compressed.
+ GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
+ }
+
AssertOkOrPrintErrorStatus(s);
- GPR_ASSERT(response->payload().type() == PayloadType::COMPRESSABLE);
- GPR_ASSERT(response->payload().body() ==
- grpc::string(kLargeResponseSize, '\0'));
+
+ // Payload related checks.
+ if (request->response_type() != PayloadType::RANDOM) {
+ GPR_ASSERT(response->payload().type() == request->response_type());
+ }
+ switch (response->payload().type()) {
+ case PayloadType::COMPRESSABLE:
+ GPR_ASSERT(response->payload().body() ==
+ grpc::string(kLargeResponseSize, '\0'));
+ break;
+ case PayloadType::UNCOMPRESSABLE: {
+ std::ifstream rnd_file(kRandomFile);
+ GPR_ASSERT(rnd_file.good());
+ for (int i = 0; i < kLargeResponseSize; i++) {
+ GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
+ }
+ } break;
+ default:
+ GPR_ASSERT(false);
+ }
}
void InteropClient::DoComputeEngineCreds(
@@ -117,6 +167,7 @@ void InteropClient::DoComputeEngineCreds(
SimpleResponse response;
request.set_fill_username(true);
request.set_fill_oauth_scope(true);
+ request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
@@ -136,6 +187,7 @@ void InteropClient::DoServiceAccountCreds(const grpc::string& username,
SimpleResponse response;
request.set_fill_username(true);
request.set_fill_oauth_scope(true);
+ request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(!response.oauth_scope().empty());
@@ -199,6 +251,7 @@ void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
SimpleRequest request;
SimpleResponse response;
request.set_fill_username(true);
+ request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
@@ -209,10 +262,33 @@ void InteropClient::DoLargeUnary() {
gpr_log(GPR_INFO, "Sending a large unary rpc...");
SimpleRequest request;
SimpleResponse response;
+ request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Large unary done.");
}
+void InteropClient::DoLargeCompressedUnary() {
+ const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
+ const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
+ for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
+ char* log_suffix;
+ gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
+ CompressionType_Name(compression_types[j]).c_str(),
+ PayloadType_Name(payload_types[i]).c_str());
+
+ gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix);
+ SimpleRequest request;
+ SimpleResponse response;
+ request.set_response_type(payload_types[i]);
+ request.set_response_compression(compression_types[j]);
+ PerformLargeUnary(&request, &response);
+ gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix);
+ gpr_free(log_suffix);
+ }
+ }
+}
+
void InteropClient::DoRequestStreaming() {
gpr_log(GPR_INFO, "Sending request steaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
@@ -261,11 +337,90 @@ void InteropClient::DoResponseStreaming() {
}
GPR_ASSERT(response_stream_sizes.size() == i);
Status s = stream->Finish();
-
AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Response streaming done.");
}
+void InteropClient::DoResponseCompressedStreaming() {
+ std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
+
+ const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
+ const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
+ for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
+ ClientContext context;
+ InteropClientContextInspector inspector(context);
+ StreamingOutputCallRequest request;
+
+ char* log_suffix;
+ gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
+ CompressionType_Name(compression_types[j]).c_str(),
+ PayloadType_Name(payload_types[i]).c_str());
+
+ gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
+
+ request.set_response_type(payload_types[i]);
+ request.set_response_compression(compression_types[j]);
+
+ for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
+ ResponseParameters* response_parameter =
+ request.add_response_parameters();
+ response_parameter->set_size(response_stream_sizes[k]);
+ }
+ StreamingOutputCallResponse response;
+
+ std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
+ stub->StreamingOutputCall(&context, request));
+
+ size_t k = 0;
+ while (stream->Read(&response)) {
+ // Payload related checks.
+ if (request.response_type() != PayloadType::RANDOM) {
+ GPR_ASSERT(response.payload().type() == request.response_type());
+ }
+ switch (response.payload().type()) {
+ case PayloadType::COMPRESSABLE:
+ GPR_ASSERT(response.payload().body() ==
+ grpc::string(response_stream_sizes[k], '\0'));
+ break;
+ case PayloadType::UNCOMPRESSABLE: {
+ std::ifstream rnd_file(kRandomFile);
+ GPR_ASSERT(rnd_file.good());
+ for (int n = 0; n < response_stream_sizes[k]; n++) {
+ GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get());
+ }
+ } break;
+ default:
+ GPR_ASSERT(false);
+ }
+
+ // Compression related checks.
+ GPR_ASSERT(request.response_compression() ==
+ GetInteropCompressionTypeFromCompressionAlgorithm(
+ inspector.GetCallCompressionAlgorithm()));
+ if (request.response_compression() == NONE) {
+ GPR_ASSERT(
+ !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
+ } else if (request.response_type() == PayloadType::COMPRESSABLE) {
+ // requested compression and compressable response => results should
+ // always be compressed.
+ GPR_ASSERT(inspector.GetMessageFlags() &
+ GRPC_WRITE_INTERNAL_COMPRESS);
+ }
+
+ ++k;
+ }
+
+ GPR_ASSERT(response_stream_sizes.size() == k);
+ Status s = stream->Finish();
+
+ AssertOkOrPrintErrorStatus(s);
+ gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix);
+ gpr_free(log_suffix);
+ }
+ }
+}
+
void InteropClient::DoResponseStreamingWithSlowConsumer() {
gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
@@ -316,7 +471,6 @@ void InteropClient::DoHalfDuplex() {
unsigned int i = 0;
StreamingOutputCallResponse response;
while (stream->Read(&response)) {
- GPR_ASSERT(response.payload().has_body());
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
++i;
@@ -346,7 +500,6 @@ void InteropClient::DoPingPong() {
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
GPR_ASSERT(stream->Write(request));
GPR_ASSERT(stream->Read(&response));
- GPR_ASSERT(response.payload().has_body());
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
}
@@ -393,7 +546,6 @@ void InteropClient::DoCancelAfterFirstResponse() {
StreamingOutputCallResponse response;
GPR_ASSERT(stream->Write(request));
GPR_ASSERT(stream->Read(&response));
- GPR_ASSERT(response.payload().has_body());
GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
gpr_log(GPR_INFO, "Trying to cancel...");
context.TryCancel();
@@ -430,7 +582,7 @@ void InteropClient::DoStatusWithMessage() {
ClientContext context;
SimpleRequest request;
SimpleResponse response;
- EchoStatus *requested_status = request.mutable_response_status();
+ EchoStatus* requested_status = request.mutable_response_status();
requested_status->set_code(grpc::StatusCode::UNKNOWN);
grpc::string test_msg = "This is a test message";
requested_status->set_message(test_msg);
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index 6e26c49e5d..5e26cc82e6 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -33,11 +33,11 @@
#ifndef GRPC_TEST_CPP_INTEROP_INTEROP_CLIENT_H
#define GRPC_TEST_CPP_INTEROP_INTEROP_CLIENT_H
+
#include <memory>
#include <grpc/grpc.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/status.h>
+#include <grpc++/channel.h>
#include "test/proto/messages.grpc.pb.h"
namespace grpc {
@@ -45,17 +45,19 @@ namespace testing {
class InteropClient {
public:
- explicit InteropClient(std::shared_ptr<ChannelInterface> channel);
+ explicit InteropClient(std::shared_ptr<Channel> channel);
~InteropClient() {}
- void Reset(std::shared_ptr<ChannelInterface> channel) { channel_ = channel; }
+ void Reset(std::shared_ptr<Channel> channel) { channel_ = channel; }
void DoEmpty();
void DoLargeUnary();
+ void DoLargeCompressedUnary();
void DoPingPong();
void DoHalfDuplex();
void DoRequestStreaming();
void DoResponseStreaming();
+ void DoResponseCompressedStreaming();
void DoResponseStreamingWithSlowConsumer();
void DoCancelAfterBegin();
void DoCancelAfterFirstResponse();
@@ -80,7 +82,7 @@ class InteropClient {
void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
void AssertOkOrPrintErrorStatus(const Status& s);
- std::shared_ptr<ChannelInterface> channel_;
+ std::shared_ptr<Channel> channel_;
};
} // namespace testing
diff --git a/test/cpp/interop/interop_test.cc b/test/cpp/interop/interop_test.cc
index aac6e56b89..f01b032e95 100644
--- a/test/cpp/interop/interop_test.cc
+++ b/test/cpp/interop/interop_test.cc
@@ -44,17 +44,18 @@
#include <sys/types.h>
#include <sys/wait.h>
-extern "C" {
-#include "src/core/iomgr/socket_utils_posix.h"
-#include "src/core/support/string.h"
-}
-
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "test/core/util/port.h"
+extern "C" {
+#include "src/core/iomgr/socket_utils_posix.h"
+#include "src/core/support/string.h"
+}
+
+
int test_client(const char* root, const char* host, int port) {
int status;
pid_t cli;
diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc
index 65f098050e..d332dcad84 100644
--- a/test/cpp/interop/reconnect_interop_client.cc
+++ b/test/cpp/interop/reconnect_interop_client.cc
@@ -37,9 +37,8 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <gflags/gflags.h>
-#include <grpc++/channel_interface.h>
+#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/status.h>
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/util/test_config.h"
#include "test/proto/test.grpc.pb.h"
@@ -50,7 +49,7 @@ DEFINE_int32(server_control_port, 0, "Server port for control rpcs.");
DEFINE_int32(server_retry_port, 0, "Server port for testing reconnection.");
DEFINE_string(server_host, "127.0.0.1", "Server host to connect to");
-using grpc::ChannelInterface;
+using grpc::Channel;
using grpc::ClientContext;
using grpc::CreateTestChannel;
using grpc::Status;
@@ -78,7 +77,7 @@ int main(int argc, char** argv) {
gpr_log(GPR_INFO, "Starting connections with retries.");
server_address.str("");
server_address << FLAGS_server_host << ':' << FLAGS_server_retry_port;
- std::shared_ptr<ChannelInterface> retry_channel =
+ std::shared_ptr<Channel> retry_channel =
CreateTestChannel(server_address.str(), true);
// About 13 retries.
const int kDeadlineSeconds = 540;
diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc
index 8bc51aa52e..d4f171b1d0 100644
--- a/test/cpp/interop/reconnect_interop_server.cc
+++ b/test/cpp/interop/reconnect_interop_server.cc
@@ -31,23 +31,22 @@
*
*/
+#include <signal.h>
+#include <unistd.h>
+
#include <condition_variable>
#include <memory>
#include <mutex>
#include <sstream>
-#include <signal.h>
-#include <unistd.h>
-
#include <gflags/gflags.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-#include <grpc++/config.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
-#include <grpc++/status.h>
+
#include "test/core/util/reconnect_server.h"
#include "test/cpp/util/test_config.h"
#include "test/proto/test.grpc.pb.h"
diff --git a/test/cpp/interop/rnd.dat b/test/cpp/interop/rnd.dat
new file mode 100644
index 0000000000..8c7f38f9e0
--- /dev/null
+++ b/test/cpp/interop/rnd.dat
Binary files differ
diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc
index 05a10de51e..4921fde9fa 100644
--- a/test/cpp/interop/server.cc
+++ b/test/cpp/interop/server.cc
@@ -31,28 +31,28 @@
*
*/
+#include <signal.h>
+#include <unistd.h>
+
+#include <fstream>
#include <memory>
#include <sstream>
#include <thread>
-#include <signal.h>
-#include <unistd.h>
-
#include <gflags/gflags.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-#include <grpc++/config.h>
+#include <grpc/support/useful.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
-#include <grpc++/status.h>
-#include <grpc++/stream.h>
+
+#include "test/cpp/interop/server_helper.h"
+#include "test/cpp/util/test_config.h"
#include "test/proto/test.grpc.pb.h"
#include "test/proto/empty.grpc.pb.h"
#include "test/proto/messages.grpc.pb.h"
-#include "test/cpp/interop/server_helper.h"
-#include "test/cpp/util/test_config.h"
DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
DEFINE_int32(port, 0, "Server port.");
@@ -65,6 +65,7 @@ using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerWriter;
using grpc::SslServerCredentialsOptions;
+using grpc::testing::InteropServerContextInspector;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
using grpc::testing::SimpleRequest;
@@ -77,19 +78,54 @@ using grpc::testing::TestService;
using grpc::Status;
static bool got_sigint = false;
+static const char* kRandomFile = "test/cpp/interop/rnd.dat";
bool SetPayload(PayloadType type, int size, Payload* payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
+ PayloadType response_type;
+ if (type == PayloadType::RANDOM) {
+ response_type =
+ rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE;
+ } else {
+ response_type = type;
}
payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
+ switch (response_type) {
+ case PayloadType::COMPRESSABLE: {
+ std::unique_ptr<char[]> body(new char[size]());
+ payload->set_body(body.get(), size);
+ } break;
+ case PayloadType::UNCOMPRESSABLE: {
+ std::unique_ptr<char[]> body(new char[size]());
+ std::ifstream rnd_file(kRandomFile);
+ GPR_ASSERT(rnd_file.good());
+ rnd_file.read(body.get(), size);
+ GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available
+ payload->set_body(body.get(), size);
+ } break;
+ default:
+ GPR_ASSERT(false);
+ }
return true;
}
+template <typename RequestType>
+void SetResponseCompression(ServerContext* context,
+ const RequestType& request) {
+ switch (request.response_compression()) {
+ case grpc::testing::NONE:
+ context->set_compression_algorithm(GRPC_COMPRESS_NONE);
+ break;
+ case grpc::testing::GZIP:
+ context->set_compression_algorithm(GRPC_COMPRESS_GZIP);
+ break;
+ case grpc::testing::DEFLATE:
+ context->set_compression_algorithm(GRPC_COMPRESS_DEFLATE);
+ break;
+ default:
+ abort();
+ }
+}
+
class TestServiceImpl : public TestService::Service {
public:
Status EmptyCall(ServerContext* context, const grpc::testing::Empty* request,
@@ -99,7 +135,8 @@ class TestServiceImpl : public TestService::Service {
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) {
- if (request->has_response_size() && request->response_size() > 0) {
+ SetResponseCompression(context, *request);
+ if (request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
@@ -107,9 +144,9 @@ class TestServiceImpl : public TestService::Service {
}
if (request->has_response_status()) {
- return Status(static_cast<grpc::StatusCode>
- (request->response_status().code()),
- request->response_status().message());
+ return Status(
+ static_cast<grpc::StatusCode>(request->response_status().code()),
+ request->response_status().message());
}
return Status::OK;
@@ -118,13 +155,16 @@ class TestServiceImpl : public TestService::Service {
Status StreamingOutputCall(
ServerContext* context, const StreamingOutputCallRequest* request,
ServerWriter<StreamingOutputCallResponse>* writer) {
+ SetResponseCompression(context, *request);
StreamingOutputCallResponse response;
bool write_success = true;
- response.mutable_payload()->set_type(request->response_type());
for (int i = 0; write_success && i < request->response_parameters_size();
i++) {
- response.mutable_payload()->set_body(
- grpc::string(request->response_parameters(i).size(), '\0'));
+ if (!SetPayload(request->response_type(),
+ request->response_parameters(i).size(),
+ response.mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
write_success = writer->Write(response);
}
if (write_success) {
@@ -140,7 +180,7 @@ class TestServiceImpl : public TestService::Service {
StreamingInputCallRequest request;
int aggregated_payload_size = 0;
while (reader->Read(&request)) {
- if (request.has_payload() && request.payload().has_body()) {
+ if (request.has_payload()) {
aggregated_payload_size += request.payload().body().size();
}
}
@@ -156,6 +196,7 @@ class TestServiceImpl : public TestService::Service {
StreamingOutputCallResponse response;
bool write_success = true;
while (write_success && stream->Read(&request)) {
+ SetResponseCompression(context, request);
if (request.response_parameters_size() != 0) {
response.mutable_payload()->set_type(request.payload().type());
response.mutable_payload()->set_body(
diff --git a/test/cpp/interop/server_helper.cc b/test/cpp/interop/server_helper.cc
index 30a78ffddf..e897f4ebf0 100644
--- a/test/cpp/interop/server_helper.cc
+++ b/test/cpp/interop/server_helper.cc
@@ -36,10 +36,11 @@
#include <memory>
#include <gflags/gflags.h>
-#include "test/core/end2end/data/ssl_test_data.h"
-#include <grpc++/config.h>
#include <grpc++/server_credentials.h>
+#include "src/core/surface/call.h"
+#include "test/core/end2end/data/ssl_test_data.h"
+
DECLARE_bool(enable_ssl);
namespace grpc {
@@ -58,16 +59,25 @@ std::shared_ptr<ServerCredentials> CreateInteropServerCredentials() {
}
}
-InteropContextInspector::InteropContextInspector(
+InteropServerContextInspector::InteropServerContextInspector(
const ::grpc::ServerContext& context)
: context_(context) {}
-std::shared_ptr<const AuthContext> InteropContextInspector::GetAuthContext()
- const {
+grpc_compression_algorithm
+InteropServerContextInspector::GetCallCompressionAlgorithm() const {
+ return grpc_call_get_compression_algorithm(context_.call_);
+}
+
+gpr_uint32 InteropServerContextInspector::GetEncodingsAcceptedByClient() const {
+ return grpc_call_get_encodings_accepted_by_peer(context_.call_);
+}
+
+std::shared_ptr<const AuthContext>
+InteropServerContextInspector::GetAuthContext() const {
return context_.auth_context();
}
-bool InteropContextInspector::IsCancelled() const {
+bool InteropServerContextInspector::IsCancelled() const {
return context_.IsCancelled();
}
diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h
index ce977b4705..7b6b12cd4d 100644
--- a/test/cpp/interop/server_helper.h
+++ b/test/cpp/interop/server_helper.h
@@ -36,6 +36,7 @@
#include <memory>
+#include <grpc/compression.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
@@ -44,13 +45,15 @@ namespace testing {
std::shared_ptr<ServerCredentials> CreateInteropServerCredentials();
-class InteropContextInspector {
+class InteropServerContextInspector {
public:
- InteropContextInspector(const ::grpc::ServerContext& context);
+ InteropServerContextInspector(const ::grpc::ServerContext& context);
// Inspector methods, able to peek inside ServerContext, follow.
std::shared_ptr<const AuthContext> GetAuthContext() const;
bool IsCancelled() const;
+ grpc_compression_algorithm GetCallCompressionAlgorithm() const;
+ gpr_uint32 GetEncodingsAcceptedByClient() const;
private:
const ::grpc::ServerContext& context_;