diff options
author | Vijay Pai <vpai@google.com> | 2018-02-09 18:07:22 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-09 18:07:22 -0800 |
commit | 2650c9730f6b6288a57c5126f629f1e42c2a282c (patch) | |
tree | 5a28edb3ed5bad039302505f4f57ba9fddb616b9 /test | |
parent | 90dde8f1360fcc251ac16cd5fb8f53b5f372659a (diff) | |
parent | dd57cb59497937e1f420858ae1e620f018ed6a23 (diff) |
Merge pull request #14397 from vjpai/nb2
Add a separate test for non-blocking polling and RPCs
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/end2end/BUILD | 18 | ||||
-rw-r--r-- | test/cpp/end2end/nonblocking_test.cc | 194 |
2 files changed, 212 insertions, 0 deletions
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 27e8da1ff1..afa054ae10 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -242,6 +242,24 @@ grpc_cc_test( ) grpc_cc_test( + name = "nonblocking_test", + srcs = ["nonblocking_test.cc"], + external_deps = [ + "gtest", + ], + deps = [ + "//:gpr", + "//:grpc", + "//:grpc++", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + +grpc_cc_test( name = "client_lb_end2end_test", srcs = ["client_lb_end2end_test.cc"], external_deps = [ diff --git a/test/cpp/end2end/nonblocking_test.cc b/test/cpp/end2end/nonblocking_test.cc new file mode 100644 index 0000000000..509c5de0b6 --- /dev/null +++ b/test/cpp/end2end/nonblocking_test.cc @@ -0,0 +1,194 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <memory> + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> + +#include "src/core/lib/gpr/tls.h" +#include "src/core/lib/iomgr/port.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +#ifdef GRPC_POSIX_SOCKET +#include "src/core/lib/iomgr/ev_posix.h" +#endif // GRPC_POSIX_SOCKET + +#include <gtest/gtest.h> + +#ifdef GRPC_POSIX_SOCKET +// Thread-local variable to so that only polls from this test assert +// non-blocking (not polls from resolver, timer thread, etc) +GPR_TLS_DECL(g_is_nonblocking_test); + +namespace { + +int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, + int timeout) { + if (gpr_tls_get(&g_is_nonblocking_test)) { + GPR_ASSERT(timeout == 0); + } + return poll(pfds, nfds, timeout); +} + +} // namespace + +namespace grpc { +namespace testing { +namespace { + +void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); } +int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } + +class NonblockingTest : public ::testing::Test { + protected: + NonblockingTest() {} + + void SetUp() override { + port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port_; + + // Setup server + BuildAndStartServer(); + } + + bool LoopForTag(void** tag, bool* ok) { + for (;;) { + auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::SHUTDOWN) { + return false; + } else if (r == CompletionQueue::GOT_EVENT) { + return true; + } + } + } + + void TearDown() override { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cq_->Shutdown(); + while (LoopForTag(&ignored_tag, &ignored_ok)) + ; + stub_.reset(); + grpc_recycle_unused_port(port_); + } + + void BuildAndStartServer() { + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + grpc::InsecureServerCredentials()); + service_.reset(new grpc::testing::EchoTestService::AsyncService()); + builder.RegisterService(service_.get()); + cq_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + } + + void ResetStub() { + std::shared_ptr<Channel> channel = CreateChannel( + server_address_.str(), grpc::InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel); + } + + void SendRpc(int num_rpcs) { + for (int i = 0; i < num_rpcs; i++) { + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("hello non-blocking world"); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get())); + + response_reader->StartCall(); + + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, + cq_.get(), cq_.get(), tag(2)); + + void* got_tag; + bool ok; + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + EXPECT_EQ(detag(got_tag), 2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(3)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + + int tagsum = 0; + int tagprod = 1; + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + tagsum += detag(got_tag); + tagprod *= detag(got_tag); + + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + tagsum += detag(got_tag); + tagprod *= detag(got_tag); + + EXPECT_EQ(tagsum, 7); + EXPECT_EQ(tagprod, 12); + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } + } + + std::unique_ptr<ServerCompletionQueue> cq_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<Server> server_; + std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_; + std::ostringstream server_address_; + int port_; +}; + +TEST_F(NonblockingTest, SimpleRpc) { + ResetStub(); + SendRpc(10); +} + +} // namespace +} // namespace testing +} // namespace grpc + +#endif // GRPC_POSIX_SOCKET + +int main(int argc, char** argv) { +#ifdef GRPC_POSIX_SOCKET + // Override the poll function before anything else can happen + grpc_poll_function = maybe_assert_non_blocking_poll; +#endif // GRPC_POSIX_SOCKET + + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); + return ret; +} |