diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 75 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 113 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 9 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 15 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 31 | ||||
-rw-r--r-- | src/core/client_config/resolvers/sockaddr_resolver.c | 21 | ||||
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.c | 25 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 67 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 8 | ||||
-rw-r--r-- | src/core/iomgr/workqueue.h | 1 | ||||
-rw-r--r-- | src/core/surface/call.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 25 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.c | 73 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.h | 38 |
14 files changed, 350 insertions, 153 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 6fefdec2f6..5b165972a7 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -437,20 +437,28 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state); +static void on_lb_policy_state_changed_locked( + lb_policy_connectivity_watcher *w) { + /* check if the notification is for a stale policy */ + if (w->lb_policy != w->chand->lb_policy) return; + + grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed"); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { + watch_lb_policy(w->chand, w->lb_policy, w->state); + } +} + static void on_lb_policy_state_changed(void *arg, int iomgr_success) { lb_policy_connectivity_watcher *w = arg; + grpc_connectivity_state_flusher f; gpr_mu_lock(&w->chand->mu_config); - /* check if the notification is for a stale policy */ - if (w->lb_policy == w->chand->lb_policy) { - grpc_connectivity_state_set(&w->chand->state_tracker, w->state, - "lb_changed"); - if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { - watch_lb_policy(w->chand, w->lb_policy, w->state); - } - } + on_lb_policy_state_changed_locked(w); + grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f); gpr_mu_unlock(&w->chand->mu_config); + grpc_connectivity_state_end_flush(&f); + GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); gpr_free(w); } @@ -464,7 +472,13 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed); + if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, + &w->on_changed) + .state_already_changed) { + on_lb_policy_state_changed_locked(w); + GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); + gpr_free(w); + } } static void cc_on_config_changed(void *arg, int iomgr_success) { @@ -474,6 +488,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_resolver *old_resolver; grpc_iomgr_closure *wakeup_closures = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_connectivity_state_flusher f; int exit_idle = 0; if (chand->incoming_configuration != NULL) { @@ -507,20 +522,24 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { GRPC_RESOLVER_REF(resolver, "channel-next"); grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy(chand, lb_policy, state); + } + grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + grpc_connectivity_state_end_flush(&f); grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); - if (lb_policy != NULL) { - watch_lb_policy(chand, lb_policy, state); - } } else { old_resolver = chand->resolver; chand->resolver = NULL; grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); + grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); gpr_mu_unlock(&chand->mu_config); + grpc_connectivity_state_end_flush(&f); if (old_resolver != NULL) { grpc_resolver_shutdown(old_resolver); GRPC_RESOLVER_UNREF(old_resolver, "channel"); @@ -554,7 +573,9 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_iomgr_closure *on_consumed = op->on_consumed; + grpc_connectivity_state_flusher f; + grpc_iomgr_closure *call_list = op->on_consumed; + call_list->next = NULL; op->on_consumed = NULL; GPR_ASSERT(op->set_accept_stream == NULL); @@ -562,9 +583,13 @@ static void cc_start_transport_op(grpc_channel_element *elem, gpr_mu_lock(&chand->mu_config); if (op->on_connectivity_state_change != NULL) { - grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change); + if (grpc_connectivity_state_notify_on_state_change( + &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change) + .state_already_changed) { + op->on_connectivity_state_change->next = call_list; + call_list = op->on_connectivity_state_change; + } op->on_connectivity_state_change = NULL; op->connectivity_state = NULL; } @@ -587,7 +612,9 @@ static void cc_start_transport_op(grpc_channel_element *elem, chand->lb_policy = NULL; } } + grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); gpr_mu_unlock(&chand->mu_config); + grpc_connectivity_state_end_flush(&f); if (destroy_resolver) { grpc_resolver_shutdown(destroy_resolver); @@ -599,9 +626,10 @@ static void cc_start_transport_op(grpc_channel_element *elem, GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); } - if (on_consumed) { - grpc_workqueue_push(grpc_channel_get_workqueue(chand->master), on_consumed, - 1); + while (call_list != NULL) { + grpc_iomgr_closure *next = call_list->next; + call_list->cb(call_list->cb_arg, 1); + call_list = next; } } @@ -671,7 +699,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, chand); grpc_connectivity_state_init(&chand->state_tracker, - grpc_channel_get_workqueue(master), GRPC_CHANNEL_IDLE, "client_channel"); } @@ -750,10 +777,14 @@ void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, grpc_iomgr_closure *on_complete) { channel_data *chand = elem->channel_data; + grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&chand->mu_config); - grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, - on_complete); + r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, + state, on_complete); gpr_mu_unlock(&chand->mu_config); + if (r.state_already_changed) { + on_complete->cb(on_complete->cb_arg, 1); + } } grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 575ee02249..804dbdeadd 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -110,38 +110,54 @@ void pf_destroy(grpc_lb_policy *pol) { void pf_shutdown(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connectivity_state_flusher f; pending_pick *pp; gpr_mu_lock(&p->mu); del_interested_parties_locked(p); p->shutdown = 1; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_workqueue_push(p->workqueue, pp->on_complete, 0); - gpr_free(pp); - } + pp = p->pending_picks; + p->pending_picks = NULL; grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + grpc_connectivity_state_begin_flush(&p->state_tracker, &f); gpr_mu_unlock(&p->mu); + grpc_connectivity_state_end_flush(&f); + while (pp != NULL) { + pending_pick *next = pp->next; + *pp->target = NULL; + pp->on_complete->cb(pp->on_complete->cb_arg, 0); + gpr_free(pp); + pp = next; + } } -static void start_picking(pick_first_lb_policy *p) { +/* returns a closure to call, or NULL */ +static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], - &p->checking_connectivity, - &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed) + .state_already_changed) { + return &p->connectivity_changed; + } else { + return NULL; + } } void pf_exit_idle(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_iomgr_closure *call = NULL; gpr_mu_lock(&p->mu); if (!p->started_picking) { - start_picking(p); + call = start_picking(p); } gpr_mu_unlock(&p->mu); + if (call) { + call->cb(call->cb_arg, 1); + } } void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, @@ -155,8 +171,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, *target = p->selected; on_complete->cb(on_complete->cb_arg, 1); } else { + grpc_iomgr_closure *call = NULL; if (!p->started_picking) { - start_picking(p); + call = start_picking(p); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); @@ -167,6 +184,9 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); + if (call) { + call->cb(call->cb_arg, 1); + } } } @@ -174,6 +194,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; int unref = 0; + grpc_iomgr_closure *cbs = NULL; + grpc_connectivity_state_flusher f; gpr_mu_lock(&p->mu); @@ -183,8 +205,12 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } } else { unref = 1; } @@ -199,11 +225,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_workqueue_push(p->workqueue, pp->on_complete, 1); + pp->on_complete->next = cbs; + cbs = pp->on_complete; gpr_free(pp); } - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change(p->selected, + &p->checking_connectivity, + &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(&p->state_tracker, @@ -216,9 +248,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->subchannels[p->checking_subchannel]); add_interested_parties_locked(p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], + &p->checking_connectivity, &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } } else { goto loop; } @@ -227,9 +263,13 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, "connecting_changed"); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed); + if (grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], + &p->checking_connectivity, &p->connectivity_changed) + .state_already_changed) { + p->connectivity_changed.next = cbs; + cbs = &p->connectivity_changed; + } break; case GRPC_CHANNEL_FATAL_FAILURE: del_interested_parties_locked(p); @@ -244,7 +284,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_workqueue_push(p->workqueue, pp->on_complete, 1); + pp->on_complete->next = cbs; + cbs = pp->on_complete; gpr_free(pp); } unref = 1; @@ -261,7 +302,15 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { } } + grpc_connectivity_state_begin_flush(&p->state_tracker, &f); gpr_mu_unlock(&p->mu); + grpc_connectivity_state_end_flush(&f); + + while (cbs != NULL) { + grpc_iomgr_closure *next = cbs->next; + cbs->cb(cbs->cb_arg, 1); + cbs = next; + } if (unref) { GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); @@ -299,14 +348,16 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { return st; } -static void pf_notify_on_state_change(grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_iomgr_closure *notify) { +static grpc_connectivity_state_notify_on_state_change_result +pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, + grpc_iomgr_closure *notify) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify); + r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify); gpr_mu_unlock(&p->mu); + return r; } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { @@ -332,8 +383,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, p->num_subchannels = args->num_subchannels; p->workqueue = args->workqueue; GRPC_WORKQUEUE_REF(p->workqueue, "pick_first"); - grpc_connectivity_state_init(&p->state_tracker, args->workqueue, - GRPC_CHANNEL_IDLE, "pick_first"); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "pick_first"); memcpy(p->subchannels, args->subchannels, sizeof(grpc_subchannel *) * args->num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 90ec44432f..48a787da8a 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -82,10 +82,11 @@ void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) { policy->vtable->exit_idle(policy); } -void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_iomgr_closure *closure) { - policy->vtable->notify_on_state_change(policy, state, closure); +grpc_connectivity_state_notify_on_state_change_result +grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure) { + return policy->vtable->notify_on_state_change(policy, state, closure); } grpc_connectivity_state grpc_lb_policy_check_connectivity( diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 3f7ca8f28d..7c9bdac648 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_H #include "src/core/client_config/subchannel.h" +#include "src/core/transport/connectivity_state.h" /** A load balancing policy: specified by a vtable and a struct (which is expected to be extended to contain some parameters) */ @@ -70,9 +71,10 @@ struct grpc_lb_policy_vtable { /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ - void (*notify_on_state_change)(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_iomgr_closure *closure); + grpc_connectivity_state_notify_on_state_change_result ( + *notify_on_state_change)(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure); }; #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -111,9 +113,10 @@ void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op); void grpc_lb_policy_exit_idle(grpc_lb_policy *policy); -void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_iomgr_closure *closure); +grpc_connectivity_state_notify_on_state_change_result +grpc_lb_policy_notify_on_state_change( + grpc_lb_policy *policy, grpc_connectivity_state *state, + grpc_iomgr_closure *closure) GRPC_MUST_USE_RESULT; grpc_connectivity_state grpc_lb_policy_check_connectivity( grpc_lb_policy *policy); diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 66a8c9f99d..4fee543a5b 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -79,7 +79,8 @@ typedef struct { static void dns_destroy(grpc_resolver *r); static void dns_start_resolving_locked(dns_resolver *r); -static void dns_maybe_finish_next_locked(dns_resolver *r); +static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) + GRPC_MUST_USE_RESULT; static void dns_shutdown(grpc_resolver *r); static void dns_channel_saw_error(grpc_resolver *r, @@ -93,13 +94,15 @@ static const grpc_resolver_vtable dns_resolver_vtable = { static void dns_shutdown(grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; + grpc_iomgr_closure *next_completion; gpr_mu_lock(&r->mu); - if (r->next_completion != NULL) { + next_completion = r->next_completion; + r->next_completion = NULL; + gpr_mu_unlock(&r->mu); + if (next_completion != NULL) { *r->target_config = NULL; - grpc_workqueue_push(r->workqueue, r->next_completion, 1); - r->next_completion = NULL; + next_completion->cb(next_completion->cb_arg, 1); } - gpr_mu_unlock(&r->mu); } static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, @@ -116,6 +119,7 @@ static void dns_next(grpc_resolver *resolver, grpc_client_config **target_config, grpc_iomgr_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; + grpc_iomgr_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; @@ -123,9 +127,12 @@ static void dns_next(grpc_resolver *resolver, if (r->resolved_version == 0 && !r->resolving) { dns_start_resolving_locked(r); } else { - dns_maybe_finish_next_locked(r); + call = dns_maybe_finish_next_locked(r); } gpr_mu_unlock(&r->mu); + if (call) { + call->cb(call->cb_arg, 1); + } } static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { @@ -134,6 +141,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; + grpc_iomgr_closure *call; size_t i; if (addresses) { grpc_lb_policy_args lb_policy_args; @@ -164,8 +172,11 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { } r->resolved_config = config; r->resolved_version++; - dns_maybe_finish_next_locked(r); + call = dns_maybe_finish_next_locked(r); gpr_mu_unlock(&r->mu); + if (call) { + call->cb(call->cb_arg, 1); + } GRPC_RESOLVER_UNREF(&r->base, "dns-resolving"); } @@ -177,17 +188,19 @@ static void dns_start_resolving_locked(dns_resolver *r) { grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); } -static void dns_maybe_finish_next_locked(dns_resolver *r) { +static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) { + grpc_iomgr_closure *ret = NULL; if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_workqueue_push(r->workqueue, r->next_completion, 1); + ret = r->next_completion; r->next_completion = NULL; r->published_version = r->resolved_version; } + return ret; } static void dns_destroy(grpc_resolver *gr) { diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index abfb7b8569..220915853c 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -80,7 +80,8 @@ typedef struct { static void sockaddr_destroy(grpc_resolver *r); -static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r); +static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked( + sockaddr_resolver *r) GRPC_MUST_USE_RESULT; static void sockaddr_shutdown(grpc_resolver *r); static void sockaddr_channel_saw_error(grpc_resolver *r, @@ -95,13 +96,17 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = { static void sockaddr_shutdown(grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; + grpc_iomgr_closure *call = NULL; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_workqueue_push(r->workqueue, r->next_completion, 1); + call = r->next_completion; r->next_completion = NULL; } gpr_mu_unlock(&r->mu); + if (call) { + call->cb(call->cb_arg, 1); + } } static void sockaddr_channel_saw_error(grpc_resolver *resolver, @@ -111,20 +116,24 @@ static void sockaddr_next(grpc_resolver *resolver, grpc_client_config **target_config, grpc_iomgr_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; + grpc_iomgr_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_config = target_config; - sockaddr_maybe_finish_next_locked(r); + call = sockaddr_maybe_finish_next_locked(r); gpr_mu_unlock(&r->mu); + if (call) call->cb(call->cb_arg, 1); } -static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { +static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked( + sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; + grpc_iomgr_closure *call = NULL; if (r->next_completion != NULL && !r->published) { size_t i; @@ -148,9 +157,11 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { GRPC_LB_POLICY_UNREF(lb_policy, "unix"); r->published = 1; *r->target_config = cfg; - grpc_workqueue_push(r->workqueue, r->next_completion, 1); + call = r->next_completion; r->next_completion = NULL; } + + return call; } static void sockaddr_destroy(grpc_resolver *gr) { diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index bc04203744..94ec12dd16 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -92,7 +92,8 @@ typedef struct { static void zookeeper_destroy(grpc_resolver *r); static void zookeeper_start_resolving_locked(zookeeper_resolver *r); -static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r); +static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked( + zookeeper_resolver *r) GRPC_MUST_USE_RESULT; static void zookeeper_shutdown(grpc_resolver *r); static void zookeeper_channel_saw_error(grpc_resolver *r, @@ -107,14 +108,18 @@ static const grpc_resolver_vtable zookeeper_resolver_vtable = { static void zookeeper_shutdown(grpc_resolver *resolver) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; + grpc_iomgr_closure *call = NULL; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_workqueue_push(r->workqueue, r->next_completion, 1); + call = r->next_completion; r->next_completion = NULL; } zookeeper_close(r->zookeeper_handle); gpr_mu_unlock(&r->mu); + if (call != NULL) { + call->cb(call->cb_arg, 1); + } } static void zookeeper_channel_saw_error(grpc_resolver *resolver, @@ -131,6 +136,7 @@ static void zookeeper_next(grpc_resolver *resolver, grpc_client_config **target_config, grpc_iomgr_closure *on_complete) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; + grpc_iomgr_closure *call; gpr_mu_lock(&r->mu); GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; @@ -138,9 +144,10 @@ static void zookeeper_next(grpc_resolver *resolver, if (r->resolved_version == 0 && r->resolving == 0) { zookeeper_start_resolving_locked(r); } else { - zookeeper_maybe_finish_next_locked(r); + call = zookeeper_maybe_finish_next_locked(r); } gpr_mu_unlock(&r->mu); + if (call) call->cb(call->cb_arg, 1); } /** Zookeeper global watcher for connection management @@ -182,6 +189,7 @@ static void zookeeper_on_resolved(void *arg, grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; + grpc_iomgr_closure *call; size_t i; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; @@ -211,9 +219,11 @@ static void zookeeper_on_resolved(void *arg, } r->resolved_config = config; r->resolved_version++; - zookeeper_maybe_finish_next_locked(r); + call = zookeeper_maybe_finish_next_locked(r); gpr_mu_unlock(&r->mu); + if (call) call->cb(call->cb_arg, 1); + GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); } @@ -404,17 +414,20 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { zookeeper_resolve_address(r); } -static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { +static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked( + zookeeper_resolver *r) { + grpc_iomgr_closure *call = NULL; if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; if (r->resolved_config != NULL) { grpc_client_config_ref(r->resolved_config); } - grpc_workqueue_push(r->workqueue, r->next_completion, 1); + call = r->next_completion; r->next_completion = NULL; r->published_version = r->resolved_version; } + return call; } static void zookeeper_destroy(grpc_resolver *gr) { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 2047d2fee7..6fbf966475 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -303,8 +303,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->random = random_seed(); grpc_mdctx_ref(c->mdctx); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); - grpc_connectivity_state_init(&c->state_tracker, c->workqueue, - GRPC_CHANNEL_IDLE, "subchannel"); + grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, + "subchannel"); gpr_mu_init(&c->mu); return c; } @@ -365,12 +365,15 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, c->waiting = w4c; grpc_subchannel_add_interested_party(c, pollset); if (!c->connecting) { + grpc_connectivity_state_flusher f; c->connecting = 1; connectivity_state_changed_locked(c, "create_call"); /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); + grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); + grpc_connectivity_state_end_flush(&f); start_connect(c); } else { @@ -387,24 +390,33 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, - grpc_connectivity_state *state, - grpc_iomgr_closure *notify) { +grpc_connectivity_state_notify_on_state_change_result +grpc_subchannel_notify_on_state_change(grpc_subchannel *c, + grpc_connectivity_state *state, + grpc_iomgr_closure *notify) { int do_connect = 0; + grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&c->mu); - if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, - notify)) { + r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, + notify); + if (r.current_state_is_idle) { + grpc_connectivity_state_flusher f; do_connect = 1; c->connecting = 1; /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); connectivity_state_changed_locked(c, "state_change"); - } + grpc_connectivity_state_begin_flush(&c->state_tracker, &f); + gpr_mu_unlock(&c->mu); + grpc_connectivity_state_end_flush(&f); + } else { gpr_mu_unlock(&c->mu); + } if (do_connect) { start_connect(c); } + return r; } void grpc_subchannel_process_transport_op(grpc_subchannel *c, @@ -413,18 +425,23 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_subchannel *destroy; int cancel_alarm = 0; gpr_mu_lock(&c->mu); + if (c->active != NULL) { + con = c->active; + CONNECTION_REF_LOCKED(con, "transport-op"); + } if (op->disconnect) { + grpc_connectivity_state_flusher f; c->disconnected = 1; connectivity_state_changed_locked(c, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; } - } - if (c->active != NULL) { - con = c->active; - CONNECTION_REF_LOCKED(con, "transport-op"); - } + grpc_connectivity_state_begin_flush(&c->state_tracker, &f); + gpr_mu_unlock(&c->mu); + grpc_connectivity_state_end_flush(&f); + } else { gpr_mu_unlock(&c->mu); + } if (con != NULL) { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); @@ -457,6 +474,7 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_transport_op op; grpc_channel_element *elem; connection *destroy_connection = NULL; + grpc_connectivity_state_flusher f; gpr_mu_lock(mu); @@ -498,7 +516,9 @@ done: connectivity_state_changed_locked(c, "transport_state_changed"); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); + grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(mu); + grpc_connectivity_state_end_flush(&f); if (destroy) { subchannel_destroy(c); } @@ -518,6 +538,7 @@ static void publish_transport(grpc_subchannel *c) { state_watcher *sw; connection *destroy_connection = NULL; grpc_channel_element *elem; + grpc_connectivity_state_flusher f; /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; @@ -581,12 +602,18 @@ static void publish_transport(grpc_subchannel *c) { /* signal completion */ connectivity_state_changed_locked(c, "connected"); - while ((w4c = c->waiting)) { - c->waiting = w4c->next; - grpc_workqueue_push(c->workqueue, &w4c->continuation, 1); - } + w4c = c->waiting; + c->waiting = NULL; + grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); + grpc_connectivity_state_end_flush(&f); + + while (w4c != NULL) { + waiting_for_connect *next = w4c; + w4c->continuation.cb(w4c->continuation.cb_arg, 1); + w4c = next; + } gpr_free(filters); @@ -626,13 +653,16 @@ static void update_reconnect_parameters(grpc_subchannel *c) { static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; + grpc_connectivity_state_flusher f; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { iomgr_success = 0; } connectivity_state_changed_locked(c, "alarm"); + grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); + grpc_connectivity_state_end_flush(&f); if (iomgr_success) { update_reconnect_parameters(c); continue_connect(c); @@ -648,12 +678,15 @@ static void subchannel_connected(void *arg, int iomgr_success) { publish_transport(c); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_connectivity_state_flusher f; gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; connectivity_state_changed_locked(c, "connect_failed"); grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); + grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); + grpc_connectivity_state_end_flush(&f); } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 2e36c69134..391df36bfd 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -36,6 +36,7 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/connector.h" +#include "src/core/transport/connectivity_state.h" /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ @@ -87,9 +88,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ -void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, - grpc_connectivity_state *state, - grpc_iomgr_closure *notify); +grpc_connectivity_state_notify_on_state_change_result +grpc_subchannel_notify_on_state_change( + grpc_subchannel *channel, grpc_connectivity_state *state, + grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT; /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_subchannel *channel, diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index a236651fbd..124f294a23 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -54,6 +54,7 @@ grpc_workqueue *grpc_workqueue_create(void); void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously); +#define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index c2b3040319..8f9e97a396 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -654,6 +654,8 @@ static void unlock(grpc_call *call) { if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) { call->bound_pollset = 1; op.bind_pollset = grpc_cq_pollset(call->cq); + grpc_workqueue_add_to_pollset(grpc_channel_get_workqueue(call->channel), + op.bind_pollset); start_op = 1; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 896f5a331a..6376c397a2 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -243,7 +243,7 @@ static void init_transport(grpc_chttp2_transport *t, is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; grpc_connectivity_state_init( - &t->channel_callback.state_tracker, workqueue, GRPC_CHANNEL_READY, + &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, is_client ? "client_transport" : "server_transport"); gpr_slice_buffer_init(&t->global.qbuf); @@ -500,6 +500,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; + grpc_connectivity_state_flusher f; unlock_check_read_write_state(t); if (!t->writing_active && !t->closed && @@ -514,8 +515,11 @@ static void unlock(grpc_chttp2_transport *t) { t->global.pending_closures_head = NULL; t->global.pending_closures_tail = NULL; + grpc_connectivity_state_begin_flush(&t->channel_callback.state_tracker, &f); gpr_mu_unlock(&t->mu); + grpc_connectivity_state_end_flush(&f); + while (run_closures) { grpc_iomgr_closure *next = run_closures->next; run_closures->cb(run_closures->cb_arg, run_closures->success); @@ -755,9 +759,13 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->on_connectivity_state_change) { - grpc_connectivity_state_notify_on_state_change( - &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); + if (grpc_connectivity_state_notify_on_state_change( + &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change) + .state_already_changed) { + grpc_chttp2_schedule_closure(&t->global, op->on_connectivity_state_change, + 1); + } } if (op->send_goaway) { @@ -1185,19 +1193,14 @@ static void recv_data(void *tp, int success) { * CALLBACK LOOP */ -static void schedule_closure_for_connectivity(void *a, - grpc_iomgr_closure *closure) { - grpc_chttp2_schedule_closure(a, closure, 1); -} - static void connectivity_state_set( grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, const char *reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); - grpc_connectivity_state_set_with_scheduler( + grpc_connectivity_state_set( &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, schedule_closure_for_connectivity, transport_global, reason); + state, reason); } void grpc_chttp2_schedule_closure( diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index cf23adfbb2..034f82474f 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -32,6 +32,9 @@ */ #include "src/core/transport/connectivity_state.h" + +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -56,14 +59,12 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) { } void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, - grpc_workqueue *workqueue, grpc_connectivity_state init_state, const char *name) { tracker->current_state = init_state; tracker->watchers = NULL; - tracker->workqueue = workqueue; - GRPC_WORKQUEUE_REF(workqueue, name); tracker->name = gpr_strdup(name); + tracker->changed = 0; } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { @@ -78,10 +79,9 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { } else { success = 0; } - grpc_workqueue_push(tracker->workqueue, w->notify, success); + w->notify->cb(w->notify->cb_arg, success); gpr_free(w); } - GRPC_WORKQUEUE_UNREF(tracker->workqueue, tracker->name); gpr_free(tracker->name); } @@ -90,9 +90,12 @@ grpc_connectivity_state grpc_connectivity_state_check( return tracker->current_state; } -int grpc_connectivity_state_notify_on_state_change( +grpc_connectivity_state_notify_on_state_change_result +grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { + grpc_connectivity_state_notify_on_state_change_result result; + memset(&result, 0, sizeof(result)); if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name, grpc_connectivity_state_name(*current), @@ -100,7 +103,7 @@ int grpc_connectivity_state_notify_on_state_change( } if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_workqueue_push(tracker->workqueue, notify, 1); + result.state_already_changed = 1; } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -108,15 +111,13 @@ int grpc_connectivity_state_notify_on_state_change( w->next = tracker->watchers; tracker->watchers = w; } - return tracker->current_state == GRPC_CHANNEL_IDLE; + result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE; + return result; } -void grpc_connectivity_state_set_with_scheduler( - grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg, - const char *reason) { - grpc_connectivity_state_watcher *new = NULL; - grpc_connectivity_state_watcher *w; +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state state, + const char *reason) { if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, grpc_connectivity_state_name(tracker->current_state), @@ -127,28 +128,40 @@ void grpc_connectivity_state_set_with_scheduler( } GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE); tracker->current_state = state; - while ((w = tracker->watchers)) { - tracker->watchers = w->next; + tracker->changed = 1; +} - if (state != *w->current) { - *w->current = state; - scheduler(arg, w->notify); +void grpc_connectivity_state_begin_flush( + grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state_flusher *flusher) { + grpc_connectivity_state_watcher *w; + flusher->cbs = NULL; + if (!tracker->changed) return; + w = tracker->watchers; + tracker->watchers = NULL; + while (w != NULL) { + grpc_connectivity_state_watcher *next = w->next; + if (tracker->current_state != *w->current) { + *w->current = tracker->current_state; + w->notify->next = flusher->cbs; + flusher->cbs = w->notify; gpr_free(w); } else { - w->next = new; - new = w; + w->next = tracker->watchers; + tracker->watchers = w; } + w = next; } - tracker->watchers = new; + tracker->changed = 0; } -static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) { - grpc_workqueue_push(workqueue, closure, 1); +void grpc_connectivity_state_end_flush( + grpc_connectivity_state_flusher *flusher) { + grpc_iomgr_closure *c = flusher->cbs; + while (c != NULL) { + grpc_iomgr_closure *next = c; + c->cb(c->cb_arg, 1); + c = next; + } } -void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state state, - const char *reason) { - grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, - tracker->workqueue, reason); -} diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 6c61e02623..323af2740c 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -54,32 +54,52 @@ typedef struct { grpc_connectivity_state_watcher *watchers; /** a name to help debugging */ char *name; - /** workqueue for async work */ - grpc_workqueue *workqueue; + /** has this state been changed since the last flush? */ + int changed; } grpc_connectivity_state_tracker; +typedef struct { grpc_iomgr_closure *cbs; } grpc_connectivity_state_flusher; + extern int grpc_connectivity_state_trace; void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, - grpc_workqueue *grpc_workqueue, grpc_connectivity_state init_state, const char *name); void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); +/** Set connectivity state; not thread safe; access must be serialized with an + * external lock */ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, const char *reason); -void grpc_connectivity_state_set_with_scheduler( - grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg, - const char *reason); + +/** Begin flushing callbacks; not thread safe; access must be serialized using + * the same external lock as grpc_connectivity_state_set. Initializes flusher. + */ +void grpc_connectivity_state_begin_flush( + grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state_flusher *flusher); + +/** Complete flushing updates: must not be called with any locks held */ +void grpc_connectivity_state_end_flush( + grpc_connectivity_state_flusher *flusher); grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); +typedef struct { + /** 1 if the current state is idle (a hint to begin connecting), 0 otherwise + */ + int current_state_is_idle; + /** 1 if the state has already changed: in this case the closure passed to + * grpc_connectivity_state_notify_on_state_change will not be called */ + int state_already_changed; +} grpc_connectivity_state_notify_on_state_change_result; + /** Return 1 if the channel should start connecting, 0 otherwise */ -int grpc_connectivity_state_notify_on_state_change( +grpc_connectivity_state_notify_on_state_change_result +grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_iomgr_closure *notify); + grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT; #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ |