diff options
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r-- | src/core/channel/client_channel.c | 292 |
1 files changed, 149 insertions, 143 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 2e25033813..0193928a50 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -73,9 +73,9 @@ typedef struct { guarded by mu_config */ grpc_client_config *incoming_configuration; /** a list of closures that are all waiting for config to come in */ - grpc_iomgr_closure *waiting_for_config_closures; + grpc_closure_list waiting_for_config_closures; /** resolver callback */ - grpc_iomgr_closure on_config_changed; + grpc_closure on_config_changed; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ @@ -91,7 +91,7 @@ typedef struct { update the channel, and create a new watcher */ typedef struct { channel_data *chand; - grpc_iomgr_closure on_changed; + grpc_closure on_changed; grpc_connectivity_state state; grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; @@ -115,7 +115,7 @@ struct call_data { call_state state; gpr_timespec deadline; grpc_subchannel *picked_channel; - grpc_iomgr_closure async_setup_task; + grpc_closure async_setup_task; grpc_transport_stream_op waiting_op; /* our child call stack */ grpc_subchannel_call *subchannel_call; @@ -123,17 +123,18 @@ struct call_data { grpc_linked_mdelem details; }; -static grpc_iomgr_closure *merge_into_waiting_op( - grpc_call_element *elem, - grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT; +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) + GRPC_MUST_USE_RESULT; -static void handle_op_after_cancellation(grpc_call_element *elem, +static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (op->send_ops) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(op->on_done_send->cb_arg, 0); + op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); } if (op->recv_ops) { char status[GPR_LTOA_MIN_BUFSIZE]; @@ -152,26 +153,28 @@ static void handle_op_after_cancellation(grpc_call_element *elem, mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); + op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); } if (op->on_consumed) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); + op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); } } typedef struct { - grpc_iomgr_closure closure; + grpc_closure closure; grpc_call_element *elem; } waiting_call; -static void perform_transport_stream_op(grpc_call_element *elem, +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, grpc_transport_stream_op *op, int continuation); -static void continue_with_pick(void *arg, int iomgr_success) { +static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { waiting_call *wc = arg; call_data *calld = wc->elem->call_data; - perform_transport_stream_op(wc->elem, &calld->waiting_op, 1); + perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1); gpr_free(wc); } @@ -179,10 +182,9 @@ static void add_to_lb_policy_wait_queue_locked_state_config( grpc_call_element *elem) { channel_data *chand = elem->channel_data; waiting_call *wc = gpr_malloc(sizeof(*wc)); - grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); + grpc_closure_init(&wc->closure, continue_with_pick, wc); wc->elem = elem; - wc->closure.next = chand->waiting_for_config_closures; - chand->waiting_for_config_closures = &wc->closure; + grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1); } static int is_empty(void *p, int len) { @@ -194,7 +196,8 @@ static int is_empty(void *p, int len) { return 1; } -static void started_call(void *arg, int iomgr_success) { +static void started_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; @@ -204,21 +207,21 @@ static void started_call(void *arg, int iomgr_success) { memset(&op, 0, sizeof(op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(calld->subchannel_call, &op); + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op); } else if (calld->state == CALL_WAITING_FOR_CALL) { have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); if (calld->subchannel_call != NULL) { calld->state = CALL_ACTIVE; gpr_mu_unlock(&calld->mu_state); if (have_waiting) { - grpc_subchannel_call_process_op(calld->subchannel_call, + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &calld->waiting_op); } } else { calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); if (have_waiting) { - handle_op_after_cancellation(calld->elem, &calld->waiting_op); + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); } } } else { @@ -227,36 +230,37 @@ static void started_call(void *arg, int iomgr_success) { } } -static void picked_target(void *arg, int iomgr_success) { +static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; if (calld->picked_channel == NULL) { /* treat this like a cancellation */ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; - perform_transport_stream_op(calld->elem, &calld->waiting_op, 1); + perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1); } else { gpr_mu_lock(&calld->mu_state); if (calld->state == CALL_CANCELLED) { gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(calld->elem, &calld->waiting_op); + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); } else { GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); calld->state = CALL_WAITING_FOR_CALL; pollset = calld->waiting_op.bind_pollset; gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call(calld->picked_channel, pollset, + grpc_closure_init(&calld->async_setup_task, started_call, calld); + grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task); } } } -static grpc_iomgr_closure *merge_into_waiting_op( - grpc_call_element *elem, grpc_transport_stream_op *new_op) { +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) { call_data *calld = elem->call_data; - grpc_iomgr_closure *consumed_op = NULL; + grpc_closure *consumed_op = NULL; grpc_transport_stream_op *waiting_op = &calld->waiting_op; GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); @@ -282,7 +286,7 @@ static grpc_iomgr_closure *merge_into_waiting_op( return consumed_op; } -static char *cc_get_peer(grpc_call_element *elem) { +static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; @@ -293,8 +297,8 @@ static char *cc_get_peer(grpc_call_element *elem) { subchannel_call = calld->subchannel_call; GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); gpr_mu_unlock(&calld->mu_state); - result = grpc_subchannel_call_get_peer(subchannel_call); - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer"); + result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer"); return result; } else { gpr_mu_unlock(&calld->mu_state); @@ -302,7 +306,8 @@ static char *cc_get_peer(grpc_call_element *elem) { } } -static void perform_transport_stream_op(grpc_call_element *elem, +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { call_data *calld = elem->call_data; @@ -310,7 +315,6 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - grpc_iomgr_closure *consumed_op = NULL; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -320,15 +324,15 @@ static void perform_transport_stream_op(grpc_call_element *elem, GPR_ASSERT(!continuation); subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(subchannel_call, op); + grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op); break; case CALL_CANCELLED: gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); + handle_op_after_cancellation(exec_ctx, elem, op); break; case CALL_WAITING_FOR_SEND: GPR_ASSERT(!continuation); - consumed_op = merge_into_waiting_op(elem, op); + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { gpr_mu_unlock(&calld->mu_state); @@ -354,10 +358,10 @@ static void perform_transport_stream_op(grpc_call_element *elem, op2.on_consumed = NULL; } gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); - handle_op_after_cancellation(elem, &op2); + handle_op_after_cancellation(exec_ctx, elem, op); + handle_op_after_cancellation(exec_ctx, elem, &op2); } else { - consumed_op = merge_into_waiting_op(elem, op); + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); gpr_mu_unlock(&calld->mu_state); } break; @@ -367,7 +371,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, if (op->cancel_with_status != GRPC_STATUS_OK) { calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); + handle_op_after_cancellation(exec_ctx, elem, op); } else { calld->waiting_op = *op; @@ -380,34 +384,33 @@ static void perform_transport_stream_op(grpc_call_element *elem, gpr_mu_lock(&chand->mu_config); lb_policy = chand->lb_policy; if (lb_policy) { - grpc_transport_stream_op *op = &calld->waiting_op; - grpc_pollset *bind_pollset = op->bind_pollset; + grpc_transport_stream_op *waiting_op = &calld->waiting_op; + grpc_pollset *bind_pollset = waiting_op->bind_pollset; grpc_metadata_batch *initial_metadata = - &op->send_ops->ops[0].data.metadata; + &waiting_op->send_ops->ops[0].data.metadata; GRPC_LB_POLICY_REF(lb_policy, "pick"); gpr_mu_unlock(&chand->mu_config); calld->state = CALL_WAITING_FOR_PICK; - GPR_ASSERT(op->bind_pollset); - GPR_ASSERT(op->send_ops); - GPR_ASSERT(op->send_ops->nops >= 1); - GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); + GPR_ASSERT(waiting_op->bind_pollset); + GPR_ASSERT(waiting_op->send_ops); + GPR_ASSERT(waiting_op->send_ops->nops >= 1); + GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA); gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, - calld); - grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, - &calld->picked_channel, + grpc_closure_init(&calld->async_setup_task, picked_target, calld); + grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset, + initial_metadata, &calld->picked_channel, &calld->async_setup_task); - GRPC_LB_POLICY_UNREF(lb_policy, "pick"); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick"); } else if (chand->resolver != NULL) { calld->state = CALL_WAITING_FOR_CONFIG; add_to_lb_policy_wait_queue_locked_state_config(elem); if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); chand->started_resolving = 1; - grpc_resolver_next(chand->resolver, + grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); } @@ -417,62 +420,68 @@ static void perform_transport_stream_op(grpc_call_element *elem, calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); + handle_op_after_cancellation(exec_ctx, elem, op); } } } break; } - - if (consumed_op != NULL) { - consumed_op->cb(consumed_op->cb_arg, 1); - } } -static void cc_start_transport_stream_op(grpc_call_element *elem, +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, grpc_transport_stream_op *op) { - perform_transport_stream_op(elem, op, 0); + perform_transport_stream_op(exec_ctx, elem, op, 0); } -static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, grpc_connectivity_state current_state); -static void on_lb_policy_state_changed(void *arg, int iomgr_success) { +static void on_lb_policy_state_changed_locked( + grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { + /* check if the notification is for a stale policy */ + if (w->lb_policy != w->chand->lb_policy) return; + + grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state, + "lb_changed"); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } +} + +static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { lb_policy_connectivity_watcher *w = arg; gpr_mu_lock(&w->chand->mu_config); - /* check if the notification is for a stale policy */ - if (w->lb_policy == w->chand->lb_policy) { - grpc_connectivity_state_set(&w->chand->state_tracker, w->state, - "lb_changed"); - if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { - watch_lb_policy(w->chand, w->lb_policy, w->state); - } - } + on_lb_policy_state_changed_locked(exec_ctx, w); gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy"); gpr_free(w); } -static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); w->chand = chand; - grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed); + grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, + &w->on_changed); } -static void cc_on_config_changed(void *arg, int iomgr_success) { +static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; - grpc_iomgr_closure *wakeup_closures = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; int exit_idle = 0; @@ -481,10 +490,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity(lb_policy); + state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy); } - grpc_client_config_unref(chand->incoming_configuration); + grpc_client_config_unref(exec_ctx, chand->incoming_configuration); } chand->incoming_configuration = NULL; @@ -493,8 +502,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - wakeup_closures = chand->waiting_for_config_closures; - chand->waiting_for_config_closures = NULL; + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -505,57 +513,53 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (iomgr_success && chand->resolver) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF(resolver, "channel-next"); - grpc_connectivity_state_set(&chand->state_tracker, state, + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy(exec_ctx, chand, lb_policy, state); + } gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_resolver_next(resolver, &chand->incoming_configuration, + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); - GRPC_RESOLVER_UNREF(resolver, "channel-next"); - if (lb_policy != NULL) { - watch_lb_policy(chand, lb_policy, state); - } + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); } else { old_resolver = chand->resolver; chand->resolver = NULL; - grpc_connectivity_state_set(&chand->state_tracker, + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); gpr_mu_unlock(&chand->mu_config); if (old_resolver != NULL) { - grpc_resolver_shutdown(old_resolver); - GRPC_RESOLVER_UNREF(old_resolver, "channel"); + grpc_resolver_shutdown(exec_ctx, old_resolver); + GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel"); } } if (exit_idle) { - grpc_lb_policy_exit_idle(lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle"); + grpc_lb_policy_exit_idle(exec_ctx, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); } if (old_lb_policy != NULL) { - grpc_lb_policy_shutdown(old_lb_policy); - GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); - } - - while (wakeup_closures) { - grpc_iomgr_closure *next = wakeup_closures->next; - wakeup_closures->cb(wakeup_closures->cb_arg, 1); - wakeup_closures = next; + grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } if (lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(lb_policy, "config_change"); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } - GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); + + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver"); } -static void cc_start_transport_op(grpc_channel_element *elem, +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_transport_op *op) { grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_iomgr_closure *on_consumed = op->on_consumed; - op->on_consumed = NULL; + + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->bind_pollset == NULL); @@ -563,7 +567,7 @@ static void cc_start_transport_op(grpc_channel_element *elem, gpr_mu_lock(&chand->mu_config); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, + exec_ctx, &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = NULL; op->connectivity_state = NULL; @@ -577,35 +581,31 @@ static void cc_start_transport_op(grpc_channel_element *elem, } if (op->disconnect && chand->resolver != NULL) { - grpc_connectivity_state_set(&chand->state_tracker, + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_lb_policy_shutdown(chand->lb_policy); - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } } gpr_mu_unlock(&chand->mu_config); if (destroy_resolver) { - grpc_resolver_shutdown(destroy_resolver); - GRPC_RESOLVER_UNREF(destroy_resolver, "channel"); + grpc_resolver_shutdown(exec_ctx, destroy_resolver); + GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); } if (lb_policy) { - grpc_lb_policy_broadcast(lb_policy, op); - GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); - } - - if (on_consumed) { - grpc_iomgr_add_callback(on_consumed); + grpc_lb_policy_broadcast(exec_ctx, lb_policy, op); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast"); } } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const void *server_transport_data, grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; @@ -622,7 +622,8 @@ static void init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call; @@ -634,7 +635,7 @@ static void destroy_call_elem(grpc_call_element *elem) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel"); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel"); break; case CALL_CREATED: case CALL_CANCELLED: @@ -651,7 +652,8 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { @@ -666,25 +668,25 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, chand->mdctx = metadata_context; chand->master = master; grpc_pollset_set_init(&chand->pollset_set); - grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, - chand); + grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); } /* Destructor for channel_data */ -static void destroy_channel_elem(grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_shutdown(chand->resolver); - GRPC_RESOLVER_UNREF(chand->resolver, "channel"); + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - grpc_connectivity_state_destroy(&chand->state_tracker); + grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(&chand->pollset_set); gpr_mu_destroy(&chand->mu_config); } @@ -702,7 +704,8 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, +void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, grpc_resolver *resolver) { /* post construction initialization: set the transport setup pointer */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); @@ -711,31 +714,32 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, GPR_ASSERT(!chand->resolver); chand->resolver = resolver; GRPC_RESOLVER_REF(resolver, "channel"); - if (chand->waiting_for_config_closures != NULL || + if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_resolver_next(resolver, &chand->incoming_configuration, + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); } gpr_mu_unlock(&chand->mu_config); } grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect) { + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; gpr_mu_lock(&chand->mu_config); out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(chand->lb_policy); + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); } else { chand->exit_idle_when_lb_policy_arrives = 1; if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); chand->started_resolving = 1; - grpc_resolver_next(chand->resolver, &chand->incoming_configuration, + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, &chand->on_config_changed); } } @@ -745,12 +749,12 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } void grpc_client_channel_watch_connectivity_state( - grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_iomgr_closure *on_complete) { + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_config); - grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, - on_complete); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, state, on_complete); gpr_mu_unlock(&chand->mu_config); } @@ -760,14 +764,16 @@ grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( return &chand->pollset_set; } -void grpc_client_channel_add_interested_party(grpc_channel_element *elem, +void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_pollset *pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset(&chand->pollset_set, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset); } -void grpc_client_channel_del_interested_party(grpc_channel_element *elem, +void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_pollset *pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset(&chand->pollset_set, pollset); + grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset); } |