diff options
6 files changed, 103 insertions, 60 deletions
diff --git a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c index a7346540b0..fa32ba2ae7 100644 --- a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c @@ -64,6 +64,8 @@ typedef struct { grpc_client_channel_factory *client_channel_factory; /** load balancing policy name */ char *lb_policy_name; + /** polling entity for driving the async IO events */ + grpc_polling_entity *pollent; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -88,7 +90,6 @@ typedef struct { /** currently resolving addresses */ grpc_resolved_addresses *addresses; - grpc_polling_entity *pollent; } dns_resolver; static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h index 8ec76f5a1b..ec9c631c38 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -44,13 +44,24 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; -void grpc_ares_notify_on_event(grpc_exec_ctx *exec_ctx, +/* Strat \a ev_driver. It will keep working until all IO on its ares_channel is + done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks + bound to its ares_channel when necessary. */ +void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx, grpc_ares_ev_driver *ev_driver); +/* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to + \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the + query. */ ares_channel *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver); +/* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is + created successfully. */ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set); + +/* Destroys \a ev_driver asynchronously. The actual cleanup happens after + grpc_ares_ev_driver_start() is called, or \ev_driver is already working. */ void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver); #endif /* GRPC_NATIVE_ADDRESS_RESOLVE */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index dbc105f0cd..c4070e402e 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -51,25 +51,38 @@ #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" -typedef struct fd_pair { +typedef struct fd_node { grpc_fd *grpc_fd; - int fd; - struct fd_pair *next; -} fd_pair; + struct fd_node *next; +} fd_node; struct grpc_ares_ev_driver { + /** the ares_channel owned by this event driver */ + ares_channel channel; + /** a closure wrapping the driver_cb, which should be invoked each time the ev + driver gets notified by fds. */ + grpc_closure driver_closure; + /** pollset set for driving the IO events of the channel */ + grpc_pollset_set *pollset_set; + + /** mutex guarding the reset of the state */ gpr_mu mu; + /** has grpc_ares_ev_driver_destroy been called on this event driver? */ bool closing; + /** is this event driver currently working? */ + bool working; + /** an array of ares sockets that the ares channel owned by this event driver + is currently using */ ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - int bitmask; - grpc_closure driver_closure; - grpc_pollset_set *pollset_set; - ares_channel channel; - fd_pair *fds; + /** a bitmask that can tell if an ares socket in the socks array is readable + or/and writable */ + int socks_bitmask; + /** a list of grpc_fd that this event driver is currently using. */ + fd_node *fds; }; -static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, - grpc_ares_ev_driver *ev_driver); +static void grpc_ares_notify_on_event(grpc_exec_ctx *exec_ctx, + grpc_ares_ev_driver *ev_driver); grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set) { @@ -91,21 +104,24 @@ void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver) { ev_driver->closing = true; } -static fd_pair *get_fd(fd_pair **head, int fd) { - fd_pair dummy_head; - fd_pair *node; - fd_pair *ret; +static fd_node *get_fd(fd_node **head, int fd) { + fd_node dummy_head; + fd_node *node; + fd_node *ret; dummy_head.next = *head; node = &dummy_head; while (node->next != NULL) { - if (node->next->fd == fd) { + if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) { + gpr_log(GPR_ERROR, "equal"); ret = node->next; node->next = node->next->next; *head = dummy_head.next; return ret; } + // node = node->next; } + gpr_log(GPR_ERROR, "not equal"); return NULL; } @@ -113,65 +129,57 @@ static void driver_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_ares_ev_driver *d = arg; size_t i; - gpr_mu_lock(&d->mu); if (error == GRPC_ERROR_NONE) { for (i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - ares_process_fd( - d->channel, - ARES_GETSOCK_READABLE(d->bitmask, i) ? d->socks[i] : ARES_SOCKET_BAD, - ARES_GETSOCK_WRITABLE(d->bitmask, i) ? d->socks[i] : ARES_SOCKET_BAD); + ares_process_fd(d->channel, ARES_GETSOCK_READABLE(d->socks_bitmask, i) + ? d->socks[i] + : ARES_SOCKET_BAD, + ARES_GETSOCK_WRITABLE(d->socks_bitmask, i) + ? d->socks[i] + : ARES_SOCKET_BAD); } } else { ares_cancel(d->channel); } - grpc_ares_notify_on_event_locked(exec_ctx, d); - if (d->closing) { - ares_destroy(d->channel); - gpr_mu_unlock(&d->mu); - gpr_free(d); - return; - } - gpr_mu_unlock(&d->mu); + grpc_ares_notify_on_event(exec_ctx, d); } ares_channel *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) { return &ev_driver->channel; } -static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, - grpc_ares_ev_driver *ev_driver) { +static void grpc_ares_notify_on_event(grpc_exec_ctx *exec_ctx, + grpc_ares_ev_driver *ev_driver) { size_t i; - fd_pair *new_list = NULL; + fd_node *new_list = NULL; if (!ev_driver->closing) { - ev_driver->bitmask = + ev_driver->socks_bitmask = ares_getsock(ev_driver->channel, ev_driver->socks, ARES_GETSOCK_MAXNUM); grpc_closure_init(&ev_driver->driver_closure, driver_cb, ev_driver); for (i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - - if (ARES_GETSOCK_READABLE(ev_driver->bitmask, i) || - ARES_GETSOCK_WRITABLE(ev_driver->bitmask, i)) { - fd_pair *fdp = get_fd(&ev_driver->fds, ev_driver->socks[i]); - if (!fdp) { + if (ARES_GETSOCK_READABLE(ev_driver->socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(ev_driver->socks_bitmask, i)) { + fd_node *fdn = get_fd(&ev_driver->fds, ev_driver->socks[i]); + if (!fdn) { char *fd_name; gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); - fdp = gpr_malloc(sizeof(fd_pair)); - fdp->grpc_fd = grpc_fd_create(ev_driver->socks[i], fd_name); - fdp->fd = ev_driver->socks[i]; + fdn = gpr_malloc(sizeof(fd_node)); + fdn->grpc_fd = grpc_fd_create(ev_driver->socks[i], fd_name); grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, - fdp->grpc_fd); + fdn->grpc_fd); gpr_free(fd_name); } - fdp->next = new_list; - new_list = fdp; + fdn->next = new_list; + new_list = fdn; - if (ARES_GETSOCK_READABLE(ev_driver->bitmask, i)) { - grpc_fd_notify_on_read(exec_ctx, fdp->grpc_fd, + if (ARES_GETSOCK_READABLE(ev_driver->socks_bitmask, i)) { + grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &ev_driver->driver_closure); } - if (ARES_GETSOCK_WRITABLE(ev_driver->bitmask, i)) { - grpc_fd_notify_on_write(exec_ctx, fdp->grpc_fd, + if (ARES_GETSOCK_WRITABLE(ev_driver->socks_bitmask, i)) { + grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &ev_driver->driver_closure); } } @@ -179,7 +187,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, } while (ev_driver->fds != NULL) { - fd_pair *cur; + fd_node *cur; cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; @@ -190,19 +198,31 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, } ev_driver->fds = new_list; + // If the ev driver has no working fd, all the tasks are done. + if (!new_list) { + gpr_mu_lock(&ev_driver->mu); + ev_driver->working = false; + gpr_mu_unlock(&ev_driver->mu); + } + + if (ev_driver->closing) { + ares_destroy(ev_driver->channel); + gpr_free(ev_driver); + return; + } } -void grpc_ares_notify_on_event(grpc_exec_ctx *exec_ctx, +void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx, grpc_ares_ev_driver *ev_driver) { gpr_mu_lock(&ev_driver->mu); - grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); - if (ev_driver->closing) { - ares_destroy(ev_driver->channel); + if (ev_driver->working) { gpr_mu_unlock(&ev_driver->mu); - gpr_free(ev_driver); return; + } else { + ev_driver->working = true; } gpr_mu_unlock(&ev_driver->mu); + grpc_ares_notify_on_event(exec_ctx, ev_driver); } #endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c index 71acf12b83..93358416ce 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -187,7 +187,7 @@ static void request_resolving_address(grpc_exec_ctx *exec_ctx, void *arg, ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r); } ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r); - grpc_ares_notify_on_event(exec_ctx, ev_driver); + grpc_ares_ev_driver_start(exec_ctx, ev_driver); } static int try_fake_resolve(const char *name, const char *port, @@ -307,6 +307,6 @@ void grpc_ares_cleanup(void) { gpr_mu_unlock(&g_init_mu); } -int grpc_ares_need_poll_entity(void) { return 1; } +bool grpc_ares_need_poll_entity(void) { return true; } #endif /* GRPC_NATIVE_ADDRESS_RESOLVE */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h index 8fe3376aed..d317dee083 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -41,6 +41,9 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" +/* Asynchronously resolve addr. Use default_port if a port isn't designated + in addr, otherwise use the port in addr. grpc_ares_init() must be called + at least once before this function . */ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, const char *addr, const char *default_port, @@ -48,11 +51,19 @@ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_resolved_addresses **addresses); +/* Initialize gRPC ares wrapper. Must be called at least once before + grpc_resolve_address_ares(). */ grpc_error *grpc_ares_init(void); +/* Uninitialized gRPC ares wrapper. If there was more than one previous call to + grpc_ares_init(), this function uninitializes the gRPC ares wrapper only if + it is the call matching the call to grpc_ares_init() which initialized the + wrapper. */ void grpc_ares_cleanup(void); +/* Returns true if the gRPC ares wrapper implementation needs a polling entity, + false otherwise. */ /* TODO(zyc): remove this temporary hack after we can build c-ares on windows */ -int grpc_ares_need_poll_entity(void); +bool grpc_ares_need_poll_entity(void); #endif /* GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c index 77c5044dae..1b008c312b 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c @@ -55,6 +55,6 @@ grpc_error *grpc_ares_init(void) { return GRPC_ERROR_NONE; } void grpc_ares_cleanup(void) {} -int grpc_ares_need_poll_entity(void) { return 0; } +bool grpc_ares_need_poll_entity(void) { return false; } #endif /* GRPC_NATIVE_ADDRESS_RESOLVE */ |