diff options
author | 2018-11-30 15:22:54 -0800 | |
---|---|---|
committer | 2018-11-30 15:22:54 -0800 | |
commit | 53949b7d766dc0e91b45b1334858b516892313ac (patch) | |
tree | 56b3fff8b7709282bc977880cdceed2367db539c /test | |
parent | a9e3e4c9409646e8b4025f70f9fd3b6006f81a01 (diff) | |
parent | 2b5d45ab381c5f455e16f49276ef22998c9d659b (diff) |
Merge pull request #17104 from vjpai/callback_streaming
C++: Experimental client callback streaming API
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/codegen/compiler_test_golden | 7 | ||||
-rw-r--r-- | test/cpp/end2end/client_callback_end2end_test.cc | 218 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 1 |
3 files changed, 226 insertions, 0 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index fdc67969d9..5f0eb6c35c 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -30,6 +30,7 @@ #include <grpcpp/impl/codegen/async_generic_service.h> #include <grpcpp/impl/codegen/async_stream.h> #include <grpcpp/impl/codegen/async_unary_call.h> +#include <grpcpp/impl/codegen/client_callback.h> #include <grpcpp/impl/codegen/method_handler_impl.h> #include <grpcpp/impl/codegen/proto_utils.h> #include <grpcpp/impl/codegen/rpc_method.h> @@ -117,10 +118,13 @@ class ServiceA final { // // Method A2 leading comment 1 // Method A2 leading comment 2 + virtual void MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor< ::grpc::testing::Request>* reactor) = 0; // MethodA2 trailing comment 1 // Method A3 leading comment 1 + virtual void MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor< ::grpc::testing::Response>* reactor) = 0; // Method A3 trailing comment 1 // Method A4 leading comment 1 + virtual void MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::grpc::testing::Request,::grpc::testing::Response>* reactor) = 0; // Method A4 trailing comment 1 }; virtual class experimental_async_interface* experimental_async() { return nullptr; } @@ -178,6 +182,9 @@ class ServiceA final { public StubInterface::experimental_async_interface { public: void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override; + void MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor< ::grpc::testing::Request>* reactor) override; + void MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor< ::grpc::testing::Response>* reactor) override; + void MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::grpc::testing::Request,::grpc::testing::Response>* reactor) override; private: friend class Stub; explicit experimental_async(Stub* stub): stub_(stub) { } diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index a35991396a..6c18703f06 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -182,6 +182,55 @@ class ClientCallbackEnd2endTest } } + void SendGenericEchoAsBidi(int num_rpcs) { + const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + test_string += "Hello world. "; + class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer, + ByteBuffer> { + public: + Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name, + const grpc::string& test_str) { + test->generic_stub_->experimental().PrepareBidiStreamingCall( + &cli_ctx_, method_name, this); + request_.set_message(test_str); + send_buf_ = SerializeToByteBuffer(&request_); + StartWrite(send_buf_.get()); + StartRead(&recv_buf_); + StartCall(); + } + void OnWriteDone(bool ok) override { StartWritesDone(); } + void OnReadDone(bool ok) override { + EchoResponse response; + EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); + EXPECT_EQ(request_.message(), response.message()); + }; + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + EchoRequest request_; + std::unique_ptr<ByteBuffer> send_buf_; + ByteBuffer recv_buf_; + ClientContext cli_ctx_; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } rpc{this, kMethodName, test_string}; + + rpc.Await(); + } + } bool is_server_started_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; @@ -211,6 +260,11 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { SendRpcsGeneric(10, false); } +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { + ResetStub(); + SendGenericEchoAsBidi(10); +} + #if GRPC_ALLOW_EXCEPTIONS TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { ResetStub(); @@ -267,6 +321,170 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { } } +TEST_P(ClientCallbackEnd2endTest, RequestStream) { + // TODO(vjpai): test with callback server once supported + if (GetParam().callback_server) { + return; + } + + ResetStub(); + class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + context_.set_initial_metadata_corked(true); + stub->experimental_async()->RequestStream(&context_, &response_, this); + StartCall(); + request_.set_message("Hello server."); + StartWrite(&request_); + } + void OnWriteDone(bool ok) override { + writes_left_--; + if (writes_left_ > 1) { + StartWrite(&request_); + } else if (writes_left_ == 1) { + StartWriteLast(&request_, WriteOptions()); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server."); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int writes_left_{3}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + +TEST_P(ClientCallbackEnd2endTest, ResponseStream) { + // TODO(vjpai): test with callback server once supported + if (GetParam().callback_server) { + return; + } + + ResetStub(); + class Client : public grpc::experimental::ClientReadReactor<EchoResponse> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + request_.set_message("Hello client "); + stub->experimental_async()->ResponseStream(&context_, &request_, this); + StartCall(); + StartRead(&response_); + } + void OnReadDone(bool ok) override { + if (!ok) { + EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); + } else { + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + EXPECT_EQ(response_.message(), + request_.message() + grpc::to_string(reads_complete_)); + reads_complete_++; + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int reads_complete_{0}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + +TEST_P(ClientCallbackEnd2endTest, BidiStream) { + // TODO(vjpai): test with callback server once supported + if (GetParam().callback_server) { + return; + } + ResetStub(); + class Client : public grpc::experimental::ClientBidiReactor<EchoRequest, + EchoResponse> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + request_.set_message("Hello fren "); + stub->experimental_async()->BidiStream(&context_, this); + StartCall(); + StartRead(&response_); + StartWrite(&request_); + } + void OnReadDone(bool ok) override { + if (!ok) { + EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); + } else { + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + EXPECT_EQ(response_.message(), request_.message()); + reads_complete_++; + StartRead(&response_); + } + } + void OnWriteDone(bool ok) override { + EXPECT_TRUE(ok); + if (++writes_complete_ == kServerDefaultResponseStreamsToSend) { + StartWritesDone(); + } else { + StartWrite(&request_); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int reads_complete_{0}; + int writes_complete_{0}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}}; INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 605356724f..1726e87ea6 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -223,6 +223,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( return; } + gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str()); response->set_message(request->message()); MaybeEchoDeadline(context, request, response); if (host_) { |