diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/resolver')
10 files changed, 265 insertions, 188 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 4ec4477c82..3a16b3492d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -97,14 +97,17 @@ typedef struct { char* service_config_json; } ares_dns_resolver; -static void dns_ares_destroy(grpc_resolver* r); +static void dns_ares_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* r); -static void dns_ares_start_resolving_locked(ares_dns_resolver* r); -static void dns_ares_maybe_finish_next_locked(ares_dns_resolver* r); +static void dns_ares_start_resolving_locked(grpc_exec_ctx* exec_ctx, + ares_dns_resolver* r); +static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + ares_dns_resolver* r); -static void dns_ares_shutdown_locked(grpc_resolver* r); -static void dns_ares_channel_saw_error_locked(grpc_resolver* r); -static void dns_ares_next_locked(grpc_resolver* r, +static void dns_ares_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* r); +static void dns_ares_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* r); +static void dns_ares_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* r, grpc_channel_args** target_result, grpc_closure* on_complete); @@ -112,39 +115,43 @@ static const grpc_resolver_vtable dns_ares_resolver_vtable = { dns_ares_destroy, dns_ares_shutdown_locked, dns_ares_channel_saw_error_locked, dns_ares_next_locked}; -static void dns_ares_shutdown_locked(grpc_resolver* resolver) { +static void dns_ares_shutdown_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (r->have_retry_timer) { - grpc_timer_cancel(&r->retry_timer); + grpc_timer_cancel(exec_ctx, &r->retry_timer); } if (r->pending_request != nullptr) { - grpc_cancel_ares_request(r->pending_request); + grpc_cancel_ares_request(exec_ctx, r->pending_request); } if (r->next_completion != nullptr) { *r->target_result = nullptr; - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); + GRPC_CLOSURE_SCHED( + exec_ctx, r->next_completion, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = nullptr; } } -static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) { +static void dns_ares_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { grpc_backoff_reset(&r->backoff_state); - dns_ares_start_resolving_locked(r); + dns_ares_start_resolving_locked(exec_ctx, r); } } -static void dns_ares_on_retry_timer_locked(void* arg, grpc_error* error) { +static void dns_ares_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { ares_dns_resolver* r = (ares_dns_resolver*)arg; r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { - dns_ares_start_resolving_locked(r); + dns_ares_start_resolving_locked(exec_ctx, r); } } - GRPC_RESOLVER_UNREF(&r->base, "retry-timer"); + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } static bool value_in_json_array(grpc_json* array, const char* value) { @@ -219,7 +226,8 @@ static char* choose_service_config(char* service_config_choice_json) { return service_config; } -static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { +static void dns_ares_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { ares_dns_resolver* r = (ares_dns_resolver*)arg; grpc_channel_args* result = nullptr; GPR_ASSERT(r->resolving); @@ -260,13 +268,13 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { num_args_to_add); if (service_config != nullptr) grpc_service_config_destroy(service_config); gpr_free(service_config_string); - grpc_lb_addresses_destroy(r->lb_addresses); + grpc_lb_addresses_destroy(exec_ctx, r->lb_addresses); } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; - grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); + grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); @@ -277,19 +285,20 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_timer_init(&r->retry_timer, next_try, + grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->dns_ares_on_retry_timer_locked); } if (r->resolved_result != nullptr) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } r->resolved_result = result; r->resolved_version++; - dns_ares_maybe_finish_next_locked(r); - GRPC_RESOLVER_UNREF(&r->base, "dns-resolving"); + dns_ares_maybe_finish_next_locked(exec_ctx, r); + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } -static void dns_ares_next_locked(grpc_resolver* resolver, +static void dns_ares_next_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver, grpc_channel_args** target_result, grpc_closure* on_complete) { gpr_log(GPR_DEBUG, "dns_ares_next is called."); @@ -299,53 +308,56 @@ static void dns_ares_next_locked(grpc_resolver* resolver, r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { grpc_backoff_reset(&r->backoff_state); - dns_ares_start_resolving_locked(r); + dns_ares_start_resolving_locked(exec_ctx, r); } else { - dns_ares_maybe_finish_next_locked(r); + dns_ares_maybe_finish_next_locked(exec_ctx, r); } } -static void dns_ares_start_resolving_locked(ares_dns_resolver* r) { +static void dns_ares_start_resolving_locked(grpc_exec_ctx* exec_ctx, + ares_dns_resolver* r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); r->resolving = true; r->lb_addresses = nullptr; r->service_config_json = nullptr; r->pending_request = grpc_dns_lookup_ares( - r->dns_server, r->name_to_resolve, r->default_port, r->interested_parties, - &r->dns_ares_on_resolved_locked, &r->lb_addresses, + exec_ctx, r->dns_server, r->name_to_resolve, r->default_port, + r->interested_parties, &r->dns_ares_on_resolved_locked, &r->lb_addresses, true /* check_grpclb */, r->request_service_config ? &r->service_config_json : nullptr); } -static void dns_ares_maybe_finish_next_locked(ares_dns_resolver* r) { +static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + ares_dns_resolver* r) { if (r->next_completion != nullptr && r->resolved_version != r->published_version) { *r->target_result = r->resolved_result == nullptr ? nullptr : grpc_channel_args_copy(r->resolved_result); gpr_log(GPR_DEBUG, "dns_ares_maybe_finish_next_locked"); - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = nullptr; r->published_version = r->resolved_version; } } -static void dns_ares_destroy(grpc_resolver* gr) { +static void dns_ares_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_log(GPR_DEBUG, "dns_ares_destroy"); ares_dns_resolver* r = (ares_dns_resolver*)gr; if (r->resolved_result != nullptr) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } - grpc_pollset_set_destroy(r->interested_parties); + grpc_pollset_set_destroy(exec_ctx, r->interested_parties); gpr_free(r->dns_server); gpr_free(r->name_to_resolve); gpr_free(r->default_port); - grpc_channel_args_destroy(r->channel_args); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } -static grpc_resolver* dns_ares_create(grpc_resolver_args* args, +static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx, + grpc_resolver_args* args, const char* default_port) { /* Get name from args. */ const char* path = args->uri->path; @@ -366,7 +378,8 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args, arg, (grpc_integer_options){false, false, true}); r->interested_parties = grpc_pollset_set_create(); if (args->pollset_set != nullptr) { - grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); + grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, + args->pollset_set); } grpc_backoff_init( &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, @@ -391,8 +404,9 @@ static void dns_ares_factory_ref(grpc_resolver_factory* factory) {} static void dns_ares_factory_unref(grpc_resolver_factory* factory) {} static grpc_resolver* dns_factory_create_resolver( - grpc_resolver_factory* factory, grpc_resolver_args* args) { - return dns_ares_create(args, "https"); + grpc_exec_ctx* exec_ctx, grpc_resolver_factory* factory, + grpc_resolver_args* args) { + return dns_ares_create(exec_ctx, args, "https"); } static char* dns_ares_factory_get_default_host_name( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index ba7dad63cf..03ea36bfcc 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -28,7 +28,8 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; /* Start \a ev_driver. It will keep working until all IO on its ares_channel is done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks bound to its ares_channel when necessary. */ -void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_start(grpc_exec_ctx* exec_ctx, + grpc_ares_ev_driver* ev_driver); /* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the @@ -46,7 +47,8 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver); /* Shutdown all the grpc_fds used by \a ev_driver */ -void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_shutdown(grpc_exec_ctx* exec_ctx, + grpc_ares_ev_driver* ev_driver); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 40e264504c..4cb068a41d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -77,7 +77,8 @@ struct grpc_ares_ev_driver { bool shutting_down; }; -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); +static void grpc_ares_notify_on_event_locked(grpc_exec_ctx* exec_ctx, + grpc_ares_ev_driver* ev_driver); static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( grpc_ares_ev_driver* ev_driver) { @@ -97,7 +98,7 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { } } -static void fd_node_destroy(fd_node* fdn) { +static void fd_node_destroy(grpc_exec_ctx* exec_ctx, fd_node* fdn) { gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); @@ -105,20 +106,21 @@ static void fd_node_destroy(fd_node* fdn) { /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(fdn->fd, nullptr, nullptr, true /* already_closed */, + grpc_fd_orphan(exec_ctx, fdn->fd, nullptr, nullptr, true /* already_closed */, "c-ares query finished"); gpr_free(fdn); } -static void fd_node_shutdown(fd_node* fdn) { +static void fd_node_shutdown(grpc_exec_ctx* exec_ctx, fd_node* fdn) { gpr_mu_lock(&fdn->mu); fdn->shutting_down = true; if (!fdn->readable_registered && !fdn->writable_registered) { gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); + fd_node_destroy(exec_ctx, fdn); } else { grpc_fd_shutdown( - fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown")); + exec_ctx, fdn->fd, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown")); gpr_mu_unlock(&fdn->mu); } } @@ -158,13 +160,15 @@ void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) { grpc_ares_ev_driver_unref(ev_driver); } -void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) { +void grpc_ares_ev_driver_shutdown(grpc_exec_ctx* exec_ctx, + grpc_ares_ev_driver* ev_driver) { gpr_mu_lock(&ev_driver->mu); ev_driver->shutting_down = true; fd_node* fn = ev_driver->fds; while (fn != nullptr) { - grpc_fd_shutdown(fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "grpc_ares_ev_driver_shutdown")); + grpc_fd_shutdown( + exec_ctx, fn->fd, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown")); fn = fn->next; } gpr_mu_unlock(&ev_driver->mu); @@ -195,7 +199,8 @@ static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver, return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; } -static void on_readable_cb(void* arg, grpc_error* error) { +static void on_readable_cb(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { fd_node* fdn = (fd_node*)arg; grpc_ares_ev_driver* ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); @@ -203,7 +208,7 @@ static void on_readable_cb(void* arg, grpc_error* error) { fdn->readable_registered = false; if (fdn->shutting_down && !fdn->writable_registered) { gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); + fd_node_destroy(exec_ctx, fdn); grpc_ares_ev_driver_unref(ev_driver); return; } @@ -224,12 +229,13 @@ static void on_readable_cb(void* arg, grpc_error* error) { ares_cancel(ev_driver->channel); } gpr_mu_lock(&ev_driver->mu); - grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -static void on_writable_cb(void* arg, grpc_error* error) { +static void on_writable_cb(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { fd_node* fdn = (fd_node*)arg; grpc_ares_ev_driver* ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); @@ -237,7 +243,7 @@ static void on_writable_cb(void* arg, grpc_error* error) { fdn->writable_registered = false; if (fdn->shutting_down && !fdn->readable_registered) { gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); + fd_node_destroy(exec_ctx, fdn); grpc_ares_ev_driver_unref(ev_driver); return; } @@ -256,7 +262,7 @@ static void on_writable_cb(void* arg, grpc_error* error) { ares_cancel(ev_driver->channel); } gpr_mu_lock(&ev_driver->mu); - grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } @@ -267,7 +273,8 @@ ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver) { // Get the file descriptors used by the ev_driver's ares channel, register // driver_closure with these filedescriptors. -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { +static void grpc_ares_notify_on_event_locked(grpc_exec_ctx* exec_ctx, + grpc_ares_ev_driver* ev_driver) { fd_node* new_list = nullptr; if (!ev_driver->shutting_down) { ares_socket_t socks[ARES_GETSOCK_MAXNUM]; @@ -293,7 +300,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_schedule_on_exec_ctx); - grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); + grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; @@ -305,7 +312,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { !fdn->readable_registered) { grpc_ares_ev_driver_ref(ev_driver); gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd)); - grpc_fd_notify_on_read(fdn->fd, &fdn->read_closure); + grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure); fdn->readable_registered = true; } // Register write_closure if the socket is writable and write_closure @@ -315,7 +322,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { gpr_log(GPR_DEBUG, "notify write on: %d", grpc_fd_wrapped_fd(fdn->fd)); grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); + grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } gpr_mu_unlock(&fdn->mu); @@ -328,7 +335,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { while (ev_driver->fds != nullptr) { fd_node* cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - fd_node_shutdown(cur); + fd_node_shutdown(exec_ctx, cur); } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. @@ -338,11 +345,12 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { } } -void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { +void grpc_ares_ev_driver_start(grpc_exec_ctx* exec_ctx, + grpc_ares_ev_driver* ev_driver) { gpr_mu_lock(&ev_driver->mu); if (!ev_driver->working) { ev_driver->working = true; - grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); } gpr_mu_unlock(&ev_driver->mu); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 3a870b2d06..7846576c11 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -96,12 +96,24 @@ static void grpc_ares_request_ref(grpc_ares_request* r) { gpr_ref(&r->pending_queries); } -static void grpc_ares_request_unref(grpc_ares_request* r) { +static void grpc_ares_request_unref(grpc_exec_ctx* exec_ctx, + grpc_ares_request* r) { /* If there are no pending queries, invoke on_done callback and destroy the request */ if (gpr_unref(&r->pending_queries)) { /* TODO(zyc): Sort results with RFC6724 before invoking on_done. */ - GRPC_CLOSURE_SCHED(r->on_done, r->error); + if (exec_ctx == nullptr) { + /* A new exec_ctx is created here, as the c-ares interface does not + provide one in ares_host_callback. It's safe to schedule on_done with + the newly created exec_ctx, since the caller has been warned not to + acquire locks in on_done. ares_dns_resolver is using combiner to + protect resources needed by on_done. */ + grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CLOSURE_SCHED(&new_exec_ctx, r->on_done, r->error); + grpc_exec_ctx_finish(&new_exec_ctx); + } else { + GRPC_CLOSURE_SCHED(exec_ctx, r->on_done, r->error); + } gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy(r->ev_driver); gpr_free(r); @@ -121,8 +133,9 @@ static grpc_ares_hostbyname_request* create_hostbyname_request( return hr; } -static void destroy_hostbyname_request(grpc_ares_hostbyname_request* hr) { - grpc_ares_request_unref(hr->parent_request); +static void destroy_hostbyname_request(grpc_exec_ctx* exec_ctx, + grpc_ares_hostbyname_request* hr) { + grpc_ares_request_unref(exec_ctx, hr->parent_request); gpr_free(hr->host); gpr_free(hr); } @@ -207,13 +220,13 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts, } } gpr_mu_unlock(&r->mu); - destroy_hostbyname_request(hr); + destroy_hostbyname_request(nullptr, hr); } static void on_srv_query_done_cb(void* arg, int status, int timeouts, unsigned char* abuf, int alen) { grpc_ares_request* r = (grpc_ares_request*)arg; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "on_query_srv_done_cb"); if (status == ARES_SUCCESS) { gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS"); @@ -233,7 +246,7 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts, r, srv_it->host, htons(srv_it->port), true /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); - grpc_ares_ev_driver_start(r->ev_driver); + grpc_ares_ev_driver_start(&exec_ctx, r->ev_driver); } } if (reply != nullptr) { @@ -251,7 +264,8 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } - grpc_ares_request_unref(r); + grpc_ares_request_unref(&exec_ctx, r); + grpc_exec_ctx_finish(&exec_ctx); } static const char g_service_config_attribute_prefix[] = "grpc_config="; @@ -309,13 +323,14 @@ fail: } done: gpr_mu_unlock(&r->mu); - grpc_ares_request_unref(r); + grpc_ares_request_unref(nullptr, r); } static grpc_ares_request* grpc_dns_lookup_ares_impl( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* name, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, + char** service_config_json) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; grpc_ares_request* r = nullptr; @@ -422,28 +437,28 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( gpr_free(config_name); } /* TODO(zyc): Handle CNAME records here. */ - grpc_ares_ev_driver_start(r->ev_driver); - grpc_ares_request_unref(r); + grpc_ares_ev_driver_start(exec_ctx, r->ev_driver); + grpc_ares_request_unref(exec_ctx, r); gpr_free(host); gpr_free(port); return r; error_cleanup: - GRPC_CLOSURE_SCHED(on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, error); gpr_free(host); gpr_free(port); return nullptr; } grpc_ares_request* (*grpc_dns_lookup_ares)( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* name, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) = grpc_dns_lookup_ares_impl; -void grpc_cancel_ares_request(grpc_ares_request* r) { +void grpc_cancel_ares_request(grpc_exec_ctx* exec_ctx, grpc_ares_request* r) { if (grpc_dns_lookup_ares == grpc_dns_lookup_ares_impl) { - grpc_ares_ev_driver_shutdown(r->ev_driver); + grpc_ares_ev_driver_shutdown(exec_ctx, r->ev_driver); } } @@ -486,7 +501,8 @@ typedef struct grpc_resolve_address_ares_request { grpc_closure on_dns_lookup_done; } grpc_resolve_address_ares_request; -static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { +static void on_dns_lookup_done_cb(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_resolve_address_ares_request* r = (grpc_resolve_address_ares_request*)arg; grpc_resolved_addresses** resolved_addresses = r->addrs_out; @@ -504,12 +520,14 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { &r->lb_addrs->addresses[i].address, sizeof(grpc_resolved_address)); } } - GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); - grpc_lb_addresses_destroy(r->lb_addrs); + GRPC_CLOSURE_SCHED(exec_ctx, r->on_resolve_address_done, + GRPC_ERROR_REF(error)); + grpc_lb_addresses_destroy(exec_ctx, r->lb_addrs); gpr_free(r); } -static void grpc_resolve_address_ares_impl(const char* name, +static void grpc_resolve_address_ares_impl(grpc_exec_ctx* exec_ctx, + const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, @@ -521,14 +539,14 @@ static void grpc_resolve_address_ares_impl(const char* name, r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, grpc_schedule_on_exec_ctx); - grpc_dns_lookup_ares(nullptr /* dns_server */, name, default_port, + grpc_dns_lookup_ares(exec_ctx, nullptr /* dns_server */, name, default_port, interested_parties, &r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */, nullptr /* service_config_json */); } void (*grpc_resolve_address_ares)( - const char* name, const char* default_port, + grpc_exec_ctx* exec_ctx, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_resolved_addresses** addrs) = grpc_resolve_address_ares_impl; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 86d870e0a6..72db622954 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -32,7 +32,8 @@ typedef struct grpc_ares_request grpc_ares_request; must be called at least once before this function. \a on_done may be called directly in this function without being scheduled with \a exec_ctx, so it must not try to acquire locks that are being held by the caller. */ -extern void (*grpc_resolve_address_ares)(const char* name, +extern void (*grpc_resolve_address_ares)(grpc_exec_ctx* exec_ctx, + const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, @@ -46,13 +47,14 @@ extern void (*grpc_resolve_address_ares)(const char* name, scheduled with \a exec_ctx, so it must not try to acquire locks that are being held by the caller. */ extern grpc_ares_request* (*grpc_dns_lookup_ares)( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addresses, bool check_grpclb, + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* name, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** addresses, bool check_grpclb, char** service_config_json); /* Cancel the pending grpc_ares_request \a request */ -void grpc_cancel_ares_request(grpc_ares_request* request); +void grpc_cancel_ares_request(grpc_exec_ctx* exec_ctx, + grpc_ares_request* request); /* Initialize gRPC ares wrapper. Must be called at least once before grpc_resolve_address_ares(). */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index a184cf2d57..a68a7c47fb 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -26,32 +26,34 @@ struct grpc_ares_request { }; static grpc_ares_request* grpc_dns_lookup_ares_impl( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* name, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, + char** service_config_json) { return NULL; } grpc_ares_request* (*grpc_dns_lookup_ares)( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* name, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) = grpc_dns_lookup_ares_impl; -void grpc_cancel_ares_request(grpc_ares_request* r) {} +void grpc_cancel_ares_request(grpc_exec_ctx* exec_ctx, grpc_ares_request* r) {} grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; } void grpc_ares_cleanup(void) {} -static void grpc_resolve_address_ares_impl(const char* name, +static void grpc_resolve_address_ares_impl(grpc_exec_ctx* exec_ctx, + const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_resolved_addresses** addrs) {} void (*grpc_resolve_address_ares)( - const char* name, const char* default_port, + grpc_exec_ctx* exec_ctx, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_resolved_addresses** addrs) = grpc_resolve_address_ares_impl; diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 77698e97aa..fc40ce6966 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -76,42 +76,49 @@ typedef struct { grpc_resolved_addresses* addresses; } dns_resolver; -static void dns_destroy(grpc_resolver* r); +static void dns_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* r); -static void dns_start_resolving_locked(dns_resolver* r); -static void dns_maybe_finish_next_locked(dns_resolver* r); +static void dns_start_resolving_locked(grpc_exec_ctx* exec_ctx, + dns_resolver* r); +static void dns_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + dns_resolver* r); -static void dns_shutdown_locked(grpc_resolver* r); -static void dns_channel_saw_error_locked(grpc_resolver* r); -static void dns_next_locked(grpc_resolver* r, grpc_channel_args** target_result, +static void dns_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* r); +static void dns_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* r); +static void dns_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* r, + grpc_channel_args** target_result, grpc_closure* on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked, dns_next_locked}; -static void dns_shutdown_locked(grpc_resolver* resolver) { +static void dns_shutdown_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (r->have_retry_timer) { - grpc_timer_cancel(&r->retry_timer); + grpc_timer_cancel(exec_ctx, &r->retry_timer); } if (r->next_completion != nullptr) { *r->target_result = nullptr; - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); + GRPC_CLOSURE_SCHED( + exec_ctx, r->next_completion, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = nullptr; } } -static void dns_channel_saw_error_locked(grpc_resolver* resolver) { +static void dns_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { grpc_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } } -static void dns_next_locked(grpc_resolver* resolver, +static void dns_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver, grpc_channel_args** target_result, grpc_closure* on_complete) { dns_resolver* r = (dns_resolver*)resolver; @@ -120,26 +127,28 @@ static void dns_next_locked(grpc_resolver* resolver, r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { grpc_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } else { - dns_maybe_finish_next_locked(r); + dns_maybe_finish_next_locked(exec_ctx, r); } } -static void dns_on_retry_timer_locked(void* arg, grpc_error* error) { +static void dns_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { dns_resolver* r = (dns_resolver*)arg; r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } } - GRPC_RESOLVER_UNREF(&r->base, "retry-timer"); + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } -static void dns_on_resolved_locked(void* arg, grpc_error* error) { +static void dns_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { dns_resolver* r = (dns_resolver*)arg; grpc_channel_args* result = nullptr; GPR_ASSERT(r->resolving); @@ -159,11 +168,11 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses); result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses); - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(exec_ctx, addresses); } else { grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; - grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); + grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); @@ -176,56 +185,59 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { } GRPC_CLOSURE_INIT(&r->on_retry, dns_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); - grpc_timer_init(&r->retry_timer, next_try, &r->on_retry); + grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry); } if (r->resolved_result != nullptr) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } r->resolved_result = result; r->resolved_version++; - dns_maybe_finish_next_locked(r); + dns_maybe_finish_next_locked(exec_ctx, r); GRPC_ERROR_UNREF(error); - GRPC_RESOLVER_UNREF(&r->base, "dns-resolving"); + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } -static void dns_start_resolving_locked(dns_resolver* r) { +static void dns_start_resolving_locked(grpc_exec_ctx* exec_ctx, + dns_resolver* r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); r->resolving = true; r->addresses = nullptr; grpc_resolve_address( - r->name_to_resolve, r->default_port, r->interested_parties, + exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, GRPC_CLOSURE_CREATE(dns_on_resolved_locked, r, grpc_combiner_scheduler(r->base.combiner)), &r->addresses); } -static void dns_maybe_finish_next_locked(dns_resolver* r) { +static void dns_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + dns_resolver* r) { if (r->next_completion != nullptr && r->resolved_version != r->published_version) { *r->target_result = r->resolved_result == nullptr ? nullptr : grpc_channel_args_copy(r->resolved_result); - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = nullptr; r->published_version = r->resolved_version; } } -static void dns_destroy(grpc_resolver* gr) { +static void dns_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { dns_resolver* r = (dns_resolver*)gr; if (r->resolved_result != nullptr) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } - grpc_pollset_set_destroy(r->interested_parties); + grpc_pollset_set_destroy(exec_ctx, r->interested_parties); gpr_free(r->name_to_resolve); gpr_free(r->default_port); - grpc_channel_args_destroy(r->channel_args); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } -static grpc_resolver* dns_create(grpc_resolver_args* args, +static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx, + grpc_resolver_args* args, const char* default_port) { if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based dns uri's not supported"); @@ -242,7 +254,8 @@ static grpc_resolver* dns_create(grpc_resolver_args* args, r->channel_args = grpc_channel_args_copy(args->args); r->interested_parties = grpc_pollset_set_create(); if (args->pollset_set != nullptr) { - grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); + grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, + args->pollset_set); } grpc_backoff_init( &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, @@ -261,8 +274,9 @@ static void dns_factory_ref(grpc_resolver_factory* factory) {} static void dns_factory_unref(grpc_resolver_factory* factory) {} static grpc_resolver* dns_factory_create_resolver( - grpc_resolver_factory* factory, grpc_resolver_args* args) { - return dns_create(args, "https"); + grpc_exec_ctx* exec_ctx, grpc_resolver_factory* factory, + grpc_resolver_args* args) { + return dns_create(exec_ctx, args, "https"); } static char* dns_factory_get_default_host_name(grpc_resolver_factory* factory, diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index fe3ad1403c..44798ca434 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -67,52 +67,57 @@ typedef struct { grpc_channel_args** target_result; } fake_resolver; -static void fake_resolver_destroy(grpc_resolver* gr) { +static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { fake_resolver* r = (fake_resolver*)gr; - grpc_channel_args_destroy(r->next_results); - grpc_channel_args_destroy(r->results_upon_error); - grpc_channel_args_destroy(r->channel_args); + grpc_channel_args_destroy(exec_ctx, r->next_results); + grpc_channel_args_destroy(exec_ctx, r->results_upon_error); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } -static void fake_resolver_shutdown_locked(grpc_resolver* resolver) { +static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { fake_resolver* r = (fake_resolver*)resolver; if (r->next_completion != nullptr) { *r->target_result = nullptr; - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); + GRPC_CLOSURE_SCHED( + exec_ctx, r->next_completion, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = nullptr; } } -static void fake_resolver_maybe_finish_next_locked(fake_resolver* r) { +static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + fake_resolver* r) { if (r->next_completion != nullptr && r->next_results != nullptr) { *r->target_result = grpc_channel_args_union(r->next_results, r->channel_args); - grpc_channel_args_destroy(r->next_results); + grpc_channel_args_destroy(exec_ctx, r->next_results); r->next_results = nullptr; - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = nullptr; } } -static void fake_resolver_channel_saw_error_locked(grpc_resolver* resolver) { +static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { fake_resolver* r = (fake_resolver*)resolver; if (r->next_results == nullptr && r->results_upon_error != nullptr) { // Pretend we re-resolved. r->next_results = grpc_channel_args_copy(r->results_upon_error); } - fake_resolver_maybe_finish_next_locked(r); + fake_resolver_maybe_finish_next_locked(exec_ctx, r); } -static void fake_resolver_next_locked(grpc_resolver* resolver, +static void fake_resolver_next_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver, grpc_channel_args** target_result, grpc_closure* on_complete) { fake_resolver* r = (fake_resolver*)resolver; GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; - fake_resolver_maybe_finish_next_locked(r); + fake_resolver_maybe_finish_next_locked(exec_ctx, r); } static const grpc_resolver_vtable fake_resolver_vtable = { @@ -152,31 +157,33 @@ typedef struct set_response_closure_arg { grpc_channel_args* next_response; } set_response_closure_arg; -static void set_response_closure_fn(void* arg, grpc_error* error) { +static void set_response_closure_fn(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { set_response_closure_arg* closure_arg = (set_response_closure_arg*)arg; grpc_fake_resolver_response_generator* generator = closure_arg->generator; fake_resolver* r = generator->resolver; if (r->next_results != nullptr) { - grpc_channel_args_destroy(r->next_results); + grpc_channel_args_destroy(exec_ctx, r->next_results); } r->next_results = closure_arg->next_response; if (r->results_upon_error != nullptr) { - grpc_channel_args_destroy(r->results_upon_error); + grpc_channel_args_destroy(exec_ctx, r->results_upon_error); } r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response); gpr_free(closure_arg); - fake_resolver_maybe_finish_next_locked(r); + fake_resolver_maybe_finish_next_locked(exec_ctx, r); } void grpc_fake_resolver_response_generator_set_response( - grpc_fake_resolver_response_generator* generator, + grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator, grpc_channel_args* next_response) { GPR_ASSERT(generator->resolver != nullptr); set_response_closure_arg* closure_arg = (set_response_closure_arg*)gpr_zalloc(sizeof(*closure_arg)); closure_arg->generator = generator; closure_arg->next_response = grpc_channel_args_copy(next_response); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, set_response_closure_fn, closure_arg, grpc_combiner_scheduler( generator->resolver->base.combiner)), @@ -188,7 +195,7 @@ static void* response_generator_arg_copy(void* p) { (grpc_fake_resolver_response_generator*)p); } -static void response_generator_arg_destroy(void* p) { +static void response_generator_arg_destroy(grpc_exec_ctx* exec_ctx, void* p) { grpc_fake_resolver_response_generator_unref( (grpc_fake_resolver_response_generator*)p); } @@ -225,7 +232,8 @@ static void fake_resolver_factory_ref(grpc_resolver_factory* factory) {} static void fake_resolver_factory_unref(grpc_resolver_factory* factory) {} -static grpc_resolver* fake_resolver_create(grpc_resolver_factory* factory, +static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx, + grpc_resolver_factory* factory, grpc_resolver_args* args) { fake_resolver* r = (fake_resolver*)gpr_zalloc(sizeof(*r)); r->channel_args = grpc_channel_args_copy(args->args); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index a8977e5980..7035cdda01 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -39,7 +39,7 @@ grpc_fake_resolver_response_generator_create(); // Instruct the fake resolver associated with the \a response_generator instance // to trigger a new resolution for \a uri and \a args. void grpc_fake_resolver_response_generator_set_response( - grpc_fake_resolver_response_generator* generator, + grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator, grpc_channel_args* next_response); // Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance. diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 7d1e283fa3..f0934b5943 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -52,13 +52,15 @@ typedef struct { grpc_channel_args** target_result; } sockaddr_resolver; -static void sockaddr_destroy(grpc_resolver* r); +static void sockaddr_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* r); -static void sockaddr_maybe_finish_next_locked(sockaddr_resolver* r); +static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + sockaddr_resolver* r); -static void sockaddr_shutdown_locked(grpc_resolver* r); -static void sockaddr_channel_saw_error_locked(grpc_resolver* r); -static void sockaddr_next_locked(grpc_resolver* r, +static void sockaddr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* r); +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* r); +static void sockaddr_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* r, grpc_channel_args** target_result, grpc_closure* on_complete); @@ -66,47 +68,52 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = { sockaddr_destroy, sockaddr_shutdown_locked, sockaddr_channel_saw_error_locked, sockaddr_next_locked}; -static void sockaddr_shutdown_locked(grpc_resolver* resolver) { +static void sockaddr_shutdown_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { sockaddr_resolver* r = (sockaddr_resolver*)resolver; if (r->next_completion != nullptr) { *r->target_result = nullptr; - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); + GRPC_CLOSURE_SCHED( + exec_ctx, r->next_completion, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = nullptr; } } -static void sockaddr_channel_saw_error_locked(grpc_resolver* resolver) { +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { sockaddr_resolver* r = (sockaddr_resolver*)resolver; r->published = false; - sockaddr_maybe_finish_next_locked(r); + sockaddr_maybe_finish_next_locked(exec_ctx, r); } -static void sockaddr_next_locked(grpc_resolver* resolver, +static void sockaddr_next_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver, grpc_channel_args** target_result, grpc_closure* on_complete) { sockaddr_resolver* r = (sockaddr_resolver*)resolver; GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; - sockaddr_maybe_finish_next_locked(r); + sockaddr_maybe_finish_next_locked(exec_ctx, r); } -static void sockaddr_maybe_finish_next_locked(sockaddr_resolver* r) { +static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + sockaddr_resolver* r) { if (r->next_completion != nullptr && !r->published) { r->published = true; grpc_arg arg = grpc_lb_addresses_create_channel_arg(r->addresses); *r->target_result = grpc_channel_args_copy_and_add(r->channel_args, &arg, 1); - GRPC_CLOSURE_SCHED(r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = nullptr; } } -static void sockaddr_destroy(grpc_resolver* gr) { +static void sockaddr_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { sockaddr_resolver* r = (sockaddr_resolver*)gr; - grpc_lb_addresses_destroy(r->addresses); - grpc_channel_args_destroy(r->channel_args); + grpc_lb_addresses_destroy(exec_ctx, r->addresses); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } @@ -135,7 +142,8 @@ char* unix_get_default_authority(grpc_resolver_factory* factory, static void do_nothing(void* ignored) {} -static grpc_resolver* sockaddr_create(grpc_resolver_args* args, +static grpc_resolver* sockaddr_create(grpc_exec_ctx* exec_ctx, + grpc_resolver_args* args, bool parse(const grpc_uri* uri, grpc_resolved_address* dst)) { if (0 != strcmp(args->uri->authority, "")) { @@ -162,10 +170,10 @@ static grpc_resolver* sockaddr_create(grpc_resolver_args* args, gpr_free(part_str); if (errors_found) break; } - grpc_slice_buffer_destroy_internal(&path_parts); - grpc_slice_unref_internal(path_slice); + grpc_slice_buffer_destroy_internal(exec_ctx, &path_parts); + grpc_slice_unref_internal(exec_ctx, path_slice); if (errors_found) { - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(exec_ctx, addresses); return nullptr; } /* Instantiate resolver. */ @@ -187,8 +195,9 @@ static void sockaddr_factory_unref(grpc_resolver_factory* factory) {} #define DECL_FACTORY(name) \ static grpc_resolver* name##_factory_create_resolver( \ - grpc_resolver_factory* factory, grpc_resolver_args* args) { \ - return sockaddr_create(args, grpc_parse_##name); \ + grpc_exec_ctx* exec_ctx, grpc_resolver_factory* factory, \ + grpc_resolver_args* args) { \ + return sockaddr_create(exec_ctx, args, grpc_parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ |