diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc | 155 |
1 files changed, 88 insertions, 67 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 18d0a7b9f6..471de58e8c 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -65,8 +65,6 @@ struct grpc_ares_request { /** number of ongoing queries */ gpr_refcount pending_queries; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** is there at least one successful query, set in on_done_cb */ bool success; /** the errors explaining the request failure, set in on_done_cb */ @@ -74,7 +72,8 @@ struct grpc_ares_request { }; typedef struct grpc_ares_hostbyname_request { - /** following members are set in create_hostbyname_request */ + /** following members are set in create_hostbyname_request_locked + */ /** the top-level request instance */ grpc_ares_request* parent_request; /** host to resolve, parsed from the name to resolve */ @@ -96,10 +95,6 @@ static uint16_t strhtons(const char* port) { return htons(static_cast<unsigned short>(atoi(port))); } -static void grpc_ares_request_ref(grpc_ares_request* r) { - gpr_ref(&r->pending_queries); -} - static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, const char* input_output_str) { for (size_t i = 0; i < lb_addrs->num_addresses; i++) { @@ -149,7 +144,11 @@ void grpc_cares_wrapper_test_only_address_sorting_sort( grpc_cares_wrapper_address_sorting_sort(lb_addrs); } -static void grpc_ares_request_unref(grpc_ares_request* r) { +static void grpc_ares_request_ref_locked(grpc_ares_request* r) { + gpr_ref(&r->pending_queries); +} + +static void grpc_ares_request_unref_locked(grpc_ares_request* r) { /* If there are no pending queries, invoke on_done callback and destroy the request */ if (gpr_unref(&r->pending_queries)) { @@ -158,13 +157,12 @@ static void grpc_ares_request_unref(grpc_ares_request* r) { grpc_cares_wrapper_address_sorting_sort(lb_addrs); } GRPC_CLOSURE_SCHED(r->on_done, r->error); - gpr_mu_destroy(&r->mu); - grpc_ares_ev_driver_destroy(r->ev_driver); + grpc_ares_ev_driver_destroy_locked(r->ev_driver); gpr_free(r); } } -static grpc_ares_hostbyname_request* create_hostbyname_request( +static grpc_ares_hostbyname_request* create_hostbyname_request_locked( grpc_ares_request* parent_request, char* host, uint16_t port, bool is_balancer) { grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>( @@ -173,22 +171,22 @@ static grpc_ares_hostbyname_request* create_hostbyname_request( hr->host = gpr_strdup(host); hr->port = port; hr->is_balancer = is_balancer; - grpc_ares_request_ref(parent_request); + grpc_ares_request_ref_locked(parent_request); return hr; } -static void destroy_hostbyname_request(grpc_ares_hostbyname_request* hr) { - grpc_ares_request_unref(hr->parent_request); +static void destroy_hostbyname_request_locked( + grpc_ares_hostbyname_request* hr) { + grpc_ares_request_unref_locked(hr->parent_request); gpr_free(hr->host); gpr_free(hr); } -static void on_hostbyname_done_cb(void* arg, int status, int timeouts, - struct hostent* hostent) { +static void on_hostbyname_done_locked(void* arg, int status, int timeouts, + struct hostent* hostent) { grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>(arg); grpc_ares_request* r = hr->parent_request; - gpr_mu_lock(&r->mu); if (status == ARES_SUCCESS) { GRPC_ERROR_UNREF(r->error); r->error = GRPC_ERROR_NONE; @@ -263,33 +261,33 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } - gpr_mu_unlock(&r->mu); - destroy_hostbyname_request(hr); + destroy_hostbyname_request_locked(hr); } -static void on_srv_query_done_cb(void* arg, int status, int timeouts, - unsigned char* abuf, int alen) { +static void on_srv_query_done_locked(void* arg, int status, int timeouts, + unsigned char* abuf, int alen) { grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); - gpr_log(GPR_DEBUG, "on_query_srv_done_cb"); + gpr_log(GPR_DEBUG, "on_query_srv_done_locked"); if (status == ARES_SUCCESS) { - gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS"); + gpr_log(GPR_DEBUG, "on_query_srv_done_locked ARES_SUCCESS"); struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); if (parse_status == ARES_SUCCESS) { - ares_channel* channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + ares_channel* channel = + grpc_ares_ev_driver_get_channel_locked(r->ev_driver); for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { if (grpc_ipv6_loopback_available()) { - grpc_ares_hostbyname_request* hr = create_hostbyname_request( + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( r, srv_it->host, htons(srv_it->port), true /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, - on_hostbyname_done_cb, hr); + on_hostbyname_done_locked, hr); } - grpc_ares_hostbyname_request* hr = create_hostbyname_request( + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( r, srv_it->host, htons(srv_it->port), true /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, - hr); - grpc_ares_ev_driver_start(r->ev_driver); + ares_gethostbyname(*channel, hr->host, AF_INET, + on_hostbyname_done_locked, hr); + grpc_ares_ev_driver_start_locked(r->ev_driver); } } if (reply != nullptr) { @@ -307,21 +305,20 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } - grpc_ares_request_unref(r); + grpc_ares_request_unref_locked(r); } static const char g_service_config_attribute_prefix[] = "grpc_config="; -static void on_txt_done_cb(void* arg, int status, int timeouts, - unsigned char* buf, int len) { - gpr_log(GPR_DEBUG, "on_txt_done_cb"); +static void on_txt_done_locked(void* arg, int status, int timeouts, + unsigned char* buf, int len) { + gpr_log(GPR_DEBUG, "on_txt_done_locked"); char* error_msg; grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; grpc_error* error = GRPC_ERROR_NONE; - gpr_mu_lock(&r->mu); if (status != ARES_SUCCESS) goto fail; status = ares_parse_txt_reply_ext(buf, len, &reply); if (status != ARES_SUCCESS) goto fail; @@ -366,14 +363,14 @@ fail: r->error = grpc_error_add_child(error, r->error); } done: - gpr_mu_unlock(&r->mu); - grpc_ares_request_unref(r); + grpc_ares_request_unref_locked(r); } -static grpc_ares_request* grpc_dns_lookup_ares_impl( +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; grpc_ares_request* r = nullptr; @@ -402,20 +399,19 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( } port = gpr_strdup(default_port); } - grpc_ares_ev_driver* ev_driver; - error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); + error = grpc_ares_ev_driver_create_locked(&ev_driver, interested_parties, + combiner); if (error != GRPC_ERROR_NONE) goto error_cleanup; r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); - gpr_mu_init(&r->mu); r->ev_driver = ev_driver; r->on_done = on_done; r->lb_addrs_out = addrs; r->service_config_json_out = service_config_json; r->success = false; r->error = GRPC_ERROR_NONE; - channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver); // If dns_server is specified, use it. if (dns_server != nullptr) { @@ -457,32 +453,34 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( } gpr_ref_init(&r->pending_queries, 1); if (grpc_ipv6_loopback_available()) { - hr = create_hostbyname_request(r, host, strhtons(port), - false /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); + hr = create_hostbyname_request_locked(r, host, strhtons(port), + false /* is_balancer */); + ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked, + hr); } - hr = create_hostbyname_request(r, host, strhtons(port), - false /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); + hr = create_hostbyname_request_locked(r, host, strhtons(port), + false /* is_balancer */); + ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked, + hr); if (check_grpclb) { /* Query the SRV record */ - grpc_ares_request_ref(r); + grpc_ares_request_ref_locked(r); char* service_name; gpr_asprintf(&service_name, "_grpclb._tcp.%s", host); - ares_query(*channel, service_name, ns_c_in, ns_t_srv, on_srv_query_done_cb, - r); + ares_query(*channel, service_name, ns_c_in, ns_t_srv, + on_srv_query_done_locked, r); gpr_free(service_name); } if (service_config_json != nullptr) { - grpc_ares_request_ref(r); + grpc_ares_request_ref_locked(r); char* config_name; gpr_asprintf(&config_name, "_grpc_config.%s", host); - ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_cb, r); + ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_locked, + r); gpr_free(config_name); } - /* TODO(zyc): Handle CNAME records here. */ - grpc_ares_ev_driver_start(r->ev_driver); - grpc_ares_request_unref(r); + grpc_ares_ev_driver_start_locked(r->ev_driver); + grpc_ares_request_unref_locked(r); gpr_free(host); gpr_free(port); return r; @@ -494,15 +492,15 @@ error_cleanup: return nullptr; } -grpc_ares_request* (*grpc_dns_lookup_ares)( +grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, - char** service_config_json) = grpc_dns_lookup_ares_impl; + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; void grpc_cancel_ares_request(grpc_ares_request* r) { - if (grpc_dns_lookup_ares == grpc_dns_lookup_ares_impl) { - grpc_ares_ev_driver_shutdown(r->ev_driver); + if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) { + grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } } @@ -534,6 +532,8 @@ void grpc_ares_cleanup(void) { */ typedef struct grpc_resolve_address_ares_request { + /* combiner that queries and related callbacks run under */ + grpc_combiner* combiner; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; /** currently resolving lb addresses */ @@ -541,8 +541,14 @@ typedef struct grpc_resolve_address_ares_request { /** closure to call when the resolve_address_ares request completes */ grpc_closure* on_resolve_address_done; /** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the - grpc_dns_lookup_ares operation is done. */ + grpc_dns_lookup_ares_locked operation is done. */ grpc_closure on_dns_lookup_done; + /* target name */ + const char* name; + /* default port to use if none is specified */ + const char* default_port; + /* pollset_set to be driven by */ + grpc_pollset_set* interested_parties; } grpc_resolve_address_ares_request; static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { @@ -566,9 +572,20 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { } GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); + GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb"); gpr_free(r); } +static void grpc_resolve_address_invoke_dns_lookup_ares_locked( + void* arg, grpc_error* unused_error) { + grpc_resolve_address_ares_request* r = + static_cast<grpc_resolve_address_ares_request*>(arg); + grpc_dns_lookup_ares_locked( + nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, + &r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */, + nullptr /* service_config_json */, r->combiner); +} + static void grpc_resolve_address_ares_impl(const char* name, const char* default_port, grpc_pollset_set* interested_parties, @@ -577,14 +594,18 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_resolve_address_ares_request* r = static_cast<grpc_resolve_address_ares_request*>( gpr_zalloc(sizeof(grpc_resolve_address_ares_request))); + r->combiner = grpc_combiner_create(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, grpc_schedule_on_exec_ctx); - grpc_dns_lookup_ares(nullptr /* dns_server */, name, default_port, - interested_parties, &r->on_dns_lookup_done, &r->lb_addrs, - false /* check_grpclb */, - nullptr /* service_config_json */); + r->name = name; + r->default_port = default_port; + r->interested_parties = interested_parties; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(grpc_resolve_address_invoke_dns_lookup_ares_locked, r, + grpc_combiner_scheduler(r->combiner)), + GRPC_ERROR_NONE); } void (*grpc_resolve_address_ares)( |