diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-18 07:20:29 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-18 07:20:29 -0700 |
commit | 000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch) | |
tree | 883d73a97471f63e616d02c1e17efc62b099c8ad /src/core/channel | |
parent | 38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff) |
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/client_channel.c | 139 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 5 |
2 files changed, 62 insertions, 82 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5b165972a7..a2346503d9 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -73,7 +73,7 @@ typedef struct { guarded by mu_config */ grpc_client_config *incoming_configuration; /** a list of closures that are all waiting for config to come in */ - grpc_iomgr_closure *waiting_for_config_closures; + grpc_iomgr_call_list waiting_for_config_closures; /** resolver callback */ grpc_iomgr_closure on_config_changed; /** connectivity state being tracked */ @@ -181,8 +181,8 @@ static void add_to_lb_policy_wait_queue_locked_state_config( waiting_call *wc = gpr_malloc(sizeof(*wc)); grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); wc->elem = elem; - wc->closure.next = chand->waiting_for_config_closures; - chand->waiting_for_config_closures = &wc->closure; + grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure, + 1); } static int is_empty(void *p, int len) { @@ -230,6 +230,7 @@ static void started_call(void *arg, int iomgr_success) { static void picked_target(void *arg, int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; if (calld->picked_channel == NULL) { /* treat this like a cancellation */ @@ -248,9 +249,10 @@ static void picked_target(void *arg, int iomgr_success) { grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); grpc_subchannel_create_call(calld->picked_channel, pollset, &calld->subchannel_call, - &calld->async_setup_task); + &calld->async_setup_task, &call_list); } } + grpc_iomgr_call_list_run(call_list); } static grpc_iomgr_closure *merge_into_waiting_op( @@ -310,7 +312,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - grpc_iomgr_closure *consumed_op = NULL; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -328,7 +330,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, break; case CALL_WAITING_FOR_SEND: GPR_ASSERT(!continuation); - consumed_op = merge_into_waiting_op(elem, op); + grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1); if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { gpr_mu_unlock(&calld->mu_state); @@ -357,7 +359,8 @@ static void perform_transport_stream_op(grpc_call_element *elem, handle_op_after_cancellation(elem, op); handle_op_after_cancellation(elem, &op2); } else { - consumed_op = merge_into_waiting_op(elem, op); + grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), + 1); gpr_mu_unlock(&calld->mu_state); } break; @@ -398,7 +401,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, calld); grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, - &calld->async_setup_task); + &calld->async_setup_task, &call_list); GRPC_LB_POLICY_UNREF(lb_policy, "pick"); } else if (chand->resolver != NULL) { @@ -424,9 +427,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, break; } - if (consumed_op != NULL) { - consumed_op->cb(consumed_op->cb_arg, 1); - } + grpc_iomgr_call_list_run(call_list); } static void cc_start_transport_stream_op(grpc_call_element *elem, @@ -435,36 +436,38 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, } static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state); + grpc_connectivity_state current_state, + grpc_iomgr_call_list *cl); -static void on_lb_policy_state_changed_locked( - lb_policy_connectivity_watcher *w) { +static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w, + grpc_iomgr_call_list *cl) { /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed"); + grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed", + cl); if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { - watch_lb_policy(w->chand, w->lb_policy, w->state); + watch_lb_policy(w->chand, w->lb_policy, w->state, cl); } } static void on_lb_policy_state_changed(void *arg, int iomgr_success) { lb_policy_connectivity_watcher *w = arg; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&w->chand->mu_config); - on_lb_policy_state_changed_locked(w); - grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f); + on_lb_policy_state_changed_locked(w, &cl); gpr_mu_unlock(&w->chand->mu_config); - grpc_connectivity_state_end_flush(&f); + grpc_iomgr_call_list_run(cl); GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); gpr_free(w); } static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state) { + grpc_connectivity_state current_state, + grpc_iomgr_call_list *call_list) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); @@ -472,13 +475,8 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, - &w->on_changed) - .state_already_changed) { - on_lb_policy_state_changed_locked(w); - GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); - gpr_free(w); - } + grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed, + call_list); } static void cc_on_config_changed(void *arg, int iomgr_success) { @@ -486,9 +484,8 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; - grpc_iomgr_closure *wakeup_closures = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT; int exit_idle = 0; if (chand->incoming_configuration != NULL) { @@ -496,7 +493,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity(lb_policy); + state = grpc_lb_policy_check_connectivity(lb_policy, &cl); } grpc_client_config_unref(chand->incoming_configuration); @@ -508,8 +505,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - wakeup_closures = chand->waiting_for_config_closures; - chand->waiting_for_config_closures = NULL; + grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -520,15 +516,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (iomgr_success && chand->resolver) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF(resolver, "channel-next"); - grpc_connectivity_state_set(&chand->state_tracker, state, - "new_lb+resolver"); + grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver", + &cl); if (lb_policy != NULL) { - watch_lb_policy(chand, lb_policy, state); + watch_lb_policy(chand, lb_policy, state, &cl); } - grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_connectivity_state_end_flush(&f); grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); @@ -536,10 +530,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { old_resolver = chand->resolver; chand->resolver = NULL; grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); - grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); + GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone", + &cl); gpr_mu_unlock(&chand->mu_config); - grpc_connectivity_state_end_flush(&f); if (old_resolver != NULL) { grpc_resolver_shutdown(old_resolver); GRPC_RESOLVER_UNREF(old_resolver, "channel"); @@ -547,24 +540,20 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } if (exit_idle) { - grpc_lb_policy_exit_idle(lb_policy); + grpc_lb_policy_exit_idle(lb_policy, &cl); GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle"); } if (old_lb_policy != NULL) { - grpc_lb_policy_shutdown(old_lb_policy); + grpc_lb_policy_shutdown(old_lb_policy, &cl); GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); } - while (wakeup_closures) { - grpc_iomgr_closure *next = wakeup_closures->next; - wakeup_closures->cb(wakeup_closures->cb_arg, 1); - wakeup_closures = next; - } - if (lb_policy != NULL) { GRPC_LB_POLICY_UNREF(lb_policy, "config_change"); } + + grpc_iomgr_call_list_run(cl); GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } @@ -573,23 +562,21 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_connectivity_state_flusher f; - grpc_iomgr_closure *call_list = op->on_consumed; - call_list->next = NULL; - op->on_consumed = NULL; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + + if (op->on_consumed) { + grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1); + op->on_consumed = NULL; + } GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->bind_pollset == NULL); gpr_mu_lock(&chand->mu_config); if (op->on_connectivity_state_change != NULL) { - if (grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change) - .state_already_changed) { - op->on_connectivity_state_change->next = call_list; - call_list = op->on_connectivity_state_change; - } + grpc_connectivity_state_notify_on_state_change( + &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change, &call_list); op->on_connectivity_state_change = NULL; op->connectivity_state = NULL; } @@ -603,18 +590,17 @@ static void cc_start_transport_op(grpc_channel_element *elem, if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + GRPC_CHANNEL_FATAL_FAILURE, "disconnect", + &call_list); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_lb_policy_shutdown(chand->lb_policy); + grpc_lb_policy_shutdown(chand->lb_policy, &call_list); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); chand->lb_policy = NULL; } } - grpc_connectivity_state_begin_flush(&chand->state_tracker, &f); gpr_mu_unlock(&chand->mu_config); - grpc_connectivity_state_end_flush(&f); if (destroy_resolver) { grpc_resolver_shutdown(destroy_resolver); @@ -622,15 +608,11 @@ static void cc_start_transport_op(grpc_channel_element *elem, } if (lb_policy) { - grpc_lb_policy_broadcast(lb_policy, op); + grpc_lb_policy_broadcast(lb_policy, op, &call_list); GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); } - while (call_list != NULL) { - grpc_iomgr_closure *next = call_list->next; - call_list->cb(call_list->cb_arg, 1); - call_list = next; - } + grpc_iomgr_call_list_run(call_list); } /* Constructor for call_data */ @@ -740,7 +722,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, GPR_ASSERT(!chand->resolver); chand->resolver = resolver; GRPC_RESOLVER_REF(resolver, "channel"); - if (chand->waiting_for_config_closures != NULL || + if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); @@ -751,14 +733,15 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, } grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect) { + grpc_channel_element *elem, int try_to_connect, + grpc_iomgr_call_list *call_list) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; gpr_mu_lock(&chand->mu_config); out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(chand->lb_policy); + grpc_lb_policy_exit_idle(chand->lb_policy, call_list); } else { chand->exit_idle_when_lb_policy_arrives = 1; if (!chand->started_resolving && chand->resolver != NULL) { @@ -775,16 +758,12 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_iomgr_closure *on_complete) { + grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) { channel_data *chand = elem->channel_data; - grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&chand->mu_config); - r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, - state, on_complete); + grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, + on_complete, call_list); gpr_mu_unlock(&chand->mu_config); - if (r.state_already_changed) { - on_complete->cb(on_complete->cb_arg, 1); - } } grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 13681e3956..60090e8d7e 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -53,11 +53,12 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect); + grpc_channel_element *elem, int try_to_connect, + grpc_iomgr_call_list *call_list); void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_iomgr_closure *on_complete); + grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list); grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( grpc_channel_element *elem); |