diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/resolver/dns')
3 files changed, 81 insertions, 43 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index b87a3b7082..9bb229ad95 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -204,7 +204,7 @@ static char *choose_service_config(char *service_config_choice_json) { int random_pct = rand() % 100; int percentage; if (sscanf(field->value, "%d", &percentage) != 1 || - random_pct > percentage) { + random_pct > percentage || percentage == 0) { service_config_json = NULL; break; } @@ -249,7 +249,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, service_config_string); args_to_remove[num_args_to_remove++] = GRPC_ARG_SERVICE_CONFIG; new_args[num_args_to_add++] = grpc_channel_arg_string_create( - GRPC_ARG_SERVICE_CONFIG, service_config_string); + (char *)GRPC_ARG_SERVICE_CONFIG, service_config_string); service_config = grpc_service_config_create(service_config_string); if (service_config != NULL) { const char *lb_policy_name = @@ -257,7 +257,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, if (lb_policy_name != NULL) { args_to_remove[num_args_to_remove++] = GRPC_ARG_LB_POLICY_NAME; new_args[num_args_to_add++] = grpc_channel_arg_string_create( - GRPC_ARG_LB_POLICY_NAME, (char *)lb_policy_name); + (char *)GRPC_ARG_LB_POLICY_NAME, (char *)lb_policy_name); } } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 9747d39a16..c30cc93b6f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -20,6 +20,7 @@ #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) #include <ares.h> +#include <sys/ioctl.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -37,8 +38,6 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver *ev_driver; - /** the grpc_fd owned by this fd node */ - grpc_fd *grpc_fd; /** a closure wrapping on_readable_cb, which should be invoked when the grpc_fd in this node becomes readable. */ grpc_closure read_closure; @@ -50,10 +49,14 @@ typedef struct fd_node { /** mutex guarding the rest of the state */ gpr_mu mu; + /** the grpc_fd owned by this fd node */ + grpc_fd *fd; /** if the readable closure has been registered */ bool readable_registered; /** if the writable closure has been registered */ bool writable_registered; + /** if the fd is being shut down */ + bool shutting_down; } fd_node; struct grpc_ares_ev_driver { @@ -96,19 +99,31 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) { } static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { - gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); gpr_mu_destroy(&fdn->mu); - grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */, + grpc_fd_orphan(exec_ctx, fdn->fd, NULL, NULL, true /* already_closed */, "c-ares query finished"); gpr_free(fdn); } +static void fd_node_shutdown(grpc_exec_ctx *exec_ctx, fd_node *fdn) { + gpr_mu_lock(&fdn->mu); + fdn->shutting_down = true; + if (!fdn->readable_registered && !fdn->writable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + } else { + grpc_fd_shutdown(exec_ctx, fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "c-ares fd shutdown")); + gpr_mu_unlock(&fdn->mu); + } +} + grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set) { *ev_driver = (grpc_ares_ev_driver *)gpr_malloc(sizeof(grpc_ares_ev_driver)); @@ -150,9 +165,8 @@ void grpc_ares_ev_driver_shutdown(grpc_exec_ctx *exec_ctx, ev_driver->shutting_down = true; fd_node *fn = ev_driver->fds; while (fn != NULL) { - grpc_fd_shutdown( - exec_ctx, fn->grpc_fd, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown")); + grpc_fd_shutdown(exec_ctx, fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "grpc_ares_ev_driver_shutdown")); fn = fn->next; } gpr_mu_unlock(&ev_driver->mu); @@ -165,7 +179,7 @@ static fd_node *pop_fd_node(fd_node **head, int fd) { dummy_head.next = *head; fd_node *node = &dummy_head; while (node->next != NULL) { - if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) { + if (grpc_fd_wrapped_fd(node->next->fd) == fd) { fd_node *ret = node->next; node->next = node->next->next; *head = dummy_head.next; @@ -176,18 +190,33 @@ static fd_node *pop_fd_node(fd_node **head, int fd) { return NULL; } +/* Check if \a fd is still readable */ +static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver *ev_driver, + int fd) { + size_t bytes_available = 0; + return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; +} + static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { fd_node *fdn = (fd_node *)arg; grpc_ares_ev_driver *ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); + const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->readable_registered = false; + if (fdn->shutting_down && !fdn->writable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + grpc_ares_ev_driver_unref(ev_driver); + return; + } gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "readable on %d", fd); if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd), - ARES_SOCKET_BAD); + do { + ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); + } while (grpc_ares_is_fd_still_readable(ev_driver, fd)); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -208,13 +237,19 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, fd_node *fdn = (fd_node *)arg; grpc_ares_ev_driver *ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); + const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->writable_registered = false; + if (fdn->shutting_down && !fdn->readable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + grpc_ares_ev_driver_unref(ev_driver); + return; + } gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "writable on %d", fd); if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, - grpc_fd_wrapped_fd(fdn->grpc_fd)); + ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -253,17 +288,17 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = (fd_node *)gpr_malloc(sizeof(fd_node)); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; + fdn->shutting_down = false; gpr_mu_init(&fdn->mu); GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_schedule_on_exec_ctx); - grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, - fdn->grpc_fd); + grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; @@ -274,9 +309,8 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_READABLE(socks_bitmask, i) && !fdn->readable_registered) { grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); - grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); + gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd)); + grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure); fdn->readable_registered = true; } // Register write_closure if the socket is writable and write_closure @@ -284,9 +318,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { gpr_log(GPR_DEBUG, "notify write on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_wrapped_fd(fdn->fd)); grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); + grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } gpr_mu_unlock(&fdn->mu); @@ -299,7 +333,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, while (ev_driver->fds != NULL) { fd_node *cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - fd_node_destroy(exec_ctx, cur); + fd_node_shutdown(exec_ctx, cur); } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 0d71f3560e..04379975e1 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -123,8 +123,8 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx, static grpc_ares_hostbyname_request *create_hostbyname_request( grpc_ares_request *parent_request, char *host, uint16_t port, bool is_balancer) { - grpc_ares_hostbyname_request *hr = - gpr_zalloc(sizeof(grpc_ares_hostbyname_request)); + grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)gpr_zalloc( + sizeof(grpc_ares_hostbyname_request)); hr->parent_request = parent_request; hr->host = gpr_strdup(host); hr->port = port; @@ -174,7 +174,7 @@ static void on_hostbyname_done_cb(void *arg, int status, int timeouts, grpc_lb_addresses_set_address( *lb_addresses, i, &addr, addr_len, hr->is_balancer /* is_balancer */, - hr->is_balancer ? strdup(hr->host) : NULL /* balancer_name */, + hr->is_balancer ? hr->host : NULL /* balancer_name */, NULL /* user_data */); char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN); @@ -195,7 +195,7 @@ static void on_hostbyname_done_cb(void *arg, int status, int timeouts, grpc_lb_addresses_set_address( *lb_addresses, i, &addr, addr_len, hr->is_balancer /* is_balancer */, - hr->is_balancer ? strdup(hr->host) : NULL /* balancer_name */, + hr->is_balancer ? hr->host : NULL /* balancer_name */, NULL /* user_data */); char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN); @@ -275,14 +275,15 @@ static void on_txt_done_cb(void *arg, int status, int timeouts, gpr_log(GPR_DEBUG, "on_txt_done_cb"); char *error_msg; grpc_ares_request *r = (grpc_ares_request *)arg; + const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; + struct ares_txt_ext *result = NULL; + struct ares_txt_ext *reply = NULL; + grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&r->mu); if (status != ARES_SUCCESS) goto fail; - struct ares_txt_ext *reply = NULL; status = ares_parse_txt_reply_ext(buf, len, &reply); if (status != ARES_SUCCESS) goto fail; // Find service config in TXT record. - const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; - struct ares_txt_ext *result; for (result = reply; result != NULL; result = result->next) { if (result->record_start && memcmp(result->txt, g_service_config_attribute_prefix, prefix_len) == @@ -313,7 +314,7 @@ static void on_txt_done_cb(void *arg, int status, int timeouts, fail: gpr_asprintf(&error_msg, "C-ares TXT lookup status is not ARES_SUCCESS: %s", ares_strerror(status)); - grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg); gpr_free(error_msg); if (r->error == GRPC_ERROR_NONE) { r->error = error; @@ -331,6 +332,9 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( grpc_closure *on_done, grpc_lb_addresses **addrs, bool check_grpclb, char **service_config_json) { grpc_error *error = GRPC_ERROR_NONE; + grpc_ares_hostbyname_request *hr = NULL; + grpc_ares_request *r = NULL; + ares_channel *channel = NULL; /* TODO(zyc): Enable tracing after #9603 is checked in */ /* if (grpc_dns_trace) { gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s", @@ -360,8 +364,7 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); if (error != GRPC_ERROR_NONE) goto error_cleanup; - grpc_ares_request *r = - (grpc_ares_request *)gpr_zalloc(sizeof(grpc_ares_request)); + r = (grpc_ares_request *)gpr_zalloc(sizeof(grpc_ares_request)); gpr_mu_init(&r->mu); r->ev_driver = ev_driver; r->on_done = on_done; @@ -369,7 +372,7 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( r->service_config_json_out = service_config_json; r->success = false; r->error = GRPC_ERROR_NONE; - ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + channel = grpc_ares_ev_driver_get_channel(r->ev_driver); // If dns_server is specified, use it. if (dns_server != NULL) { @@ -410,12 +413,12 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( } gpr_ref_init(&r->pending_queries, 1); if (grpc_ipv6_loopback_available()) { - grpc_ares_hostbyname_request *hr = create_hostbyname_request( - r, host, strhtons(port), false /* is_balancer */); + hr = create_hostbyname_request(r, host, strhtons(port), + false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); } - grpc_ares_hostbyname_request *hr = create_hostbyname_request( - r, host, strhtons(port), false /* is_balancer */); + hr = create_hostbyname_request(r, host, strhtons(port), + false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); if (check_grpclb) { /* Query the SRV record */ @@ -527,7 +530,8 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_resolved_addresses **addrs) { grpc_resolve_address_ares_request *r = - gpr_zalloc(sizeof(grpc_resolve_address_ares_request)); + (grpc_resolve_address_ares_request *)gpr_zalloc( + sizeof(grpc_resolve_address_ares_request)); r->addrs_out = addrs; r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, |