diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc | 148 |
1 files changed, 59 insertions, 89 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index b604f2bf14..b73e979e9f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -18,9 +18,10 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) #include <ares.h> +#include <string.h> #include <sys/ioctl.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -38,25 +39,23 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver* ev_driver; - /** a closure wrapping on_readable_cb, which should be invoked when the - grpc_fd in this node becomes readable. */ + /** a closure wrapping on_readable_locked, which should be + invoked when the grpc_fd in this node becomes readable. */ grpc_closure read_closure; - /** a closure wrapping on_writable_cb, which should be invoked when the - grpc_fd in this node becomes writable. */ + /** a closure wrapping on_writable_locked, which should be + invoked when the grpc_fd in this node becomes writable. */ grpc_closure write_closure; /** next fd node in the list */ struct fd_node* next; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** the grpc_fd owned by this fd node */ grpc_fd* fd; /** if the readable closure has been registered */ bool readable_registered; /** if the writable closure has been registered */ bool writable_registered; - /** if the fd is being shut down */ - bool shutting_down; + /** if the fd has been shutdown yet from grpc iomgr perspective */ + bool already_shutdown; } fd_node; struct grpc_ares_ev_driver { @@ -67,8 +66,8 @@ struct grpc_ares_ev_driver { /** refcount of the event driver */ gpr_refcount refs; - /** mutex guarding the rest of the state */ - gpr_mu mu; + /** combiner to synchronize c-ares and I/O callbacks on */ + grpc_combiner* combiner; /** a list of grpc_fd that this event driver is currently using. */ fd_node* fds; /** is this event driver currently working? */ @@ -91,44 +90,42 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { if (gpr_unref(&ev_driver->refs)) { gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); GPR_ASSERT(ev_driver->fds == nullptr); - gpr_mu_destroy(&ev_driver->mu); + GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); ares_destroy(ev_driver->channel); gpr_free(ev_driver); } } -static void fd_node_destroy(fd_node* fdn) { +static void fd_node_destroy_locked(fd_node* fdn) { gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); - gpr_mu_destroy(&fdn->mu); - /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up + GPR_ASSERT(fdn->already_shutdown); + /* c-ares library will close the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(fdn->fd, nullptr, nullptr, true /* already_closed */, - "c-ares query finished"); + int dummy_release_fd; + grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished"); gpr_free(fdn); } -static void fd_node_shutdown(fd_node* fdn) { - gpr_mu_lock(&fdn->mu); - fdn->shutting_down = true; - if (!fdn->readable_registered && !fdn->writable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - } else { - grpc_fd_shutdown( - fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown")); - gpr_mu_unlock(&fdn->mu); +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { + if (!fdn->already_shutdown) { + fdn->already_shutdown = true; + grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); } } -grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, - grpc_pollset_set* pollset_set) { +grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, + grpc_pollset_set* pollset_set, + grpc_combiner* combiner) { *ev_driver = static_cast<grpc_ares_ev_driver*>( gpr_malloc(sizeof(grpc_ares_ev_driver))); - int status = ares_init(&(*ev_driver)->channel); - gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create"); + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); + gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); if (status != ARES_SUCCESS) { char* err_msg; gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", @@ -138,7 +135,7 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, gpr_free(*ev_driver); return err; } - gpr_mu_init(&(*ev_driver)->mu); + (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); gpr_ref_init(&(*ev_driver)->refs, 1); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = nullptr; @@ -147,33 +144,26 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, return GRPC_ERROR_NONE; } -void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) { - // It's not safe to shut down remaining fds here directly, becauses - // ares_host_callback does not provide an exec_ctx. We mark the event driver - // as being shut down. If the event driver is working, - // grpc_ares_notify_on_event_locked will shut down the fds; if it's not - // working, there are no fds to shut down. - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { + // We mark the event driver as being shut down. If the event driver + // is working, grpc_ares_notify_on_event_locked will shut down the + // fds; if it's not working, there are no fds to shut down. ev_driver->shutting_down = true; - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) { - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { ev_driver->shutting_down = true; fd_node* fn = ev_driver->fds; while (fn != nullptr) { - grpc_fd_shutdown(fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "grpc_ares_ev_driver_shutdown")); + fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); fn = fn->next; } - gpr_mu_unlock(&ev_driver->mu); } // Search fd in the fd_node list head. This is an O(n) search, the max possible // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node(fd_node** head, int fd) { +static fd_node* pop_fd_node_locked(fd_node** head, int fd) { fd_node dummy_head; dummy_head.next = *head; fd_node* node = &dummy_head; @@ -190,31 +180,22 @@ static fd_node* pop_fd_node(fd_node** head, int fd) { } /* Check if \a fd is still readable */ -static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver, - int fd) { +static bool grpc_ares_is_fd_still_readable_locked( + grpc_ares_ev_driver* ev_driver, int fd) { size_t bytes_available = 0; return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; } -static void on_readable_cb(void* arg, grpc_error* error) { +static void on_readable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->readable_registered = false; - if (fdn->shutting_down && !fdn->writable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - grpc_ares_ev_driver_unref(ev_driver); - return; - } - gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "readable on %d", fd); if (error == GRPC_ERROR_NONE) { do { ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); - } while (grpc_ares_is_fd_still_readable(ev_driver, fd)); + } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd)); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -224,26 +205,15 @@ static void on_readable_cb(void* arg, grpc_error* error) { // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(ev_driver); - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -static void on_writable_cb(void* arg, grpc_error* error) { +static void on_writable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->writable_registered = false; - if (fdn->shutting_down && !fdn->readable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - grpc_ares_ev_driver_unref(ev_driver); - return; - } - gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "writable on %d", fd); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); @@ -256,13 +226,12 @@ static void on_writable_cb(void* arg, grpc_error* error) { // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(ev_driver); - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver) { +ares_channel* grpc_ares_ev_driver_get_channel_locked( + grpc_ares_ev_driver* ev_driver) { return &ev_driver->channel; } @@ -277,29 +246,27 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { if (ARES_GETSOCK_READABLE(socks_bitmask, i) || ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - fd_node* fdn = pop_fd_node(&ev_driver->fds, socks[i]); + fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); // Create a new fd_node if sock[i] is not in the fd_node list. if (fdn == nullptr) { char* fd_name; gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name, false); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; - fdn->shutting_down = false; - gpr_mu_init(&fdn->mu); - GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, - grpc_schedule_on_exec_ctx); + fdn->already_shutdown = false; + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; new_list = fdn; - gpr_mu_lock(&fdn->mu); // Register read_closure if the socket is readable and read_closure has // not been registered with this socket. if (ARES_GETSOCK_READABLE(socks_bitmask, i) && @@ -319,7 +286,6 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } - gpr_mu_unlock(&fdn->mu); } } } @@ -329,7 +295,13 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { while (ev_driver->fds != nullptr) { fd_node* cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - fd_node_shutdown(cur); + fd_node_shutdown_locked(cur, "c-ares fd shutdown"); + if (!cur->readable_registered && !cur->writable_registered) { + fd_node_destroy_locked(cur); + } else { + cur->next = new_list; + new_list = cur; + } } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. @@ -339,13 +311,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { } } -void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { if (!ev_driver->working) { ev_driver->working = true; grpc_ares_notify_on_event_locked(ev_driver); } - gpr_mu_unlock(&ev_driver->mu); } -#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */ +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ |