aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-11-30 15:22:54 -0800
committerGravatar GitHub <noreply@github.com>2018-11-30 15:22:54 -0800
commit53949b7d766dc0e91b45b1334858b516892313ac (patch)
tree56b3fff8b7709282bc977880cdceed2367db539c /test
parenta9e3e4c9409646e8b4025f70f9fd3b6006f81a01 (diff)
parent2b5d45ab381c5f455e16f49276ef22998c9d659b (diff)
Merge pull request #17104 from vjpai/callback_streaming
C++: Experimental client callback streaming API
Diffstat (limited to 'test')
-rw-r--r--test/cpp/codegen/compiler_test_golden7
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc218
-rw-r--r--test/cpp/end2end/test_service_impl.cc1
3 files changed, 226 insertions, 0 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index fdc67969d9..5f0eb6c35c 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -30,6 +30,7 @@
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/rpc_method.h>
@@ -117,10 +118,13 @@ class ServiceA final {
//
// Method A2 leading comment 1
// Method A2 leading comment 2
+ virtual void MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor< ::grpc::testing::Request>* reactor) = 0;
// MethodA2 trailing comment 1
// Method A3 leading comment 1
+ virtual void MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor< ::grpc::testing::Response>* reactor) = 0;
// Method A3 trailing comment 1
// Method A4 leading comment 1
+ virtual void MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::grpc::testing::Request,::grpc::testing::Response>* reactor) = 0;
// Method A4 trailing comment 1
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
@@ -178,6 +182,9 @@ class ServiceA final {
public StubInterface::experimental_async_interface {
public:
void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
+ void MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor< ::grpc::testing::Request>* reactor) override;
+ void MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor< ::grpc::testing::Response>* reactor) override;
+ void MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::grpc::testing::Request,::grpc::testing::Response>* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index a35991396a..6c18703f06 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -182,6 +182,55 @@ class ClientCallbackEnd2endTest
}
}
+ void SendGenericEchoAsBidi(int num_rpcs) {
+ const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
+ grpc::string test_string("");
+ for (int i = 0; i < num_rpcs; i++) {
+ test_string += "Hello world. ";
+ class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
+ ByteBuffer> {
+ public:
+ Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
+ const grpc::string& test_str) {
+ test->generic_stub_->experimental().PrepareBidiStreamingCall(
+ &cli_ctx_, method_name, this);
+ request_.set_message(test_str);
+ send_buf_ = SerializeToByteBuffer(&request_);
+ StartWrite(send_buf_.get());
+ StartRead(&recv_buf_);
+ StartCall();
+ }
+ void OnWriteDone(bool ok) override { StartWritesDone(); }
+ void OnReadDone(bool ok) override {
+ EchoResponse response;
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
+ EXPECT_EQ(request_.message(), response.message());
+ };
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ EchoRequest request_;
+ std::unique_ptr<ByteBuffer> send_buf_;
+ ByteBuffer recv_buf_;
+ ClientContext cli_ctx_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } rpc{this, kMethodName, test_string};
+
+ rpc.Await();
+ }
+ }
bool is_server_started_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@@ -211,6 +260,11 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
SendRpcsGeneric(10, false);
}
+TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
+ ResetStub();
+ SendGenericEchoAsBidi(10);
+}
+
#if GRPC_ALLOW_EXCEPTIONS
TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
ResetStub();
@@ -267,6 +321,170 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
}
}
+TEST_P(ClientCallbackEnd2endTest, RequestStream) {
+ // TODO(vjpai): test with callback server once supported
+ if (GetParam().callback_server) {
+ return;
+ }
+
+ ResetStub();
+ class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ context_.set_initial_metadata_corked(true);
+ stub->experimental_async()->RequestStream(&context_, &response_, this);
+ StartCall();
+ request_.set_message("Hello server.");
+ StartWrite(&request_);
+ }
+ void OnWriteDone(bool ok) override {
+ writes_left_--;
+ if (writes_left_ > 1) {
+ StartWrite(&request_);
+ } else if (writes_left_ == 1) {
+ StartWriteLast(&request_, WriteOptions());
+ }
+ }
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server.");
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int writes_left_{3};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
+TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
+ // TODO(vjpai): test with callback server once supported
+ if (GetParam().callback_server) {
+ return;
+ }
+
+ ResetStub();
+ class Client : public grpc::experimental::ClientReadReactor<EchoResponse> {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ request_.set_message("Hello client ");
+ stub->experimental_async()->ResponseStream(&context_, &request_, this);
+ StartCall();
+ StartRead(&response_);
+ }
+ void OnReadDone(bool ok) override {
+ if (!ok) {
+ EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+ } else {
+ EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+ EXPECT_EQ(response_.message(),
+ request_.message() + grpc::to_string(reads_complete_));
+ reads_complete_++;
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int reads_complete_{0};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
+TEST_P(ClientCallbackEnd2endTest, BidiStream) {
+ // TODO(vjpai): test with callback server once supported
+ if (GetParam().callback_server) {
+ return;
+ }
+ ResetStub();
+ class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ request_.set_message("Hello fren ");
+ stub->experimental_async()->BidiStream(&context_, this);
+ StartCall();
+ StartRead(&response_);
+ StartWrite(&request_);
+ }
+ void OnReadDone(bool ok) override {
+ if (!ok) {
+ EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+ } else {
+ EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+ EXPECT_EQ(response_.message(), request_.message());
+ reads_complete_++;
+ StartRead(&response_);
+ }
+ }
+ void OnWriteDone(bool ok) override {
+ EXPECT_TRUE(ok);
+ if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
+ StartWritesDone();
+ } else {
+ StartWrite(&request_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int reads_complete_{0};
+ int writes_complete_{0};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}};
INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 605356724f..1726e87ea6 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -223,6 +223,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
return;
}
+ gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str());
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (host_) {