diff options
Diffstat (limited to 'test/cpp/end2end/client_callback_end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/client_callback_end2end_test.cc | 254 |
1 files changed, 253 insertions, 1 deletions
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index a35991396a..a999321992 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -182,6 +182,67 @@ class ClientCallbackEnd2endTest } } + void SendGenericEchoAsBidi(int num_rpcs, int reuses) { + const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + test_string += "Hello world. "; + class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer, + ByteBuffer> { + public: + Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name, + const grpc::string& test_str, int reuses) + : reuses_remaining_(reuses) { + activate_ = [this, test, method_name, test_str] { + if (reuses_remaining_ > 0) { + cli_ctx_.reset(new ClientContext); + reuses_remaining_--; + test->generic_stub_->experimental().PrepareBidiStreamingCall( + cli_ctx_.get(), method_name, this); + request_.set_message(test_str); + send_buf_ = SerializeToByteBuffer(&request_); + StartWrite(send_buf_.get()); + StartRead(&recv_buf_); + StartCall(); + } else { + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + }; + activate_(); + } + void OnWriteDone(bool ok) override { StartWritesDone(); } + void OnReadDone(bool ok) override { + EchoResponse response; + EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); + EXPECT_EQ(request_.message(), response.message()); + }; + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + activate_(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + EchoRequest request_; + std::unique_ptr<ByteBuffer> send_buf_; + ByteBuffer recv_buf_; + std::unique_ptr<ClientContext> cli_ctx_; + int reuses_remaining_; + std::function<void()> activate_; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } rpc{this, kMethodName, test_string, reuses}; + + rpc.Await(); + } + } bool is_server_started_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; @@ -201,6 +262,37 @@ TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { SendRpcs(10, false); } +TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) { + ResetStub(); + SimpleRequest request; + SimpleResponse response; + ClientContext cli_ctx; + + cli_ctx.AddMetadata(kCheckClientInitialMetadataKey, + kCheckClientInitialMetadataVal); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->CheckClientInitialMetadata( + &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) { + GPR_ASSERT(s.ok()); + + std::lock_guard<std::mutex> l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock<std::mutex> l(mu); + while (!done) { + cv.wait(l); + } +} + +TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) { + ResetStub(); + SendRpcs(1, true); +} + TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { ResetStub(); SendRpcs(10, true); @@ -211,6 +303,16 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { SendRpcsGeneric(10, false); } +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { + ResetStub(); + SendGenericEchoAsBidi(10, 1); +} + +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) { + ResetStub(); + SendGenericEchoAsBidi(10, 10); +} + #if GRPC_ALLOW_EXCEPTIONS TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { ResetStub(); @@ -267,6 +369,156 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { } } +TEST_P(ClientCallbackEnd2endTest, RequestStream) { + ResetStub(); + class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + context_.set_initial_metadata_corked(true); + stub->experimental_async()->RequestStream(&context_, &response_, this); + StartCall(); + request_.set_message("Hello server."); + StartWrite(&request_); + } + void OnWriteDone(bool ok) override { + writes_left_--; + if (writes_left_ > 1) { + StartWrite(&request_); + } else if (writes_left_ == 1) { + StartWriteLast(&request_, WriteOptions()); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server."); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int writes_left_{3}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + +TEST_P(ClientCallbackEnd2endTest, ResponseStream) { + ResetStub(); + class Client : public grpc::experimental::ClientReadReactor<EchoResponse> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + request_.set_message("Hello client "); + stub->experimental_async()->ResponseStream(&context_, &request_, this); + StartCall(); + StartRead(&response_); + } + void OnReadDone(bool ok) override { + if (!ok) { + EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); + } else { + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + EXPECT_EQ(response_.message(), + request_.message() + grpc::to_string(reads_complete_)); + reads_complete_++; + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int reads_complete_{0}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + +TEST_P(ClientCallbackEnd2endTest, BidiStream) { + ResetStub(); + class Client : public grpc::experimental::ClientBidiReactor<EchoRequest, + EchoResponse> { + public: + explicit Client(grpc::testing::EchoTestService::Stub* stub) { + request_.set_message("Hello fren "); + stub->experimental_async()->BidiStream(&context_, this); + StartCall(); + StartRead(&response_); + StartWrite(&request_); + } + void OnReadDone(bool ok) override { + if (!ok) { + EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend); + } else { + EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); + EXPECT_EQ(response_.message(), request_.message()); + reads_complete_++; + StartRead(&response_); + } + } + void OnWriteDone(bool ok) override { + EXPECT_TRUE(ok); + if (++writes_complete_ == kServerDefaultResponseStreamsToSend) { + StartWritesDone(); + } else { + StartWrite(&request_); + } + } + void OnDone(const Status& s) override { + EXPECT_TRUE(s.ok()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + EchoResponse response_; + ClientContext context_; + int reads_complete_{0}; + int writes_complete_{0}; + std::mutex mu_; + std::condition_variable cv_; + bool done_ = false; + } test{stub_.get()}; + + test.Await(); +} + TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}}; INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, @@ -277,7 +529,7 @@ INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, } // namespace grpc int main(int argc, char** argv) { - grpc_test_init(argc, argv); + grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } |