aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/client_channel/client_channel.c39
-rw-r--r--src/core/ext/client_channel/resolver.c18
-rw-r--r--src/core/ext/client_channel/resolver.h31
-rw-r--r--src/core/ext/client_channel/resolver_factory.h1
-rw-r--r--src/core/ext/client_channel/resolver_registry.c4
-rw-r--r--src/core/ext/client_channel/resolver_registry.h3
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c61
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c46
-rw-r--r--src/core/lib/iomgr/combiner.c5
-rw-r--r--src/core/lib/iomgr/combiner.h2
-rw-r--r--test/core/client_channel/resolvers/dns_resolver_connectivity_test.c3
11 files changed, 119 insertions, 94 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 907d81866d..758bdc0f17 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -230,7 +230,7 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
if (w->lb_policy == w->chand->lb_policy) {
if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
+ grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
@@ -386,11 +386,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
} else {
if (chand->resolver != NULL) {
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
@@ -451,7 +452,7 @@ static void cc_start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
if (!chand->started_resolving) {
@@ -551,7 +552,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->resolver = grpc_resolver_create(
exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
new_args != NULL ? new_args : args->channel_args,
- chand->interested_parties);
+ chand->interested_parties, chand->combiner);
if (proxy_name != NULL) gpr_free(proxy_name);
if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
if (chand->resolver == NULL) {
@@ -560,13 +561,23 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
+static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_resolver *resolver = arg;
+ grpc_resolver_shutdown_locked(exec_ctx, resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
+}
+
/* Destructor for channel_data */
static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL) {
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
- GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(shutdown_resolver_locked, chand->resolver,
+ grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_ERROR_NONE);
}
if (chand->client_channel_factory != NULL) {
grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
@@ -584,7 +595,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
- grpc_combiner_destroy(exec_ctx, chand->combiner);
+ grpc_combiner_unref(exec_ctx, chand->combiner);
gpr_mu_destroy(&chand->info_mu);
}
@@ -847,8 +858,9 @@ static bool pick_subchannel_locked(
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
}
if (chand->resolver != NULL) {
cpa = gpr_malloc(sizeof(*cpa));
@@ -1187,8 +1199,9 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (!chand->started_resolving && chand->resolver != NULL) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
chand->started_resolving = true;
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
}
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c
index 2ae4fe862e..22e6a9e525 100644
--- a/src/core/ext/client_channel/resolver.c
+++ b/src/core/ext/client_channel/resolver.c
@@ -66,16 +66,18 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
}
}
-void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
- resolver->vtable->shutdown(exec_ctx, resolver);
+void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ resolver->vtable->shutdown_locked(exec_ctx, resolver);
}
-void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
- resolver->vtable->channel_saw_error(exec_ctx, resolver);
+void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ resolver->vtable->channel_saw_error_locked(exec_ctx, resolver);
}
-void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **result, grpc_closure *on_complete) {
- resolver->vtable->next(exec_ctx, resolver, result, on_complete);
+void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result,
+ grpc_closure *on_complete) {
+ resolver->vtable->next_locked(exec_ctx, resolver, result, on_complete);
}
diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h
index 96ece92b9d..01fe9a4a46 100644
--- a/src/core/ext/client_channel/resolver.h
+++ b/src/core/ext/client_channel/resolver.h
@@ -48,10 +48,11 @@ struct grpc_resolver {
struct grpc_resolver_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
- void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
- void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **result, grpc_closure *on_complete);
+ void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+ void (*channel_saw_error_locked)(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver);
+ void (*next_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result, grpc_closure *on_complete);
};
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
@@ -72,19 +73,27 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy);
void grpc_resolver_init(grpc_resolver *resolver,
const grpc_resolver_vtable *vtable);
-void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver);
/** Notification that the channel has seen an error on some address.
- Can be used as a hint that re-resolution is desirable soon. */
-void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver);
+ Can be used as a hint that re-resolution is desirable soon.
+
+ Must be called from the combiner passed as a resolver_arg at construction
+ time.*/
+void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver);
/** Get the next result from the resolver. Expected to set \a *result with
new channel args and then schedule \a on_complete for execution.
If resolution is fatally broken, set \a *result to NULL and
- schedule \a on_complete. */
-void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **result, grpc_closure *on_complete);
+ schedule \a on_complete.
+
+ Must be called from the combiner passed as a resolver_arg at construction
+ time.*/
+void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result,
+ grpc_closure *on_complete);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */
diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h
index 3792ddca18..e3cd99ec5a 100644
--- a/src/core/ext/client_channel/resolver_factory.h
+++ b/src/core/ext/client_channel/resolver_factory.h
@@ -50,6 +50,7 @@ typedef struct grpc_resolver_args {
grpc_uri *uri;
const grpc_channel_args *args;
grpc_pollset_set *pollset_set;
+ grpc_combiner *combiner;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c
index 5110a7cad9..f8e8bc9c39 100644
--- a/src/core/ext/client_channel/resolver_registry.c
+++ b/src/core/ext/client_channel/resolver_registry.c
@@ -133,7 +133,8 @@ static grpc_resolver_factory *resolve_factory(const char *target,
grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *args,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set,
+ grpc_combiner *combiner) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
grpc_resolver_factory *factory =
@@ -144,6 +145,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
resolver_args.uri = uri;
resolver_args.args = args;
resolver_args.pollset_set = pollset_set;
+ resolver_args.combiner = combiner;
resolver =
grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args);
grpc_uri_destroy(uri);
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h
index a4606463eb..e2c189cf0c 100644
--- a/src/core/ext/client_channel/resolver_registry.h
+++ b/src/core/ext/client_channel/resolver_registry.h
@@ -65,7 +65,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
should not be NULL. */
grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *args,
- grpc_pollset_set *pollset_set);
+ grpc_pollset_set *pollset_set,
+ grpc_combiner *combiner);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 2c9623211b..7efe3be250 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -40,6 +40,7 @@
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/backoff.h"
@@ -62,9 +63,9 @@ typedef struct {
grpc_channel_args *channel_args;
/** pollset_set to drive the name resolution process */
grpc_pollset_set *interested_parties;
+ /** combiner (shared with client channel) */
+ grpc_combiner *combiner;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
/** are we currently resolving? */
bool resolving;
/** which version of the result have we published? */
@@ -95,18 +96,20 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
dns_resolver *r);
-static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- grpc_channel_args **target_result,
- grpc_closure *on_complete);
+static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r);
+static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete);
static const grpc_resolver_vtable dns_resolver_vtable = {
- dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next};
+ dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked,
+ dns_next_locked};
-static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
+static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
- gpr_mu_lock(&r->mu);
if (r->have_retry_timer) {
grpc_timer_cancel(exec_ctx, &r->retry_timer);
}
@@ -116,25 +119,21 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
GRPC_ERROR_CREATE("Resolver Shutdown"));
r->next_completion = NULL;
}
- gpr_mu_unlock(&r->mu);
}
-static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
+static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
- gpr_mu_lock(&r->mu);
if (!r->resolving) {
gpr_backoff_reset(&r->backoff_state);
dns_start_resolving_locked(exec_ctx, r);
}
- gpr_mu_unlock(&r->mu);
}
-static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **target_result,
- grpc_closure *on_complete) {
+static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete) {
dns_resolver *r = (dns_resolver *)resolver;
- gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_result = target_result;
@@ -144,30 +143,26 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
} else {
dns_maybe_finish_next_locked(exec_ctx, r);
}
- gpr_mu_unlock(&r->mu);
}
-static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
dns_resolver *r = arg;
- gpr_mu_lock(&r->mu);
r->have_retry_timer = false;
if (error == GRPC_ERROR_NONE) {
if (!r->resolving) {
dns_start_resolving_locked(exec_ctx, r);
}
}
- gpr_mu_unlock(&r->mu);
GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer");
}
-static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
dns_resolver *r = arg;
grpc_channel_args *result = NULL;
- gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = false;
if (r->addresses != NULL) {
@@ -198,8 +193,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
} else {
gpr_log(GPR_DEBUG, "retrying immediately");
}
- grpc_closure_init(&r->on_retry, dns_on_retry_timer, r,
- grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r,
+ grpc_combiner_scheduler(r->combiner, false));
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now);
}
if (r->resolved_result != NULL) {
@@ -208,7 +203,6 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
r->resolved_result = result;
r->resolved_version++;
dns_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving");
}
@@ -221,7 +215,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
r->addresses = NULL;
grpc_resolve_address(
exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
- grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx),
+ grpc_closure_create(dns_on_resolved_locked, r,
+ grpc_combiner_scheduler(r->combiner, false)),
&r->addresses);
}
@@ -240,7 +235,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
dns_resolver *r = (dns_resolver *)gr;
- gpr_mu_destroy(&r->mu);
+ grpc_combiner_unref(exec_ctx, r->combiner);
if (r->resolved_result != NULL) {
grpc_channel_args_destroy(exec_ctx, r->resolved_result);
}
@@ -264,7 +259,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx,
// Create resolver.
dns_resolver *r = gpr_malloc(sizeof(dns_resolver));
memset(r, 0, sizeof(*r));
- gpr_mu_init(&r->mu);
+ r->combiner = args->combiner;
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name_to_resolve = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index a1365f6465..2dd5259719 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -45,6 +45,7 @@
#include "src/core/ext/client_channel/parse_address.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -58,8 +59,8 @@ typedef struct {
grpc_lb_addresses *addresses;
/** channel args */
grpc_channel_args *channel_args;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
+ /** combiner guarding the rest of the state */
+ grpc_combiner *combiner;
/** have we published? */
bool published;
/** pending next completion, or NULL */
@@ -73,48 +74,43 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r);
-static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *r);
-static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- grpc_channel_args **target_result,
- grpc_closure *on_complete);
+static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r);
+static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete);
static const grpc_resolver_vtable sockaddr_resolver_vtable = {
- sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error,
- sockaddr_next};
+ sockaddr_destroy, sockaddr_shutdown_locked,
+ sockaddr_channel_saw_error_locked, sockaddr_next_locked};
-static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
+static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_result = NULL;
grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
}
- gpr_mu_unlock(&r->mu);
}
-static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
+static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- gpr_mu_lock(&r->mu);
r->published = false;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
}
-static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **target_result,
- grpc_closure *on_complete) {
+static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_result = target_result;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
}
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@@ -131,7 +127,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
- gpr_mu_destroy(&r->mu);
+ grpc_combiner_unref(exec_ctx, r->combiner);
grpc_lb_addresses_destroy(exec_ctx, r->addresses);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
@@ -201,7 +197,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
memset(r, 0, sizeof(*r));
r->addresses = addresses;
r->channel_args = grpc_channel_args_copy(args->args);
- gpr_mu_init(&r->mu);
+ r->combiner = grpc_combiner_ref(args->combiner);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
return &r->base;
}
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 93d8c0bd70..9c6bd2728f 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -169,7 +169,10 @@ void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
}
-void grpc_combiner_ref(grpc_combiner *lock) { gpr_ref(&lock->refs); }
+grpc_combiner *grpc_combiner_ref(grpc_combiner *lock) {
+ gpr_ref(&lock->refs);
+ return lock;
+}
static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock) {
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 352e87d050..d257150058 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -49,7 +49,7 @@
// necessary
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Ref/unref the lock, for when we're sharing the lock ownership
-void grpc_combiner_ref(grpc_combiner *lock);
+grpc_combiner *grpc_combiner_ref(grpc_combiner *lock);
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Fetch a scheduler to schedule closures against
grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c
index cfd37052de..ab9ae71d61 100644
--- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c
+++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c
@@ -44,6 +44,7 @@
static gpr_mu g_mu;
static bool g_fail_resolution = true;
+static grpc_combiner *g_combiner;
static grpc_error *my_resolve_address(const char *name, const char *addr,
grpc_resolved_addresses **addrs) {
@@ -101,6 +102,7 @@ int main(int argc, char **argv) {
grpc_init();
gpr_mu_init(&g_mu);
+ g_combiner = grpc_combiner_create(NULL);
grpc_blocking_resolve_address = my_resolve_address;
grpc_channel_args *result = (grpc_channel_args *)1;
@@ -126,6 +128,7 @@ int main(int argc, char **argv) {
grpc_channel_args_destroy(&exec_ctx, result);
GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test");
+ grpc_combiner_unref(&exec_ctx, g_combiner);
grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();