aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/client_channel.c75
-rw-r--r--src/core/client_config/lb_policies/pick_first.c113
-rw-r--r--src/core/client_config/lb_policy.c9
-rw-r--r--src/core/client_config/lb_policy.h15
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c31
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c21
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c25
-rw-r--r--src/core/client_config/subchannel.c67
-rw-r--r--src/core/client_config/subchannel.h8
-rw-r--r--src/core/iomgr/workqueue.h1
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/transport/chttp2_transport.c25
-rw-r--r--src/core/transport/connectivity_state.c73
-rw-r--r--src/core/transport/connectivity_state.h38
14 files changed, 350 insertions, 153 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 6fefdec2f6..5b165972a7 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -437,20 +437,28 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
+static void on_lb_policy_state_changed_locked(
+ lb_policy_connectivity_watcher *w) {
+ /* check if the notification is for a stale policy */
+ if (w->lb_policy != w->chand->lb_policy) return;
+
+ grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed");
+ if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
+ watch_lb_policy(w->chand, w->lb_policy, w->state);
+ }
+}
+
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&w->chand->mu_config);
- /* check if the notification is for a stale policy */
- if (w->lb_policy == w->chand->lb_policy) {
- grpc_connectivity_state_set(&w->chand->state_tracker, w->state,
- "lb_changed");
- if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
- watch_lb_policy(w->chand, w->lb_policy, w->state);
- }
- }
+ on_lb_policy_state_changed_locked(w);
+ grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f);
gpr_mu_unlock(&w->chand->mu_config);
+ grpc_connectivity_state_end_flush(&f);
+
GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
gpr_free(w);
}
@@ -464,7 +472,13 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
- grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed);
+ if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state,
+ &w->on_changed)
+ .state_already_changed) {
+ on_lb_policy_state_changed_locked(w);
+ GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
+ gpr_free(w);
+ }
}
static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -474,6 +488,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_resolver *old_resolver;
grpc_iomgr_closure *wakeup_closures = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_connectivity_state_flusher f;
int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
@@ -507,20 +522,24 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
GRPC_RESOLVER_REF(resolver, "channel-next");
grpc_connectivity_state_set(&chand->state_tracker, state,
"new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy(chand, lb_policy, state);
+ }
+ grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ grpc_connectivity_state_end_flush(&f);
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
- if (lb_policy != NULL) {
- watch_lb_policy(chand, lb_policy, state);
- }
} else {
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set(&chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
+ grpc_connectivity_state_end_flush(&f);
if (old_resolver != NULL) {
grpc_resolver_shutdown(old_resolver);
GRPC_RESOLVER_UNREF(old_resolver, "channel");
@@ -554,7 +573,9 @@ static void cc_start_transport_op(grpc_channel_element *elem,
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_iomgr_closure *on_consumed = op->on_consumed;
+ grpc_connectivity_state_flusher f;
+ grpc_iomgr_closure *call_list = op->on_consumed;
+ call_list->next = NULL;
op->on_consumed = NULL;
GPR_ASSERT(op->set_accept_stream == NULL);
@@ -562,9 +583,13 @@ static void cc_start_transport_op(grpc_channel_element *elem,
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
- grpc_connectivity_state_notify_on_state_change(
- &chand->state_tracker, op->connectivity_state,
- op->on_connectivity_state_change);
+ if (grpc_connectivity_state_notify_on_state_change(
+ &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change)
+ .state_already_changed) {
+ op->on_connectivity_state_change->next = call_list;
+ call_list = op->on_connectivity_state_change;
+ }
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
}
@@ -587,7 +612,9 @@ static void cc_start_transport_op(grpc_channel_element *elem,
chand->lb_policy = NULL;
}
}
+ grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
+ grpc_connectivity_state_end_flush(&f);
if (destroy_resolver) {
grpc_resolver_shutdown(destroy_resolver);
@@ -599,9 +626,10 @@ static void cc_start_transport_op(grpc_channel_element *elem,
GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
}
- if (on_consumed) {
- grpc_workqueue_push(grpc_channel_get_workqueue(chand->master), on_consumed,
- 1);
+ while (call_list != NULL) {
+ grpc_iomgr_closure *next = call_list->next;
+ call_list->cb(call_list->cb_arg, 1);
+ call_list = next;
}
}
@@ -671,7 +699,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand);
grpc_connectivity_state_init(&chand->state_tracker,
- grpc_channel_get_workqueue(master),
GRPC_CHANNEL_IDLE, "client_channel");
}
@@ -750,10 +777,14 @@ void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete) {
channel_data *chand = elem->channel_data;
+ grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&chand->mu_config);
- grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
- on_complete);
+ r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker,
+ state, on_complete);
gpr_mu_unlock(&chand->mu_config);
+ if (r.state_already_changed) {
+ on_complete->cb(on_complete->cb_arg, 1);
+ }
}
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 575ee02249..804dbdeadd 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -110,38 +110,54 @@ void pf_destroy(grpc_lb_policy *pol) {
void pf_shutdown(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_flusher f;
pending_pick *pp;
gpr_mu_lock(&p->mu);
del_interested_parties_locked(p);
p->shutdown = 1;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_workqueue_push(p->workqueue, pp->on_complete, 0);
- gpr_free(pp);
- }
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
"shutdown");
+ grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
+ grpc_connectivity_state_end_flush(&f);
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ *pp->target = NULL;
+ pp->on_complete->cb(pp->on_complete->cb_arg, 0);
+ gpr_free(pp);
+ pp = next;
+ }
}
-static void start_picking(pick_first_lb_policy *p) {
+/* returns a closure to call, or NULL */
+static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
- &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed)
+ .state_already_changed) {
+ return &p->connectivity_changed;
+ } else {
+ return NULL;
+ }
}
void pf_exit_idle(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
- start_picking(p);
+ call = start_picking(p);
}
gpr_mu_unlock(&p->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
@@ -155,8 +171,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
*target = p->selected;
on_complete->cb(on_complete->cb_arg, 1);
} else {
+ grpc_iomgr_closure *call = NULL;
if (!p->started_picking) {
- start_picking(p);
+ call = start_picking(p);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@@ -167,6 +184,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
}
@@ -174,6 +194,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
+ grpc_iomgr_closure *cbs = NULL;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&p->mu);
@@ -183,8 +205,12 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
} else {
unref = 1;
}
@@ -199,11 +225,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
+ pp->on_complete->next = cbs;
+ cbs = pp->on_complete;
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(p->selected,
+ &p->checking_connectivity,
+ &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
@@ -216,9 +248,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
} else {
goto loop;
}
@@ -227,9 +263,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"connecting_changed");
- grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed);
+ if (grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity, &p->connectivity_changed)
+ .state_already_changed) {
+ p->connectivity_changed.next = cbs;
+ cbs = &p->connectivity_changed;
+ }
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p);
@@ -244,7 +284,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
+ pp->on_complete->next = cbs;
+ cbs = pp->on_complete;
gpr_free(pp);
}
unref = 1;
@@ -261,7 +302,15 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
}
+ grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
+ grpc_connectivity_state_end_flush(&f);
+
+ while (cbs != NULL) {
+ grpc_iomgr_closure *next = cbs->next;
+ cbs->cb(cbs->cb_arg, 1);
+ cbs = next;
+ }
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
@@ -299,14 +348,16 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st;
}
-static void pf_notify_on_state_change(grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_iomgr_closure *notify) {
+static grpc_connectivity_state_notify_on_state_change_result
+pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&p->mu);
- grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
- notify);
+ r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
gpr_mu_unlock(&p->mu);
+ return r;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
@@ -332,8 +383,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
p->num_subchannels = args->num_subchannels;
p->workqueue = args->workqueue;
GRPC_WORKQUEUE_REF(p->workqueue, "pick_first");
- grpc_connectivity_state_init(&p->state_tracker, args->workqueue,
- GRPC_CHANNEL_IDLE, "pick_first");
+ grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ "pick_first");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 90ec44432f..48a787da8a 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -82,10 +82,11 @@ void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
policy->vtable->exit_idle(policy);
}
-void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure) {
- policy->vtable->notify_on_state_change(policy, state, closure);
+grpc_connectivity_state_notify_on_state_change_result
+grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure) {
+ return policy->vtable->notify_on_state_change(policy, state, closure);
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 3f7ca8f28d..7c9bdac648 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_H
#include "src/core/client_config/subchannel.h"
+#include "src/core/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
is expected to be extended to contain some parameters) */
@@ -70,9 +71,10 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
- void (*notify_on_state_change)(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure);
+ grpc_connectivity_state_notify_on_state_change_result (
+ *notify_on_state_change)(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure);
};
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -111,9 +113,10 @@ void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
-void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure);
+grpc_connectivity_state_notify_on_state_change_result
+grpc_lb_policy_notify_on_state_change(
+ grpc_lb_policy *policy, grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure) GRPC_MUST_USE_RESULT;
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_lb_policy *policy);
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 66a8c9f99d..4fee543a5b 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -79,7 +79,8 @@ typedef struct {
static void dns_destroy(grpc_resolver *r);
static void dns_start_resolving_locked(dns_resolver *r);
-static void dns_maybe_finish_next_locked(dns_resolver *r);
+static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r)
+ GRPC_MUST_USE_RESULT;
static void dns_shutdown(grpc_resolver *r);
static void dns_channel_saw_error(grpc_resolver *r,
@@ -93,13 +94,15 @@ static const grpc_resolver_vtable dns_resolver_vtable = {
static void dns_shutdown(grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
+ grpc_iomgr_closure *next_completion;
gpr_mu_lock(&r->mu);
- if (r->next_completion != NULL) {
+ next_completion = r->next_completion;
+ r->next_completion = NULL;
+ gpr_mu_unlock(&r->mu);
+ if (next_completion != NULL) {
*r->target_config = NULL;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
- r->next_completion = NULL;
+ next_completion->cb(next_completion->cb_arg, 1);
}
- gpr_mu_unlock(&r->mu);
}
static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
@@ -116,6 +119,7 @@ static void dns_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
dns_resolver *r = (dns_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
@@ -123,9 +127,12 @@ static void dns_next(grpc_resolver *resolver,
if (r->resolved_version == 0 && !r->resolving) {
dns_start_resolving_locked(r);
} else {
- dns_maybe_finish_next_locked(r);
+ call = dns_maybe_finish_next_locked(r);
}
gpr_mu_unlock(&r->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
@@ -134,6 +141,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
+ grpc_iomgr_closure *call;
size_t i;
if (addresses) {
grpc_lb_policy_args lb_policy_args;
@@ -164,8 +172,11 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
}
r->resolved_config = config;
r->resolved_version++;
- dns_maybe_finish_next_locked(r);
+ call = dns_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
GRPC_RESOLVER_UNREF(&r->base, "dns-resolving");
}
@@ -177,17 +188,19 @@ static void dns_start_resolving_locked(dns_resolver *r) {
grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
}
-static void dns_maybe_finish_next_locked(dns_resolver *r) {
+static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) {
+ grpc_iomgr_closure *ret = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ ret = r->next_completion;
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
+ return ret;
}
static void dns_destroy(grpc_resolver *gr) {
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index abfb7b8569..220915853c 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -80,7 +80,8 @@ typedef struct {
static void sockaddr_destroy(grpc_resolver *r);
-static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r);
+static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
+ sockaddr_resolver *r) GRPC_MUST_USE_RESULT;
static void sockaddr_shutdown(grpc_resolver *r);
static void sockaddr_channel_saw_error(grpc_resolver *r,
@@ -95,13 +96,17 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = {
static void sockaddr_shutdown(grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
+ if (call) {
+ call->cb(call->cb_arg, 1);
+ }
}
static void sockaddr_channel_saw_error(grpc_resolver *resolver,
@@ -111,20 +116,24 @@ static void sockaddr_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_config = target_config;
- sockaddr_maybe_finish_next_locked(r);
+ call = sockaddr_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
+ if (call) call->cb(call->cb_arg, 1);
}
-static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
+static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
+ sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_lb_policy_args lb_policy_args;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
+ grpc_iomgr_closure *call = NULL;
if (r->next_completion != NULL && !r->published) {
size_t i;
@@ -148,9 +157,11 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
*r->target_config = cfg;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
}
+
+ return call;
}
static void sockaddr_destroy(grpc_resolver *gr) {
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index bc04203744..94ec12dd16 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -92,7 +92,8 @@ typedef struct {
static void zookeeper_destroy(grpc_resolver *r);
static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
-static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r);
+static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
+ zookeeper_resolver *r) GRPC_MUST_USE_RESULT;
static void zookeeper_shutdown(grpc_resolver *r);
static void zookeeper_channel_saw_error(grpc_resolver *r,
@@ -107,14 +108,18 @@ static const grpc_resolver_vtable zookeeper_resolver_vtable = {
static void zookeeper_shutdown(grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
}
zookeeper_close(r->zookeeper_handle);
gpr_mu_unlock(&r->mu);
+ if (call != NULL) {
+ call->cb(call->cb_arg, 1);
+ }
}
static void zookeeper_channel_saw_error(grpc_resolver *resolver,
@@ -131,6 +136,7 @@ static void zookeeper_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ grpc_iomgr_closure *call;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
@@ -138,9 +144,10 @@ static void zookeeper_next(grpc_resolver *resolver,
if (r->resolved_version == 0 && r->resolving == 0) {
zookeeper_start_resolving_locked(r);
} else {
- zookeeper_maybe_finish_next_locked(r);
+ call = zookeeper_maybe_finish_next_locked(r);
}
gpr_mu_unlock(&r->mu);
+ if (call) call->cb(call->cb_arg, 1);
}
/** Zookeeper global watcher for connection management
@@ -182,6 +189,7 @@ static void zookeeper_on_resolved(void *arg,
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
+ grpc_iomgr_closure *call;
size_t i;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
@@ -211,9 +219,11 @@ static void zookeeper_on_resolved(void *arg,
}
r->resolved_config = config;
r->resolved_version++;
- zookeeper_maybe_finish_next_locked(r);
+ call = zookeeper_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
+ if (call) call->cb(call->cb_arg, 1);
+
GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
}
@@ -404,17 +414,20 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
zookeeper_resolve_address(r);
}
-static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
+static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
+ zookeeper_resolver *r) {
+ grpc_iomgr_closure *call = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_workqueue_push(r->workqueue, r->next_completion, 1);
+ call = r->next_completion;
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
+ return call;
}
static void zookeeper_destroy(grpc_resolver *gr) {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2047d2fee7..6fbf966475 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -303,8 +303,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->random = random_seed();
grpc_mdctx_ref(c->mdctx);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
- grpc_connectivity_state_init(&c->state_tracker, c->workqueue,
- GRPC_CHANNEL_IDLE, "subchannel");
+ grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
+ "subchannel");
gpr_mu_init(&c->mu);
return c;
}
@@ -365,12 +365,15 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, pollset);
if (!c->connecting) {
+ grpc_connectivity_state_flusher f;
c->connecting = 1;
connectivity_state_changed_locked(c, "create_call");
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
start_connect(c);
} else {
@@ -387,24 +390,33 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
-void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *notify) {
+grpc_connectivity_state_notify_on_state_change_result
+grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *notify) {
int do_connect = 0;
+ grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&c->mu);
- if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- notify)) {
+ r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
+ notify);
+ if (r.current_state_is_idle) {
+ grpc_connectivity_state_flusher f;
do_connect = 1;
c->connecting = 1;
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
connectivity_state_changed_locked(c, "state_change");
- }
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
+ gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
+ } else {
gpr_mu_unlock(&c->mu);
+ }
if (do_connect) {
start_connect(c);
}
+ return r;
}
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
@@ -413,18 +425,23 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_subchannel *destroy;
int cancel_alarm = 0;
gpr_mu_lock(&c->mu);
+ if (c->active != NULL) {
+ con = c->active;
+ CONNECTION_REF_LOCKED(con, "transport-op");
+ }
if (op->disconnect) {
+ grpc_connectivity_state_flusher f;
c->disconnected = 1;
connectivity_state_changed_locked(c, "disconnect");
if (c->have_alarm) {
cancel_alarm = 1;
}
- }
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "transport-op");
- }
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
+ gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
+ } else {
gpr_mu_unlock(&c->mu);
+ }
if (con != NULL) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
@@ -457,6 +474,7 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(mu);
@@ -498,7 +516,9 @@ done:
connectivity_state_changed_locked(c, "transport_state_changed");
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(mu);
+ grpc_connectivity_state_end_flush(&f);
if (destroy) {
subchannel_destroy(c);
}
@@ -518,6 +538,7 @@ static void publish_transport(grpc_subchannel *c) {
state_watcher *sw;
connection *destroy_connection = NULL;
grpc_channel_element *elem;
+ grpc_connectivity_state_flusher f;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@@ -581,12 +602,18 @@ static void publish_transport(grpc_subchannel *c) {
/* signal completion */
connectivity_state_changed_locked(c, "connected");
- while ((w4c = c->waiting)) {
- c->waiting = w4c->next;
- grpc_workqueue_push(c->workqueue, &w4c->continuation, 1);
- }
+ w4c = c->waiting;
+ c->waiting = NULL;
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
+
+ while (w4c != NULL) {
+ waiting_for_connect *next = w4c;
+ w4c->continuation.cb(w4c->continuation.cb_arg, 1);
+ w4c = next;
+ }
gpr_free(filters);
@@ -626,13 +653,16 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
static void on_alarm(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
iomgr_success = 0;
}
connectivity_state_changed_locked(c, "alarm");
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(c);
@@ -648,12 +678,15 @@ static void subchannel_connected(void *arg, int iomgr_success) {
publish_transport(c);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_connectivity_state_flusher f;
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
connectivity_state_changed_locked(c, "connect_failed");
grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
+ grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_end_flush(&f);
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 2e36c69134..391df36bfd 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -36,6 +36,7 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/connector.h"
+#include "src/core/transport/connectivity_state.h"
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
@@ -87,9 +88,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
-void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *notify);
+grpc_connectivity_state_notify_on_state_change_result
+grpc_subchannel_notify_on_state_change(
+ grpc_subchannel *channel, grpc_connectivity_state *state,
+ grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index a236651fbd..124f294a23 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -54,6 +54,7 @@ grpc_workqueue *grpc_workqueue_create(void);
void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously);
+#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index c2b3040319..8f9e97a396 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -654,6 +654,8 @@ static void unlock(grpc_call *call) {
if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
call->bound_pollset = 1;
op.bind_pollset = grpc_cq_pollset(call->cq);
+ grpc_workqueue_add_to_pollset(grpc_channel_get_workqueue(call->channel),
+ op.bind_pollset);
start_op = 1;
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 896f5a331a..6376c397a2 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -243,7 +243,7 @@ static void init_transport(grpc_chttp2_transport *t,
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
grpc_connectivity_state_init(
- &t->channel_callback.state_tracker, workqueue, GRPC_CHANNEL_READY,
+ &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
gpr_slice_buffer_init(&t->global.qbuf);
@@ -500,6 +500,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
+ grpc_connectivity_state_flusher f;
unlock_check_read_write_state(t);
if (!t->writing_active && !t->closed &&
@@ -514,8 +515,11 @@ static void unlock(grpc_chttp2_transport *t) {
t->global.pending_closures_head = NULL;
t->global.pending_closures_tail = NULL;
+ grpc_connectivity_state_begin_flush(&t->channel_callback.state_tracker, &f);
gpr_mu_unlock(&t->mu);
+ grpc_connectivity_state_end_flush(&f);
+
while (run_closures) {
grpc_iomgr_closure *next = run_closures->next;
run_closures->cb(run_closures->cb_arg, run_closures->success);
@@ -755,9 +759,13 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
}
if (op->on_connectivity_state_change) {
- grpc_connectivity_state_notify_on_state_change(
- &t->channel_callback.state_tracker, op->connectivity_state,
- op->on_connectivity_state_change);
+ if (grpc_connectivity_state_notify_on_state_change(
+ &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change)
+ .state_already_changed) {
+ grpc_chttp2_schedule_closure(&t->global, op->on_connectivity_state_change,
+ 1);
+ }
}
if (op->send_goaway) {
@@ -1185,19 +1193,14 @@ static void recv_data(void *tp, int success) {
* CALLBACK LOOP
*/
-static void schedule_closure_for_connectivity(void *a,
- grpc_iomgr_closure *closure) {
- grpc_chttp2_schedule_closure(a, closure, 1);
-}
-
static void connectivity_state_set(
grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, const char *reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
- grpc_connectivity_state_set_with_scheduler(
+ grpc_connectivity_state_set(
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
- state, schedule_closure_for_connectivity, transport_global, reason);
+ state, reason);
}
void grpc_chttp2_schedule_closure(
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index cf23adfbb2..034f82474f 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -32,6 +32,9 @@
*/
#include "src/core/transport/connectivity_state.h"
+
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@@ -56,14 +59,12 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
}
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
- grpc_workqueue *workqueue,
grpc_connectivity_state init_state,
const char *name) {
tracker->current_state = init_state;
tracker->watchers = NULL;
- tracker->workqueue = workqueue;
- GRPC_WORKQUEUE_REF(workqueue, name);
tracker->name = gpr_strdup(name);
+ tracker->changed = 0;
}
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
@@ -78,10 +79,9 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
} else {
success = 0;
}
- grpc_workqueue_push(tracker->workqueue, w->notify, success);
+ w->notify->cb(w->notify->cb_arg, success);
gpr_free(w);
}
- GRPC_WORKQUEUE_UNREF(tracker->workqueue, tracker->name);
gpr_free(tracker->name);
}
@@ -90,9 +90,12 @@ grpc_connectivity_state grpc_connectivity_state_check(
return tracker->current_state;
}
-int grpc_connectivity_state_notify_on_state_change(
+grpc_connectivity_state_notify_on_state_change_result
+grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
grpc_iomgr_closure *notify) {
+ grpc_connectivity_state_notify_on_state_change_result result;
+ memset(&result, 0, sizeof(result));
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name,
grpc_connectivity_state_name(*current),
@@ -100,7 +103,7 @@ int grpc_connectivity_state_notify_on_state_change(
}
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_workqueue_push(tracker->workqueue, notify, 1);
+ result.state_already_changed = 1;
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -108,15 +111,13 @@ int grpc_connectivity_state_notify_on_state_change(
w->next = tracker->watchers;
tracker->watchers = w;
}
- return tracker->current_state == GRPC_CHANNEL_IDLE;
+ result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE;
+ return result;
}
-void grpc_connectivity_state_set_with_scheduler(
- grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
- void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg,
- const char *reason) {
- grpc_connectivity_state_watcher *new = NULL;
- grpc_connectivity_state_watcher *w;
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state,
+ const char *reason) {
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
grpc_connectivity_state_name(tracker->current_state),
@@ -127,28 +128,40 @@ void grpc_connectivity_state_set_with_scheduler(
}
GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE);
tracker->current_state = state;
- while ((w = tracker->watchers)) {
- tracker->watchers = w->next;
+ tracker->changed = 1;
+}
- if (state != *w->current) {
- *w->current = state;
- scheduler(arg, w->notify);
+void grpc_connectivity_state_begin_flush(
+ grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state_flusher *flusher) {
+ grpc_connectivity_state_watcher *w;
+ flusher->cbs = NULL;
+ if (!tracker->changed) return;
+ w = tracker->watchers;
+ tracker->watchers = NULL;
+ while (w != NULL) {
+ grpc_connectivity_state_watcher *next = w->next;
+ if (tracker->current_state != *w->current) {
+ *w->current = tracker->current_state;
+ w->notify->next = flusher->cbs;
+ flusher->cbs = w->notify;
gpr_free(w);
} else {
- w->next = new;
- new = w;
+ w->next = tracker->watchers;
+ tracker->watchers = w;
}
+ w = next;
}
- tracker->watchers = new;
+ tracker->changed = 0;
}
-static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) {
- grpc_workqueue_push(workqueue, closure, 1);
+void grpc_connectivity_state_end_flush(
+ grpc_connectivity_state_flusher *flusher) {
+ grpc_iomgr_closure *c = flusher->cbs;
+ while (c != NULL) {
+ grpc_iomgr_closure *next = c;
+ c->cb(c->cb_arg, 1);
+ c = next;
+ }
}
-void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
- grpc_connectivity_state state,
- const char *reason) {
- grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
- tracker->workqueue, reason);
-}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index 6c61e02623..323af2740c 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -54,32 +54,52 @@ typedef struct {
grpc_connectivity_state_watcher *watchers;
/** a name to help debugging */
char *name;
- /** workqueue for async work */
- grpc_workqueue *workqueue;
+ /** has this state been changed since the last flush? */
+ int changed;
} grpc_connectivity_state_tracker;
+typedef struct { grpc_iomgr_closure *cbs; } grpc_connectivity_state_flusher;
+
extern int grpc_connectivity_state_trace;
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
- grpc_workqueue *grpc_workqueue,
grpc_connectivity_state init_state,
const char *name);
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
+/** Set connectivity state; not thread safe; access must be serialized with an
+ * external lock */
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
const char *reason);
-void grpc_connectivity_state_set_with_scheduler(
- grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
- void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg,
- const char *reason);
+
+/** Begin flushing callbacks; not thread safe; access must be serialized using
+ * the same external lock as grpc_connectivity_state_set. Initializes flusher.
+ */
+void grpc_connectivity_state_begin_flush(
+ grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state_flusher *flusher);
+
+/** Complete flushing updates: must not be called with any locks held */
+void grpc_connectivity_state_end_flush(
+ grpc_connectivity_state_flusher *flusher);
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);
+typedef struct {
+ /** 1 if the current state is idle (a hint to begin connecting), 0 otherwise
+ */
+ int current_state_is_idle;
+ /** 1 if the state has already changed: in this case the closure passed to
+ * grpc_connectivity_state_notify_on_state_change will not be called */
+ int state_already_changed;
+} grpc_connectivity_state_notify_on_state_change_result;
+
/** Return 1 if the channel should start connecting, 0 otherwise */
-int grpc_connectivity_state_notify_on_state_change(
+grpc_connectivity_state_notify_on_state_change_result
+grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
- grpc_iomgr_closure *notify);
+ grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */