aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/client_callback_end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/client_callback_end2end_test.cc')
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc254
1 files changed, 253 insertions, 1 deletions
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index a35991396a..a999321992 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -182,6 +182,67 @@ class ClientCallbackEnd2endTest
}
}
+ void SendGenericEchoAsBidi(int num_rpcs, int reuses) {
+ const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
+ grpc::string test_string("");
+ for (int i = 0; i < num_rpcs; i++) {
+ test_string += "Hello world. ";
+ class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
+ ByteBuffer> {
+ public:
+ Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
+ const grpc::string& test_str, int reuses)
+ : reuses_remaining_(reuses) {
+ activate_ = [this, test, method_name, test_str] {
+ if (reuses_remaining_ > 0) {
+ cli_ctx_.reset(new ClientContext);
+ reuses_remaining_--;
+ test->generic_stub_->experimental().PrepareBidiStreamingCall(
+ cli_ctx_.get(), method_name, this);
+ request_.set_message(test_str);
+ send_buf_ = SerializeToByteBuffer(&request_);
+ StartWrite(send_buf_.get());
+ StartRead(&recv_buf_);
+ StartCall();
+ } else {
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ };
+ activate_();
+ }
+ void OnWriteDone(bool ok) override { StartWritesDone(); }
+ void OnReadDone(bool ok) override {
+ EchoResponse response;
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
+ EXPECT_EQ(request_.message(), response.message());
+ };
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ activate_();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ EchoRequest request_;
+ std::unique_ptr<ByteBuffer> send_buf_;
+ ByteBuffer recv_buf_;
+ std::unique_ptr<ClientContext> cli_ctx_;
+ int reuses_remaining_;
+ std::function<void()> activate_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } rpc{this, kMethodName, test_string, reuses};
+
+ rpc.Await();
+ }
+ }
bool is_server_started_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@@ -201,6 +262,37 @@ TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
SendRpcs(10, false);
}
+TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) {
+ ResetStub();
+ SimpleRequest request;
+ SimpleResponse response;
+ ClientContext cli_ctx;
+
+ cli_ctx.AddMetadata(kCheckClientInitialMetadataKey,
+ kCheckClientInitialMetadataVal);
+
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ stub_->experimental_async()->CheckClientInitialMetadata(
+ &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
+ GPR_ASSERT(s.ok());
+
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+}
+
+TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) {
+ ResetStub();
+ SendRpcs(1, true);
+}
+
TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
ResetStub();
SendRpcs(10, true);
@@ -211,6 +303,16 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
SendRpcsGeneric(10, false);
}
+TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
+ ResetStub();
+ SendGenericEchoAsBidi(10, 1);
+}
+
+TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
+ ResetStub();
+ SendGenericEchoAsBidi(10, 10);
+}
+
#if GRPC_ALLOW_EXCEPTIONS
TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
ResetStub();
@@ -267,6 +369,156 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
}
}
+TEST_P(ClientCallbackEnd2endTest, RequestStream) {
+ ResetStub();
+ class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ context_.set_initial_metadata_corked(true);
+ stub->experimental_async()->RequestStream(&context_, &response_, this);
+ StartCall();
+ request_.set_message("Hello server.");
+ StartWrite(&request_);
+ }
+ void OnWriteDone(bool ok) override {
+ writes_left_--;
+ if (writes_left_ > 1) {
+ StartWrite(&request_);
+ } else if (writes_left_ == 1) {
+ StartWriteLast(&request_, WriteOptions());
+ }
+ }
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server.");
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int writes_left_{3};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
+TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
+ ResetStub();
+ class Client : public grpc::experimental::ClientReadReactor<EchoResponse> {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ request_.set_message("Hello client ");
+ stub->experimental_async()->ResponseStream(&context_, &request_, this);
+ StartCall();
+ StartRead(&response_);
+ }
+ void OnReadDone(bool ok) override {
+ if (!ok) {
+ EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+ } else {
+ EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+ EXPECT_EQ(response_.message(),
+ request_.message() + grpc::to_string(reads_complete_));
+ reads_complete_++;
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int reads_complete_{0};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
+TEST_P(ClientCallbackEnd2endTest, BidiStream) {
+ ResetStub();
+ class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ explicit Client(grpc::testing::EchoTestService::Stub* stub) {
+ request_.set_message("Hello fren ");
+ stub->experimental_async()->BidiStream(&context_, this);
+ StartCall();
+ StartRead(&response_);
+ StartWrite(&request_);
+ }
+ void OnReadDone(bool ok) override {
+ if (!ok) {
+ EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
+ } else {
+ EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
+ EXPECT_EQ(response_.message(), request_.message());
+ reads_complete_++;
+ StartRead(&response_);
+ }
+ }
+ void OnWriteDone(bool ok) override {
+ EXPECT_TRUE(ok);
+ if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
+ StartWritesDone();
+ } else {
+ StartWrite(&request_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(s.ok());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ EchoResponse response_;
+ ClientContext context_;
+ int reads_complete_{0};
+ int writes_complete_{0};
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_ = false;
+ } test{stub_.get()};
+
+ test.Await();
+}
+
TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}};
INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
@@ -277,7 +529,7 @@ INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
} // namespace grpc
int main(int argc, char** argv) {
- grpc_test_init(argc, argv);
+ grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}