diff options
Diffstat (limited to 'test/cpp/naming/resolver_component_test.cc')
-rw-r--r-- | test/cpp/naming/resolver_component_test.cc | 116 |
1 files changed, 113 insertions, 3 deletions
diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index f4be064305..07ddfd30ee 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -22,10 +22,14 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> + #include <string.h> +#include <errno.h> +#include <fcntl.h> #include <gflags/gflags.h> #include <gmock/gmock.h> +#include <thread> #include <vector> #include "test/cpp/util/subprocess.h" @@ -48,6 +52,12 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" +// TODO: pull in different headers when enabling this +// test on windows. Also set BAD_SOCKET_RETURN_VAL +// to INVALID_SOCKET on windows. +#include "src/core/lib/iomgr/sockaddr_posix.h" +#define BAD_SOCKET_RETURN_VAL -1 + using grpc::SubProcess; using std::vector; using testing::UnorderedElementsAreArray; @@ -231,7 +241,79 @@ void CheckLBPolicyResultLocked(grpc_channel_args* channel_args, } } +void OpenAndCloseSocketsStressLoop(int dummy_port, gpr_event* done_ev) { + // The goal of this loop is to catch socket + // "use after close" bugs within the c-ares resolver by acting + // like some separate thread doing I/O. + // It's goal is to try to hit race conditions whereby: + // 1) The c-ares resolver closes a socket. + // 2) This loop opens a socket with (coincidentally) the same handle. + // 3) the c-ares resolver mistakenly uses that same socket without + // realizing that its closed. + // 4) This loop performs an operation on that socket that should + // succeed but instead fails because of what the c-ares + // resolver did in the meantime. + sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_port = htons(dummy_port); + ((char*)&addr.sin6_addr)[15] = 1; + for (;;) { + if (gpr_event_get(done_ev)) { + return; + } + std::vector<int> sockets; + // First open a bunch of sockets, bind and listen + // '50' is an arbitrary number that, experimentally, + // has a good chance of catching bugs. + for (size_t i = 0; i < 50; i++) { + int s = socket(AF_INET6, SOCK_STREAM, 0); + int val = 1; + ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) == + 0) + << "Failed to set socketopt reuseport"; + ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == + 0) + << "Failed to set socket reuseaddr"; + ASSERT_TRUE(fcntl(s, F_SETFL, O_NONBLOCK) == 0) + << "Failed to set socket non-blocking"; + ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL) + << "Failed to create TCP ipv6 socket"; + gpr_log(GPR_DEBUG, "Opened fd: %d", s); + ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) == 0) + << "Failed to bind socket " + std::to_string(s) + + " to [::1]:" + std::to_string(dummy_port) + + ". errno: " + std::to_string(errno); + ASSERT_TRUE(listen(s, 1) == 0) << "Failed to listen on socket " + + std::to_string(s) + + ". errno: " + std::to_string(errno); + sockets.push_back(s); + } + // Do a non-blocking accept followed by a close on all of those sockets. + // Do this in a separate loop to try to induce a time window to hit races. + for (size_t i = 0; i < sockets.size(); i++) { + gpr_log(GPR_DEBUG, "non-blocking accept then close on %d", sockets[i]); + if (accept(sockets[i], nullptr, nullptr)) { + // If e.g. a "shutdown" was called on this fd from another thread, + // then this accept call should fail with an unexpected error. + ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK) + << "OpenAndCloseSocketsStressLoop accept on socket " + + std::to_string(sockets[i]) + + " failed in " + "an unexpected way. " + "errno: " + + std::to_string(errno) + + ". Socket use-after-close bugs are likely."; + } + ASSERT_TRUE(close(sockets[i]) == 0) + << "Failed to close socket: " + std::to_string(sockets[i]) + + ". errno: " + std::to_string(errno); + } + } +} + void CheckResolverResultLocked(void* argsp, grpc_error* err) { + EXPECT_EQ(err, GRPC_ERROR_NONE); ArgsStruct* args = (ArgsStruct*)argsp; grpc_channel_args* channel_args = args->channel_args; const grpc_arg* channel_arg = @@ -271,7 +353,17 @@ void CheckResolverResultLocked(void* argsp, grpc_error* err) { gpr_mu_unlock(args->mu); } -TEST(ResolverComponentTest, TestResolvesRelevantRecords) { +void CheckResolvedWithoutErrorLocked(void* argsp, grpc_error* err) { + EXPECT_EQ(err, GRPC_ERROR_NONE); + ArgsStruct* args = (ArgsStruct*)argsp; + gpr_atm_rel_store(&args->done_atm, 1); + gpr_mu_lock(args->mu); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); + gpr_mu_unlock(args->mu); +} + +void RunResolvesRelevantRecordsTest(void (*OnDoneLocked)(void* arg, + grpc_error* error)) { grpc_core::ExecCtx exec_ctx; ArgsStruct args; ArgsInit(&args); @@ -289,14 +381,32 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecords) { args.pollset_set, args.lock); gpr_free(whole_uri); grpc_closure on_resolver_result_changed; - GRPC_CLOSURE_INIT(&on_resolver_result_changed, CheckResolverResultLocked, - (void*)&args, grpc_combiner_scheduler(args.lock)); + GRPC_CLOSURE_INIT(&on_resolver_result_changed, OnDoneLocked, (void*)&args, + grpc_combiner_scheduler(args.lock)); resolver->NextLocked(&args.channel_args, &on_resolver_result_changed); grpc_core::ExecCtx::Get()->Flush(); PollPollsetUntilRequestDone(&args); ArgsFinish(&args); } +TEST(ResolverComponentTest, TestResolvesRelevantRecords) { + RunResolvesRelevantRecordsTest(CheckResolverResultLocked); +} + +TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) { + // Start up background stress thread + int dummy_port = grpc_pick_unused_port_or_die(); + gpr_event done_ev; + gpr_event_init(&done_ev); + std::thread socket_stress_thread(OpenAndCloseSocketsStressLoop, dummy_port, + &done_ev); + // Run the resolver test + RunResolvesRelevantRecordsTest(CheckResolvedWithoutErrorLocked); + // Shutdown and join stress thread + gpr_event_set(&done_ev, (void*)1); + socket_stress_thread.join(); +} + } // namespace int main(int argc, char** argv) { |