diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-18 07:20:29 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-18 07:20:29 -0700 |
commit | 000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch) | |
tree | 883d73a97471f63e616d02c1e17efc62b099c8ad /src/core | |
parent | 38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff) |
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 139 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 5 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 127 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 35 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 43 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 91 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 11 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 39 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 15 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 48 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 10 | ||||
-rw-r--r-- | src/core/surface/channel.c | 8 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 16 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 8 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 67 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.c | 62 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.h | 32 |
19 files changed, 352 insertions, 410 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5b165972a7..a2346503d9 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -73,7 +73,7 @@ typedef struct { guarded by mu_config */ grpc_client_config *incoming_configuration; /** a list of closures that are all waiting for config to come in */ - grpc_iomgr_closure *waiting_for_config_closures; + grpc_iomgr_call_list waiting_for_config_closures; /** resolver callback */ grpc_iomgr_closure on_config_changed; /** connectivity state being tracked */ @@ -181,8 +181,8 @@ static void add_to_lb_policy_wait_queue_locked_state_config( waiting_call *wc = gpr_malloc(sizeof(*wc)); grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); wc->elem = elem; - wc->closure.next = chand->waiting_for_config_closures; - chand->waiting_for_config_closures = &wc->closure; + grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure, + 1); } static int is_empty(void *p, int len) { @@ -230,6 +230,7 @@ static void started_call(void *arg, int iomgr_success) { static void picked_target(void *arg, int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; if (calld->picked_channel == NULL) { /* treat this like a cancellation */ @@ -248,9 +249,10 @@ static void picked_target(void *arg, int iomgr_success) { grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); grpc_subchannel_create_call(calld->picked_channel, pollset, &calld->subchannel_call, - &calld->async_setup_task); + &calld->async_setup_task, &call_list); } } + grpc_iomgr_call_list_run(call_list); } static grpc_iomgr_closure *merge_into_waiting_op( @@ -310,7 +312,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - grpc_iomgr_closure *consumed_op = NULL; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -328,7 +330,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, break; case CALL_WAITING_FOR_SEND: GPR_ASSERT(!continuation); - consumed_op = merge_into_waiting_op(elem, op); + grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1); if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { gpr_mu_unlock(&calld->mu_state); @@ -357,7 +359,8 @@ static void perform_transport_stream_op(grpc_call_element *elem, handle_op_after_cancellation(elem, op); handle_op_after_cancellation(elem, &op2); } else { - consumed_op = merge_into_waiting_op(elem, op); + grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), + 1); gpr_mu_unlock(&calld->mu_state); } break; @@ -398,7 +401,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, calld); grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, - &calld->async_setup_task); + &calld->async_setup_task, &call_list); GRPC_LB_POLICY_UNREF(lb_policy, "pick"); } else if (chand->resolver != NULL) { @@ -424,9 +427,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, break; } - if (consumed_op != NULL) { - consumed_op->cb(consumed_op->cb_arg, 1); - } + grpc_iomgr_call_list_run(call_list); } static void cc_start_transport_stream_op(grpc_call_element *elem, @@ -435,36 +436,38 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, } static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state); + grpc_connectivity_state current_state, + grpc_iomgr_call_list *cl); -static void on_lb_policy_state_changed_locked( - lb_policy_connectivity_watcher *w) { +static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w, + grpc_iomgr_call_list *cl) { /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed"); + grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed", + cl); if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { - watch_lb_policy(w->chand, w->lb_policy, w->state); + watch_lb_policy(w->chand, w->lb_policy, w->state, cl); } } static void on_lb_policy_state_changed(void *arg, int iomgr_success) { lb_policy_connectivity_watcher *w = arg; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&w->chand->mu_config); - on_lb_policy_state_changed_locked(w); - grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f); + on_lb_policy_state_changed_locked(w, &cl); gpr_mu_unlock(&w->chand->mu_config); - grpc_connectivity_state_end_flush(&f); + grpc_iomgr_call_list_run(cl); GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); gpr_free(w); } static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state) { + grpc_connectivity_state current_state, + grpc_iomgr_call_list *call_list) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); @@ -472,13 +475,8 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, - &w->on_changed) - .state_already_changed) { - on_lb_policy_state_changed_locked(w); - GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); - gpr_free(w); - } + grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed, + call_list); } static void cc_on_config_changed(void *arg, int iomgr_success) { @@ -486,9 +484,8 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; - grpc_iomgr_closure *wakeup_closures = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT; int exit_idle = 0; if (chand->incoming_configuration != NULL) { @@ -496,7 +493,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity(lb_policy); + state = grpc_lb_policy_check_connectivity(lb_policy, &cl); } grpc_client_config_unref(chand->incoming_configuration); @@ -508,8 +505,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - wakeup_closures = chand->waiting_for_config_closures; - chand->waiting_for_config_closures = NULL; + grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -520,15 +516,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (iomgr_success && chand->resolver) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF(resolver, "channel-next"); - grpc_connectivity_state_set(&chand->state_tracker, state, - "new_lb+resolver"); + grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver", + &cl); if (lb_policy != NULL) { - watch_lb_policy(chand, lb_policy, state); + watch_lb_policy(chand, lb_policy, state, &cl); } - grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_connectivity_state_end_flush(&f); grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); @@ -536,10 +530,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { old_resolver = chand->resolver; chand->resolver = NULL; grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); - grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); + GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone", + &cl); gpr_mu_unlock(&chand->mu_config); - grpc_connectivity_state_end_flush(&f); if (old_resolver != NULL) { grpc_resolver_shutdown(old_resolver); GRPC_RESOLVER_UNREF(old_resolver, "channel"); @@ -547,24 +540,20 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } if (exit_idle) { - grpc_lb_policy_exit_idle(lb_policy); + grpc_lb_policy_exit_idle(lb_policy, &cl); GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle"); } if (old_lb_policy != NULL) { - grpc_lb_policy_shutdown(old_lb_policy); + grpc_lb_policy_shutdown(old_lb_policy, &cl); GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); } - while (wakeup_closures) { - grpc_iomgr_closure *next = wakeup_closures->next; - wakeup_closures->cb(wakeup_closures->cb_arg, 1); - wakeup_closures = next; - } - if (lb_policy != NULL) { GRPC_LB_POLICY_UNREF(lb_policy, "config_change"); } + + grpc_iomgr_call_list_run(cl); GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } @@ -573,23 +562,21 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_connectivity_state_flusher f; - grpc_iomgr_closure *call_list = op->on_consumed; - call_list->next = NULL; - op->on_consumed = NULL; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + + if (op->on_consumed) { + grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1); + op->on_consumed = NULL; + } GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->bind_pollset == NULL); gpr_mu_lock(&chand->mu_config); if (op->on_connectivity_state_change != NULL) { - if (grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change) - .state_already_changed) { - op->on_connectivity_state_change->next = call_list; - call_list = op->on_connectivity_state_change; - } + grpc_connectivity_state_notify_on_state_change( + &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change, &call_list); op->on_connectivity_state_change = NULL; op->connectivity_state = NULL; } @@ -603,18 +590,17 @@ static void cc_start_transport_op(grpc_channel_element *elem, if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + GRPC_CHANNEL_FATAL_FAILURE, "disconnect", + &call_list); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_lb_policy_shutdown(chand->lb_policy); + grpc_lb_policy_shutdown(chand->lb_policy, &call_list); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); chand->lb_policy = NULL; } } - grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); gpr_mu_unlock(&chand->mu_config); - grpc_connectivity_state_end_flush(&f); if (destroy_resolver) { grpc_resolver_shutdown(destroy_resolver); @@ -622,15 +608,11 @@ static void cc_start_transport_op(grpc_channel_element *elem, } if (lb_policy) { - grpc_lb_policy_broadcast(lb_policy, op); + grpc_lb_policy_broadcast(lb_policy, op, &call_list); GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); } - while (call_list != NULL) { - grpc_iomgr_closure *next = call_list->next; - call_list->cb(call_list->cb_arg, 1); - call_list = next; - } + grpc_iomgr_call_list_run(call_list); } /* Constructor for call_data */ @@ -740,7 +722,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, GPR_ASSERT(!chand->resolver); chand->resolver = resolver; GRPC_RESOLVER_REF(resolver, "channel"); - if (chand->waiting_for_config_closures != NULL || + if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); @@ -751,14 +733,15 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, } grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect) { + grpc_channel_element *elem, int try_to_connect, + grpc_iomgr_call_list *call_list) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; gpr_mu_lock(&chand->mu_config); out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(chand->lb_policy); + grpc_lb_policy_exit_idle(chand->lb_policy, call_list); } else { chand->exit_idle_when_lb_policy_arrives = 1; if (!chand->started_resolving && chand->resolver != NULL) { @@ -775,16 +758,12 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_iomgr_closure *on_complete) { + grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) { channel_data *chand = elem->channel_data; - grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&chand->mu_config); - r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, - state, on_complete); + grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, + on_complete, call_list); gpr_mu_unlock(&chand->mu_config); - if (r.state_already_changed) { - on_complete->cb(on_complete->cb_arg, 1); - } } grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 13681e3956..60090e8d7e 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -53,11 +53,12 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect); + grpc_channel_element *elem, int try_to_connect, + grpc_iomgr_call_list *call_list); void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_iomgr_closure *on_complete); + grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list); grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( grpc_channel_element *elem); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 804dbdeadd..8fd8dd7b67 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -108,9 +108,8 @@ void pf_destroy(grpc_lb_policy *pol) { gpr_free(p); } -void pf_shutdown(grpc_lb_policy *pol) { +void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_flusher f; pending_pick *pp; gpr_mu_lock(&p->mu); del_interested_parties_locked(p); @@ -118,51 +117,40 @@ void pf_shutdown(grpc_lb_policy *pol) { pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "shutdown"); - grpc_connectivity_state_begin_flush(&p->state_tracker, &f); + "shutdown", call_list); gpr_mu_unlock(&p->mu); - grpc_connectivity_state_end_flush(&f); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - pp->on_complete->cb(pp->on_complete->cb_arg, 0); + grpc_iomgr_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); pp = next; } } -/* returns a closure to call, or NULL */ -static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) { +static void start_picking(pick_first_lb_policy *p, + grpc_iomgr_call_list *call_list) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); - if (grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed) - .state_already_changed) { - return &p->connectivity_changed; - } else { - return NULL; - } + grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], + &p->checking_connectivity, + &p->connectivity_changed, call_list); } -void pf_exit_idle(grpc_lb_policy *pol) { +void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_iomgr_closure *call = NULL; gpr_mu_lock(&p->mu); if (!p->started_picking) { - call = start_picking(p); + start_picking(p, call_list); } gpr_mu_unlock(&p->mu); - if (call) { - call->cb(call->cb_arg, 1); - } } void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete) { + grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -171,9 +159,8 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, *target = p->selected; on_complete->cb(on_complete->cb_arg, 1); } else { - grpc_iomgr_closure *call = NULL; if (!p->started_picking) { - call = start_picking(p); + start_picking(p, call_list); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); @@ -184,9 +171,6 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); - if (call) { - call->cb(call->cb_arg, 1); - } } } @@ -194,8 +178,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; int unref = 0; - grpc_iomgr_closure *cbs = NULL; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&p->mu); @@ -203,14 +186,11 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { unref = 1; } else if (p->selected != NULL) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "selected_changed"); + "selected_changed", &call_list); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - if (grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed, + &call_list); } else { unref = 1; } @@ -219,28 +199,23 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready"); + "connecting_ready", &call_list); p->selected = p->subchannels[p->checking_subchannel]; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - pp->on_complete->next = cbs; - cbs = pp->on_complete; + grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } - if (grpc_subchannel_notify_on_state_change(p->selected, - &p->checking_connectivity, - &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed, + &call_list); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure"); + "connecting_transient_failure", &call_list); del_interested_parties_locked(p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; @@ -248,13 +223,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->subchannels[p->checking_subchannel]); add_interested_parties_locked(p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - if (grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], - &p->checking_connectivity, &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed, &call_list); } else { goto loop; } @@ -262,14 +233,10 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "connecting_changed"); - if (grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], - &p->checking_connectivity, &p->connectivity_changed) - .state_already_changed) { - p->connectivity_changed.next = cbs; - cbs = &p->connectivity_changed; - } + "connecting_changed", &call_list); + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed, &call_list); break; case GRPC_CHANNEL_FATAL_FAILURE: del_interested_parties_locked(p); @@ -280,19 +247,18 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { if (p->num_subchannels == 0) { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels"); + "no_more_channels", &call_list); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - pp->on_complete->next = cbs; - cbs = pp->on_complete; + grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } unref = 1; } else { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed"); + "subchannel_failed", &call_list); p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); @@ -302,22 +268,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { } } - grpc_connectivity_state_begin_flush(&p->state_tracker, &f); gpr_mu_unlock(&p->mu); - grpc_connectivity_state_end_flush(&f); - while (cbs != NULL) { - grpc_iomgr_closure *next = cbs->next; - cbs->cb(cbs->cb_arg, 1); - cbs = next; - } + grpc_iomgr_call_list_run(call_list); if (unref) { GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); } } -static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { +static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, + grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; size_t n; @@ -339,7 +300,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { gpr_free(subchannels); } -static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { +static grpc_connectivity_state pf_check_connectivity( + grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); @@ -348,16 +310,15 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { return st; } -static grpc_connectivity_state_notify_on_state_change_result -pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, - grpc_iomgr_closure *notify) { +void pf_notify_on_state_change(grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&p->mu); - r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify, call_list); gpr_mu_unlock(&p->mu); - return r; } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 48a787da8a..a9dc9dcca8 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -63,33 +63,38 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy) { } } -void grpc_lb_policy_shutdown(grpc_lb_policy *policy) { - policy->vtable->shutdown(policy); +void grpc_lb_policy_shutdown(grpc_lb_policy *policy, + grpc_iomgr_call_list *call_list) { + policy->vtable->shutdown(policy, call_list); } void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete) { - policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete); + grpc_iomgr_closure *on_complete, + grpc_iomgr_call_list *call_list) { + policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete, + call_list); } -void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) { - policy->vtable->broadcast(policy, op); +void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, + grpc_iomgr_call_list *call_list) { + policy->vtable->broadcast(policy, op, call_list); } -void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) { - policy->vtable->exit_idle(policy); +void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, + grpc_iomgr_call_list *call_list) { + policy->vtable->exit_idle(policy, call_list); } -grpc_connectivity_state_notify_on_state_change_result -grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_iomgr_closure *closure) { - return policy->vtable->notify_on_state_change(policy, state, closure); +void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure, + grpc_iomgr_call_list *call_list) { + policy->vtable->notify_on_state_change(policy, state, closure, call_list); } grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy) { - return policy->vtable->check_connectivity(policy); + grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) { + return policy->vtable->check_connectivity(policy, call_list); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 7c9bdac648..8d7eb579b5 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -53,28 +53,31 @@ struct grpc_lb_policy { struct grpc_lb_policy_vtable { void (*destroy)(grpc_lb_policy *policy); - void (*shutdown)(grpc_lb_policy *policy); + void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); /** implement grpc_lb_policy_pick */ void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete); + grpc_iomgr_closure *on_complete, + grpc_iomgr_call_list *call_list); /** try to enter a READY connectivity state */ - void (*exit_idle)(grpc_lb_policy *policy); + void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); /** broadcast a transport op to all subchannels */ - void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op); + void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op, + grpc_iomgr_call_list *call_list); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy); + grpc_connectivity_state (*check_connectivity)( + grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ - grpc_connectivity_state_notify_on_state_change_result ( - *notify_on_state_change)(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_iomgr_closure *closure); + void (*notify_on_state_change)(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure, + grpc_iomgr_call_list *call_list); }; #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -98,7 +101,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); /** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown(grpc_lb_policy *policy); +void grpc_lb_policy_shutdown(grpc_lb_policy *policy, + grpc_iomgr_call_list *call_list); /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting @@ -107,18 +111,21 @@ void grpc_lb_policy_shutdown(grpc_lb_policy *policy); void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete); + grpc_iomgr_closure *on_complete, + grpc_iomgr_call_list *call_list); -void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op); +void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, + grpc_iomgr_call_list *call_list); -void grpc_lb_policy_exit_idle(grpc_lb_policy *policy); +void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, + grpc_iomgr_call_list *call_list); -grpc_connectivity_state_notify_on_state_change_result -grpc_lb_policy_notify_on_state_change( - grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure) GRPC_MUST_USE_RESULT; +void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure, + grpc_iomgr_call_list *call_list); grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy); + grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6fbf966475..b15acf826a 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -146,7 +146,8 @@ struct grpc_subchannel_call { static grpc_subchannel_call *create_call(connection *con); static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason); + const char *reason, + grpc_iomgr_call_list *call_list); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); @@ -333,16 +334,18 @@ static void start_connect(grpc_subchannel *c) { static void continue_creating_call(void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset); grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, - w4c->notify); + w4c->notify, &call_list); GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, grpc_subchannel_call **target, - grpc_iomgr_closure *notify) { + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -365,15 +368,12 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, c->waiting = w4c; grpc_subchannel_add_interested_party(c, pollset); if (!c->connecting) { - grpc_connectivity_state_flusher f; c->connecting = 1; - connectivity_state_changed_locked(c, "create_call"); + connectivity_state_changed_locked(c, "create_call", call_list); /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); start_connect(c); } else { @@ -390,33 +390,26 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -grpc_connectivity_state_notify_on_state_change_result -grpc_subchannel_notify_on_state_change(grpc_subchannel *c, - grpc_connectivity_state *state, - grpc_iomgr_closure *notify) { +void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, + grpc_connectivity_state *state, + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list) { int do_connect = 0; - grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&c->mu); - r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, - notify); - if (r.current_state_is_idle) { - grpc_connectivity_state_flusher f; + if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, + notify, call_list)) { do_connect = 1; c->connecting = 1; /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - connectivity_state_changed_locked(c, "state_change"); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); - gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); - } else { - gpr_mu_unlock(&c->mu); + connectivity_state_changed_locked(c, "state_change", call_list); } + gpr_mu_unlock(&c->mu); + if (do_connect) { start_connect(c); } - return r; } void grpc_subchannel_process_transport_op(grpc_subchannel *c, @@ -424,24 +417,20 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; CONNECTION_REF_LOCKED(con, "transport-op"); } if (op->disconnect) { - grpc_connectivity_state_flusher f; c->disconnected = 1; - connectivity_state_changed_locked(c, "disconnect"); + connectivity_state_changed_locked(c, "disconnect", &call_list); if (c->have_alarm) { cancel_alarm = 1; } - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); - gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); - } else { - gpr_mu_unlock(&c->mu); } + gpr_mu_unlock(&c->mu); if (con != NULL) { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); @@ -464,6 +453,8 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, if (op->disconnect) { grpc_connector_shutdown(c->connector); } + + grpc_iomgr_call_list_run(call_list); } static void on_state_changed(void *p, int iomgr_success) { @@ -474,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_transport_op op; grpc_channel_element *elem; connection *destroy_connection = NULL; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(mu); @@ -508,26 +499,26 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_connectivity_state_set( &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, - "connection_failed"); + "connection_failed", &call_list); break; } done: - connectivity_state_changed_locked(c, "transport_state_changed"); + connectivity_state_changed_locked(c, "transport_state_changed", &call_list); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(mu); - grpc_connectivity_state_end_flush(&f); if (destroy) { subchannel_destroy(c); } if (destroy_connection != NULL) { connection_destroy(destroy_connection); } + grpc_iomgr_call_list_run(call_list); } -static void publish_transport(grpc_subchannel *c) { +static void publish_transport(grpc_subchannel *c, + grpc_iomgr_call_list *call_list) { size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -538,7 +529,6 @@ static void publish_transport(grpc_subchannel *c) { state_watcher *sw; connection *destroy_connection = NULL; grpc_channel_element *elem; - grpc_connectivity_state_flusher f; /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; @@ -601,17 +591,15 @@ static void publish_transport(grpc_subchannel *c) { elem->filter->start_transport_op(elem, &op); /* signal completion */ - connectivity_state_changed_locked(c, "connected"); + connectivity_state_changed_locked(c, "connected", call_list); w4c = c->waiting; c->waiting = NULL; - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); while (w4c != NULL) { waiting_for_connect *next = w4c; - w4c->continuation.cb(w4c->continuation.cb_arg, 1); + grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1); w4c = next; } @@ -653,16 +641,14 @@ static void update_reconnect_parameters(grpc_subchannel *c) { static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { iomgr_success = 0; } - connectivity_state_changed_locked(c, "alarm"); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); + connectivity_state_changed_locked(c, "alarm", &call_list); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); if (iomgr_success) { update_reconnect_parameters(c); continue_connect(c); @@ -670,24 +656,24 @@ static void on_alarm(void *arg, int iomgr_success) { GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(c, "connecting"); } + grpc_iomgr_call_list_run(call_list); } static void subchannel_connected(void *arg, int iomgr_success) { grpc_subchannel *c = arg; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; if (c->connecting_result.transport != NULL) { - publish_transport(c); + publish_transport(c, &call_list); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_connectivity_state_flusher f; gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - connectivity_state_changed_locked(c, "connect_failed"); + connectivity_state_changed_locked(c, "connect_failed", &call_list); grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); } + grpc_iomgr_call_list_run(call_list); } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { @@ -718,9 +704,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { } static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason) { + const char *reason, + grpc_iomgr_call_list *call_list) { grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(&c->state_tracker, current, reason); + grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list); } /* diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 391df36bfd..7c00ff172d 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -76,7 +76,8 @@ void grpc_subchannel_call_unref( void grpc_subchannel_create_call(grpc_subchannel *subchannel, grpc_pollset *pollset, grpc_subchannel_call **target, - grpc_iomgr_closure *notify); + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, @@ -88,10 +89,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ -grpc_connectivity_state_notify_on_state_change_result -grpc_subchannel_notify_on_state_change( - grpc_subchannel *channel, grpc_connectivity_state *state, - grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT; +void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, + grpc_connectivity_state *state, + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list); /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_subchannel *channel, diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 67eff3e528..ba8f73fe08 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -157,3 +157,42 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, closure->cb_arg = cb_arg; closure->next = NULL; } + +void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list, + grpc_iomgr_closure *closure, int success) { + if (!closure) return; + closure->next = NULL; + closure->success = success; + if (!call_list->head) { + call_list->head = closure; + } else { + call_list->tail->next = closure; + } + call_list->tail = closure; +} + +void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) { + grpc_iomgr_closure *c = call_list.head; + while (c) { + grpc_iomgr_closure *next = c->next; + c->cb(c->cb_arg, c->success); + c = next; + } +} + +int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) { + return call_list.head == NULL; +} + +void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src, + grpc_iomgr_call_list *dst) { + if (dst->head == NULL) { + *dst = *src; + return; + } + if (src->head == NULL) { + return; + } + dst->tail->next = src->head; + dst->tail = src->tail; +} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index f1d2e6439d..58c31763d2 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -58,10 +58,25 @@ typedef struct grpc_iomgr_closure { struct grpc_iomgr_closure *next; } grpc_iomgr_closure; +typedef struct grpc_iomgr_call_list { + grpc_iomgr_closure *head; + grpc_iomgr_closure *tail; +} grpc_iomgr_call_list; + /** Initializes \a closure with \a cb and \a cb_arg. */ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg); +#define GRPC_IOMGR_CALL_LIST_INIT \ + { NULL, NULL } + +void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list, + grpc_iomgr_closure *closure, int success); +void grpc_iomgr_call_list_run(grpc_iomgr_call_list list); +void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src, + grpc_iomgr_call_list *dst); +int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list); + /** Initializes the iomgr. */ void grpc_iomgr_init(void); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 12f34c1a19..253f3190f1 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, GRPC_FD_REF(fd, "delayed_add"); grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_workqueue_push(fd->workqueue, &da->closure, 1); + grpc_pollset_add_unlock_job(pollset, &da->closure); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 291181d9a5..107ffb0b5e 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -136,6 +136,8 @@ void grpc_pollset_init(grpc_pollset *pollset) { pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; + pollset->idle_jobs = NULL; + pollset->unlock_jobs = NULL; become_basic_pollset(pollset, NULL); } @@ -145,7 +147,6 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { } gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd, 1); - grpc_workqueue_flush(fd->workqueue, 0); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -174,6 +175,33 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } +static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) { + grpc_iomgr_closure *exec = *root; + *root = NULL; + gpr_mu_unlock(&pollset->mu); + while (exec != NULL) { + grpc_iomgr_closure *next = exec->next; + exec->cb(exec->cb_arg, 1); + exec = next; + } + gpr_mu_lock(&pollset->mu); +} + +static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) { + closure->next = *root; + *root = closure; +} + +void grpc_pollset_add_idle_job(grpc_pollset *pollset, + grpc_iomgr_closure *closure) { + add_job(&pollset->idle_jobs, closure); +} + +void grpc_pollset_add_unlock_job(grpc_pollset *pollset, + grpc_iomgr_closure *closure) { + add_job(&pollset->unlock_jobs, closure); +} + void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { /* pollset->mu already held */ @@ -182,6 +210,14 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); + if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) { + run_jobs(pollset, &pollset->idle_jobs); + goto done; + } + if (pollset->unlock_jobs != NULL) { + run_jobs(pollset, &pollset->unlock_jobs); + goto done; + } if (grpc_alarm_check(&pollset->mu, now, &deadline)) { goto done; } @@ -301,12 +337,7 @@ static void basic_do_promote(void *args, int success) { gpr_mu_lock(&pollset->mu); /* First we need to ensure that nobody is polling concurrently */ - if (grpc_pollset_has_workers(pollset)) { - grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1); - gpr_mu_unlock(&pollset->mu); - return; - } + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); gpr_free(up_args); /* At this point the pollset may no longer be a unary poller. In that case @@ -390,13 +421,12 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, GRPC_FD_REF(fd, "basicpoll_add"); pollset->in_flight_cbs++; up_args = gpr_malloc(sizeof(*up_args)); - up_args->pollset = pollset; up_args->fd = fd; up_args->original_vtable = pollset->vtable; up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1); + grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure); grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 69bd9cca8c..3f89095d40 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -37,6 +37,7 @@ #include <poll.h> #include <grpc/support/sync.h> +#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -66,6 +67,8 @@ typedef struct grpc_pollset { int kicked_without_pollers; void (*shutdown_done_cb)(void *arg); void *shutdown_done_arg; + grpc_iomgr_closure *unlock_jobs; + grpc_iomgr_closure *idle_jobs; union { int fd; void *ptr; @@ -124,4 +127,11 @@ int grpc_pollset_has_workers(grpc_pollset *pollset); typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; +/** schedule a closure to be run next time there are no active workers */ +void grpc_pollset_add_idle_job(grpc_pollset *pollset, + grpc_iomgr_closure *closure); +/** schedule a closure to be run next time the pollset is unlocked */ +void grpc_pollset_add_unlock_job(grpc_pollset *pollset, + grpc_iomgr_closure *closure); + #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 4ec6aba7f4..6f49060be5 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -77,7 +77,6 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call *registered_calls; - grpc_iomgr_closure destroy_closure; char *target; grpc_workqueue *workqueue; }; @@ -273,8 +272,7 @@ void grpc_channel_internal_ref(grpc_channel *c) { gpr_ref(&c->refs); } -static void destroy_channel(void *p, int ok) { - grpc_channel *channel = p; +static void destroy_channel(grpc_channel *channel) { size_t i; grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { @@ -312,9 +310,7 @@ void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) { void grpc_channel_internal_unref(grpc_channel *channel) { #endif if (gpr_unref(&channel->refs)) { - channel->destroy_closure.cb = destroy_channel; - channel->destroy_closure.cb_arg = channel; - grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1); + destroy_channel(channel); } } diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 12b15f353f..ef47a28fda 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -45,6 +45,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( /* forward through to the underlying client channel */ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_connectivity_state state; if (client_channel_elem->filter != &grpc_client_channel_filter) { gpr_log(GPR_ERROR, "grpc_channel_check_connectivity_state called on something that is " @@ -52,8 +54,10 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( client_channel_elem->filter->name); return GRPC_CHANNEL_FATAL_FAILURE; } - return grpc_client_channel_check_connectivity_state(client_channel_elem, - try_to_connect); + state = grpc_client_channel_check_connectivity_state( + client_channel_elem, try_to_connect, &call_list); + grpc_iomgr_call_list_run(call_list); + return state; } typedef enum { @@ -154,6 +158,7 @@ void grpc_channel_watch_connectivity_state( gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; state_watcher *w = gpr_malloc(sizeof(*w)); grpc_cq_begin_op(cq); @@ -176,13 +181,14 @@ void grpc_channel_watch_connectivity_state( "grpc_channel_watch_connectivity_state called on something that is " "not a client channel, but '%s'", client_channel_elem->filter->name); - grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete, - 1); + grpc_iomgr_call_list_add(&call_list, &w->on_complete, 1); } else { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq)); grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, - &w->on_complete); + &w->on_complete, &call_list); } + + grpc_iomgr_call_list_run(call_list); } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index c8c1abb750..9be1f2f7cf 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -164,8 +164,7 @@ typedef struct { /** data to write next write */ gpr_slice_buffer qbuf; /** queued callbacks */ - grpc_iomgr_closure *pending_closures_head; - grpc_iomgr_closure *pending_closures_tail; + grpc_iomgr_call_list run_at_unlock; /** window available for us to send to peer */ gpr_int64 outgoing_window; @@ -568,11 +567,6 @@ int grpc_chttp2_list_pop_read_write_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -/** schedule a closure to run without the transport lock taken */ -void grpc_chttp2_schedule_closure( - grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, - int success); - grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index c015e82931..1da3f85bde 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -238,8 +238,8 @@ void grpc_chttp2_cleanup_writing( stream_global->outgoing_sopb->nops == 0) { GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); stream_global->outgoing_sopb = NULL; - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 1); + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 1); } } stream_global->writing_now = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 6376c397a2..50d5a3e8a8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -499,32 +499,20 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { - grpc_iomgr_closure *run_closures; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT; unlock_check_read_write_state(t); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); - grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1); prevent_endpoint_shutdown(t); } - run_closures = t->global.pending_closures_head; - t->global.pending_closures_head = NULL; - t->global.pending_closures_tail = NULL; - - grpc_connectivity_state_begin_flush(&t->channel_callback.state_tracker, &f); + GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock); gpr_mu_unlock(&t->mu); - - grpc_connectivity_state_end_flush(&f); - - while (run_closures) { - grpc_iomgr_closure *next = run_closures->next; - run_closures->cb(run_closures->cb_arg, run_closures->success); - run_closures = next; - } + grpc_iomgr_call_list_run(run); } /* @@ -676,8 +664,8 @@ static void perform_stream_op_locked( } } else { grpc_sopb_reset(op->send_ops); - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 0); + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 0); } } @@ -715,9 +703,8 @@ static void perform_stream_op_locked( op->bind_pollset); } - if (op->on_consumed) { - grpc_chttp2_schedule_closure(transport_global, op->on_consumed, 1); - } + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed, + 1); } static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, @@ -754,18 +741,12 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { lock(t); - if (op->on_consumed) { - grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1); - } + grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1); if (op->on_connectivity_state_change) { - if (grpc_connectivity_state_notify_on_state_change( - &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change) - .state_already_changed) { - grpc_chttp2_schedule_closure(&t->global, op->on_connectivity_state_change, - 1); - } + grpc_connectivity_state_notify_on_state_change( + &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change, &t->global.run_at_unlock); } if (op->send_goaway) { @@ -887,8 +868,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (stream_global->outgoing_sopb != NULL) { grpc_sopb_reset(stream_global->outgoing_sopb); stream_global->outgoing_sopb = NULL; - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 1); + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 1); } stream_global->read_closed = 1; if (!stream_global->published_cancelled) { @@ -938,8 +919,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; - grpc_chttp2_schedule_closure(transport_global, - stream_global->recv_done_closure, 1); + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, + stream_global->recv_done_closure, 1); stream_global->recv_done_closure = NULL; stream_global->publish_sopb = NULL; stream_global->publish_state = NULL; @@ -1200,21 +1181,7 @@ static void connectivity_state_set( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set( &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, reason); -} - -void grpc_chttp2_schedule_closure( - grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, - int success) { - closure->success = success; - if (transport_global->pending_closures_tail == NULL) { - transport_global->pending_closures_head = - transport_global->pending_closures_tail = closure; - } else { - transport_global->pending_closures_tail->next = closure; - transport_global->pending_closures_tail = closure; - } - closure->next = NULL; + state, reason, &transport_global->run_at_unlock); } /* diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 034f82474f..b605da9dbf 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -64,7 +64,6 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, tracker->current_state = init_state; tracker->watchers = NULL; tracker->name = gpr_strdup(name); - tracker->changed = 0; } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { @@ -90,20 +89,17 @@ grpc_connectivity_state grpc_connectivity_state_check( return tracker->current_state; } -grpc_connectivity_state_notify_on_state_change_result -grpc_connectivity_state_notify_on_state_change( +int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_iomgr_closure *notify) { - grpc_connectivity_state_notify_on_state_change_result result; - memset(&result, 0, sizeof(result)); + grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name, - grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state)); + gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p", + tracker->name, grpc_connectivity_state_name(*current), + grpc_connectivity_state_name(tracker->current_state), notify); } if (tracker->current_state != *current) { *current = tracker->current_state; - result.state_already_changed = 1; + grpc_iomgr_call_list_add(call_list, notify, 1); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -111,13 +107,14 @@ grpc_connectivity_state_notify_on_state_change( w->next = tracker->watchers; tracker->watchers = w; } - result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE; - return result; + return tracker->current_state == GRPC_CHANNEL_IDLE; } void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - const char *reason) { + const char *reason, + grpc_iomgr_call_list *call_list) { + grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, grpc_connectivity_state_name(tracker->current_state), @@ -128,40 +125,11 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, } GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE); tracker->current_state = state; - tracker->changed = 1; -} - -void grpc_connectivity_state_begin_flush( - grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state_flusher *flusher) { - grpc_connectivity_state_watcher *w; - flusher->cbs = NULL; - if (!tracker->changed) return; - w = tracker->watchers; - tracker->watchers = NULL; - while (w != NULL) { - grpc_connectivity_state_watcher *next = w->next; - if (tracker->current_state != *w->current) { - *w->current = tracker->current_state; - w->notify->next = flusher->cbs; - flusher->cbs = w->notify; - gpr_free(w); - } else { - w->next = tracker->watchers; - tracker->watchers = w; - } - w = next; - } - tracker->changed = 0; -} - -void grpc_connectivity_state_end_flush( - grpc_connectivity_state_flusher *flusher) { - grpc_iomgr_closure *c = flusher->cbs; - while (c != NULL) { - grpc_iomgr_closure *next = c; - c->cb(c->cb_arg, 1); - c = next; + while ((w = tracker->watchers) != NULL) { + *w->current = tracker->current_state; + tracker->watchers = w->next; + grpc_iomgr_call_list_add(call_list, w->notify, 1); + gpr_free(w); } } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 323af2740c..49c2815266 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -54,12 +54,8 @@ typedef struct { grpc_connectivity_state_watcher *watchers; /** a name to help debugging */ char *name; - /** has this state been changed since the last flush? */ - int changed; } grpc_connectivity_state_tracker; -typedef struct { grpc_iomgr_closure *cbs; } grpc_connectivity_state_flusher; - extern int grpc_connectivity_state_trace; void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, @@ -71,35 +67,15 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); * external lock */ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - const char *reason); - -/** Begin flushing callbacks; not thread safe; access must be serialized using - * the same external lock as grpc_connectivity_state_set. Initializes flusher. - */ -void grpc_connectivity_state_begin_flush( - grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state_flusher *flusher); - -/** Complete flushing updates: must not be called with any locks held */ -void grpc_connectivity_state_end_flush( - grpc_connectivity_state_flusher *flusher); + const char *reason, + grpc_iomgr_call_list *call_list); grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); -typedef struct { - /** 1 if the current state is idle (a hint to begin connecting), 0 otherwise - */ - int current_state_is_idle; - /** 1 if the state has already changed: in this case the closure passed to - * grpc_connectivity_state_notify_on_state_change will not be called */ - int state_already_changed; -} grpc_connectivity_state_notify_on_state_change_result; - /** Return 1 if the channel should start connecting, 0 otherwise */ -grpc_connectivity_state_notify_on_state_change_result -grpc_connectivity_state_notify_on_state_change( +int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT; + grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ |