aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-11-04 23:37:00 -0800
committerGravatar Vijay Pai <vpai@google.com>2018-11-30 04:13:40 -0800
commitd7eb26648d33912199573b842557b0e92ec3071f (patch)
tree42ef1a1d37c512a7baa044b33e5d83f05aa1fab2 /test
parent7ef8fc826c52b0c4abeaead633d464197fc0bdf8 (diff)
Client callback streaming
Diffstat (limited to 'test')
-rw-r--r--test/cpp/codegen/compiler_test_golden7
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc228
2 files changed, 235 insertions, 0 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index fdc67969d9..7a25f51d10 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -30,6 +30,7 @@
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/client_callback.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/rpc_method.h>
@@ -117,10 +118,13 @@ class ServiceA final {
//
// Method A2 leading comment 1
// Method A2 leading comment 2
+ virtual ::grpc::experimental::ClientCallbackWriter< ::grpc::testing::Request>* MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor* reactor) = 0;
// MethodA2 trailing comment 1
// Method A3 leading comment 1
+ virtual ::grpc::experimental::ClientCallbackReader< ::grpc::testing::Response>* MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor* reactor) = 0;
// Method A3 trailing comment 1
// Method A4 leading comment 1
+ virtual ::grpc::experimental::ClientCallbackReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor* reactor) = 0;
// Method A4 trailing comment 1
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
@@ -178,6 +182,9 @@ class ServiceA final {
public StubInterface::experimental_async_interface {
public:
void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
+ ::grpc::experimental::ClientCallbackWriter< ::grpc::testing::Request>* MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor* reactor) override;
+ ::grpc::experimental::ClientCallbackReader< ::grpc::testing::Response>* MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor* reactor) override;
+ ::grpc::experimental::ClientCallbackReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index a35991396a..e39fc5aab2 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -182,6 +182,59 @@ class ClientCallbackEnd2endTest
}
}
+ void SendGenericEchoAsBidi(int num_rpcs) {
+ const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
+ grpc::string test_string("");
+ for (int i = 0; i < num_rpcs; i++) {
+ test_string += "Hello world. ";
+ class Client : public grpc::experimental::ClientBidiReactor {
+ public:
+ Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
+ const grpc::string& test_str) {
+ stream_ =
+ test->generic_stub_->experimental().PrepareBidiStreamingCall(
+ &cli_ctx_, method_name, this);
+ stream_->StartCall();
+ request_.set_message(test_str);
+ send_buf_ = SerializeToByteBuffer(&request_);
+ stream_->Read(&recv_buf_);
+ stream_->Write(send_buf_.get());
+ }
+ void OnWriteDone(bool ok) override { stream_->WritesDone(); }
+ void OnReadDone(bool ok) override {
+ EchoResponse response;
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
+ EXPECT_EQ(request_.message(), response.message());
+ };
+ void OnDone(Status s) override {
+ // The stream is invalid once OnDone is called
+ stream_ = nullptr;
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ EchoRequest request_;
+ std::unique_ptr<ByteBuffer> send_buf_;
+ ByteBuffer recv_buf_;
+ ClientContext cli_ctx_;
+ experimental::ClientCallbackReaderWriter<ByteBuffer, ByteBuffer>*
+ stream_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } rpc{this, kMethodName, test_string};
+
+ rpc.Await();
+ }
+ }
bool is_server_started_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@@ -211,6 +264,11 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
SendRpcsGeneric(10, false);
}
+TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
+ ResetStub();
+ SendGenericEchoAsBidi(10);
+}
+
#if GRPC_ALLOW_EXCEPTIONS
TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
ResetStub();
@@ -267,6 +325,176 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
}
}
+TEST_P(ClientCallbackEnd2endTest, RequestStream) {
+ // TODO(vjpai): test with callback server once supported
+ if (GetParam().callback_server) {
+ return;
+ }
+
+ ResetStub();
+ class Client : public grpc::experimental::ClientWriteReactor {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ context_.set_initial_metadata_corked(true);
+ stream_ = stub->experimental_async()->RequestStream(&context_, &response_,
+ this);
+ stream_->StartCall();
+ request_.set_message("Hello server.");
+ stream_->Write(&request_);
+ }
+ void OnWriteDone(bool ok) override {
+ writes_left_--;
+ if (writes_left_ > 1) {
+ stream_->Write(&request_);
+ } else if (writes_left_ == 1) {
+ stream_->WriteLast(&request_, WriteOptions());
+ }
+ }
+ void OnDone(Status s) override {
+ stream_ = nullptr;
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server.");
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ ::grpc::experimental::ClientCallbackWriter<EchoRequest>* stream_;
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int writes_left_{3};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
+TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
+ // TODO(vjpai): test with callback server once supported
+ if (GetParam().callback_server) {
+ return;
+ }
+
+ ResetStub();
+ class Client : public grpc::experimental::ClientReadReactor {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ request_.set_message("Hello client ");
+ stream_ = stub->experimental_async()->ResponseStream(&context_, &request_,
+ this);
+ stream_->StartCall();
+ stream_->Read(&response_);
+ }
+ void OnReadDone(bool ok) override {
+ // Note that != is the boolean XOR operator
+ EXPECT_NE(ok, reads_complete_ == kServerDefaultResponseStreamsToSend);
+ if (ok) {
+ EXPECT_EQ(response_.message(),
+ request_.message() + grpc::to_string(reads_complete_));
+ reads_complete_++;
+ stream_->Read(&response_);
+ }
+ }
+ void OnDone(Status s) override {
+ stream_ = nullptr;
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ ::grpc::experimental::ClientCallbackReader<EchoResponse>* stream_;
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int reads_complete_{0};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
+TEST_P(ClientCallbackEnd2endTest, BidiStream) {
+ // TODO(vjpai): test with callback server once supported
+ if (GetParam().callback_server) {
+ return;
+ }
+ ResetStub();
+ class Client : public grpc::experimental::ClientBidiReactor {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ request_.set_message("Hello fren ");
+ stream_ = stub->experimental_async()->BidiStream(&context_, this);
+ stream_->StartCall();
+ stream_->Read(&response_);
+ stream_->Write(&request_);
+ }
+ void OnReadDone(bool ok) override {
+ // Note that != is the boolean XOR operator
+ EXPECT_NE(ok, reads_complete_ == kServerDefaultResponseStreamsToSend);
+ if (ok) {
+ EXPECT_EQ(response_.message(), request_.message());
+ reads_complete_++;
+ stream_->Read(&response_);
+ }
+ }
+ void OnWriteDone(bool ok) override {
+ EXPECT_TRUE(ok);
+ if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
+ stream_->WritesDone();
+ } else {
+ stream_->Write(&request_);
+ }
+ }
+ void OnDone(Status s) override {
+ stream_ = nullptr;
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ ::grpc::experimental::ClientCallbackReaderWriter<EchoRequest, EchoResponse>*
+ stream_;
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int reads_complete_{0};
+ int writes_complete_{0};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}};
INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,