diff options
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 39 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver.c | 18 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver.h | 31 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver_factory.h | 1 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver_registry.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver_registry.h | 3 | ||||
-rw-r--r-- | src/core/ext/resolver/dns/native/dns_resolver.c | 61 | ||||
-rw-r--r-- | src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 46 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.h | 2 | ||||
-rw-r--r-- | test/core/client_channel/resolvers/dns_resolver_connectivity_test.c | 3 |
11 files changed, 119 insertions, 94 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 907d81866d..758bdc0f17 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -230,7 +230,7 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, if (w->lb_policy == w->chand->lb_policy) { if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); w->chand->lb_policy = NULL; } @@ -386,11 +386,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, watch_lb_policy(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } else { if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } @@ -451,7 +452,7 @@ static void cc_start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, set_channel_connectivity_state_locked( exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (!chand->started_resolving) { @@ -551,7 +552,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->resolver = grpc_resolver_create( exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string, new_args != NULL ? new_args : args->channel_args, - chand->interested_parties); + chand->interested_parties, chand->combiner); if (proxy_name != NULL) gpr_free(proxy_name); if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); if (chand->resolver == NULL) { @@ -560,13 +561,23 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } +static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_resolver *resolver = arg; + grpc_resolver_shutdown_locked(exec_ctx, resolver); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); +} + /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); @@ -584,7 +595,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); - grpc_combiner_destroy(exec_ctx, chand->combiner); + grpc_combiner_unref(exec_ctx, chand->combiner); gpr_mu_destroy(&chand->info_mu); } @@ -847,8 +858,9 @@ static bool pick_subchannel_locked( if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { cpa = gpr_malloc(sizeof(*cpa)); @@ -1187,8 +1199,9 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c index 2ae4fe862e..22e6a9e525 100644 --- a/src/core/ext/client_channel/resolver.c +++ b/src/core/ext/client_channel/resolver.c @@ -66,16 +66,18 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } } -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { - resolver->vtable->shutdown(exec_ctx, resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->shutdown_locked(exec_ctx, resolver); } -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { - resolver->vtable->channel_saw_error(exec_ctx, resolver); +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->channel_saw_error_locked(exec_ctx, resolver); } -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete) { - resolver->vtable->next(exec_ctx, resolver, result, on_complete); +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete) { + resolver->vtable->next_locked(exec_ctx, resolver, result, on_complete); } diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h index 96ece92b9d..01fe9a4a46 100644 --- a/src/core/ext/client_channel/resolver.h +++ b/src/core/ext/client_channel/resolver.h @@ -48,10 +48,11 @@ struct grpc_resolver { struct grpc_resolver_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); + void (*channel_saw_error_locked)(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); + void (*next_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG @@ -72,19 +73,27 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy); void grpc_resolver_init(grpc_resolver *resolver, const grpc_resolver_vtable *vtable); -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. - Can be used as a hint that re-resolution is desirable soon. */ -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver); + Can be used as a hint that re-resolution is desirable soon. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Get the next result from the resolver. Expected to set \a *result with new channel args and then schedule \a on_complete for execution. If resolution is fatally broken, set \a *result to NULL and - schedule \a on_complete. */ -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + schedule \a on_complete. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */ diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h index 3792ddca18..e3cd99ec5a 100644 --- a/src/core/ext/client_channel/resolver_factory.h +++ b/src/core/ext/client_channel/resolver_factory.h @@ -50,6 +50,7 @@ typedef struct grpc_resolver_args { grpc_uri *uri; const grpc_channel_args *args; grpc_pollset_set *pollset_set; + grpc_combiner *combiner; } grpc_resolver_args; struct grpc_resolver_factory_vtable { diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 5110a7cad9..f8e8bc9c39 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -133,7 +133,8 @@ static grpc_resolver_factory *resolve_factory(const char *target, grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_combiner *combiner) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = @@ -144,6 +145,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, resolver_args.uri = uri; resolver_args.args = args; resolver_args.pollset_set = pollset_set; + resolver_args.combiner = combiner; resolver = grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args); grpc_uri_destroy(uri); diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index a4606463eb..e2c189cf0c 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -65,7 +65,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); should not be NULL. */ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set, + grpc_combiner *combiner); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2c9623211b..7efe3be250 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -40,6 +40,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/backoff.h" @@ -62,9 +63,9 @@ typedef struct { grpc_channel_args *channel_args; /** pollset_set to drive the name resolution process */ grpc_pollset_set *interested_parties; + /** combiner (shared with client channel) */ + grpc_combiner *combiner; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** are we currently resolving? */ bool resolving; /** which version of the result have we published? */ @@ -95,18 +96,20 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; + dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked, + dns_next_locked}; -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->have_retry_timer) { grpc_timer_cancel(exec_ctx, &r->retry_timer); } @@ -116,25 +119,21 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { GRPC_ERROR_CREATE("Resolver Shutdown")); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); dns_start_resolving_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; @@ -144,30 +143,26 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, } else { dns_maybe_finish_next_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; - gpr_mu_lock(&r->mu); r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { dns_start_resolving_locked(exec_ctx, r); } } - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } -static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; grpc_channel_args *result = NULL; - gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = false; if (r->addresses != NULL) { @@ -198,8 +193,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_closure_init(&r->on_retry, dns_on_retry_timer, r, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, + grpc_combiner_scheduler(r->combiner, false)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -208,7 +203,6 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, r->resolved_result = result; r->resolved_version++; dns_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } @@ -221,7 +215,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->addresses = NULL; grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, - grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx), + grpc_closure_create(dns_on_resolved_locked, r, + grpc_combiner_scheduler(r->combiner, false)), &r->addresses); } @@ -240,7 +235,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { dns_resolver *r = (dns_resolver *)gr; - gpr_mu_destroy(&r->mu); + grpc_combiner_unref(exec_ctx, r->combiner); if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } @@ -264,7 +259,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, // Create resolver. dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); - gpr_mu_init(&r->mu); + r->combiner = args->combiner; grpc_resolver_init(&r->base, &dns_resolver_vtable); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index a1365f6465..2dd5259719 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -45,6 +45,7 @@ #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" @@ -58,8 +59,8 @@ typedef struct { grpc_lb_addresses *addresses; /** channel args */ grpc_channel_args *channel_args; - /** mutex guarding the rest of the state */ - gpr_mu mu; + /** combiner guarding the rest of the state */ + grpc_combiner *combiner; /** have we published? */ bool published; /** pending next completion, or NULL */ @@ -73,48 +74,43 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r); -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *r); -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { - sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, - sockaddr_next}; + sockaddr_destroy, sockaddr_shutdown_locked, + sockaddr_channel_saw_error_locked, sockaddr_next_locked}; -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_result = NULL; grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -131,7 +127,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; - gpr_mu_destroy(&r->mu); + grpc_combiner_unref(exec_ctx, r->combiner); grpc_lb_addresses_destroy(exec_ctx, r->addresses); grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); @@ -201,7 +197,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, memset(r, 0, sizeof(*r)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); - gpr_mu_init(&r->mu); + r->combiner = grpc_combiner_ref(args->combiner); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); return &r->base; } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 93d8c0bd70..9c6bd2728f 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -169,7 +169,10 @@ void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } -void grpc_combiner_ref(grpc_combiner *lock) { gpr_ref(&lock->refs); } +grpc_combiner *grpc_combiner_ref(grpc_combiner *lock) { + gpr_ref(&lock->refs); + return lock; +} static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 352e87d050..d257150058 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -49,7 +49,7 @@ // necessary grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); // Ref/unref the lock, for when we're sharing the lock ownership -void grpc_combiner_ref(grpc_combiner *lock); +grpc_combiner *grpc_combiner_ref(grpc_combiner *lock); void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); // Fetch a scheduler to schedule closures against grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c index cfd37052de..ab9ae71d61 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c @@ -44,6 +44,7 @@ static gpr_mu g_mu; static bool g_fail_resolution = true; +static grpc_combiner *g_combiner; static grpc_error *my_resolve_address(const char *name, const char *addr, grpc_resolved_addresses **addrs) { @@ -101,6 +102,7 @@ int main(int argc, char **argv) { grpc_init(); gpr_mu_init(&g_mu); + g_combiner = grpc_combiner_create(NULL); grpc_blocking_resolve_address = my_resolve_address; grpc_channel_args *result = (grpc_channel_args *)1; @@ -126,6 +128,7 @@ int main(int argc, char **argv) { grpc_channel_args_destroy(&exec_ctx, result); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test"); + grpc_combiner_unref(&exec_ctx, g_combiner); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); |