aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c113
-rw-r--r--src/core/client_config/lb_policy.c9
-rw-r--r--src/core/client_config/lb_policy.h15
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c31
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c21
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c25
-rw-r--r--src/core/client_config/subchannel.c67
-rw-r--r--src/core/client_config/subchannel.h8
8 files changed, 208 insertions, 81 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 575ee02249..804dbdeadd 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -110,38 +110,54 @@ void pf_destroy(grpc_lb_policy *pol) {
void pf_shutdown(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_flusher f;
pending_pick *pp;
gpr_mu_lock(&p->mu);
del_interested_parties_locked(p);
p->shutdown = 1;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_workqueue_push(p->workqueue, pp->on_complete, 0);
- gpr_free(pp);
- }
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
"shutdown");
+ grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
+ grpc_connectivity_state_end_flush(&f);
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ *pp->target = NULL;
+ pp->on_complete->cb(pp->on_complete->cb_arg, 0);
+ gpr_free(pp);
+ pp = next;
+ }
}
-static void start_picking(pick_first_lb_policy *p) {
+/* returns a closure to call, or NULL */
+static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
- &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed)
+ .state_already_changed) {
+ return &p->connectivity_changed;
+ } else {
+ return NULL;
+ }
}
void pf_exit_idle(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
- start_picking(p);
+ call = start_picking(p);
}
gpr_mu_unlock(&p->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
@@ -155,8 +171,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
*target = p->selected;
on_complete->cb(on_complete->cb_arg, 1);
} else {
+ grpc_iomgr_closure *call = NULL;
if (!p->started_picking) {
- start_picking(p);
+ call = start_picking(p);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@@ -167,6 +184,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
}
@@ -174,6 +194,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
+ grpc_iomgr_closure *cbs = NULL;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&p->mu);
@@ -183,8 +205,12 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
} else {
unref = 1;
}
@@ -199,11 +225,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
+ pp->on_complete->next = cbs;
+ cbs = pp->on_complete;
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(p->selected,
+ &p->checking_connectivity,
+ &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
@@ -216,9 +248,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
} else {
goto loop;
}
@@ -227,9 +263,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"connecting_changed");
- grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p);
@@ -244,7 +284,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
+ pp->on_complete->next = cbs;
+ cbs = pp->on_complete;
gpr_free(pp);
}
unref = 1;
@@ -261,7 +302,15 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
}
+ grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
+ grpc_connectivity_state_end_flush(&f);
+
+ while (cbs != NULL) {
+ grpc_iomgr_closure *next = cbs->next;
+ cbs->cb(cbs->cb_arg, 1);
+ cbs = next;
+ }
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
@@ -299,14 +348,16 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st;
}
-static void pf_notify_on_state_change(grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_iomgr_closure *notify) {
+static grpc_connectivity_state_notify_on_state_change_result
+pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&p->mu);
- grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
- notify);
+ r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
gpr_mu_unlock(&p->mu);
+ return r;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
@@ -332,8 +383,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
p->num_subchannels = args->num_subchannels;
p->workqueue = args->workqueue;
GRPC_WORKQUEUE_REF(p->workqueue, "pick_first");
- grpc_connectivity_state_init(&p->state_tracker, args->workqueue,
- GRPC_CHANNEL_IDLE, "pick_first");
+ grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ "pick_first");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 90ec44432f..48a787da8a 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -82,10 +82,11 @@ void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
policy->vtable->exit_idle(policy);
}
-void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure) {
- policy->vtable->notify_on_state_change(policy, state, closure);
+grpc_connectivity_state_notify_on_state_change_result
+grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure) {
+ return policy->vtable->notify_on_state_change(policy, state, closure);
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 3f7ca8f28d..7c9bdac648 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_H
#include "src/core/client_config/subchannel.h"
+#include "src/core/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
is expected to be extended to contain some parameters) */
@@ -70,9 +71,10 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
- void (*notify_on_state_change)(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure);
+ grpc_connectivity_state_notify_on_state_change_result (
+ *notify_on_state_change)(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure);
};
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -111,9 +113,10 @@ void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
-void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure);
+grpc_connectivity_state_notify_on_state_change_result
+grpc_lb_policy_notify_on_state_change(
+ grpc_lb_policy *policy, grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure) GRPC_MUST_USE_RESULT;
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_lb_policy *policy);
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 66a8c9f99d..4fee543a5b 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -79,7 +79,8 @@ typedef struct {
static void dns_destroy(grpc_resolver *r);
static void dns_start_resolving_locked(dns_resolver *r);
-static void dns_maybe_finish_next_locked(dns_resolver *r);
+static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r)
+ GRPC_MUST_USE_RESULT;
static void dns_shutdown(grpc_resolver *r);
static void dns_channel_saw_error(grpc_resolver *r,
@@ -93,13 +94,15 @@ static const grpc_resolver_vtable dns_resolver_vtable = {
static void dns_shutdown(grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
+ grpc_iomgr_closure *next_completion;
gpr_mu_lock(&r->mu);
- if (r->next_completion != NULL) {
+ next_completion = r->next_completion;
+ r->next_completion = NULL;
+ gpr_mu_unlock(&r->mu);
+ if (next_completion != NULL) {
*r->target_config = NULL;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
- r->next_completion = NULL;
+ next_completion->cb(next_completion->cb_arg, 1);
}
- gpr_mu_unlock(&r->mu);
}
static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
@@ -116,6 +119,7 @@ static void dns_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
dns_resolver *r = (dns_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
@@ -123,9 +127,12 @@ static void dns_next(grpc_resolver *resolver,
if (r->resolved_version == 0 && !r->resolving) {
dns_start_resolving_locked(r);
} else {
- dns_maybe_finish_next_locked(r);
+ call = dns_maybe_finish_next_locked(r);
}
gpr_mu_unlock(&r->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
@@ -134,6 +141,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
+ grpc_iomgr_closure *call;
size_t i;
if (addresses) {
grpc_lb_policy_args lb_policy_args;
@@ -164,8 +172,11 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
}
r->resolved_config = config;
r->resolved_version++;
- dns_maybe_finish_next_locked(r);
+ call = dns_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
GRPC_RESOLVER_UNREF(&r->base, "dns-resolving");
}
@@ -177,17 +188,19 @@ static void dns_start_resolving_locked(dns_resolver *r) {
grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
}
-static void dns_maybe_finish_next_locked(dns_resolver *r) {
+static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) {
+ grpc_iomgr_closure *ret = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ ret = r->next_completion;
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
+ return ret;
}
static void dns_destroy(grpc_resolver *gr) {
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index abfb7b8569..220915853c 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -80,7 +80,8 @@ typedef struct {
static void sockaddr_destroy(grpc_resolver *r);
-static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r);
+static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
+ sockaddr_resolver *r) GRPC_MUST_USE_RESULT;
static void sockaddr_shutdown(grpc_resolver *r);
static void sockaddr_channel_saw_error(grpc_resolver *r,
@@ -95,13 +96,17 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = {
static void sockaddr_shutdown(grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
static void sockaddr_channel_saw_error(grpc_resolver *resolver,
@@ -111,20 +116,24 @@ static void sockaddr_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_config = target_config;
- sockaddr_maybe_finish_next_locked(r);
+ call = sockaddr_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
+ if (call) call->cb(call->cb_arg, 1);
}
-static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
+static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
+ sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_lb_policy_args lb_policy_args;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
+ grpc_iomgr_closure *call = NULL;
if (r->next_completion != NULL && !r->published) {
size_t i;
@@ -148,9 +157,11 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
*r->target_config = cfg;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
}
+
+ return call;
}
static void sockaddr_destroy(grpc_resolver *gr) {
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index bc04203744..94ec12dd16 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -92,7 +92,8 @@ typedef struct {
static void zookeeper_destroy(grpc_resolver *r);
static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
-static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r);
+static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
+ zookeeper_resolver *r) GRPC_MUST_USE_RESULT;
static void zookeeper_shutdown(grpc_resolver *r);
static void zookeeper_channel_saw_error(grpc_resolver *r,
@@ -107,14 +108,18 @@ static const grpc_resolver_vtable zookeeper_resolver_vtable = {
static void zookeeper_shutdown(grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
}
zookeeper_close(r->zookeeper_handle);
gpr_mu_unlock(&r->mu);
+ if (call != NULL) {
+ call->cb(call->cb_arg, 1);
+ }
}
static void zookeeper_channel_saw_error(grpc_resolver *resolver,
@@ -131,6 +136,7 @@ static void zookeeper_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ grpc_iomgr_closure *call;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
@@ -138,9 +144,10 @@ static void zookeeper_next(grpc_resolver *resolver,
if (r->resolved_version == 0 && r->resolving == 0) {
zookeeper_start_resolving_locked(r);
} else {
- zookeeper_maybe_finish_next_locked(r);
+ call = zookeeper_maybe_finish_next_locked(r);
}
gpr_mu_unlock(&r->mu);
+ if (call) call->cb(call->cb_arg, 1);
}
/** Zookeeper global watcher for connection management
@@ -182,6 +189,7 @@ static void zookeeper_on_resolved(void *arg,
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
+ grpc_iomgr_closure *call;
size_t i;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
@@ -211,9 +219,11 @@ static void zookeeper_on_resolved(void *arg,
}
r->resolved_config = config;
r->resolved_version++;
- zookeeper_maybe_finish_next_locked(r);
+ call = zookeeper_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
+ if (call) call->cb(call->cb_arg, 1);
+
GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
}
@@ -404,17 +414,20 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
zookeeper_resolve_address(r);
}
-static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
+static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
+ zookeeper_resolver *r) {
+ grpc_iomgr_closure *call = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
+ return call;
}
static void zookeeper_destroy(grpc_resolver *gr) {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2047d2fee7..6fbf966475 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -303,8 +303,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->random = random_seed();
grpc_mdctx_ref(c->mdctx);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
- grpc_connectivity_state_init(&c->state_tracker, c->workqueue,
- GRPC_CHANNEL_IDLE, "subchannel");
+ grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
+ "subchannel");
gpr_mu_init(&c->mu);
return c;
}
@@ -365,12 +365,15 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, pollset);
if (!c->connecting) {
+ grpc_connectivity_state_flusher f;
c->connecting = 1;
connectivity_state_changed_locked(c, "create_call");
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
start_connect(c);
} else {
@@ -387,24 +390,33 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
-void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *notify) {
+grpc_connectivity_state_notify_on_state_change_result
+grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *notify) {
int do_connect = 0;
+ grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&c->mu);
- if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- notify)) {
+ r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
+ notify);
+ if (r.current_state_is_idle) {
+ grpc_connectivity_state_flusher f;
do_connect = 1;
c->connecting = 1;
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
connectivity_state_changed_locked(c, "state_change");
- }
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
+ gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
+ } else {
gpr_mu_unlock(&c->mu);
+ }
if (do_connect) {
start_connect(c);
}
+ return r;
}
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
@@ -413,18 +425,23 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_subchannel *destroy;
int cancel_alarm = 0;
gpr_mu_lock(&c->mu);
+ if (c->active != NULL) {
+ con = c->active;
+ CONNECTION_REF_LOCKED(con, "transport-op");
+ }
if (op->disconnect) {
+ grpc_connectivity_state_flusher f;
c->disconnected = 1;
connectivity_state_changed_locked(c, "disconnect");
if (c->have_alarm) {
cancel_alarm = 1;
}
- }
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "transport-op");
- }
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
+ gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
+ } else {
gpr_mu_unlock(&c->mu);
+ }
if (con != NULL) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
@@ -457,6 +474,7 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(mu);
@@ -498,7 +516,9 @@ done:
connectivity_state_changed_locked(c, "transport_state_changed");
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(mu);
+ grpc_connectivity_state_end_flush(&f);
if (destroy) {
subchannel_destroy(c);
}
@@ -518,6 +538,7 @@ static void publish_transport(grpc_subchannel *c) {
state_watcher *sw;
connection *destroy_connection = NULL;
grpc_channel_element *elem;
+ grpc_connectivity_state_flusher f;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@@ -581,12 +602,18 @@ static void publish_transport(grpc_subchannel *c) {
/* signal completion */
connectivity_state_changed_locked(c, "connected");
- while ((w4c = c->waiting)) {
- c->waiting = w4c->next;
- grpc_workqueue_push(c->workqueue, &w4c->continuation, 1);
- }
+ w4c = c->waiting;
+ c->waiting = NULL;
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
+
+ while (w4c != NULL) {
+ waiting_for_connect *next = w4c;
+ w4c->continuation.cb(w4c->continuation.cb_arg, 1);
+ w4c = next;
+ }
gpr_free(filters);
@@ -626,13 +653,16 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
static void on_alarm(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
iomgr_success = 0;
}
connectivity_state_changed_locked(c, "alarm");
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(c);
@@ -648,12 +678,15 @@ static void subchannel_connected(void *arg, int iomgr_success) {
publish_transport(c);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
connectivity_state_changed_locked(c, "connect_failed");
grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 2e36c69134..391df36bfd 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -36,6 +36,7 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/connector.h"
+#include "src/core/transport/connectivity_state.h"
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
@@ -87,9 +88,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
-void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *notify);
+grpc_connectivity_state_notify_on_state_change_result
+grpc_subchannel_notify_on_state_change(
+ grpc_subchannel *channel, grpc_connectivity_state *state,
+ grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,