diff options
author | Yuchen Zeng <zyc@google.com> | 2017-09-18 22:02:52 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2017-09-19 14:23:37 -0700 |
commit | 79c12b9dc02f5558af6868acd2cafdfaa600d89e (patch) | |
tree | 23202d40d812211147b4acee9f848ede82138d55 /src | |
parent | 06576d5bc427b5020b71d797e1211b7581ef61d7 (diff) |
Drain readable fd
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 7f1f57259a..d38fe66d06 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -20,6 +20,7 @@ #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) #include <ares.h> +#include <sys/ioctl.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -54,6 +55,8 @@ typedef struct fd_node { bool readable_registered; /** if the writable closure has been registered */ bool writable_registered; + /** if the fd is being shut down */ + bool shutting_down; } fd_node; struct grpc_ares_ev_driver { @@ -100,7 +103,6 @@ static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); gpr_mu_destroy(&fdn->mu); - grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->fd); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ @@ -109,6 +111,20 @@ static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { gpr_free(fdn); } +static void fd_node_shutdown(grpc_exec_ctx *exec_ctx, fd_node *fdn) { + gpr_mu_lock(&fdn->mu); + fdn->shutting_down = true; + if (!fdn->readable_registered && !fdn->writable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + } else { + grpc_fd_shutdown( + exec_ctx, fdn->fd, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown")); + gpr_mu_unlock(&fdn->mu); + } +} + grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set) { *ev_driver = (grpc_ares_ev_driver *)gpr_malloc(sizeof(grpc_ares_ev_driver)); @@ -175,18 +191,34 @@ static fd_node *pop_fd_node(fd_node **head, int fd) { return NULL; } +/* Check if \a fd is still readable */ +static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver *ev_driver, + int fd) { + size_t bytes_available = 0; + return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; +} + static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { fd_node *fdn = (fd_node *)arg; grpc_ares_ev_driver *ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); fdn->readable_registered = false; + if (fdn->shutting_down && !fdn->writable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + grpc_ares_ev_driver_unref(ev_driver); + return; + } gpr_mu_unlock(&fdn->mu); gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->fd)); if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->fd), - ARES_SOCKET_BAD); + do { + ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->fd), + ARES_SOCKET_BAD); + } while ( + grpc_ares_is_fd_still_readable(ev_driver, grpc_fd_wrapped_fd(fdn->fd))); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -208,6 +240,12 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_ares_ev_driver *ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); fdn->writable_registered = false; + if (fdn->shutting_down && !fdn->readable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + grpc_ares_ev_driver_unref(ev_driver); + return; + } gpr_mu_unlock(&fdn->mu); gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->fd)); @@ -256,6 +294,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; + fdn->shutting_down = false; gpr_mu_init(&fdn->mu); GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, grpc_schedule_on_exec_ctx); @@ -296,7 +335,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, while (ev_driver->fds != NULL) { fd_node *cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - fd_node_destroy(exec_ctx, cur); + fd_node_shutdown(exec_ctx, cur); } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. |