aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar chenw <chenw@google.com>2015-01-08 14:04:39 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-09 17:50:09 -0800
commit6edb547c99e42d5bf0dbea540883728a09066d4b (patch)
tree70920340c84f2befb9361cefd1c5a6d9048c5a4c /test/cpp
parent4dee1577fb678336a1492edbf458b2e0a4a41479 (diff)
Enable streaming test case for gRPC client in GCE.
Change on 2015/01/08 by chenw <chenw@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83549959
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/interop/client.cc163
1 files changed, 163 insertions, 0 deletions
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index ee0f62cf20..36bc580a96 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -128,6 +128,152 @@ void DoLargeUnary(std::shared_ptr<ChannelInterface> channel) {
gpr_log(GPR_INFO, "Large unary done.");
}
+void DoRequestStreaming(std::shared_ptr<ChannelInterface> channel) {
+ gpr_log(GPR_INFO, "Sending request steaming rpc ...");
+ std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
+
+ grpc::ClientContext context;
+ StreamingInputCallRequest request;
+ StreamingInputCallResponse response;
+
+ std::unique_ptr<grpc::ClientWriter<StreamingInputCallRequest>> stream(
+ stub->StreamingInputCall(&context, &response));
+
+ int aggregated_payload_size = 0;
+ for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
+ grpc::testing::Payload* payload = request.mutable_payload();
+ payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
+ GPR_ASSERT(stream->Write(request));
+ aggregated_payload_size += request_stream_sizes[i];
+ }
+ stream->WritesDone();
+ grpc::Status s = stream->Wait();
+
+ GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
+ GPR_ASSERT(s.IsOk());
+ gpr_log(GPR_INFO, "Request streaming done.");
+}
+
+void DoResponseStreaming(std::shared_ptr<ChannelInterface> channel) {
+ gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
+ std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
+
+ grpc::ClientContext context;
+ StreamingOutputCallRequest request;
+ for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
+ ResponseParameters* response_parameter = request.add_response_parameters();
+ response_parameter->set_size(response_stream_sizes[i]);
+ }
+ StreamingOutputCallResponse response;
+ std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream(
+ stub->StreamingOutputCall(&context, &request));
+
+ unsigned int i = 0;
+ while (stream->Read(&response)) {
+ GPR_ASSERT(response.payload().body() ==
+ grpc::string(response_stream_sizes[i], '\0'));
+ ++i;
+ }
+ GPR_ASSERT(response_stream_sizes.size() == i);
+ grpc::Status s = stream->Wait();
+
+ GPR_ASSERT(s.IsOk());
+ gpr_log(GPR_INFO, "Response streaming done.");
+}
+
+void DoResponseStreamingWithSlowConsumer(
+ std::shared_ptr<ChannelInterface> channel) {
+ gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
+ std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
+
+ grpc::ClientContext context;
+ StreamingOutputCallRequest request;
+
+ for (unsigned int i = 0; i < kNumResponseMessages; ++i) {
+ ResponseParameters* response_parameter = request.add_response_parameters();
+ response_parameter->set_size(kResponseMessageSize);
+ }
+ StreamingOutputCallResponse response;
+ std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream(
+ stub->StreamingOutputCall(&context, &request));
+
+ unsigned int i = 0;
+ while (stream->Read(&response)) {
+ GPR_ASSERT(response.payload().body() ==
+ grpc::string(kResponseMessageSize, '\0'));
+ gpr_log(GPR_INFO, "received message %d", i);
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(kReceiveDelayMilliSeconds));
+ ++i;
+ }
+ GPR_ASSERT(kNumResponseMessages == i);
+ grpc::Status s = stream->Wait();
+
+ GPR_ASSERT(s.IsOk());
+ gpr_log(GPR_INFO, "Response streaming done.");
+}
+
+void DoHalfDuplex(std::shared_ptr<ChannelInterface> channel) {
+ gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
+ std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
+
+ grpc::ClientContext context;
+ std::unique_ptr<grpc::ClientReaderWriter<StreamingOutputCallRequest,
+ StreamingOutputCallResponse>>
+ stream(stub->HalfDuplexCall(&context));
+
+ StreamingOutputCallRequest request;
+ ResponseParameters* response_parameter = request.add_response_parameters();
+ for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
+ response_parameter->set_size(response_stream_sizes[i]);
+ GPR_ASSERT(stream->Write(request));
+ }
+ stream->WritesDone();
+
+ unsigned int i = 0;
+ StreamingOutputCallResponse response;
+ while (stream->Read(&response)) {
+ GPR_ASSERT(response.payload().has_body());
+ GPR_ASSERT(response.payload().body() ==
+ grpc::string(response_stream_sizes[i], '\0'));
+ ++i;
+ }
+ GPR_ASSERT(response_stream_sizes.size() == i);
+ grpc::Status s = stream->Wait();
+ GPR_ASSERT(s.IsOk());
+ gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
+}
+
+void DoPingPong(std::shared_ptr<ChannelInterface> channel) {
+ gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
+ std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
+
+ grpc::ClientContext context;
+ std::unique_ptr<grpc::ClientReaderWriter<StreamingOutputCallRequest,
+ StreamingOutputCallResponse>>
+ stream(stub->FullDuplexCall(&context));
+
+ StreamingOutputCallRequest request;
+ request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ ResponseParameters* response_parameter = request.add_response_parameters();
+ grpc::testing::Payload* payload = request.mutable_payload();
+ StreamingOutputCallResponse response;
+ for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
+ response_parameter->set_size(response_stream_sizes[i]);
+ payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
+ GPR_ASSERT(stream->Write(request));
+ GPR_ASSERT(stream->Read(&response));
+ GPR_ASSERT(response.payload().has_body());
+ GPR_ASSERT(response.payload().body() ==
+ grpc::string(response_stream_sizes[i], '\0'));
+ }
+
+ stream->WritesDone();
+ GPR_ASSERT(!stream->Read(&response));
+ grpc::Status s = stream->Wait();
+ GPR_ASSERT(s.IsOk());
+ gpr_log(GPR_INFO, "Ping pong streaming done.");
+}
int main(int argc, char** argv) {
grpc_init();
@@ -148,6 +294,23 @@ int main(int argc, char** argv) {
DoEmpty(channel);
} else if (FLAGS_test_case == "large_unary") {
DoLargeUnary(channel);
+ } else if (FLAGS_test_case == "client_streaming") {
+ DoRequestStreaming(channel);
+ } else if (FLAGS_test_case == "server_streaming") {
+ DoResponseStreaming(channel);
+ } else if (FLAGS_test_case == "slow_consumer") {
+ DoResponseStreamingWithSlowConsumer(channel);
+ } else if (FLAGS_test_case == "half_duplex") {
+ DoHalfDuplex(channel);
+ } else if (FLAGS_test_case == "ping_pong") {
+ DoPingPong(channel);
+ } else if (FLAGS_test_case == "all") {
+ DoEmpty(channel);
+ DoLargeUnary(channel);
+ DoRequestStreaming(channel);
+ DoResponseStreaming(channel);
+ DoHalfDuplex(channel);
+ DoPingPong(channel);
} else {
gpr_log(
GPR_ERROR,