diff options
author | 2018-06-15 15:41:58 -0700 | |
---|---|---|
committer | 2018-06-15 15:41:58 -0700 | |
commit | 8a7f6bf5cb9cb07cef24c29a6a11f6fc3dcd176d (patch) | |
tree | 64b9054d6f717db3c22796a7a591a19170f9fd6d /src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc | |
parent | bd6ac0f9d57dda96797bbd5b1bd27815342e7ee5 (diff) | |
parent | 7eda61937e5b042882f70f9f8078c94edd7fc01c (diff) |
Merge pull request #15780 from apolcyn/cares_windows_build
Separate out the posix part of the c-ares driver
Diffstat (limited to 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc new file mode 100644 index 0000000000..06a6e853f5 --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -0,0 +1,311 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) + +#include <ares.h> +#include <string.h> +#include <sys/ioctl.h> + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" + +typedef struct fd_node { + /** the owner of this fd node */ + grpc_ares_ev_driver* ev_driver; + /** a closure wrapping on_readable_locked, which should be + invoked when the grpc_fd in this node becomes readable. */ + grpc_closure read_closure; + /** a closure wrapping on_writable_locked, which should be + invoked when the grpc_fd in this node becomes writable. */ + grpc_closure write_closure; + /** next fd node in the list */ + struct fd_node* next; + + /** wrapped fd that's polled by grpc's poller for the current platform */ + grpc_core::GrpcPolledFd* grpc_polled_fd; + /** if the readable closure has been registered */ + bool readable_registered; + /** if the writable closure has been registered */ + bool writable_registered; + /** if the fd has been shutdown yet from grpc iomgr perspective */ + bool already_shutdown; +} fd_node; + +struct grpc_ares_ev_driver { + /** the ares_channel owned by this event driver */ + ares_channel channel; + /** pollset set for driving the IO events of the channel */ + grpc_pollset_set* pollset_set; + /** refcount of the event driver */ + gpr_refcount refs; + + /** combiner to synchronize c-ares and I/O callbacks on */ + grpc_combiner* combiner; + /** a list of grpc_fd that this event driver is currently using. */ + fd_node* fds; + /** is this event driver currently working? */ + bool working; + /** is this event driver being shut down */ + bool shutting_down; +}; + +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); + +static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( + grpc_ares_ev_driver* ev_driver) { + gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + gpr_ref(&ev_driver->refs); + return ev_driver; +} + +static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { + gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + if (gpr_unref(&ev_driver->refs)) { + gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GPR_ASSERT(ev_driver->fds == nullptr); + GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); + ares_destroy(ev_driver->channel); + gpr_free(ev_driver); + } +} + +static void fd_node_destroy_locked(fd_node* fdn) { + gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName()); + GPR_ASSERT(!fdn->readable_registered); + GPR_ASSERT(!fdn->writable_registered); + GPR_ASSERT(fdn->already_shutdown); + grpc_core::Delete(fdn->grpc_polled_fd); + gpr_free(fdn); +} + +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { + if (!fdn->already_shutdown) { + fdn->already_shutdown = true; + fdn->grpc_polled_fd->ShutdownLocked( + GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); + } +} + +grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, + grpc_pollset_set* pollset_set, + grpc_combiner* combiner) { + *ev_driver = static_cast<grpc_ares_ev_driver*>( + gpr_malloc(sizeof(grpc_ares_ev_driver))); + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); + grpc_core::ConfigureAresChannelLocked(&(*ev_driver)->channel); + gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); + if (status != ARES_SUCCESS) { + char* err_msg; + gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", + ares_strerror(status)); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg); + gpr_free(err_msg); + gpr_free(*ev_driver); + return err; + } + (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); + gpr_ref_init(&(*ev_driver)->refs, 1); + (*ev_driver)->pollset_set = pollset_set; + (*ev_driver)->fds = nullptr; + (*ev_driver)->working = false; + (*ev_driver)->shutting_down = false; + return GRPC_ERROR_NONE; +} + +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { + // We mark the event driver as being shut down. If the event driver + // is working, grpc_ares_notify_on_event_locked will shut down the + // fds; if it's not working, there are no fds to shut down. + ev_driver->shutting_down = true; + grpc_ares_ev_driver_unref(ev_driver); +} + +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { + ev_driver->shutting_down = true; + fd_node* fn = ev_driver->fds; + while (fn != nullptr) { + fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); + fn = fn->next; + } +} + +// Search fd in the fd_node list head. This is an O(n) search, the max possible +// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. +static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { + fd_node dummy_head; + dummy_head.next = *head; + fd_node* node = &dummy_head; + while (node->next != nullptr) { + if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) { + fd_node* ret = node->next; + node->next = node->next->next; + *head = dummy_head.next; + return ret; + } + node = node->next; + } + return nullptr; +} + +static void on_readable_locked(void* arg, grpc_error* error) { + fd_node* fdn = static_cast<fd_node*>(arg); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->readable_registered = false; + gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + do { + ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); + } while (fdn->grpc_polled_fd->IsFdStillReadableLocked()); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); +} + +static void on_writable_locked(void* arg, grpc_error* error) { + fd_node* fdn = static_cast<fd_node*>(arg); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->writable_registered = false; + gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); +} + +ares_channel* grpc_ares_ev_driver_get_channel_locked( + grpc_ares_ev_driver* ev_driver) { + return &ev_driver->channel; +} + +// Get the file descriptors used by the ev_driver's ares channel, register +// driver_closure with these filedescriptors. +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { + fd_node* new_list = nullptr; + if (!ev_driver->shutting_down) { + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int socks_bitmask = + ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); + for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { + if (ARES_GETSOCK_READABLE(socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); + // Create a new fd_node if sock[i] is not in the fd_node list. + if (fdn == nullptr) { + fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); + fdn->grpc_polled_fd = grpc_core::NewGrpcPolledFdLocked( + socks[i], ev_driver->pollset_set); + gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName()); + fdn->ev_driver = ev_driver; + fdn->readable_registered = false; + fdn->writable_registered = false; + fdn->already_shutdown = false; + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + } + fdn->next = new_list; + new_list = fdn; + // Register read_closure if the socket is readable and read_closure has + // not been registered with this socket. + if (ARES_GETSOCK_READABLE(socks_bitmask, i) && + !fdn->readable_registered) { + grpc_ares_ev_driver_ref(ev_driver); + gpr_log(GPR_DEBUG, "notify read on: %s", + fdn->grpc_polled_fd->GetName()); + fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure); + fdn->readable_registered = true; + } + // Register write_closure if the socket is writable and write_closure + // has not been registered with this socket. + if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && + !fdn->writable_registered) { + gpr_log(GPR_DEBUG, "notify write on: %s", + fdn->grpc_polled_fd->GetName()); + grpc_ares_ev_driver_ref(ev_driver); + fdn->grpc_polled_fd->RegisterForOnWriteableLocked( + &fdn->write_closure); + fdn->writable_registered = true; + } + } + } + } + // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and + // are therefore no longer in use, so they can be shut down and removed from + // the list. + while (ev_driver->fds != nullptr) { + fd_node* cur = ev_driver->fds; + ev_driver->fds = ev_driver->fds->next; + fd_node_shutdown_locked(cur, "c-ares fd shutdown"); + if (!cur->readable_registered && !cur->writable_registered) { + fd_node_destroy_locked(cur); + } else { + cur->next = new_list; + new_list = cur; + } + } + ev_driver->fds = new_list; + // If the ev driver has no working fd, all the tasks are done. + if (new_list == nullptr) { + ev_driver->working = false; + gpr_log(GPR_DEBUG, "ev driver stop working"); + } +} + +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { + if (!ev_driver->working) { + ev_driver->working = true; + grpc_ares_notify_on_event_locked(ev_driver); + } +} + +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ |