diff options
author | Alexander Polcyn <apolcyn@google.com> | 2018-05-20 02:36:27 -0700 |
---|---|---|
committer | Alexander Polcyn <apolcyn@google.com> | 2018-06-13 10:40:53 -0700 |
commit | 626cea8f734313e7ac7188e877d9a8fbfccc67c7 (patch) | |
tree | 5b1b52d0b148c017ac1a862e80255da697afa430 /src | |
parent | 9b52965ba4157d098bb951c892e18a1361917200 (diff) |
Put c-ares queries under a combiner
Diffstat (limited to 'src')
6 files changed, 129 insertions, 134 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 3c40ae14b8..f4f6444c5f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -414,10 +414,10 @@ void AresDnsResolver::StartResolvingLocked() { resolving_ = true; lb_addresses_ = nullptr; service_config_json_ = nullptr; - pending_request_ = grpc_dns_lookup_ares( + pending_request_ = grpc_dns_lookup_ares_locked( dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, &on_resolved_, &lb_addresses_, true /* check_grpclb */, - request_service_config_ ? &service_config_json_ : nullptr); + request_service_config_ ? &service_config_json_ : nullptr, combiner()); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 6239549534..b58a74a4e5 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -29,7 +29,7 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; /* Start \a ev_driver. It will keep working until all IO on its ares_channel is done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks bound to its ares_channel when necessary. */ -void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver); /* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the @@ -39,15 +39,16 @@ ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver); /* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is created successfully. */ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, - grpc_pollset_set* pollset_set); + grpc_pollset_set* pollset_set, + grpc_combiner* combiner); /* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver will be cancelled and their on_done callbacks will be invoked with a status of ARES_ECANCELLED. */ -void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver); /* Shutdown all the grpc_fds used by \a ev_driver */ -void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index f496e9694d..84f90ecde7 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -39,17 +39,15 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver* ev_driver; - /** a closure wrapping on_readable_cb, which should be invoked when the - grpc_fd in this node becomes readable. */ + /** a closure wrapping on_readable_locked, which should be + invoked when the grpc_fd in this node becomes readable. */ grpc_closure read_closure; - /** a closure wrapping on_writable_cb, which should be invoked when the - grpc_fd in this node becomes writable. */ + /** a closure wrapping on_writable_locked, which should be + invoked when the grpc_fd in this node becomes writable. */ grpc_closure write_closure; /** next fd node in the list */ struct fd_node* next; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** the grpc_fd owned by this fd node */ grpc_fd* fd; /** if the readable closure has been registered */ @@ -68,8 +66,8 @@ struct grpc_ares_ev_driver { /** refcount of the event driver */ gpr_refcount refs; - /** mutex guarding the rest of the state */ - gpr_mu mu; + /** combiner to synchronize c-ares and I/O callbacks on */ + grpc_combiner* combiner; /** a list of grpc_fd that this event driver is currently using. */ fd_node* fds; /** is this event driver currently working? */ @@ -92,19 +90,18 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { if (gpr_unref(&ev_driver->refs)) { gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); GPR_ASSERT(ev_driver->fds == nullptr); - gpr_mu_destroy(&ev_driver->mu); + GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); ares_destroy(ev_driver->channel); gpr_free(ev_driver); } } -static void fd_node_destroy(fd_node* fdn) { +static void fd_node_destroy_locked(fd_node* fdn) { gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(fdn->already_shutdown); - gpr_mu_destroy(&fdn->mu); - /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up + /* c-ares library will close the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ int dummy_release_fd; @@ -120,7 +117,8 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { } grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, - grpc_pollset_set* pollset_set) { + grpc_pollset_set* pollset_set, + grpc_combiner* combiner) { *ev_driver = static_cast<grpc_ares_ev_driver*>( gpr_malloc(sizeof(grpc_ares_ev_driver))); ares_options opts; @@ -137,7 +135,7 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, gpr_free(*ev_driver); return err; } - gpr_mu_init(&(*ev_driver)->mu); + (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); gpr_ref_init(&(*ev_driver)->refs, 1); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = nullptr; @@ -146,34 +144,26 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, return GRPC_ERROR_NONE; } -void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) { - // It's not safe to shut down remaining fds here directly, becauses - // ares_host_callback does not provide an exec_ctx. We mark the event driver - // as being shut down. If the event driver is working, - // grpc_ares_notify_on_event_locked will shut down the fds; if it's not - // working, there are no fds to shut down. - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { + // We mark the event driver as being shut down. If the event driver + // is working, grpc_ares_notify_on_event_locked will shut down the + // fds; if it's not working, there are no fds to shut down. ev_driver->shutting_down = true; - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) { - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { ev_driver->shutting_down = true; fd_node* fn = ev_driver->fds; while (fn != nullptr) { - gpr_mu_lock(&fn->mu); fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); - gpr_mu_unlock(&fn->mu); fn = fn->next; } - gpr_mu_unlock(&ev_driver->mu); } // Search fd in the fd_node list head. This is an O(n) search, the max possible // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node(fd_node** head, int fd) { +static fd_node* pop_fd_node_locked(fd_node** head, int fd) { fd_node dummy_head; dummy_head.next = *head; fd_node* node = &dummy_head; @@ -190,24 +180,22 @@ static fd_node* pop_fd_node(fd_node** head, int fd) { } /* Check if \a fd is still readable */ -static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver, - int fd) { +static bool grpc_ares_is_fd_still_readable_locked( + grpc_ares_ev_driver* ev_driver, int fd) { size_t bytes_available = 0; return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; } -static void on_readable_cb(void* arg, grpc_error* error) { +static void on_readable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->readable_registered = false; - gpr_mu_unlock(&fdn->mu); gpr_log(GPR_DEBUG, "readable on %d", fd); if (error == GRPC_ERROR_NONE) { do { ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); - } while (grpc_ares_is_fd_still_readable(ev_driver, fd)); + } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd)); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -217,19 +205,15 @@ static void on_readable_cb(void* arg, grpc_error* error) { // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(ev_driver); - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -static void on_writable_cb(void* arg, grpc_error* error) { +static void on_writable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->writable_registered = false; - gpr_mu_unlock(&fdn->mu); gpr_log(GPR_DEBUG, "writable on %d", fd); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); @@ -242,9 +226,7 @@ static void on_writable_cb(void* arg, grpc_error* error) { // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(ev_driver); - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } @@ -263,7 +245,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { if (ARES_GETSOCK_READABLE(socks_bitmask, i) || ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - fd_node* fdn = pop_fd_node(&ev_driver->fds, socks[i]); + fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); // Create a new fd_node if sock[i] is not in the fd_node list. if (fdn == nullptr) { char* fd_name; @@ -275,17 +257,15 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { fdn->readable_registered = false; fdn->writable_registered = false; fdn->already_shutdown = false; - gpr_mu_init(&fdn->mu); - GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; new_list = fdn; - gpr_mu_lock(&fdn->mu); // Register read_closure if the socket is readable and read_closure has // not been registered with this socket. if (ARES_GETSOCK_READABLE(socks_bitmask, i) && @@ -305,7 +285,6 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } - gpr_mu_unlock(&fdn->mu); } } } @@ -315,15 +294,12 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { while (ev_driver->fds != nullptr) { fd_node* cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - gpr_mu_lock(&cur->mu); fd_node_shutdown_locked(cur, "c-ares fd shutdown"); if (!cur->readable_registered && !cur->writable_registered) { - gpr_mu_unlock(&cur->mu); - fd_node_destroy(cur); + fd_node_destroy_locked(cur); } else { cur->next = new_list; new_list = cur; - gpr_mu_unlock(&cur->mu); } } ev_driver->fds = new_list; @@ -334,13 +310,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { } } -void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { if (!ev_driver->working) { ev_driver->working = true; grpc_ares_notify_on_event_locked(ev_driver); } - gpr_mu_unlock(&ev_driver->mu); } #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 18d0a7b9f6..73d3d38044 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -65,8 +65,6 @@ struct grpc_ares_request { /** number of ongoing queries */ gpr_refcount pending_queries; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** is there at least one successful query, set in on_done_cb */ bool success; /** the errors explaining the request failure, set in on_done_cb */ @@ -74,7 +72,8 @@ struct grpc_ares_request { }; typedef struct grpc_ares_hostbyname_request { - /** following members are set in create_hostbyname_request */ + /** following members are set in create_hostbyname_request_locked + */ /** the top-level request instance */ grpc_ares_request* parent_request; /** host to resolve, parsed from the name to resolve */ @@ -96,10 +95,6 @@ static uint16_t strhtons(const char* port) { return htons(static_cast<unsigned short>(atoi(port))); } -static void grpc_ares_request_ref(grpc_ares_request* r) { - gpr_ref(&r->pending_queries); -} - static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, const char* input_output_str) { for (size_t i = 0; i < lb_addrs->num_addresses; i++) { @@ -149,7 +144,11 @@ void grpc_cares_wrapper_test_only_address_sorting_sort( grpc_cares_wrapper_address_sorting_sort(lb_addrs); } -static void grpc_ares_request_unref(grpc_ares_request* r) { +static void grpc_ares_request_ref_locked(grpc_ares_request* r) { + gpr_ref(&r->pending_queries); +} + +static void grpc_ares_request_unref_locked(grpc_ares_request* r) { /* If there are no pending queries, invoke on_done callback and destroy the request */ if (gpr_unref(&r->pending_queries)) { @@ -158,13 +157,12 @@ static void grpc_ares_request_unref(grpc_ares_request* r) { grpc_cares_wrapper_address_sorting_sort(lb_addrs); } GRPC_CLOSURE_SCHED(r->on_done, r->error); - gpr_mu_destroy(&r->mu); - grpc_ares_ev_driver_destroy(r->ev_driver); + grpc_ares_ev_driver_destroy_locked(r->ev_driver); gpr_free(r); } } -static grpc_ares_hostbyname_request* create_hostbyname_request( +static grpc_ares_hostbyname_request* create_hostbyname_request_locked( grpc_ares_request* parent_request, char* host, uint16_t port, bool is_balancer) { grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>( @@ -173,22 +171,22 @@ static grpc_ares_hostbyname_request* create_hostbyname_request( hr->host = gpr_strdup(host); hr->port = port; hr->is_balancer = is_balancer; - grpc_ares_request_ref(parent_request); + grpc_ares_request_ref_locked(parent_request); return hr; } -static void destroy_hostbyname_request(grpc_ares_hostbyname_request* hr) { - grpc_ares_request_unref(hr->parent_request); +static void destroy_hostbyname_request_locked( + grpc_ares_hostbyname_request* hr) { + grpc_ares_request_unref_locked(hr->parent_request); gpr_free(hr->host); gpr_free(hr); } -static void on_hostbyname_done_cb(void* arg, int status, int timeouts, - struct hostent* hostent) { +static void on_hostbyname_done_locked(void* arg, int status, int timeouts, + struct hostent* hostent) { grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>(arg); grpc_ares_request* r = hr->parent_request; - gpr_mu_lock(&r->mu); if (status == ARES_SUCCESS) { GRPC_ERROR_UNREF(r->error); r->error = GRPC_ERROR_NONE; @@ -263,16 +261,15 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } - gpr_mu_unlock(&r->mu); - destroy_hostbyname_request(hr); + destroy_hostbyname_request_locked(hr); } -static void on_srv_query_done_cb(void* arg, int status, int timeouts, - unsigned char* abuf, int alen) { +static void on_srv_query_done_locked(void* arg, int status, int timeouts, + unsigned char* abuf, int alen) { grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); - gpr_log(GPR_DEBUG, "on_query_srv_done_cb"); + gpr_log(GPR_DEBUG, "on_query_srv_done_locked"); if (status == ARES_SUCCESS) { - gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS"); + gpr_log(GPR_DEBUG, "on_query_srv_done_locked ARES_SUCCESS"); struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); if (parse_status == ARES_SUCCESS) { @@ -280,16 +277,16 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts, for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { if (grpc_ipv6_loopback_available()) { - grpc_ares_hostbyname_request* hr = create_hostbyname_request( + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( r, srv_it->host, htons(srv_it->port), true /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, - on_hostbyname_done_cb, hr); + on_hostbyname_done_locked, hr); } - grpc_ares_hostbyname_request* hr = create_hostbyname_request( + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( r, srv_it->host, htons(srv_it->port), true /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, - hr); - grpc_ares_ev_driver_start(r->ev_driver); + ares_gethostbyname(*channel, hr->host, AF_INET, + on_hostbyname_done_locked, hr); + grpc_ares_ev_driver_start_locked(r->ev_driver); } } if (reply != nullptr) { @@ -307,21 +304,20 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } - grpc_ares_request_unref(r); + grpc_ares_request_unref_locked(r); } static const char g_service_config_attribute_prefix[] = "grpc_config="; -static void on_txt_done_cb(void* arg, int status, int timeouts, - unsigned char* buf, int len) { - gpr_log(GPR_DEBUG, "on_txt_done_cb"); +static void on_txt_done_locked(void* arg, int status, int timeouts, + unsigned char* buf, int len) { + gpr_log(GPR_DEBUG, "on_txt_done_locked"); char* error_msg; grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; grpc_error* error = GRPC_ERROR_NONE; - gpr_mu_lock(&r->mu); if (status != ARES_SUCCESS) goto fail; status = ares_parse_txt_reply_ext(buf, len, &reply); if (status != ARES_SUCCESS) goto fail; @@ -366,14 +362,14 @@ fail: r->error = grpc_error_add_child(error, r->error); } done: - gpr_mu_unlock(&r->mu); - grpc_ares_request_unref(r); + grpc_ares_request_unref_locked(r); } -static grpc_ares_request* grpc_dns_lookup_ares_impl( +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; grpc_ares_request* r = nullptr; @@ -402,13 +398,11 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( } port = gpr_strdup(default_port); } - grpc_ares_ev_driver* ev_driver; - error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); + error = grpc_ares_ev_driver_create(&ev_driver, interested_parties, combiner); if (error != GRPC_ERROR_NONE) goto error_cleanup; r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); - gpr_mu_init(&r->mu); r->ev_driver = ev_driver; r->on_done = on_done; r->lb_addrs_out = addrs; @@ -457,32 +451,34 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( } gpr_ref_init(&r->pending_queries, 1); if (grpc_ipv6_loopback_available()) { - hr = create_hostbyname_request(r, host, strhtons(port), - false /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); + hr = create_hostbyname_request_locked(r, host, strhtons(port), + false /* is_balancer */); + ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked, + hr); } - hr = create_hostbyname_request(r, host, strhtons(port), - false /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); + hr = create_hostbyname_request_locked(r, host, strhtons(port), + false /* is_balancer */); + ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked, + hr); if (check_grpclb) { /* Query the SRV record */ - grpc_ares_request_ref(r); + grpc_ares_request_ref_locked(r); char* service_name; gpr_asprintf(&service_name, "_grpclb._tcp.%s", host); - ares_query(*channel, service_name, ns_c_in, ns_t_srv, on_srv_query_done_cb, - r); + ares_query(*channel, service_name, ns_c_in, ns_t_srv, + on_srv_query_done_locked, r); gpr_free(service_name); } if (service_config_json != nullptr) { - grpc_ares_request_ref(r); + grpc_ares_request_ref_locked(r); char* config_name; gpr_asprintf(&config_name, "_grpc_config.%s", host); - ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_cb, r); + ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_locked, + r); gpr_free(config_name); } - /* TODO(zyc): Handle CNAME records here. */ - grpc_ares_ev_driver_start(r->ev_driver); - grpc_ares_request_unref(r); + grpc_ares_ev_driver_start_locked(r->ev_driver); + grpc_ares_request_unref_locked(r); gpr_free(host); gpr_free(port); return r; @@ -494,15 +490,15 @@ error_cleanup: return nullptr; } -grpc_ares_request* (*grpc_dns_lookup_ares)( +grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, - char** service_config_json) = grpc_dns_lookup_ares_impl; + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; void grpc_cancel_ares_request(grpc_ares_request* r) { - if (grpc_dns_lookup_ares == grpc_dns_lookup_ares_impl) { - grpc_ares_ev_driver_shutdown(r->ev_driver); + if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) { + grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } } @@ -534,6 +530,8 @@ void grpc_ares_cleanup(void) { */ typedef struct grpc_resolve_address_ares_request { + /* combiner that queries and related callbacks run under */ + grpc_combiner* combiner; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; /** currently resolving lb addresses */ @@ -541,8 +539,14 @@ typedef struct grpc_resolve_address_ares_request { /** closure to call when the resolve_address_ares request completes */ grpc_closure* on_resolve_address_done; /** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the - grpc_dns_lookup_ares operation is done. */ + grpc_dns_lookup_ares_locked operation is done. */ grpc_closure on_dns_lookup_done; + /* target name */ + const char* name; + /* default port to use if none is specified */ + const char* default_port; + /* pollset_set to be driven by */ + grpc_pollset_set* interested_parties; } grpc_resolve_address_ares_request; static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { @@ -566,9 +570,20 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { } GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); + GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb"); gpr_free(r); } +static void grpc_resolve_address_invoke_dns_lookup_ares_locked( + void* arg, grpc_error* unused_error) { + grpc_resolve_address_ares_request* r = + static_cast<grpc_resolve_address_ares_request*>(arg); + grpc_dns_lookup_ares_locked( + nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, + &r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */, + nullptr /* service_config_json */, r->combiner); +} + static void grpc_resolve_address_ares_impl(const char* name, const char* default_port, grpc_pollset_set* interested_parties, @@ -577,14 +592,18 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_resolve_address_ares_request* r = static_cast<grpc_resolve_address_ares_request*>( gpr_zalloc(sizeof(grpc_resolve_address_ares_request))); + r->combiner = grpc_combiner_create(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, grpc_schedule_on_exec_ctx); - grpc_dns_lookup_ares(nullptr /* dns_server */, name, default_port, - interested_parties, &r->on_dns_lookup_done, &r->lb_addrs, - false /* check_grpclb */, - nullptr /* service_config_json */); + r->name = name; + r->default_port = default_port; + r->interested_parties = interested_parties; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(grpc_resolve_address_invoke_dns_lookup_ares_locked, r, + grpc_combiner_scheduler(r->combiner)), + GRPC_ERROR_NONE); } void (*grpc_resolve_address_ares)( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 2d84a038d6..9e93d0cf94 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -48,11 +48,11 @@ extern void (*grpc_resolve_address_ares)(const char* name, function. \a on_done may be called directly in this function without being scheduled with \a exec_ctx, so it must not try to acquire locks that are being held by the caller. */ -extern grpc_ares_request* (*grpc_dns_lookup_ares)( +extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addresses, bool check_grpclb, - char** service_config_json); + char** service_config_json, grpc_combiner* combiner); /* Cancel the pending grpc_ares_request \a request */ void grpc_cancel_ares_request(grpc_ares_request* request); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index 5096e480bc..d6a76fc8b6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -26,18 +26,19 @@ struct grpc_ares_request { char val; }; -static grpc_ares_request* grpc_dns_lookup_ares_impl( +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) { return NULL; } -grpc_ares_request* (*grpc_dns_lookup_ares)( +grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, - char** service_config_json) = grpc_dns_lookup_ares_impl; + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; void grpc_cancel_ares_request(grpc_ares_request* r) {} |