diff options
author | Vijay Pai <vpai@google.com> | 2018-02-09 15:28:14 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-02-09 15:42:05 -0800 |
commit | 132efa0c9bf8ba974619d171c586fb809225929d (patch) | |
tree | 539d9d3d981a56d04b51ebf34c16113bfe0c67f8 /test/cpp/end2end/nonblocking_test.cc | |
parent | 98b6ec406bc013f402c178c182838a5946cde643 (diff) |
Add non-blocking test
Diffstat (limited to 'test/cpp/end2end/nonblocking_test.cc')
-rw-r--r-- | test/cpp/end2end/nonblocking_test.cc | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/test/cpp/end2end/nonblocking_test.cc b/test/cpp/end2end/nonblocking_test.cc new file mode 100644 index 0000000000..a1530664f0 --- /dev/null +++ b/test/cpp/end2end/nonblocking_test.cc @@ -0,0 +1,193 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <memory> + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> + +#include "src/core/lib/gpr/tls.h" +#include "src/core/lib/iomgr/port.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +#ifdef GRPC_POSIX_SOCKET +#include "src/core/lib/iomgr/ev_posix.h" +#endif + +#include <gtest/gtest.h> + +// Thread-local variable to so that only polls from this test assert +// non-blocking (not polls from resolver, timer thread, etc) +GPR_TLS_DECL(g_is_nonblocking_test); + +#ifdef GRPC_POSIX_SOCKET +namespace { + +int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, + int timeout) { + if (gpr_tls_get(&g_is_nonblocking_test)) { + GPR_ASSERT(timeout == 0); + } + return poll(pfds, nfds, timeout); +} + +} // namespace +#endif + +namespace grpc { +namespace testing { +namespace { + +void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); } +int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } + +class NonblockingTest : public ::testing::Test { + protected: + NonblockingTest() {} + + void SetUp() override { + port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port_; + + // Setup server + BuildAndStartServer(); + } + + bool LoopForTag(void** tag, bool* ok) { + for (;;) { + auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::SHUTDOWN) { + return false; + } else if (r == CompletionQueue::GOT_EVENT) { + return true; + } + } + } + + void TearDown() override { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cq_->Shutdown(); + while (LoopForTag(&ignored_tag, &ignored_ok)) + ; + stub_.reset(); + grpc_recycle_unused_port(port_); + } + + void BuildAndStartServer() { + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + grpc::InsecureServerCredentials()); + service_.reset(new grpc::testing::EchoTestService::AsyncService()); + builder.RegisterService(service_.get()); + cq_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + } + + void ResetStub() { + std::shared_ptr<Channel> channel = CreateChannel( + server_address_.str(), grpc::InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel); + } + + void SendRpc(int num_rpcs) { + for (int i = 0; i < num_rpcs; i++) { + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("hello non-blocking world"); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get())); + + response_reader->StartCall(); + + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, + cq_.get(), cq_.get(), tag(2)); + + void* got_tag; + bool ok; + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + EXPECT_EQ(detag(got_tag), 2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(3)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + + int tagsum = 0; + int tagprod = 1; + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + tagsum += detag(got_tag); + tagprod *= detag(got_tag); + + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + tagsum += detag(got_tag); + tagprod *= detag(got_tag); + + EXPECT_EQ(tagsum, 7); + EXPECT_EQ(tagprod, 12); + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } + } + + std::unique_ptr<ServerCompletionQueue> cq_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<Server> server_; + std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_; + std::ostringstream server_address_; + int port_; +}; + +TEST_F(NonblockingTest, SimpleRpc) { + ResetStub(); + SendRpc(10); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { +#ifdef GRPC_POSIX_SOCKET + // Override the poll function before anything else can happen + grpc_poll_function = maybe_assert_non_blocking_poll; +#endif + + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); + return ret; +} |