diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 10:47:08 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 10:47:08 -0700 |
commit | 9f7dc3a4e5c3eeb6524472c6dc694f0600d03688 (patch) | |
tree | 8b7d92f3bdd000ec44011dd4c96bd3d9c39036fb /src/core | |
parent | 1be70ccdb84ec41975cc018f6b2a2a89cf5072ee (diff) |
Move argument passing to start of list
Diffstat (limited to 'src/core')
57 files changed, 645 insertions, 645 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index eed9523d72..485987a295 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -99,7 +99,7 @@ static void client_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) { client_mutate_op (elem, op); - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } static void @@ -112,7 +112,7 @@ server_on_done_recv (grpc_exec_ctx * exec_ctx, void *ptr, int success) { extract_and_annotate_method_tag (calld->recv_ops, calld, chand); } - calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list); + calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success); } static void @@ -134,7 +134,7 @@ server_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, g call_data *calld = elem->call_data; GPR_ASSERT ((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); server_mutate_op (elem, op); - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } static void diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 07b118f74a..199862dddd 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -120,7 +120,7 @@ grpc_channel_stack_init (grpc_exec_ctx * exec_ctx, const grpc_channel_filter ** { elems[i].filter = filters[i]; elems[i].channel_data = user_data; - elems[i].filter->init_channel_elem (&elems[i], master, args, metadata_context, i == 0, i == (filter_count - 1), closure_list); + elems[i].filter->init_channel_elem (&elems[i], master, args, metadata_context, i == 0, i == (exec_ctx, filter_count - 1)); user_data += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data); call_size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_call_data); } @@ -141,7 +141,7 @@ grpc_channel_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_channel_stack * stack /* destroy per-filter data */ for (i = 0; i < count; i++) { - channel_elems[i].filter->destroy_channel_elem (&channel_elems[i], closure_list); + channel_elems[i].filter->destroy_channel_elem (exec_ctx, &channel_elems[i]); } } @@ -164,7 +164,7 @@ grpc_call_stack_init (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_sta call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - call_elems[i].filter->init_call_elem (&call_elems[i], transport_server_data, initial_op, closure_list); + call_elems[i].filter->init_call_elem (exec_ctx, &call_elems[i], transport_server_data, initial_op); user_data += ROUND_UP_TO_ALIGNMENT_SIZE (call_elems[i].filter->sizeof_call_data); } } @@ -179,7 +179,7 @@ grpc_call_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_call_stack * stack) /* destroy per-filter data */ for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem (&elems[i], closure_list); + elems[i].filter->destroy_call_elem (exec_ctx, &elems[i]); } } @@ -187,21 +187,21 @@ void grpc_call_next_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) { grpc_call_element *next_elem = elem + 1; - next_elem->filter->start_transport_stream_op (next_elem, op, closure_list); + next_elem->filter->start_transport_stream_op (exec_ctx, next_elem, op); } char * grpc_call_next_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) { grpc_call_element *next_elem = elem + 1; - return next_elem->filter->get_peer (next_elem, closure_list); + return next_elem->filter->get_peer (exec_ctx, next_elem); } void grpc_channel_next_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op) { grpc_channel_element *next_elem = elem + 1; - next_elem->filter->start_transport_op (next_elem, op, closure_list); + next_elem->filter->start_transport_op (exec_ctx, next_elem, op); } grpc_channel_stack * @@ -222,5 +222,5 @@ grpc_call_element_send_cancel (grpc_exec_ctx * exec_ctx, grpc_call_element * cur grpc_transport_stream_op op; memset (&op, 0, sizeof (op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; - grpc_call_next_op (cur_elem, &op, closure_list); + grpc_call_next_op (exec_ctx, cur_elem, &op); } diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index d0be3ca7b6..9a7c1c77ca 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -138,7 +138,7 @@ merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_ if (op->send_ops) { grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb (op->on_done_send->cb_arg, 0, closure_list); + op->on_done_send->cb (exec_ctx, op->on_done_send->cb_arg, 0); } if (op->recv_ops) { @@ -156,11 +156,11 @@ merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_ mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME); grpc_sopb_add_metadata (op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb (op->on_done_recv->cb_arg, 1, closure_list); + op->on_done_recv->cb (exec_ctx, op->on_done_recv->cb_arg, 1); } if (op->on_consumed) { - op->on_consumed->cb (op->on_consumed->cb_arg, 0, closure_list); + op->on_consumed->cb (exec_ctx, op->on_consumed->cb_arg, 0); } } @@ -177,7 +177,7 @@ continue_with_pick (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) { waiting_call *wc = arg; call_data *calld = wc->elem->call_data; - perform_transport_stream_op (wc->elem, &calld->waiting_op, 1, closure_list); + perform_transport_stream_op (exec_ctx, wc->elem, &calld->waiting_op, 1); gpr_free (wc); } @@ -217,7 +217,7 @@ started_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) memset (&op, 0, sizeof (op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; gpr_mu_unlock (&calld->mu_state); - grpc_subchannel_call_process_op (calld->subchannel_call, &op, closure_list); + grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &op); } else if (calld->state == CALL_WAITING_FOR_CALL) { @@ -228,7 +228,7 @@ started_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) gpr_mu_unlock (&calld->mu_state); if (have_waiting) { - grpc_subchannel_call_process_op (calld->subchannel_call, &calld->waiting_op, closure_list); + grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &calld->waiting_op); } } else @@ -237,7 +237,7 @@ started_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) gpr_mu_unlock (&calld->mu_state); if (have_waiting) { - handle_op_after_cancellation (calld->elem, &calld->waiting_op, closure_list); + handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op); } } } @@ -258,7 +258,7 @@ picked_target (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) { /* treat this like a cancellation */ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; - perform_transport_stream_op (calld->elem, &calld->waiting_op, 1, closure_list); + perform_transport_stream_op (exec_ctx, calld->elem, &calld->waiting_op, 1); } else { @@ -266,7 +266,7 @@ picked_target (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) if (calld->state == CALL_CANCELLED) { gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (calld->elem, &calld->waiting_op, closure_list); + handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op); } else { @@ -275,7 +275,7 @@ picked_target (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) pollset = calld->waiting_op.bind_pollset; gpr_mu_unlock (&calld->mu_state); grpc_closure_init (&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call (calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task, closure_list); + grpc_subchannel_create_call (exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task); } } } @@ -329,8 +329,8 @@ cc_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) subchannel_call = calld->subchannel_call; GRPC_SUBCHANNEL_CALL_REF (subchannel_call, "get_peer"); gpr_mu_unlock (&calld->mu_state); - result = grpc_subchannel_call_get_peer (subchannel_call, closure_list); - GRPC_SUBCHANNEL_CALL_UNREF (subchannel_call, "get_peer", closure_list); + result = grpc_subchannel_call_get_peer (exec_ctx, subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "get_peer"); return result; } else @@ -358,11 +358,11 @@ perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, GPR_ASSERT (!continuation); subchannel_call = calld->subchannel_call; gpr_mu_unlock (&calld->mu_state); - grpc_subchannel_call_process_op (subchannel_call, op, closure_list); + grpc_subchannel_call_process_op (exec_ctx, subchannel_call, op); break; case CALL_CANCELLED: gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (elem, op, closure_list); + handle_op_after_cancellation (exec_ctx, elem, op); break; case CALL_WAITING_FOR_SEND: GPR_ASSERT (!continuation); @@ -397,8 +397,8 @@ perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, op2.on_consumed = NULL; } gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (elem, op, closure_list); - handle_op_after_cancellation (elem, &op2, closure_list); + handle_op_after_cancellation (exec_ctx, elem, op); + handle_op_after_cancellation (exec_ctx, elem, &op2); } else { @@ -413,7 +413,7 @@ perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, { calld->state = CALL_CANCELLED; gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (elem, op, closure_list); + handle_op_after_cancellation (exec_ctx, elem, op); } else { @@ -446,9 +446,9 @@ perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, gpr_mu_unlock (&calld->mu_state); grpc_closure_init (&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick (lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task, closure_list); + grpc_lb_policy_pick (exec_ctx, lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task); - GRPC_LB_POLICY_UNREF (lb_policy, "pick", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "pick"); } else if (chand->resolver != NULL) { @@ -458,7 +458,7 @@ perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, { GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); chand->started_resolving = 1; - grpc_resolver_next (chand->resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); + grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); } gpr_mu_unlock (&chand->mu_config); gpr_mu_unlock (&calld->mu_state); @@ -468,7 +468,7 @@ perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, calld->state = CALL_CANCELLED; gpr_mu_unlock (&chand->mu_config); gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (elem, op, closure_list); + handle_op_after_cancellation (exec_ctx, elem, op); } } } @@ -479,7 +479,7 @@ perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, static void cc_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) { - perform_transport_stream_op (elem, op, 0, closure_list); + perform_transport_stream_op (exec_ctx, elem, op, 0); } static void watch_lb_policy (channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state, grpc_closure_list * cl); @@ -504,10 +504,10 @@ on_lb_policy_state_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_succe lb_policy_connectivity_watcher *w = arg; gpr_mu_lock (&w->chand->mu_config); - on_lb_policy_state_changed_locked (w, closure_list); + on_lb_policy_state_changed_locked (exec_ctx, w); gpr_mu_unlock (&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF (w->chand->master, "watch_lb_policy", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, w->chand->master, "watch_lb_policy"); gpr_free (w); } @@ -521,7 +521,7 @@ watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy grpc_closure_init (&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change (lb_policy, &w->state, &w->on_changed, closure_list); + grpc_lb_policy_notify_on_state_change (exec_ctx, lb_policy, &w->state, &w->on_changed); } static void @@ -541,10 +541,10 @@ cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) { GRPC_LB_POLICY_REF (lb_policy, "channel"); GRPC_LB_POLICY_REF (lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity (lb_policy, closure_list); + state = grpc_lb_policy_check_connectivity (exec_ctx, lb_policy); } - grpc_client_config_unref (chand->incoming_configuration, closure_list); + grpc_client_config_unref (exec_ctx, chand->incoming_configuration); } chand->incoming_configuration = NULL; @@ -554,7 +554,7 @@ cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */ ) { - grpc_closure_list_move (&chand->waiting_for_config_closures, closure_list); + grpc_closure_list_move (exec_ctx, &chand->waiting_for_config_closures); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { @@ -567,47 +567,47 @@ cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF (resolver, "channel-next"); - grpc_connectivity_state_set (&chand->state_tracker, state, "new_lb+resolver", closure_list); + grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, state, "new_lb+resolver"); if (lb_policy != NULL) { - watch_lb_policy (chand, lb_policy, state, closure_list); + watch_lb_policy (exec_ctx, chand, lb_policy, state); } gpr_mu_unlock (&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - grpc_resolver_next (resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); - GRPC_RESOLVER_UNREF (resolver, "channel-next", closure_list); + grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); + GRPC_RESOLVER_UNREF (exec_ctx, resolver, "channel-next"); } else { old_resolver = chand->resolver; chand->resolver = NULL; - grpc_connectivity_state_set (&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone", closure_list); + grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); gpr_mu_unlock (&chand->mu_config); if (old_resolver != NULL) { - grpc_resolver_shutdown (old_resolver, closure_list); - GRPC_RESOLVER_UNREF (old_resolver, "channel", closure_list); + grpc_resolver_shutdown (exec_ctx, old_resolver); + GRPC_RESOLVER_UNREF (exec_ctx, old_resolver, "channel"); } } if (exit_idle) { - grpc_lb_policy_exit_idle (lb_policy, closure_list); - GRPC_LB_POLICY_UNREF (lb_policy, "exit_idle", closure_list); + grpc_lb_policy_exit_idle (exec_ctx, lb_policy); + GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "exit_idle"); } if (old_lb_policy != NULL) { - grpc_lb_policy_shutdown (old_lb_policy, closure_list); - GRPC_LB_POLICY_UNREF (old_lb_policy, "channel", closure_list); + grpc_lb_policy_shutdown (exec_ctx, old_lb_policy); + GRPC_LB_POLICY_UNREF (exec_ctx, old_lb_policy, "channel"); } if (lb_policy != NULL) { - GRPC_LB_POLICY_UNREF (lb_policy, "config_change", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "config_change"); } - GRPC_CHANNEL_INTERNAL_UNREF (chand->master, "resolver", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, chand->master, "resolver"); } static void @@ -625,7 +625,7 @@ cc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, gr gpr_mu_lock (&chand->mu_config); if (op->on_connectivity_state_change != NULL) { - grpc_connectivity_state_notify_on_state_change (&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change, closure_list); + grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = NULL; op->connectivity_state = NULL; } @@ -641,13 +641,13 @@ cc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, gr if (op->disconnect && chand->resolver != NULL) { - grpc_connectivity_state_set (&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect", closure_list); + grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_lb_policy_shutdown (chand->lb_policy, closure_list); - GRPC_LB_POLICY_UNREF (chand->lb_policy, "channel", closure_list); + grpc_lb_policy_shutdown (exec_ctx, chand->lb_policy); + GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } } @@ -655,14 +655,14 @@ cc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, gr if (destroy_resolver) { - grpc_resolver_shutdown (destroy_resolver, closure_list); - GRPC_RESOLVER_UNREF (destroy_resolver, "channel", closure_list); + grpc_resolver_shutdown (exec_ctx, destroy_resolver); + GRPC_RESOLVER_UNREF (exec_ctx, destroy_resolver, "channel"); } if (lb_policy) { - grpc_lb_policy_broadcast (lb_policy, op, closure_list); - GRPC_LB_POLICY_UNREF (lb_policy, "broadcast", closure_list); + grpc_lb_policy_broadcast (exec_ctx, lb_policy, op); + GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "broadcast"); } } @@ -699,7 +699,7 @@ destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) case CALL_ACTIVE: subchannel_call = calld->subchannel_call; gpr_mu_unlock (&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF (subchannel_call, "client_channel", closure_list); + GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "client_channel"); break; case CALL_CREATED: case CALL_CANCELLED: @@ -743,14 +743,14 @@ destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) if (chand->resolver != NULL) { - grpc_resolver_shutdown (chand->resolver, closure_list); - GRPC_RESOLVER_UNREF (chand->resolver, "channel", closure_list); + grpc_resolver_shutdown (exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF (exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF (chand->lb_policy, "channel", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel"); } - grpc_connectivity_state_destroy (&chand->state_tracker, closure_list); + grpc_connectivity_state_destroy (exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy (&chand->pollset_set); gpr_mu_destroy (&chand->mu_config); } @@ -782,7 +782,7 @@ grpc_client_channel_set_resolver (grpc_exec_ctx * exec_ctx, grpc_channel_stack * { chand->started_resolving = 1; GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - grpc_resolver_next (resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); + grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); } gpr_mu_unlock (&chand->mu_config); } @@ -798,7 +798,7 @@ grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_cha { if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle (chand->lb_policy, closure_list); + grpc_lb_policy_exit_idle (exec_ctx, chand->lb_policy); } else { @@ -807,7 +807,7 @@ grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_cha { GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); chand->started_resolving = 1; - grpc_resolver_next (chand->resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); + grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); } } } @@ -820,7 +820,7 @@ grpc_client_channel_watch_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_cha { channel_data *chand = elem->channel_data; gpr_mu_lock (&chand->mu_config); - grpc_connectivity_state_notify_on_state_change (&chand->state_tracker, state, on_complete, closure_list); + grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, state, on_complete); gpr_mu_unlock (&chand->mu_config); } @@ -835,12 +835,12 @@ void grpc_client_channel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset (&chand->pollset_set, pollset, closure_list); + grpc_pollset_set_add_pollset (exec_ctx, &chand->pollset_set, pollset); } void grpc_client_channel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset (&chand->pollset_set, pollset, closure_list); + grpc_pollset_set_del_pollset (exec_ctx, &chand->pollset_set, pollset); } diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index bf3c5a9457..6acdc72075 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -283,7 +283,7 @@ compress_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element } /* pass control down the stack */ - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } /* Constructor for call_data */ diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 2fd80d2207..71d18849f2 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -73,14 +73,14 @@ con_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * ele GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP (GPR_INFO, elem, op); - grpc_transport_perform_stream_op (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), op, closure_list); + grpc_transport_perform_stream_op (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (exec_ctx, calld), op); } static void con_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op) { channel_data *chand = elem->channel_data; - grpc_transport_perform_op (chand->transport, op, closure_list); + grpc_transport_perform_op (exec_ctx, chand->transport, op); } /* Constructor for call_data */ @@ -92,7 +92,7 @@ init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void * int r; GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - r = grpc_transport_init_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), server_transport_data, initial_op, closure_list); + r = grpc_transport_init_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (exec_ctx, calld), server_transport_data, initial_op); GPR_ASSERT (r == 0); } @@ -103,7 +103,7 @@ destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), closure_list); + grpc_transport_destroy_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (exec_ctx, calld)); } /* Constructor for channel_data */ @@ -122,14 +122,14 @@ destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) { channel_data *cd = (channel_data *) elem->channel_data; GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy (cd->transport, closure_list); + grpc_transport_destroy (exec_ctx, cd->transport); } static char * con_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) { channel_data *chand = elem->channel_data; - return grpc_transport_get_peer (chand->transport, closure_list); + return grpc_transport_get_peer (exec_ctx, chand->transport); } const grpc_channel_filter grpc_connected_channel_filter = { diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index b2cc69bb4a..7c8d7c0240 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -116,7 +116,7 @@ hc_on_recv (grpc_exec_ctx * exec_ctx, void *user_data, int success) a.closure_list = closure_list; grpc_metadata_batch_filter (&op->data.metadata, client_recv_filter, &a); } - calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list); + calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success); } static grpc_mdelem * @@ -181,7 +181,7 @@ hc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_ { GRPC_CALL_LOG_OP (GPR_INFO, elem, op); hc_mutate_op (elem, op); - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } /* Constructor for call_data */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 62be4167f7..6c5902e426 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -219,11 +219,11 @@ hs_on_recv (grpc_exec_ctx * exec_ctx, void *user_data, int success) } /* Error this call out */ success = 0; - grpc_call_element_send_cancel (elem, closure_list); + grpc_call_element_send_cancel (exec_ctx, elem); } } } - calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list); + calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success); } static void @@ -264,7 +264,7 @@ hs_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_ { GRPC_CALL_LOG_OP (GPR_INFO, elem, op); hs_mutate_op (elem, op); - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } /* Constructor for call_data */ diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 3a3f1254bd..7e566642f9 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -74,7 +74,7 @@ noop_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * el noop_mutate_op (elem, op); /* pass control down the stack */ - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } /* Constructor for call_data */ diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index ceb52e93e5..fc6448201f 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -63,7 +63,7 @@ grpc_client_config_unref (grpc_exec_ctx * exec_ctx, grpc_client_config * c) { if (gpr_unref (&c->refs)) { - GRPC_LB_POLICY_UNREF (c->lb_policy, "client_config", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, c->lb_policy, "client_config"); gpr_free (c); } } diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 823b82af5c..209c3f1767 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -42,17 +42,17 @@ grpc_connector_ref (grpc_connector * connector) void grpc_connector_unref (grpc_exec_ctx * exec_ctx, grpc_connector * connector) { - connector->vtable->unref (connector, closure_list); + connector->vtable->unref (exec_ctx, connector); } void grpc_connector_connect (grpc_exec_ctx * exec_ctx, grpc_connector * connector, const grpc_connect_in_args * in_args, grpc_connect_out_args * out_args, grpc_closure * notify) { - connector->vtable->connect (connector, in_args, out_args, notify, closure_list); + connector->vtable->connect (exec_ctx, connector, in_args, out_args, notify); } void grpc_connector_shutdown (grpc_exec_ctx * exec_ctx, grpc_connector * connector) { - connector->vtable->shutdown (connector, closure_list); + connector->vtable->shutdown (exec_ctx, connector); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 02aaac1dc3..c07cc81b27 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -84,7 +84,7 @@ del_interested_parties_locked (grpc_exec_ctx * exec_ctx, pick_first_lb_policy * pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list); + grpc_subchannel_del_interested_party (exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); } } @@ -94,7 +94,7 @@ add_interested_parties_locked (grpc_exec_ctx * exec_ctx, pick_first_lb_policy * pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list); + grpc_subchannel_add_interested_party (exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); } } @@ -106,9 +106,9 @@ pf_destroy (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) GPR_ASSERT (p->pending_picks == NULL); for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF (p->subchannels[i], "pick_first", closure_list); + GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[i], "pick_first"); } - grpc_connectivity_state_destroy (&p->state_tracker, closure_list); + grpc_connectivity_state_destroy (exec_ctx, &p->state_tracker); gpr_free (p->subchannels); gpr_mu_destroy (&p->mu); gpr_free (p); @@ -120,11 +120,11 @@ pf_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) pick_first_lb_policy *p = (pick_first_lb_policy *) pol; pending_pick *pp; gpr_mu_lock (&p->mu); - del_interested_parties_locked (p, closure_list); + del_interested_parties_locked (exec_ctx, p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); gpr_mu_unlock (&p->mu); while (pp != NULL) { @@ -143,7 +143,7 @@ start_picking (grpc_exec_ctx * exec_ctx, pick_first_lb_policy * p) p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; GRPC_LB_POLICY_REF (&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); + grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); } void @@ -153,7 +153,7 @@ pf_exit_idle (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) gpr_mu_lock (&p->mu); if (!p->started_picking) { - start_picking (p, closure_list); + start_picking (exec_ctx, p); } gpr_mu_unlock (&p->mu); } @@ -174,9 +174,9 @@ pf_pick (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_pollset * pollset, { if (!p->started_picking) { - start_picking (p, closure_list); + start_picking (exec_ctx, p); } - grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pollset, closure_list); + grpc_subchannel_add_interested_party (exec_ctx, p->subchannels[p->checking_subchannel], pollset); pp = gpr_malloc (sizeof (*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -198,19 +198,19 @@ pf_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) if (p->shutdown) { gpr_mu_unlock (&p->mu); - GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "pick_first_connectivity"); return; } else if (p->selected != NULL) { - grpc_connectivity_state_set (&p->state_tracker, p->checking_connectivity, "selected_changed", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list); + grpc_subchannel_notify_on_state_change (exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); } else { - GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "pick_first_connectivity"); } } else @@ -219,27 +219,27 @@ pf_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); p->selected = p->subchannels[p->checking_subchannel]; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party (p->selected, pp->pollset, closure_list); + grpc_subchannel_del_interested_party (exec_ctx, p->selected, pp->pollset); grpc_closure_list_add (closure_list, pp->on_complete, 1); gpr_free (pp); } - grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list); + grpc_subchannel_notify_on_state_change (exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure", closure_list); - del_interested_parties_locked (p, closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); + del_interested_parties_locked (exec_ctx, p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); - add_interested_parties_locked (p, closure_list); + add_interested_parties_locked (exec_ctx, p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); + grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); } else { @@ -248,17 +248,17 @@ pf_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_CONNECTING, "connecting_changed", closure_list); - grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, "connecting_changed"); + grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked (p, closure_list); + del_interested_parties_locked (exec_ctx, p); GPR_SWAP (grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF (p->subchannels[p->num_subchannels], "pick_first", closure_list); + GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels"); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -266,14 +266,14 @@ pf_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) grpc_closure_list_add (closure_list, pp->on_complete, 1); gpr_free (pp); } - GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "pick_first_connectivity"); } else { - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed"); p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); - add_interested_parties_locked (p, closure_list); + add_interested_parties_locked (exec_ctx, p); goto loop; } } @@ -302,8 +302,8 @@ pf_broadcast (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_transport_op for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op (subchannels[i], op, closure_list); - GRPC_SUBCHANNEL_UNREF (subchannels[i], "pf_broadcast", closure_list); + grpc_subchannel_process_transport_op (exec_ctx, subchannels[i], op); + GRPC_SUBCHANNEL_UNREF (exec_ctx, subchannels[i], "pf_broadcast"); } gpr_free (subchannels); } @@ -324,7 +324,7 @@ pf_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_ { pick_first_lb_policy *p = (pick_first_lb_policy *) pol; gpr_mu_lock (&p->mu); - grpc_connectivity_state_notify_on_state_change (&p->state_tracker, current, notify, closure_list); + grpc_connectivity_state_notify_on_state_change (exec_ctx, &p->state_tracker, current, notify); gpr_mu_unlock (&p->mu); } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 959d9e1c32..b745628bbc 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -233,7 +233,7 @@ del_interested_parties_locked (grpc_exec_ctx * exec_ctx, round_robin_lb_policy * pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party (p->subchannels[subchannel_idx], pp->pollset, closure_list); + grpc_subchannel_del_interested_party (exec_ctx, p->subchannels[subchannel_idx], pp->pollset); } } @@ -245,16 +245,16 @@ rr_destroy (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) ready_list *elem; for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked (p, i, closure_list); + del_interested_parties_locked (exec_ctx, p, i); } for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF (p->subchannels[i], "round_robin", closure_list); + GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[i], "round_robin"); } gpr_free (p->connectivity_changed_cbs); gpr_free (p->subchannel_connectivity); - grpc_connectivity_state_destroy (&p->state_tracker, closure_list); + grpc_connectivity_state_destroy (exec_ctx, &p->state_tracker); gpr_free (p->subchannels); gpr_mu_destroy (&p->mu); @@ -284,7 +284,7 @@ rr_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked (p, i, closure_list); + del_interested_parties_locked (exec_ctx, p, i); } p->shutdown = 1; @@ -295,7 +295,7 @@ rr_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) grpc_closure_list_add (closure_list, pp->on_complete, 0); gpr_free (pp); } - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); gpr_mu_unlock (&p->mu); } @@ -308,7 +308,7 @@ start_picking (grpc_exec_ctx * exec_ctx, round_robin_lb_policy * p) for (i = 0; i < p->num_subchannels; i++) { p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change (p->subchannels[i], &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i], closure_list); + grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[i], &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i]); GRPC_LB_POLICY_REF (&p->base, "round_robin_connectivity"); } } @@ -320,7 +320,7 @@ rr_exit_idle (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) gpr_mu_lock (&p->mu); if (!p->started_picking) { - start_picking (p, closure_list); + start_picking (exec_ctx, p); } gpr_mu_unlock (&p->mu); } @@ -343,17 +343,17 @@ rr_pick (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_pollset * pollset, } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked (p); - on_complete->cb (on_complete->cb_arg, 1, closure_list); + on_complete->cb (exec_ctx, on_complete->cb_arg, 1); } else { if (!p->started_picking) { - start_picking (p, closure_list); + start_picking (exec_ctx, p); } for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party (p->subchannels[i], pollset, closure_list); + grpc_subchannel_add_interested_party (exec_ctx, p->subchannels[i], pollset); } pp = gpr_malloc (sizeof (*pp)); pp->next = p->pending_picks; @@ -393,7 +393,7 @@ rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) switch (*this_connectivity) { case GRPC_CHANNEL_READY: - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ p->subchannel_index_to_readylist_node[this_idx] = add_connected_sc_locked (p, p->subchannels[this_idx]); @@ -415,21 +415,21 @@ rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) { gpr_log (GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party (selected->subchannel, pp->pollset, closure_list); + grpc_subchannel_del_interested_party (exec_ctx, selected->subchannel, pp->pollset); grpc_closure_list_add (closure_list, pp->on_complete, 1); gpr_free (pp); } - grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set (&p->state_tracker, *this_connectivity, "connecting_changed", closure_list); - grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, *this_connectivity, "connecting_changed"); + grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked (p, this_idx, closure_list); + del_interested_parties_locked (exec_ctx, p, this_idx); /* renew state notification */ - grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); /* remove from ready list if still present */ if (p->subchannel_index_to_readylist_node[this_idx] != NULL) @@ -437,10 +437,10 @@ rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); p->subchannel_index_to_readylist_node[this_idx] = NULL; } - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked (p, this_idx, closure_list); + del_interested_parties_locked (exec_ctx, p, this_idx); if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); @@ -449,11 +449,11 @@ rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) GPR_SWAP (grpc_subchannel *, p->subchannels[this_idx], p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF (p->subchannels[p->num_subchannels], "round_robin", closure_list); + GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[p->num_subchannels], "round_robin"); if (p->num_subchannels == 0) { - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels"); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -465,7 +465,7 @@ rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) } else { - grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed", closure_list); + grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed"); } } /* switch */ } /* !unref */ @@ -474,7 +474,7 @@ rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) if (unref) { - GRPC_LB_POLICY_UNREF (&p->base, "round_robin_connectivity", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "round_robin_connectivity"); } } @@ -498,8 +498,8 @@ rr_broadcast (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_transport_op for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op (subchannels[i], op, closure_list); - GRPC_SUBCHANNEL_UNREF (subchannels[i], "rr_broadcast", closure_list); + grpc_subchannel_process_transport_op (exec_ctx, subchannels[i], op); + GRPC_SUBCHANNEL_UNREF (exec_ctx, subchannels[i], "rr_broadcast"); } gpr_free (subchannels); } @@ -520,7 +520,7 @@ rr_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_ { round_robin_lb_policy *p = (round_robin_lb_policy *) pol; gpr_mu_lock (&p->mu); - grpc_connectivity_state_notify_on_state_change (&p->state_tracker, current, notify, closure_list); + grpc_connectivity_state_notify_on_state_change (exec_ctx, &p->state_tracker, current, notify); gpr_mu_unlock (&p->mu); } diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 06ac41ff56..6a9cf66887 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -65,42 +65,42 @@ grpc_lb_policy_unref (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) #endif if (gpr_unref (&policy->refs)) { - policy->vtable->destroy (policy, closure_list); + policy->vtable->destroy (exec_ctx, policy); } } void grpc_lb_policy_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) { - policy->vtable->shutdown (policy, closure_list); + policy->vtable->shutdown (exec_ctx, policy); } void grpc_lb_policy_pick (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete) { - policy->vtable->pick (policy, pollset, initial_metadata, target, on_complete, closure_list); + policy->vtable->pick (exec_ctx, policy, pollset, initial_metadata, target, on_complete); } void grpc_lb_policy_broadcast (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_transport_op * op) { - policy->vtable->broadcast (policy, op, closure_list); + policy->vtable->broadcast (exec_ctx, policy, op); } void grpc_lb_policy_exit_idle (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) { - policy->vtable->exit_idle (policy, closure_list); + policy->vtable->exit_idle (exec_ctx, policy); } void grpc_lb_policy_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_connectivity_state * state, grpc_closure * closure) { - policy->vtable->notify_on_state_change (policy, state, closure, closure_list); + policy->vtable->notify_on_state_change (exec_ctx, policy, state, closure); } grpc_connectivity_state grpc_lb_policy_check_connectivity (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) { - return policy->vtable->check_connectivity (policy, closure_list); + return policy->vtable->check_connectivity (exec_ctx, policy); } diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index cb5a6a5333..a55daad5a2 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -65,24 +65,24 @@ grpc_resolver_unref (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver) #endif if (gpr_unref (&resolver->refs)) { - resolver->vtable->destroy (resolver, closure_list); + resolver->vtable->destroy (exec_ctx, resolver); } } void grpc_resolver_shutdown (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver) { - resolver->vtable->shutdown (resolver, closure_list); + resolver->vtable->shutdown (exec_ctx, resolver); } void grpc_resolver_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, struct sockaddr *failing_address, int failing_address_len) { - resolver->vtable->channel_saw_error (resolver, failing_address, failing_address_len, closure_list); + resolver->vtable->channel_saw_error (exec_ctx, resolver, failing_address, failing_address_len); } void grpc_resolver_next (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete) { - resolver->vtable->next (resolver, target_config, on_complete, closure_list); + resolver->vtable->next (exec_ctx, resolver, target_config, on_complete); } diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 66430c43cb..781f1a9e23 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -128,7 +128,7 @@ dns_next (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_config } else { - dns_maybe_finish_next_locked (r, closure_list); + dns_maybe_finish_next_locked (exec_ctx, r); } gpr_mu_unlock (&r->mu); } @@ -152,14 +152,14 @@ dns_on_resolved (grpc_exec_ctx * exec_ctx, void *arg, grpc_resolved_addresses * memset (&args, 0, sizeof (args)); args.addr = (struct sockaddr *) (addresses->addrs[i].addr); args.addr_len = (size_t) addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel (r->subchannel_factory, &args, closure_list); + subchannels[i] = grpc_subchannel_factory_create_subchannel (exec_ctx, r->subchannel_factory, &args); } memset (&lb_policy_args, 0, sizeof (lb_policy_args)); lb_policy_args.subchannels = subchannels; lb_policy_args.num_subchannels = addresses->naddrs; lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy (config, lb_policy); - GRPC_LB_POLICY_UNREF (lb_policy, "construction", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "construction"); grpc_resolved_addresses_destroy (addresses); gpr_free (subchannels); } @@ -168,14 +168,14 @@ dns_on_resolved (grpc_exec_ctx * exec_ctx, void *arg, grpc_resolved_addresses * r->resolving = 0; if (r->resolved_config) { - grpc_client_config_unref (r->resolved_config, closure_list); + grpc_client_config_unref (exec_ctx, r->resolved_config); } r->resolved_config = config; r->resolved_version++; - dns_maybe_finish_next_locked (r, closure_list); + dns_maybe_finish_next_locked (exec_ctx, r); gpr_mu_unlock (&r->mu); - GRPC_RESOLVER_UNREF (&r->base, "dns-resolving", closure_list); + GRPC_RESOLVER_UNREF (exec_ctx, &r->base, "dns-resolving"); } static void @@ -210,9 +210,9 @@ dns_destroy (grpc_exec_ctx * exec_ctx, grpc_resolver * gr) gpr_mu_destroy (&r->mu); if (r->resolved_config) { - grpc_client_config_unref (r->resolved_config, closure_list); + grpc_client_config_unref (exec_ctx, r->resolved_config); } - grpc_subchannel_factory_unref (r->subchannel_factory, closure_list); + grpc_subchannel_factory_unref (exec_ctx, r->subchannel_factory); gpr_free (r->name); gpr_free (r->default_port); gpr_free (r->lb_policy_name); diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 2887cffcd4..1b2eac997d 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -117,7 +117,7 @@ sockaddr_next (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_c GPR_ASSERT (!r->next_completion); r->next_completion = on_complete; r->target_config = target_config; - sockaddr_maybe_finish_next_locked (r, closure_list); + sockaddr_maybe_finish_next_locked (exec_ctx, r); gpr_mu_unlock (&r->mu); } @@ -140,7 +140,7 @@ sockaddr_maybe_finish_next_locked (grpc_exec_ctx * exec_ctx, sockaddr_resolver * memset (&args, 0, sizeof (args)); args.addr = (struct sockaddr *) &r->addrs[i]; args.addr_len = r->addrs_len[i]; - subchannels[i] = grpc_subchannel_factory_create_subchannel (r->subchannel_factory, &args, closure_list); + subchannels[i] = grpc_subchannel_factory_create_subchannel (exec_ctx, r->subchannel_factory, &args); } memset (&lb_policy_args, 0, sizeof (lb_policy_args)); lb_policy_args.subchannels = subchannels; @@ -148,7 +148,7 @@ sockaddr_maybe_finish_next_locked (grpc_exec_ctx * exec_ctx, sockaddr_resolver * lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); gpr_free (subchannels); grpc_client_config_set_lb_policy (cfg, lb_policy); - GRPC_LB_POLICY_UNREF (lb_policy, "sockaddr", closure_list); + GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "sockaddr"); r->published = 1; *r->target_config = cfg; grpc_closure_list_add (closure_list, r->next_completion, 1); @@ -161,7 +161,7 @@ sockaddr_destroy (grpc_exec_ctx * exec_ctx, grpc_resolver * gr) { sockaddr_resolver *r = (sockaddr_resolver *) gr; gpr_mu_destroy (&r->mu); - grpc_subchannel_factory_unref (r->subchannel_factory, closure_list); + grpc_subchannel_factory_unref (exec_ctx, r->subchannel_factory); gpr_free (r->addrs); gpr_free (r->addrs_len); gpr_free (r->lb_policy_name); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 1ab348da83..3db2fae593 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -199,7 +199,7 @@ subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) static void connection_destroy (grpc_exec_ctx * exec_ctx, connection * c) { GPR_ASSERT (c->refs == 0); - grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CONNECTION (c), closure_list); + grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CONNECTION (exec_ctx, c)); gpr_free (c); } @@ -222,7 +222,7 @@ connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_S } if (--c->refs == 0 && c->subchannel->active != c) { - connection_destroy (c, closure_list); + connection_destroy (exec_ctx, c); } return destroy; } @@ -261,7 +261,7 @@ grpc_subchannel_unref (grpc_subchannel * c, grpc_closure_list * closure_list GRP destroy = subchannel_unref_locked (c REF_PASS_ARGS); gpr_mu_unlock (&c->mu); if (destroy) - subchannel_destroy (c, closure_list); + subchannel_destroy (exec_ctx, c); } static void @@ -269,27 +269,27 @@ subchannel_destroy (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) { if (c->active != NULL) { - connection_destroy (c->active, closure_list); + connection_destroy (exec_ctx, c->active); } gpr_free (c->filters); grpc_channel_args_destroy (c->args); gpr_free (c->addr); grpc_mdctx_unref (c->mdctx); - grpc_connectivity_state_destroy (&c->state_tracker, closure_list); - grpc_connector_unref (c->connector, closure_list); + grpc_connectivity_state_destroy (exec_ctx, &c->state_tracker); + grpc_connector_unref (exec_ctx, c->connector); gpr_free (c); } void grpc_subchannel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset) { - grpc_pollset_set_add_pollset (c->pollset_set, pollset, closure_list); + grpc_pollset_set_add_pollset (exec_ctx, c->pollset_set, pollset); } void grpc_subchannel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset) { - grpc_pollset_set_del_pollset (c->pollset_set, pollset, closure_list); + grpc_pollset_set_del_pollset (exec_ctx, c->pollset_set, pollset); } static gpr_uint32 @@ -336,7 +336,7 @@ continue_connect (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) args.deadline = compute_connect_deadline (c); args.channel_args = c->args; - grpc_connector_connect (c->connector, &args, &c->connecting_result, &c->connected, closure_list); + grpc_connector_connect (exec_ctx, c->connector, &args, &c->connecting_result, &c->connected); } static void @@ -344,16 +344,16 @@ start_connect (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) { c->backoff_delta = gpr_time_from_seconds (GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta); - continue_connect (c, closure_list); + continue_connect (exec_ctx, c); } static void continue_creating_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; - grpc_subchannel_del_interested_party (w4c->subchannel, w4c->pollset, closure_list); - grpc_subchannel_create_call (w4c->subchannel, w4c->pollset, w4c->target, w4c->notify, closure_list); - GRPC_SUBCHANNEL_UNREF (w4c->subchannel, "waiting_for_connect", closure_list); + grpc_subchannel_del_interested_party (exec_ctx, w4c->subchannel, w4c->pollset); + grpc_subchannel_create_call (exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); + GRPC_SUBCHANNEL_UNREF (exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free (w4c); } @@ -368,8 +368,8 @@ grpc_subchannel_create_call (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc CONNECTION_REF_LOCKED (con, "call"); gpr_mu_unlock (&c->mu); - *target = create_call (con, closure_list); - notify->cb (notify->cb_arg, 1, closure_list); + *target = create_call (exec_ctx, con); + notify->cb (exec_ctx, notify->cb_arg, 1); } else { @@ -383,17 +383,17 @@ grpc_subchannel_create_call (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc SUBCHANNEL_REF_LOCKED (c, "waiting_for_connect"); grpc_closure_init (&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; - grpc_subchannel_add_interested_party (c, pollset, closure_list); + grpc_subchannel_add_interested_party (exec_ctx, c, pollset); if (!c->connecting) { c->connecting = 1; - connectivity_state_changed_locked (c, "create_call", closure_list); + connectivity_state_changed_locked (exec_ctx, c, "create_call"); /* released by connection */ SUBCHANNEL_REF_LOCKED (c, "connecting"); GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); gpr_mu_unlock (&c->mu); - start_connect (c, closure_list); + start_connect (exec_ctx, c); } else { @@ -417,20 +417,20 @@ grpc_subchannel_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_subchanne { int do_connect = 0; gpr_mu_lock (&c->mu); - if (grpc_connectivity_state_notify_on_state_change (&c->state_tracker, state, notify, closure_list)) + if (grpc_connectivity_state_notify_on_state_change (exec_ctx, &c->state_tracker, state, notify)) { do_connect = 1; c->connecting = 1; /* released by connection */ SUBCHANNEL_REF_LOCKED (c, "connecting"); GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); - connectivity_state_changed_locked (c, "state_change", closure_list); + connectivity_state_changed_locked (exec_ctx, c, "state_change"); } gpr_mu_unlock (&c->mu); if (do_connect) { - start_connect (c, closure_list); + start_connect (exec_ctx, c); } } @@ -449,7 +449,7 @@ grpc_subchannel_process_transport_op (grpc_exec_ctx * exec_ctx, grpc_subchannel if (op->disconnect) { c->disconnected = 1; - connectivity_state_changed_locked (c, "disconnect", closure_list); + connectivity_state_changed_locked (exec_ctx, c, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; @@ -461,25 +461,25 @@ grpc_subchannel_process_transport_op (grpc_exec_ctx * exec_ctx, grpc_subchannel { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION (con); grpc_channel_element *top_elem = grpc_channel_stack_element (channel_stack, 0); - top_elem->filter->start_transport_op (top_elem, op, closure_list); + top_elem->filter->start_transport_op (exec_ctx, top_elem, op); gpr_mu_lock (&c->mu); - destroy = CONNECTION_UNREF_LOCKED (con, "transport-op", closure_list); + destroy = CONNECTION_UNREF_LOCKED (exec_ctx, con, "transport-op"); gpr_mu_unlock (&c->mu); if (destroy) { - subchannel_destroy (destroy, closure_list); + subchannel_destroy (exec_ctx, destroy); } } if (cancel_alarm) { - grpc_alarm_cancel (&c->alarm, closure_list); + grpc_alarm_cancel (exec_ctx, &c->alarm); } if (op->disconnect) { - grpc_connector_shutdown (c->connector, closure_list); + grpc_connector_shutdown (exec_ctx, c->connector); } } @@ -513,7 +513,7 @@ on_state_changed (grpc_exec_ctx * exec_ctx, void *p, int iomgr_success) op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); - elem->filter->start_transport_op (elem, &op, closure_list); + elem->filter->start_transport_op (exec_ctx, elem, &op); /* early out */ gpr_mu_unlock (mu); return; @@ -525,22 +525,22 @@ on_state_changed (grpc_exec_ctx * exec_ctx, void *p, int iomgr_success) destroy_connection = sw->subchannel->active; } sw->subchannel->active = NULL; - grpc_connectivity_state_set (&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, "connection_failed", closure_list); + grpc_connectivity_state_set (exec_ctx, &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, "connection_failed"); break; } done: - connectivity_state_changed_locked (c, "transport_state_changed", closure_list); + connectivity_state_changed_locked (exec_ctx, c, "transport_state_changed"); destroy = SUBCHANNEL_UNREF_LOCKED (c, "state_watcher"); gpr_free (sw); gpr_mu_unlock (mu); if (destroy) { - subchannel_destroy (c, closure_list); + subchannel_destroy (exec_ctx, c); } if (destroy_connection != NULL) { - connection_destroy (destroy_connection, closure_list); + connection_destroy (exec_ctx, destroy_connection); } } @@ -571,7 +571,7 @@ publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) stk = (grpc_channel_stack *) (con + 1); con->refs = 0; con->subchannel = c; - grpc_channel_stack_init (filters, num_filters, c->master, c->args, c->mdctx, stk, closure_list); + grpc_channel_stack_init (exec_ctx, filters, num_filters, c->master, c->args, c->mdctx, stk); grpc_connected_channel_bind_transport (stk, c->connecting_result.transport); gpr_free (c->connecting_result.filters); memset (&c->connecting_result, 0, sizeof (c->connecting_result)); @@ -589,9 +589,9 @@ publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) gpr_mu_unlock (&c->mu); gpr_free (sw); gpr_free (filters); - grpc_channel_stack_destroy (stk, closure_list); - GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); - GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list); + grpc_channel_stack_destroy (exec_ctx, stk); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting"); + GRPC_SUBCHANNEL_UNREF (exec_ctx, c, "connecting"); return; } @@ -612,13 +612,13 @@ publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) op.on_connectivity_state_change = &sw->closure; op.bind_pollset_set = c->pollset_set; SUBCHANNEL_REF_LOCKED (c, "state_watcher"); - GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting"); GPR_ASSERT (!SUBCHANNEL_UNREF_LOCKED (c, "connecting")); elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); - elem->filter->start_transport_op (elem, &op, closure_list); + elem->filter->start_transport_op (exec_ctx, elem, &op); /* signal completion */ - connectivity_state_changed_locked (c, "connected", closure_list); + connectivity_state_changed_locked (exec_ctx, c, "connected"); w4c = c->waiting; c->waiting = NULL; @@ -635,7 +635,7 @@ publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) if (destroy_connection != NULL) { - connection_destroy (destroy_connection, closure_list); + connection_destroy (exec_ctx, destroy_connection); } } @@ -677,17 +677,17 @@ on_alarm (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) { iomgr_success = 0; } - connectivity_state_changed_locked (c, "alarm", closure_list); + connectivity_state_changed_locked (exec_ctx, c, "alarm"); gpr_mu_unlock (&c->mu); if (iomgr_success) { update_reconnect_parameters (c); - continue_connect (c, closure_list); + continue_connect (exec_ctx, c); } else { - GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); - GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting"); + GRPC_SUBCHANNEL_UNREF (exec_ctx, c, "connecting"); } } @@ -697,7 +697,7 @@ subchannel_connected (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) grpc_subchannel *c = arg; if (c->connecting_result.transport != NULL) { - publish_transport (c, closure_list); + publish_transport (exec_ctx, c); } else { @@ -705,8 +705,8 @@ subchannel_connected (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) gpr_mu_lock (&c->mu); GPR_ASSERT (!c->have_alarm); c->have_alarm = 1; - connectivity_state_changed_locked (c, "connect_failed", closure_list); - grpc_alarm_init (&c->alarm, c->next_attempt, on_alarm, c, now, closure_list); + connectivity_state_changed_locked (exec_ctx, c, "connect_failed"); + grpc_alarm_init (exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock (&c->mu); } } @@ -747,7 +747,7 @@ static void connectivity_state_changed_locked (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, const char *reason) { grpc_connectivity_state current = compute_connectivity_locked (c); - grpc_connectivity_state_set (&c->state_tracker, current, reason, closure_list); + grpc_connectivity_state_set (exec_ctx, &c->state_tracker, current, reason); } /* @@ -767,14 +767,14 @@ grpc_subchannel_call_unref (grpc_subchannel_call * c, grpc_closure_list * closur { gpr_mu *mu = &c->connection->subchannel->mu; grpc_subchannel *destroy; - grpc_call_stack_destroy (SUBCHANNEL_CALL_TO_CALL_STACK (c), closure_list); + grpc_call_stack_destroy (SUBCHANNEL_CALL_TO_CALL_STACK (exec_ctx, c)); gpr_mu_lock (mu); - destroy = CONNECTION_UNREF_LOCKED (c->connection, "call", closure_list); + destroy = CONNECTION_UNREF_LOCKED (exec_ctx, c->connection, "call"); gpr_mu_unlock (mu); gpr_free (c); if (destroy != NULL) { - subchannel_destroy (destroy, closure_list); + subchannel_destroy (exec_ctx, destroy); } } } @@ -784,7 +784,7 @@ grpc_subchannel_call_get_peer (grpc_exec_ctx * exec_ctx, grpc_subchannel_call * { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); - return top_elem->filter->get_peer (top_elem, closure_list); + return top_elem->filter->get_peer (exec_ctx, top_elem); } void @@ -792,7 +792,7 @@ grpc_subchannel_call_process_op (grpc_exec_ctx * exec_ctx, grpc_subchannel_call { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); - top_elem->filter->start_transport_stream_op (top_elem, op, closure_list); + top_elem->filter->start_transport_stream_op (exec_ctx, top_elem, op); } static grpc_subchannel_call * @@ -803,6 +803,6 @@ create_call (grpc_exec_ctx * exec_ctx, connection * con) grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK (call); call->connection = con; gpr_ref_init (&call->refs, 1); - grpc_call_stack_init (chanstk, NULL, NULL, callstk, closure_list); + grpc_call_stack_init (exec_ctx, chanstk, NULL, NULL, callstk); return call; } diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c index 82b851c2f7..f60cd02421 100644 --- a/src/core/client_config/subchannel_factory.c +++ b/src/core/client_config/subchannel_factory.c @@ -42,11 +42,11 @@ grpc_subchannel_factory_ref (grpc_subchannel_factory * factory) void grpc_subchannel_factory_unref (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory) { - factory->vtable->unref (factory, closure_list); + factory->vtable->unref (exec_ctx, factory); } grpc_subchannel * grpc_subchannel_factory_create_subchannel (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory, grpc_subchannel_args * args) { - return factory->vtable->create_subchannel (factory, args, closure_list); + return factory->vtable->create_subchannel (exec_ctx, factory, args); } diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c index 59af175da1..442862a4b8 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c @@ -56,7 +56,7 @@ merge_args_factory_unref (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * sc merge_args_factory *f = (merge_args_factory *) scf; if (gpr_unref (&f->refs)) { - grpc_subchannel_factory_unref (f->wrapped, closure_list); + grpc_subchannel_factory_unref (exec_ctx, f->wrapped); grpc_channel_args_destroy (f->merge_args); gpr_free (f); } @@ -69,7 +69,7 @@ merge_args_factory_create_subchannel (grpc_exec_ctx * exec_ctx, grpc_subchannel_ grpc_channel_args *final_args = grpc_channel_args_merge (args->args, f->merge_args); grpc_subchannel *s; args->args = final_args; - s = grpc_subchannel_factory_create_subchannel (f->wrapped, args, closure_list); + s = grpc_subchannel_factory_create_subchannel (exec_ctx, f->wrapped, args); grpc_channel_args_destroy (final_args); return s; } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index fac80a44a1..b9dc350df9 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -75,7 +75,7 @@ static grpc_httpcli_post_override g_post_override = NULL; static void plaintext_handshake (void *arg, grpc_endpoint * endpoint, const char *host, void (*on_done) (grpc_exec_ctx * exec_ctx, void *arg, grpc_endpoint * endpoint, grpc_closure_list * closure_list)) { - on_done (arg, endpoint, closure_list); + on_done (exec_ctx, arg, endpoint); } const grpc_httpcli_handshaker grpc_httpcli_plaintext = { "http", @@ -99,8 +99,8 @@ static void next_address (grpc_exec_ctx * exec_ctx, internal_request * req); static void finish (grpc_exec_ctx * exec_ctx, internal_request * req, int success) { - grpc_pollset_set_del_pollset (&req->context->pollset_set, req->pollset, closure_list); - req->on_response (req->user_data, success ? &req->parser.r : NULL, closure_list); + grpc_pollset_set_del_pollset (exec_ctx, &req->context->pollset_set, req->pollset); + req->on_response (exec_ctx, req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy (&req->parser); if (req->addresses != NULL) { @@ -108,7 +108,7 @@ finish (grpc_exec_ctx * exec_ctx, internal_request * req, int success) } if (req->ep != NULL) { - grpc_endpoint_destroy (req->ep, closure_list); + grpc_endpoint_destroy (exec_ctx, req->ep); } gpr_slice_unref (req->request_text); gpr_free (req->host); @@ -123,7 +123,7 @@ static void on_read (grpc_exec_ctx * exec_ctx, void *user_data, int success); static void do_read (grpc_exec_ctx * exec_ctx, internal_request * req) { - grpc_endpoint_read (req->ep, &req->incoming, &req->on_read, closure_list); + grpc_endpoint_read (exec_ctx, req->ep, &req->incoming, &req->on_read); } static void @@ -139,7 +139,7 @@ on_read (grpc_exec_ctx * exec_ctx, void *user_data, int success) req->have_read_byte = 1; if (!grpc_httpcli_parser_parse (&req->parser, req->incoming.slices[i])) { - finish (req, 0, closure_list); + finish (exec_ctx, req, 0); return; } } @@ -147,22 +147,22 @@ on_read (grpc_exec_ctx * exec_ctx, void *user_data, int success) if (success) { - do_read (req, closure_list); + do_read (exec_ctx, req); } else if (!req->have_read_byte) { - next_address (req, closure_list); + next_address (exec_ctx, req); } else { - finish (req, grpc_httpcli_parser_eof (&req->parser), closure_list); + finish (req, grpc_httpcli_parser_eof (exec_ctx, &req->parser)); } } static void on_written (grpc_exec_ctx * exec_ctx, internal_request * req) { - do_read (req, closure_list); + do_read (exec_ctx, req); } static void @@ -171,11 +171,11 @@ done_write (grpc_exec_ctx * exec_ctx, void *arg, int success) internal_request *req = arg; if (success) { - on_written (req, closure_list); + on_written (exec_ctx, req); } else { - next_address (req, closure_list); + next_address (exec_ctx, req); } } @@ -184,7 +184,7 @@ start_write (grpc_exec_ctx * exec_ctx, internal_request * req) { gpr_slice_ref (req->request_text); gpr_slice_buffer_add (&req->outgoing, req->request_text); - grpc_endpoint_write (req->ep, &req->outgoing, &req->done_write, closure_list); + grpc_endpoint_write (exec_ctx, req->ep, &req->outgoing, &req->done_write); } static void @@ -194,12 +194,12 @@ on_handshake_done (grpc_exec_ctx * exec_ctx, void *arg, grpc_endpoint * ep) if (!ep) { - next_address (req, closure_list); + next_address (exec_ctx, req); return; } req->ep = ep; - start_write (req, closure_list); + start_write (exec_ctx, req); } static void @@ -209,10 +209,10 @@ on_connected (grpc_exec_ctx * exec_ctx, void *arg, int success) if (!req->ep) { - next_address (req, closure_list); + next_address (exec_ctx, req); return; } - req->handshaker->handshake (req, req->ep, req->host, on_handshake_done, closure_list); + req->handshaker->handshake (exec_ctx, req, req->ep, req->host, on_handshake_done); } static void @@ -221,12 +221,12 @@ next_address (grpc_exec_ctx * exec_ctx, internal_request * req) grpc_resolved_address *addr; if (req->next_address == req->addresses->naddrs) { - finish (req, 0, closure_list); + finish (exec_ctx, req, 0); return; } addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init (&req->connected, on_connected, req); - grpc_tcp_client_connect (&req->connected, &req->ep, &req->context->pollset_set, (struct sockaddr *) &addr->addr, addr->len, req->deadline, closure_list); + grpc_tcp_client_connect (&req->connected, &req->ep, &req->context->pollset_set, (exec_ctx, struct sockaddr *) &addr->addr, addr->len, req->deadline); } static void @@ -235,12 +235,12 @@ on_resolved (grpc_exec_ctx * exec_ctx, void *arg, grpc_resolved_addresses * addr internal_request *req = arg; if (!addresses) { - finish (req, 0, closure_list); + finish (exec_ctx, req, 0); return; } req->addresses = addresses; req->next_address = 0; - next_address (req, closure_list); + next_address (exec_ctx, req); } static void @@ -263,7 +263,7 @@ internal_request_begin (grpc_exec_ctx * exec_ctx, grpc_httpcli_context * context grpc_iomgr_register_object (&req->iomgr_obj, name); req->host = gpr_strdup (request->host); - grpc_pollset_set_add_pollset (&req->context->pollset_set, req->pollset, closure_list); + grpc_pollset_set_add_pollset (exec_ctx, &req->context->pollset_set, req->pollset); grpc_resolve_address (request->host, req->handshaker->default_port, on_resolved, req); } @@ -271,12 +271,12 @@ void grpc_httpcli_get (grpc_exec_ctx * exec_ctx, grpc_httpcli_context * context, grpc_pollset * pollset, const grpc_httpcli_request * request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { char *name; - if (g_get_override && g_get_override (request, deadline, on_response, user_data, closure_list)) + if (g_get_override && g_get_override (exec_ctx, request, deadline, on_response, user_data)) { return; } gpr_asprintf (&name, "HTTP:GET:%s:%s", request->host, request->path); - internal_request_begin (context, pollset, request, deadline, on_response, user_data, name, grpc_httpcli_format_get_request (request), closure_list); + internal_request_begin (context, pollset, request, deadline, on_response, user_data, name, grpc_httpcli_format_get_request (exec_ctx, request)); gpr_free (name); } @@ -284,12 +284,12 @@ void grpc_httpcli_post (grpc_exec_ctx * exec_ctx, grpc_httpcli_context * context, grpc_pollset * pollset, const grpc_httpcli_request * request, const char *body_bytes, size_t body_size, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { char *name; - if (g_post_override && g_post_override (request, body_bytes, body_size, deadline, on_response, user_data, closure_list)) + if (g_post_override && g_post_override (exec_ctx, request, body_bytes, body_size, deadline, on_response, user_data)) { return; } gpr_asprintf (&name, "HTTP:POST:%s:%s", request->host, request->path); - internal_request_begin (context, pollset, request, deadline, on_response, user_data, name, grpc_httpcli_format_post_request (request, body_bytes, body_size), closure_list); + internal_request_begin (context, pollset, request, deadline, on_response, user_data, name, grpc_httpcli_format_post_request (exec_ctx, request, body_bytes, body_size)); gpr_free (name); } diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c index 5760f27f09..a6ecfa993b 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/httpcli/httpcli_security_connector.c @@ -70,18 +70,18 @@ httpcli_ssl_do_handshake (grpc_exec_ctx * exec_ctx, grpc_security_connector * sc tsi_handshaker *handshaker; if (c->handshaker_factory == NULL) { - cb (user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL, closure_list); + cb (exec_ctx, user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL); return; } result = tsi_ssl_handshaker_factory_create_handshaker (c->handshaker_factory, c->secure_peer_name, &handshaker); if (result != TSI_OK) { gpr_log (GPR_ERROR, "Handshaker creation failed with error %s.", tsi_result_to_string (result)); - cb (user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL, closure_list); + cb (exec_ctx, user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL); } else { - grpc_do_security_handshake (handshaker, sc, nonsecure_endpoint, cb, user_data, closure_list); + grpc_do_security_handshake (exec_ctx, handshaker, sc, nonsecure_endpoint, cb, user_data); } } @@ -154,11 +154,11 @@ on_secure_transport_setup_done (grpc_exec_ctx * exec_ctx, void *rp, grpc_securit if (status != GRPC_SECURITY_OK) { gpr_log (GPR_ERROR, "Secure transport setup failed with error %d.", status); - c->func (c->arg, NULL, closure_list); + c->func (exec_ctx, c->arg, NULL); } else { - c->func (c->arg, secure_endpoint, closure_list); + c->func (exec_ctx, c->arg, secure_endpoint); } gpr_free (c); } @@ -173,14 +173,14 @@ ssl_handshake (void *arg, grpc_endpoint * tcp, const char *host, void (*on_done) if (pem_root_certs == NULL || pem_root_certs_size == 0) { gpr_log (GPR_ERROR, "Could not get default pem root certs."); - on_done (arg, NULL, closure_list); + on_done (exec_ctx, arg, NULL); gpr_free (c); return; } c->func = on_done; c->arg = arg; GPR_ASSERT (httpcli_ssl_channel_security_connector_create (pem_root_certs, pem_root_certs_size, host, &sc) == GRPC_SECURITY_OK); - grpc_security_connector_do_handshake (&sc->base, tcp, on_secure_transport_setup_done, c, closure_list); + grpc_security_connector_do_handshake (exec_ctx, &sc->base, tcp, on_secure_transport_setup_done, c); GRPC_SECURITY_CONNECTOR_UNREF (&sc->base, "httpcli"); } diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index b10335c613..3110bd3cdf 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -108,7 +108,7 @@ void grpc_alarm_list_shutdown (grpc_closure_list * closure_list) { int i; - run_some_expired_alarms (gpr_inf_future (g_clock_type), NULL, 0, closure_list); + run_some_expired_alarms (gpr_inf_future (exec_ctx, g_clock_type), NULL, 0); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -346,7 +346,7 @@ run_some_expired_alarms (grpc_exec_ctx * exec_ctx, gpr_timespec now, gpr_timespe /* For efficiency, we pop as many available alarms as we can from the shard. This may violate perfect alarm deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += pop_alarms (g_shard_queue[0], now, &new_min_deadline, success, closure_list); + n += pop_alarms (exec_ctx, g_shard_queue[0], now, &new_min_deadline, success); /* An grpc_alarm_init() on the shard could intervene here, adding a new alarm that is earlier than new_min_deadline. However, @@ -373,7 +373,7 @@ int grpc_alarm_check (grpc_exec_ctx * exec_ctx, gpr_timespec now, gpr_timespec * next) { GPR_ASSERT (now.clock_type == g_clock_type); - return run_some_expired_alarms (now, next, gpr_time_cmp (now, gpr_inf_future (now.clock_type)) != 0, closure_list); + return run_some_expired_alarms (now, next, gpr_time_cmp (now, gpr_inf_future (exec_ctx, now.clock_type)) != 0); } gpr_timespec diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index ae4350e8a9..cd02aac75e 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -69,7 +69,7 @@ grpc_closure_list_run (grpc_closure_list * closure_list) while (c != NULL) { grpc_closure *next = c->next; - c->cb (c->cb_arg, c->success, closure_list); + c->cb (exec_ctx, c->cb_arg, c->success); c = next; } } diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 20ad7fee13..d0f272a00b 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -36,37 +36,37 @@ void grpc_endpoint_read (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, gpr_slice_buffer * slices, grpc_closure * cb) { - ep->vtable->read (ep, slices, cb, closure_list); + ep->vtable->read (exec_ctx, ep, slices, cb); } void grpc_endpoint_write (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, gpr_slice_buffer * slices, grpc_closure * cb) { - ep->vtable->write (ep, slices, cb, closure_list); + ep->vtable->write (exec_ctx, ep, slices, cb); } void grpc_endpoint_add_to_pollset (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, grpc_pollset * pollset) { - ep->vtable->add_to_pollset (ep, pollset, closure_list); + ep->vtable->add_to_pollset (exec_ctx, ep, pollset); } void grpc_endpoint_add_to_pollset_set (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, grpc_pollset_set * pollset_set) { - ep->vtable->add_to_pollset_set (ep, pollset_set, closure_list); + ep->vtable->add_to_pollset_set (exec_ctx, ep, pollset_set); } void grpc_endpoint_shutdown (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep) { - ep->vtable->shutdown (ep, closure_list); + ep->vtable->shutdown (exec_ctx, ep); } void grpc_endpoint_destroy (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep) { - ep->vtable->destroy (ep, closure_list); + ep->vtable->destroy (exec_ctx, ep); } char * diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 8ae63e2a89..fb4e99605d 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -370,7 +370,7 @@ set_ready (grpc_exec_ctx * exec_ctx, grpc_fd * fd, gpr_atm * st) /* only one set_ready can be active at once (but there may be a racing notify_on) */ gpr_mu_lock (&fd->set_state_mu); - set_ready_locked (fd, st, closure_list); + set_ready_locked (exec_ctx, fd, st); gpr_mu_unlock (&fd->set_state_mu); } @@ -380,21 +380,21 @@ grpc_fd_shutdown (grpc_exec_ctx * exec_ctx, grpc_fd * fd) gpr_mu_lock (&fd->set_state_mu); GPR_ASSERT (!gpr_atm_no_barrier_load (&fd->shutdown)); gpr_atm_rel_store (&fd->shutdown, 1); - set_ready_locked (fd, &fd->readst, closure_list); - set_ready_locked (fd, &fd->writest, closure_list); + set_ready_locked (exec_ctx, fd, &fd->readst); + set_ready_locked (exec_ctx, fd, &fd->writest); gpr_mu_unlock (&fd->set_state_mu); } void grpc_fd_notify_on_read (grpc_exec_ctx * exec_ctx, grpc_fd * fd, grpc_closure * closure) { - notify_on (fd, &fd->readst, closure, closure_list); + notify_on (exec_ctx, fd, &fd->readst, closure); } void grpc_fd_notify_on_write (grpc_exec_ctx * exec_ctx, grpc_fd * fd, grpc_closure * closure) { - notify_on (fd, &fd->writest, closure, closure_list); + notify_on (exec_ctx, fd, &fd->writest, closure); } gpr_uint32 @@ -493,13 +493,13 @@ grpc_fd_end_poll (grpc_exec_ctx * exec_ctx, grpc_fd_watcher * watcher, int got_r void grpc_fd_become_readable (grpc_exec_ctx * exec_ctx, grpc_fd * fd) { - set_ready (fd, &fd->readst, closure_list); + set_ready (exec_ctx, fd, &fd->readst); } void grpc_fd_become_writable (grpc_exec_ctx * exec_ctx, grpc_fd * fd) { - set_ready (fd, &fd->writest, closure_list); + set_ready (exec_ctx, fd, &fd->writest); } #endif diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index e5f0d99e54..a9ee5a4574 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -90,7 +90,7 @@ finally_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd * fd) } } } - grpc_fd_end_poll (&watcher, 0, 0, closure_list); + grpc_fd_end_poll (exec_ctx, &watcher, 0, 0); } static void @@ -100,7 +100,7 @@ perform_delayed_add (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_status) if (!grpc_fd_is_orphaned (da->fd)) { - finally_add_fd (da->pollset, da->fd, closure_list); + finally_add_fd (exec_ctx, da->pollset, da->fd); } gpr_mu_lock (&da->pollset->mu); @@ -127,7 +127,7 @@ multipoll_with_epoll_pollset_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * po if (and_unlock_pollset) { gpr_mu_unlock (&pollset->mu); - finally_add_fd (pollset, fd, closure_list); + finally_add_fd (exec_ctx, pollset, fd); } else { @@ -235,11 +235,11 @@ multipoll_with_epoll_pollset_maybe_work_and_unlock (grpc_exec_ctx * exec_ctx, gr int write = ep_ev[i].events & EPOLLOUT; if (read || cancel) { - grpc_fd_become_readable (fd, closure_list); + grpc_fd_become_readable (exec_ctx, fd); } if (write || cancel) { - grpc_fd_become_writable (fd, closure_list); + grpc_fd_become_writable (exec_ctx, fd); } } } @@ -286,7 +286,7 @@ epoll_become_multipoller (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc } for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd (pollset, fds[i], 0, closure_list); + multipoll_with_epoll_pollset_add_fd (exec_ctx, pollset, fds[i], 0); } } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 8417b7491a..b7eb6c0ab0 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -163,7 +163,7 @@ multipoll_with_poll_pollset_maybe_work_and_unlock (grpc_exec_ctx * exec_ctx, grp for (i = 1; i < pfd_count; i++) { - grpc_fd_end_poll (&watchers[i], pfds[i].revents & POLLIN, pfds[i].revents & POLLOUT, closure_list); + grpc_fd_end_poll (exec_ctx, &watchers[i], pfds[i].revents & POLLIN, pfds[i].revents & POLLOUT); } if (r < 0) @@ -191,11 +191,11 @@ multipoll_with_poll_pollset_maybe_work_and_unlock (grpc_exec_ctx * exec_ctx, grp } if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable (watchers[i].fd, closure_list); + grpc_fd_become_readable (exec_ctx, watchers[i].fd); } if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable (watchers[i].fd, closure_list); + grpc_fd_become_writable (exec_ctx, watchers[i].fd); } } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index ad5c42aac3..c15b1f1908 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -172,7 +172,7 @@ void grpc_pollset_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd * fd) { gpr_mu_lock (&pollset->mu); - pollset->vtable->add_fd (pollset, fd, 1, closure_list); + pollset->vtable->add_fd (exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -187,7 +187,7 @@ void grpc_pollset_del_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd * fd) { gpr_mu_lock (&pollset->mu); - pollset->vtable->del_fd (pollset, fd, 1, closure_list); + pollset->vtable->del_fd (exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -217,10 +217,10 @@ grpc_pollset_work (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_pollse grpc_wakeup_fd_init (&worker->wakeup_fd); if (!grpc_pollset_has_workers (pollset) && !grpc_closure_list_empty (pollset->idle_jobs)) { - grpc_closure_list_move (&pollset->idle_jobs, closure_list); + grpc_closure_list_move (exec_ctx, &pollset->idle_jobs); goto done; } - if (grpc_alarm_check (now, &deadline, closure_list)) + if (grpc_alarm_check (exec_ctx, now, &deadline)) { goto done; } @@ -240,7 +240,7 @@ grpc_pollset_work (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_pollse push_front_worker (pollset, worker); added_worker = 1; gpr_tls_set (&g_current_thread_poller, (gpr_intptr) pollset); - pollset->vtable->maybe_work_and_unlock (pollset, worker, deadline, now, closure_list); + pollset->vtable->maybe_work_and_unlock (exec_ctx, pollset, worker, deadline, now); locked = 0; gpr_tls_set (&g_current_thread_poller, 0); } @@ -270,7 +270,7 @@ done: { pollset->called_shutdown = 1; gpr_mu_unlock (&pollset->mu); - finish_shutdown (pollset, closure_list); + finish_shutdown (exec_ctx, pollset); grpc_closure_list_run (closure_list); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to @@ -299,7 +299,7 @@ grpc_pollset_shutdown (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_cl if (call_shutdown) { - finish_shutdown (pollset, closure_list); + finish_shutdown (exec_ctx, pollset); } } @@ -387,7 +387,7 @@ basic_do_promote (grpc_exec_ctx * exec_ctx, void *args, int success) } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd (pollset, fd, 0, closure_list); + pollset->vtable->add_fd (exec_ctx, pollset, fd, 0); } else if (fd != pollset->data.ptr) { @@ -397,7 +397,7 @@ basic_do_promote (grpc_exec_ctx * exec_ctx, void *args, int success) if (fds[0] && !grpc_fd_is_orphaned (fds[0])) { - grpc_platform_become_multipoller (pollset, fds, GPR_ARRAY_SIZE (fds), closure_list); + grpc_platform_become_multipoller (pollset, fds, GPR_ARRAY_SIZE (exec_ctx, fds)); GRPC_FD_UNREF (fds[0], "basicpoll"); } else @@ -445,7 +445,7 @@ basic_pollset_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd } else if (!grpc_fd_is_orphaned (fds[0])) { - grpc_platform_become_multipoller (pollset, fds, GPR_ARRAY_SIZE (fds), closure_list); + grpc_platform_become_multipoller (pollset, fds, GPR_ARRAY_SIZE (exec_ctx, fds)); GRPC_FD_UNREF (fds[0], "basicpoll"); } else @@ -540,7 +540,7 @@ basic_pollset_maybe_work_and_unlock (grpc_exec_ctx * exec_ctx, grpc_pollset * po if (fd) { - grpc_fd_end_poll (&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT, closure_list); + grpc_fd_end_poll (exec_ctx, &fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT); } if (r < 0) @@ -564,11 +564,11 @@ basic_pollset_maybe_work_and_unlock (grpc_exec_ctx * exec_ctx, grpc_pollset * po { if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable (fd, closure_list); + grpc_fd_become_readable (exec_ctx, fd); } if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable (fd, closure_list); + grpc_fd_become_writable (exec_ctx, fd); } } } diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 83be0b91c9..cb2e6aede3 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -82,7 +82,7 @@ grpc_pollset_set_add_pollset (grpc_exec_ctx * exec_ctx, grpc_pollset_set * polls } else { - grpc_pollset_add_fd (pollset, pollset_set->fds[i], closure_list); + grpc_pollset_add_fd (exec_ctx, pollset, pollset_set->fds[i]); pollset_set->fds[j++] = pollset_set->fds[i]; } } @@ -121,7 +121,7 @@ grpc_pollset_set_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset_set * pollset_se pollset_set->fds[pollset_set->fd_count++] = fd; for (i = 0; i < pollset_set->pollset_count; i++) { - grpc_pollset_add_fd (pollset_set->pollsets[i], fd, closure_list); + grpc_pollset_add_fd (exec_ctx, pollset_set->pollsets[i], fd); } gpr_mu_unlock (&pollset_set->mu); } diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index cca626228a..69384ef484 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -106,7 +106,7 @@ tc_on_alarm (grpc_exec_ctx * exec_ctx, void *acp, int success) gpr_mu_lock (&ac->mu); if (ac->fd != NULL) { - grpc_fd_shutdown (ac->fd, closure_list); + grpc_fd_shutdown (exec_ctx, ac->fd); } done = (--ac->refs == 0); gpr_mu_unlock (&ac->mu); @@ -141,7 +141,7 @@ on_writable (grpc_exec_ctx * exec_ctx, void *acp, int success) ac->fd = NULL; gpr_mu_unlock (&ac->mu); - grpc_alarm_cancel (&ac->alarm, closure_list); + grpc_alarm_cancel (exec_ctx, &ac->alarm); gpr_mu_lock (&ac->mu); if (success) @@ -177,7 +177,7 @@ on_writable (grpc_exec_ctx * exec_ctx, void *acp, int success) don't do that! */ gpr_log (GPR_ERROR, "kernel out of buffers"); gpr_mu_unlock (&ac->mu); - grpc_fd_notify_on_write (fd, &ac->write_closure, closure_list); + grpc_fd_notify_on_write (exec_ctx, fd, &ac->write_closure); return; } else @@ -196,7 +196,7 @@ on_writable (grpc_exec_ctx * exec_ctx, void *acp, int success) } else { - grpc_pollset_set_del_fd (ac->interested_parties, fd, closure_list); + grpc_pollset_set_del_fd (exec_ctx, ac->interested_parties, fd); *ep = grpc_tcp_create (fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); fd = NULL; goto finish; @@ -213,8 +213,8 @@ on_writable (grpc_exec_ctx * exec_ctx, void *acp, int success) finish: if (fd != NULL) { - grpc_pollset_set_del_fd (ac->interested_parties, fd, closure_list); - grpc_fd_orphan (fd, NULL, "tcp_client_orphan", closure_list); + grpc_pollset_set_del_fd (exec_ctx, ac->interested_parties, fd); + grpc_fd_orphan (exec_ctx, fd, NULL, "tcp_client_orphan"); fd = NULL; } done = (--ac->refs == 0); @@ -290,12 +290,12 @@ grpc_tcp_client_connect (grpc_exec_ctx * exec_ctx, grpc_closure * closure, grpc_ if (errno != EWOULDBLOCK && errno != EINPROGRESS) { gpr_log (GPR_ERROR, "connect error to '%s': %s", addr_str, strerror (errno)); - grpc_fd_orphan (fdobj, NULL, "tcp_client_connect_error", closure_list); + grpc_fd_orphan (exec_ctx, fdobj, NULL, "tcp_client_connect_error"); grpc_closure_list_add (closure_list, closure, 0); goto done; } - grpc_pollset_set_add_fd (interested_parties, fdobj, closure_list); + grpc_pollset_set_add_fd (exec_ctx, interested_parties, fdobj); ac = gpr_malloc (sizeof (async_connect)); ac->closure = closure; @@ -315,8 +315,8 @@ grpc_tcp_client_connect (grpc_exec_ctx * exec_ctx, grpc_closure * closure, grpc_ } gpr_mu_lock (&ac->mu); - grpc_alarm_init (&ac->alarm, gpr_convert_clock_type (deadline, GPR_CLOCK_MONOTONIC), tc_on_alarm, ac, gpr_now (GPR_CLOCK_MONOTONIC), closure_list); - grpc_fd_notify_on_write (ac->fd, &ac->write_closure, closure_list); + grpc_alarm_init (&ac->alarm, gpr_convert_clock_type (deadline, GPR_CLOCK_MONOTONIC), tc_on_alarm, ac, gpr_now (exec_ctx, GPR_CLOCK_MONOTONIC)); + grpc_fd_notify_on_write (exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock (&ac->mu); done: diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index bc9f566c6a..66895f3744 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -104,13 +104,13 @@ static void tcp_shutdown (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep) { grpc_tcp *tcp = (grpc_tcp *) ep; - grpc_fd_shutdown (tcp->em_fd, closure_list); + grpc_fd_shutdown (exec_ctx, tcp->em_fd); } static void tcp_free (grpc_exec_ctx * exec_ctx, grpc_tcp * tcp) { - grpc_fd_orphan (tcp->em_fd, NULL, "tcp_unref_orphan", closure_list); + grpc_fd_orphan (exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan"); gpr_free (tcp->peer_string); gpr_free (tcp); } @@ -126,7 +126,7 @@ tcp_unref (grpc_tcp * tcp, grpc_closure_list * closure_list, const char *reason, gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref (&tcp->refcount)) { - tcp_free (tcp, closure_list); + tcp_free (exec_ctx, tcp); } } @@ -144,7 +144,7 @@ tcp_unref (grpc_exec_ctx * exec_ctx, grpc_tcp * tcp) { if (gpr_unref (&tcp->refcount)) { - tcp_free (tcp, closure_list); + tcp_free (exec_ctx, tcp); } } @@ -159,7 +159,7 @@ static void tcp_destroy (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep) { grpc_tcp *tcp = (grpc_tcp *) ep; - TCP_UNREF (tcp, "destroy", closure_list); + TCP_UNREF (exec_ctx, tcp, "destroy"); } static void @@ -182,7 +182,7 @@ call_read_cb (grpc_exec_ctx * exec_ctx, grpc_tcp * tcp, int success) tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb (cb->cb_arg, success, closure_list); + cb->cb (exec_ctx, cb->cb_arg, success); } #define MAX_READ_IOVEC 4 @@ -236,22 +236,22 @@ tcp_continue_read (grpc_exec_ctx * exec_ctx, grpc_tcp * tcp) tcp->iov_size /= 2; } /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read (tcp->em_fd, &tcp->read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, tcp->em_fd, &tcp->read_closure); } else { /* TODO(klempner): Log interesting errors */ gpr_slice_buffer_reset_and_unref (tcp->incoming_buffer); - call_read_cb (tcp, 0, closure_list); - TCP_UNREF (tcp, "read", closure_list); + call_read_cb (exec_ctx, tcp, 0); + TCP_UNREF (exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ gpr_slice_buffer_reset_and_unref (tcp->incoming_buffer); - call_read_cb (tcp, 0, closure_list); - TCP_UNREF (tcp, "read", closure_list); + call_read_cb (exec_ctx, tcp, 0); + TCP_UNREF (exec_ctx, tcp, "read"); } else { @@ -265,8 +265,8 @@ tcp_continue_read (grpc_exec_ctx * exec_ctx, grpc_tcp * tcp) ++tcp->iov_size; } GPR_ASSERT ((size_t) read_bytes == tcp->incoming_buffer->length); - call_read_cb (tcp, 1, closure_list); - TCP_UNREF (tcp, "read", closure_list); + call_read_cb (exec_ctx, tcp, 1); + TCP_UNREF (exec_ctx, tcp, "read"); } GRPC_TIMER_END (GRPC_PTAG_HANDLE_READ, 0); @@ -282,12 +282,12 @@ tcp_handle_read (void *arg /* grpc_tcp */ , int success, if (!success) { gpr_slice_buffer_reset_and_unref (tcp->incoming_buffer); - call_read_cb (tcp, 0, closure_list); - TCP_UNREF (tcp, "read", closure_list); + call_read_cb (exec_ctx, tcp, 0); + TCP_UNREF (exec_ctx, tcp, "read"); } else { - tcp_continue_read (tcp, closure_list); + tcp_continue_read (exec_ctx, tcp); } } @@ -303,7 +303,7 @@ tcp_read (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, gpr_slice_buffer * incom if (tcp->finished_edge) { tcp->finished_edge = 0; - grpc_fd_notify_on_read (tcp->em_fd, &tcp->read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, tcp->em_fd, &tcp->read_closure); } else { @@ -412,8 +412,8 @@ tcp_handle_write (void *arg /* grpc_tcp */ , int success, { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb (cb->cb_arg, 0, closure_list); - TCP_UNREF (tcp, "write", closure_list); + cb->cb (exec_ctx, cb->cb_arg, 0); + TCP_UNREF (exec_ctx, tcp, "write"); return; } @@ -421,14 +421,14 @@ tcp_handle_write (void *arg /* grpc_tcp */ , int success, status = tcp_flush (tcp); if (status == FLUSH_PENDING) { - grpc_fd_notify_on_write (tcp->em_fd, &tcp->write_closure, closure_list); + grpc_fd_notify_on_write (exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb (cb->cb_arg, status == FLUSH_DONE, closure_list); - TCP_UNREF (tcp, "write", closure_list); + cb->cb (exec_ctx, cb->cb_arg, status == FLUSH_DONE); + TCP_UNREF (exec_ctx, tcp, "write"); } GRPC_TIMER_END (GRPC_PTAG_TCP_CB_WRITE, 0); } @@ -469,7 +469,7 @@ tcp_write (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, gpr_slice_buffer * buf, { TCP_REF (tcp, "write"); tcp->write_cb = cb; - grpc_fd_notify_on_write (tcp->em_fd, &tcp->write_closure, closure_list); + grpc_fd_notify_on_write (exec_ctx, tcp->em_fd, &tcp->write_closure); } else { @@ -483,14 +483,14 @@ static void tcp_add_to_pollset (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, grpc_pollset * pollset) { grpc_tcp *tcp = (grpc_tcp *) ep; - grpc_pollset_add_fd (pollset, tcp->em_fd, closure_list); + grpc_pollset_add_fd (exec_ctx, pollset, tcp->em_fd); } static void tcp_add_to_pollset_set (grpc_exec_ctx * exec_ctx, grpc_endpoint * ep, grpc_pollset_set * pollset_set) { grpc_tcp *tcp = (grpc_tcp *) ep; - grpc_pollset_set_add_fd (pollset_set, tcp->em_fd, closure_list); + grpc_pollset_set_add_fd (exec_ctx, pollset_set, tcp->em_fd); } static char * diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index ee462aabd4..2d42e03ce0 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -168,7 +168,7 @@ destroyed_port (grpc_exec_ctx * exec_ctx, void *server, int success) if (s->destroyed_ports == s->nports) { gpr_mu_unlock (&s->mu); - finish_shutdown (s, closure_list); + finish_shutdown (exec_ctx, s); } else { @@ -205,14 +205,14 @@ deactivated_all_ports (grpc_exec_ctx * exec_ctx, grpc_tcp_server * s) } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan (sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown", closure_list); + grpc_fd_orphan (exec_ctx, sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown"); } gpr_mu_unlock (&s->mu); } else { gpr_mu_unlock (&s->mu); - finish_shutdown (s, closure_list); + finish_shutdown (exec_ctx, s); } } @@ -232,14 +232,14 @@ grpc_tcp_server_destroy (grpc_exec_ctx * exec_ctx, grpc_tcp_server * s, grpc_clo { for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown (s->ports[i].emfd, closure_list); + grpc_fd_shutdown (exec_ctx, s->ports[i].emfd); } gpr_mu_unlock (&s->mu); } else { gpr_mu_unlock (&s->mu); - deactivated_all_ports (s, closure_list); + deactivated_all_ports (exec_ctx, s); } } @@ -361,7 +361,7 @@ on_read (grpc_exec_ctx * exec_ctx, void *arg, int success) case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read (sp->emfd, &sp->read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, sp->emfd, &sp->read_closure); return; default: gpr_log (GPR_ERROR, "Failed accept4: %s", strerror (errno)); @@ -385,9 +385,9 @@ on_read (grpc_exec_ctx * exec_ctx, void *arg, int success) incoming channel to every pollset owned by the server */ for (i = 0; i < sp->server->pollset_count; i++) { - grpc_pollset_add_fd (sp->server->pollsets[i], fdobj, closure_list); + grpc_pollset_add_fd (exec_ctx, sp->server->pollsets[i], fdobj); } - sp->server->on_accept_cb (sp->server->on_accept_cb_arg, grpc_tcp_create (fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), closure_list); + sp->server->on_accept_cb (sp->server->on_accept_cb_arg, grpc_tcp_create (exec_ctx, fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); gpr_free (name); gpr_free (addr_str); @@ -400,7 +400,7 @@ error: if (0 == --sp->server->active_ports) { gpr_mu_unlock (&sp->server->mu); - deactivated_all_ports (sp->server, closure_list); + deactivated_all_ports (exec_ctx, sp->server); } else { @@ -557,11 +557,11 @@ grpc_tcp_server_start (grpc_exec_ctx * exec_ctx, grpc_tcp_server * s, grpc_polls { for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd (pollsets[j], s->ports[i].emfd, closure_list); + grpc_pollset_add_fd (exec_ctx, pollsets[j], s->ports[i].emfd); } s->ports[i].read_closure.cb = on_read; s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read (s->ports[i].emfd, &s->ports[i].read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, s->ports[i].emfd, &s->ports[i].read_closure); s->active_ports++; } gpr_mu_unlock (&s->mu); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 2f45011a89..80b8748ea5 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -162,7 +162,7 @@ destroyed_port (grpc_exec_ctx * exec_ctx, void *server, int success) if (s->destroyed_ports == s->nports) { gpr_mu_unlock (&s->mu); - finish_shutdown (s, closure_list); + finish_shutdown (exec_ctx, s); } else { @@ -198,14 +198,14 @@ deactivated_all_ports (grpc_exec_ctx * exec_ctx, grpc_udp_server * s) } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan (sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown", closure_list); + grpc_fd_orphan (exec_ctx, sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown"); } gpr_mu_unlock (&s->mu); } else { gpr_mu_unlock (&s->mu); - finish_shutdown (s, closure_list); + finish_shutdown (exec_ctx, s); } } @@ -225,14 +225,14 @@ grpc_udp_server_destroy (grpc_exec_ctx * exec_ctx, grpc_udp_server * s, grpc_clo { for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown (s->ports[i].emfd, closure_list); + grpc_fd_shutdown (exec_ctx, s->ports[i].emfd); } gpr_mu_unlock (&s->mu); } else { gpr_mu_unlock (&s->mu); - deactivated_all_ports (s, closure_list); + deactivated_all_ports (exec_ctx, s); } } @@ -302,7 +302,7 @@ on_read (grpc_exec_ctx * exec_ctx, void *arg, int success) if (0 == --sp->server->active_ports) { gpr_mu_unlock (&sp->server->mu); - deactivated_all_ports (sp->server, closure_list); + deactivated_all_ports (exec_ctx, sp->server); } else { @@ -316,7 +316,7 @@ on_read (grpc_exec_ctx * exec_ctx, void *arg, int success) sp->read_cb (sp->fd); /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read (sp->emfd, &sp->read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, sp->emfd, &sp->read_closure); } static int @@ -461,11 +461,11 @@ grpc_udp_server_start (grpc_exec_ctx * exec_ctx, grpc_udp_server * s, grpc_polls { for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd (pollsets[j], s->ports[i].emfd, closure_list); + grpc_pollset_add_fd (exec_ctx, pollsets[j], s->ports[i].emfd); } s->ports[i].read_closure.cb = on_read; s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read (s->ports[i].emfd, &s->ports[i].read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, s->ports[i].emfd, &s->ports[i].read_closure); s->active_ports++; } gpr_mu_unlock (&s->mu); diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index 1aadaa4028..a407fb61a4 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -59,7 +59,7 @@ grpc_workqueue_create (grpc_closure_list * closure_list) sprintf (name, "workqueue:%p", (void *) workqueue); workqueue->wakeup_read_fd = grpc_fd_create (GRPC_WAKEUP_FD_GET_READ_FD (&workqueue->wakeup_fd), name); grpc_closure_init (&workqueue->read_closure, on_readable, workqueue); - grpc_fd_notify_on_read (workqueue->wakeup_read_fd, &workqueue->read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, workqueue->wakeup_read_fd, &workqueue->read_closure); return workqueue; } @@ -67,7 +67,7 @@ static void workqueue_destroy (grpc_exec_ctx * exec_ctx, grpc_workqueue * workqueue) { GPR_ASSERT (grpc_closure_list_empty (workqueue->closure_list)); - grpc_fd_shutdown (workqueue->wakeup_read_fd, closure_list); + grpc_fd_shutdown (exec_ctx, workqueue->wakeup_read_fd); } #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -95,21 +95,21 @@ grpc_workqueue_unref (grpc_exec_ctx * exec_ctx, grpc_workqueue * workqueue) #endif if (gpr_unref (&workqueue->refs)) { - workqueue_destroy (workqueue, closure_list); + workqueue_destroy (exec_ctx, workqueue); } } void grpc_workqueue_add_to_pollset (grpc_exec_ctx * exec_ctx, grpc_workqueue * workqueue, grpc_pollset * pollset) { - grpc_pollset_add_fd (pollset, workqueue->wakeup_read_fd, closure_list); + grpc_pollset_add_fd (exec_ctx, pollset, workqueue->wakeup_read_fd); } void grpc_workqueue_flush (grpc_exec_ctx * exec_ctx, grpc_workqueue * workqueue) { gpr_mu_lock (&workqueue->mu); - grpc_closure_list_move (&workqueue->closure_list, closure_list); + grpc_closure_list_move (exec_ctx, &workqueue->closure_list); gpr_mu_unlock (&workqueue->mu); } @@ -124,16 +124,16 @@ on_readable (grpc_exec_ctx * exec_ctx, void *arg, int success) /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; grpc_wakeup_fd_destroy (&workqueue->wakeup_fd); - grpc_fd_orphan (workqueue->wakeup_read_fd, NULL, "destroy", closure_list); + grpc_fd_orphan (exec_ctx, workqueue->wakeup_read_fd, NULL, "destroy"); gpr_free (workqueue); } else { gpr_mu_lock (&workqueue->mu); - grpc_closure_list_move (&workqueue->closure_list, closure_list); + grpc_closure_list_move (exec_ctx, &workqueue->closure_list); grpc_wakeup_fd_consume_wakeup (&workqueue->wakeup_fd); gpr_mu_unlock (&workqueue->mu); - grpc_fd_notify_on_read (workqueue->wakeup_read_fd, &workqueue->read_closure, closure_list); + grpc_fd_notify_on_read (exec_ctx, workqueue->wakeup_read_fd, &workqueue->read_closure); } } diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 89ac3f464f..33f2e8093a 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -94,7 +94,7 @@ bubble_up_error (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_status call_data *calld = elem->call_data; gpr_log (GPR_ERROR, "Client side authentication failure: %s", error_msg); grpc_transport_stream_op_add_cancellation (&calld->op, status); - grpc_call_next_op (elem, &calld->op, closure_list); + grpc_call_next_op (exec_ctx, elem, &calld->op); } static void @@ -109,7 +109,7 @@ on_credentials_metadata (grpc_exec_ctx * exec_ctx, void *user_data, grpc_credent reset_service_url (calld); if (status != GRPC_CREDENTIALS_OK) { - bubble_up_error (elem, GRPC_STATUS_UNAUTHENTICATED, "Credentials failed to get metadata.", closure_list); + bubble_up_error (exec_ctx, elem, GRPC_STATUS_UNAUTHENTICATED, "Credentials failed to get metadata."); return; } GPR_ASSERT (num_md <= MAX_CREDENTIALS_METADATA_COUNT); @@ -119,7 +119,7 @@ on_credentials_metadata (grpc_exec_ctx * exec_ctx, void *user_data, grpc_credent { grpc_metadata_batch_add_tail (mdb, &calld->md_links[i], grpc_mdelem_from_slices (chand->md_ctx, gpr_slice_ref (md_elems[i].key), gpr_slice_ref (md_elems[i].value))); } - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } void @@ -161,7 +161,7 @@ send_security_metadata (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc if (!channel_creds_has_md && !call_creds_has_md) { /* Skip sending metadata altogether. */ - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); return; } @@ -170,7 +170,7 @@ send_security_metadata (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc calld->creds = grpc_composite_credentials_create (channel_creds, ctx->creds, NULL); if (calld->creds == NULL) { - bubble_up_error (elem, GRPC_STATUS_INVALID_ARGUMENT, "Incompatible credentials set on channel and call.", closure_list); + bubble_up_error (exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, "Incompatible credentials set on channel and call."); return; } } @@ -182,7 +182,7 @@ send_security_metadata (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc build_service_url (chand->security_connector->base.url_scheme, calld); calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT (calld->pollset); - grpc_credentials_get_request_metadata (calld->creds, calld->pollset, calld->service_url, on_credentials_metadata, elem, closure_list); + grpc_credentials_get_request_metadata (exec_ctx, calld->creds, calld->pollset, calld->service_url, on_credentials_metadata, elem); } static void @@ -193,13 +193,13 @@ on_host_checked (grpc_exec_ctx * exec_ctx, void *user_data, grpc_security_status if (status == GRPC_SECURITY_OK) { - send_security_metadata (elem, &calld->op, closure_list); + send_security_metadata (exec_ctx, elem, &calld->op); } else { char *error_msg; gpr_asprintf (&error_msg, "Invalid host %s set in :authority metadata.", grpc_mdstr_as_c_string (calld->host)); - bubble_up_error (elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg, closure_list); + bubble_up_error (exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg); gpr_free (error_msg); } } @@ -272,26 +272,26 @@ auth_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grp grpc_security_status status; const char *call_host = grpc_mdstr_as_c_string (calld->host); calld->op = *op; /* Copy op (originates from the caller's stack). */ - status = grpc_channel_security_connector_check_call_host (chand->security_connector, call_host, on_host_checked, elem, closure_list); + status = grpc_channel_security_connector_check_call_host (exec_ctx, chand->security_connector, call_host, on_host_checked, elem); if (status != GRPC_SECURITY_OK) { if (status == GRPC_SECURITY_ERROR) { char *error_msg; gpr_asprintf (&error_msg, "Invalid host %s set in :authority metadata.", call_host); - bubble_up_error (elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg, closure_list); + bubble_up_error (exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg); gpr_free (error_msg); } return; /* early exit */ } } - send_security_metadata (elem, op, closure_list); + send_security_metadata (exec_ctx, elem, op); return; /* early exit */ } } /* pass control down the stack */ - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } /* Constructor for call_data */ diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index caf3d6f885..7c444f6ac9 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -126,11 +126,11 @@ grpc_credentials_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credential { if (cb != NULL) { - cb (user_data, NULL, 0, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, NULL, 0, GRPC_CREDENTIALS_OK); } return; } - creds->vtable->get_request_metadata (creds, pollset, service_url, cb, user_data, closure_list); + creds->vtable->get_request_metadata (exec_ctx, creds, pollset, service_url, cb, user_data); } grpc_security_status @@ -460,12 +460,12 @@ jwt_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credentials * creds, gr if (jwt_md != NULL) { - cb (user_data, jwt_md->entries, jwt_md->num_entries, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, jwt_md->entries, jwt_md->num_entries, GRPC_CREDENTIALS_OK); grpc_credentials_md_store_unref (jwt_md); } else { - cb (user_data, NULL, 0, GRPC_CREDENTIALS_ERROR, closure_list); + cb (exec_ctx, user_data, NULL, 0, GRPC_CREDENTIALS_ERROR); } } @@ -644,12 +644,12 @@ on_oauth2_token_fetcher_http_response (grpc_exec_ctx * exec_ctx, void *user_data if (status == GRPC_CREDENTIALS_OK) { c->token_expiration = gpr_time_add (gpr_now (GPR_CLOCK_REALTIME), token_lifetime); - r->cb (r->user_data, c->access_token_md->entries, c->access_token_md->num_entries, status, closure_list); + r->cb (exec_ctx, r->user_data, c->access_token_md->entries, c->access_token_md->num_entries, status); } else { c->token_expiration = gpr_inf_past (GPR_CLOCK_REALTIME); - r->cb (r->user_data, NULL, 0, status, closure_list); + r->cb (exec_ctx, r->user_data, NULL, 0, status); } gpr_mu_unlock (&c->mu); grpc_credentials_metadata_request_destroy (r); @@ -671,12 +671,12 @@ oauth2_token_fetcher_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_creden } if (cached_access_token_md != NULL) { - cb (user_data, cached_access_token_md->entries, cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, cached_access_token_md->entries, cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK); grpc_credentials_md_store_unref (cached_access_token_md); } else { - c->fetch_func (grpc_credentials_metadata_request_create (creds, cb, user_data), &c->httpcli_context, pollset, on_oauth2_token_fetcher_http_response, gpr_time_add (gpr_now (GPR_CLOCK_REALTIME), refresh_threshold), closure_list); + c->fetch_func (grpc_credentials_metadata_request_create (creds, cb, user_data), &c->httpcli_context, pollset, on_oauth2_token_fetcher_http_response, gpr_time_add (gpr_now (exec_ctx, GPR_CLOCK_REALTIME), refresh_threshold)); } } @@ -710,7 +710,7 @@ compute_engine_fetch_oauth2 (grpc_exec_ctx * exec_ctx, grpc_credentials_metadata request.path = GRPC_COMPUTE_ENGINE_METADATA_TOKEN_PATH; request.hdr_count = 1; request.hdrs = &header; - grpc_httpcli_get (httpcli_context, pollset, &request, deadline, response_cb, metadata_req, closure_list); + grpc_httpcli_get (exec_ctx, httpcli_context, pollset, &request, deadline, response_cb, metadata_req); } grpc_credentials * @@ -755,7 +755,7 @@ refresh_token_fetch_oauth2 (grpc_exec_ctx * exec_ctx, grpc_credentials_metadata_ request.hdr_count = 1; request.hdrs = &header; request.handshaker = &grpc_httpcli_ssl; - grpc_httpcli_post (httpcli_context, pollset, &request, body, strlen (body), deadline, response_cb, metadata_req, closure_list); + grpc_httpcli_post (httpcli_context, pollset, &request, body, strlen (exec_ctx, body), deadline, response_cb, metadata_req); gpr_free (body); } @@ -828,7 +828,7 @@ md_only_test_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credentials * } else { - cb (user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK); } } @@ -877,7 +877,7 @@ static void access_token_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credentials * creds, grpc_pollset * pollset, const char *service_url, grpc_credentials_metadata_cb cb, void *user_data) { grpc_access_token_credentials *c = (grpc_access_token_credentials *) creds; - cb (user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK); } static grpc_credentials_vtable access_token_vtable = { @@ -1047,7 +1047,7 @@ composite_metadata_cb (grpc_exec_ctx * exec_ctx, void *user_data, grpc_credentia grpc_composite_credentials_metadata_context *ctx = (grpc_composite_credentials_metadata_context *) user_data; if (status != GRPC_CREDENTIALS_OK) { - ctx->cb (ctx->user_data, NULL, 0, status, closure_list); + ctx->cb (exec_ctx, ctx->user_data, NULL, 0, status); return; } @@ -1067,13 +1067,13 @@ composite_metadata_cb (grpc_exec_ctx * exec_ctx, void *user_data, grpc_credentia grpc_credentials *inner_creds = ctx->composite_creds->inner.creds_array[ctx->creds_index++]; if (grpc_credentials_has_request_metadata (inner_creds)) { - grpc_credentials_get_request_metadata (inner_creds, ctx->pollset, ctx->service_url, composite_metadata_cb, ctx, closure_list); + grpc_credentials_get_request_metadata (exec_ctx, inner_creds, ctx->pollset, ctx->service_url, composite_metadata_cb, ctx); return; } } /* We're done!. */ - ctx->cb (ctx->user_data, ctx->md_elems->entries, ctx->md_elems->num_entries, GRPC_CREDENTIALS_OK, closure_list); + ctx->cb (exec_ctx, ctx->user_data, ctx->md_elems->entries, ctx->md_elems->num_entries, GRPC_CREDENTIALS_OK); composite_md_context_destroy (ctx); grpc_closure_list_run (closure_list); } @@ -1085,7 +1085,7 @@ composite_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credentials * cre grpc_composite_credentials_metadata_context *ctx; if (!grpc_credentials_has_request_metadata (creds)) { - cb (user_data, NULL, 0, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, NULL, 0, GRPC_CREDENTIALS_OK); return; } ctx = gpr_malloc (sizeof (grpc_composite_credentials_metadata_context)); @@ -1101,7 +1101,7 @@ composite_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credentials * cre grpc_credentials *inner_creds = c->inner.creds_array[ctx->creds_index++]; if (grpc_credentials_has_request_metadata (inner_creds)) { - grpc_credentials_get_request_metadata (inner_creds, pollset, service_url, composite_metadata_cb, ctx, closure_list); + grpc_credentials_get_request_metadata (exec_ctx, inner_creds, pollset, service_url, composite_metadata_cb, ctx); return; } } @@ -1262,7 +1262,7 @@ static void iam_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credentials * creds, grpc_pollset * pollset, const char *service_url, grpc_credentials_metadata_cb cb, void *user_data) { grpc_google_iam_credentials *c = (grpc_google_iam_credentials *) creds; - cb (user_data, c->iam_md->entries, c->iam_md->num_entries, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, c->iam_md->entries, c->iam_md->num_entries, GRPC_CREDENTIALS_OK); } static grpc_credentials_vtable iam_vtable = { @@ -1373,7 +1373,7 @@ plugin_get_request_metadata (grpc_exec_ctx * exec_ctx, grpc_credentials * creds, } else { - cb (user_data, NULL, 0, GRPC_CREDENTIALS_OK, closure_list); + cb (exec_ctx, user_data, NULL, 0, GRPC_CREDENTIALS_OK); } } diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c index 7951f34943..687cc575a7 100644 --- a/src/core/security/handshake.c +++ b/src/core/security/handshake.c @@ -68,20 +68,20 @@ security_handshake_done (grpc_exec_ctx * exec_ctx, grpc_security_handshake * h, { if (is_success) { - h->cb (h->user_data, GRPC_SECURITY_OK, h->wrapped_endpoint, h->secure_endpoint, closure_list); + h->cb (exec_ctx, h->user_data, GRPC_SECURITY_OK, h->wrapped_endpoint, h->secure_endpoint); } else { if (h->secure_endpoint != NULL) { - grpc_endpoint_shutdown (h->secure_endpoint, closure_list); - grpc_endpoint_destroy (h->secure_endpoint, closure_list); + grpc_endpoint_shutdown (exec_ctx, h->secure_endpoint); + grpc_endpoint_destroy (exec_ctx, h->secure_endpoint); } else { - grpc_endpoint_destroy (h->wrapped_endpoint, closure_list); + grpc_endpoint_destroy (exec_ctx, h->wrapped_endpoint); } - h->cb (h->user_data, GRPC_SECURITY_ERROR, h->wrapped_endpoint, NULL, closure_list); + h->cb (exec_ctx, h->user_data, GRPC_SECURITY_ERROR, h->wrapped_endpoint, NULL); } if (h->handshaker != NULL) tsi_handshaker_destroy (h->handshaker); @@ -103,20 +103,20 @@ on_peer_checked (grpc_exec_ctx * exec_ctx, void *user_data, grpc_security_status if (status != GRPC_SECURITY_OK) { gpr_log (GPR_ERROR, "Error checking peer."); - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } result = tsi_handshaker_create_frame_protector (h->handshaker, NULL, &protector); if (result != TSI_OK) { gpr_log (GPR_ERROR, "Frame protector creation failed with error %s.", tsi_result_to_string (result)); - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } h->secure_endpoint = grpc_secure_endpoint_create (protector, h->wrapped_endpoint, h->left_overs.slices, h->left_overs.count); h->left_overs.count = 0; h->left_overs.length = 0; - security_handshake_done (h, 1, closure_list); + security_handshake_done (exec_ctx, h, 1); return; } @@ -130,19 +130,19 @@ check_peer (grpc_exec_ctx * exec_ctx, grpc_security_handshake * h) if (result != TSI_OK) { gpr_log (GPR_ERROR, "Peer extraction failed with error %s", tsi_result_to_string (result)); - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } peer_status = grpc_security_connector_check_peer (h->connector, peer, on_peer_checked, h); if (peer_status == GRPC_SECURITY_ERROR) { gpr_log (GPR_ERROR, "Peer check failed."); - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } else if (peer_status == GRPC_SECURITY_OK) { - on_peer_checked (h, peer_status, closure_list); + on_peer_checked (exec_ctx, h, peer_status); } } @@ -169,7 +169,7 @@ send_handshake_bytes_to_peer (grpc_exec_ctx * exec_ctx, grpc_security_handshake if (result != TSI_OK) { gpr_log (GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string (result)); - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } @@ -178,7 +178,7 @@ send_handshake_bytes_to_peer (grpc_exec_ctx * exec_ctx, grpc_security_handshake gpr_slice_buffer_add (&h->outgoing, to_send); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_write (h->wrapped_endpoint, &h->outgoing, &h->on_handshake_data_sent_to_peer, closure_list); + grpc_endpoint_write (exec_ctx, h->wrapped_endpoint, &h->outgoing, &h->on_handshake_data_sent_to_peer); } static void @@ -194,7 +194,7 @@ on_handshake_data_received_from_peer (grpc_exec_ctx * exec_ctx, void *handshake, if (!success) { gpr_log (GPR_ERROR, "Read failed."); - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } @@ -211,12 +211,12 @@ on_handshake_data_received_from_peer (grpc_exec_ctx * exec_ctx, void *handshake, /* We may need more data. */ if (result == TSI_INCOMPLETE_DATA) { - grpc_endpoint_read (h->wrapped_endpoint, &h->incoming, &h->on_handshake_data_received_from_peer, closure_list); + grpc_endpoint_read (exec_ctx, h->wrapped_endpoint, &h->incoming, &h->on_handshake_data_received_from_peer); return; } else { - send_handshake_bytes_to_peer (h, closure_list); + send_handshake_bytes_to_peer (exec_ctx, h); return; } } @@ -224,7 +224,7 @@ on_handshake_data_received_from_peer (grpc_exec_ctx * exec_ctx, void *handshake, if (result != TSI_OK) { gpr_log (GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string (result)); - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } @@ -233,7 +233,7 @@ on_handshake_data_received_from_peer (grpc_exec_ctx * exec_ctx, void *handshake, num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + h->incoming.count - i - 1; if (num_left_overs == 0) { - check_peer (h, closure_list); + check_peer (exec_ctx, h); return; } @@ -244,7 +244,7 @@ on_handshake_data_received_from_peer (grpc_exec_ctx * exec_ctx, void *handshake, gpr_slice_unref (h->incoming.slices[i]); /* split_tail above increments refcount. */ } gpr_slice_buffer_addn (&h->left_overs, &h->incoming.slices[i + 1], num_left_overs - (size_t) has_left_overs_in_current_slice); - check_peer (h, closure_list); + check_peer (exec_ctx, h); } /* If handshake is NULL, the handshake is done. */ @@ -258,7 +258,7 @@ on_handshake_data_sent_to_peer (grpc_exec_ctx * exec_ctx, void *handshake, int s { gpr_log (GPR_ERROR, "Write failed."); if (handshake != NULL) - security_handshake_done (h, 0, closure_list); + security_handshake_done (exec_ctx, h, 0); return; } @@ -267,11 +267,11 @@ on_handshake_data_sent_to_peer (grpc_exec_ctx * exec_ctx, void *handshake, int s { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_read (h->wrapped_endpoint, &h->incoming, &h->on_handshake_data_received_from_peer, closure_list); + grpc_endpoint_read (exec_ctx, h->wrapped_endpoint, &h->incoming, &h->on_handshake_data_received_from_peer); } else { - check_peer (h, closure_list); + check_peer (exec_ctx, h); } } @@ -292,5 +292,5 @@ grpc_do_security_handshake (grpc_exec_ctx * exec_ctx, tsi_handshaker * handshake gpr_slice_buffer_init (&h->left_overs); gpr_slice_buffer_init (&h->outgoing); gpr_slice_buffer_init (&h->incoming); - send_handshake_bytes_to_peer (h, closure_list); + send_handshake_bytes_to_peer (exec_ctx, h); } diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c index dcc751f143..7a45ff9343 100644 --- a/src/core/security/jwt_verifier.c +++ b/src/core/security/jwt_verifier.c @@ -774,7 +774,7 @@ on_openid_config_retrieved (grpc_exec_ctx * exec_ctx, void *user_data, const grp { *(req.host + (req.path - jwks_uri)) = '\0'; } - grpc_httpcli_get (&ctx->verifier->http_ctx, ctx->pollset, &req, gpr_time_add (gpr_now (GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), on_keys_retrieved, ctx, closure_list); + grpc_httpcli_get (&ctx->verifier->http_ctx, ctx->pollset, &req, gpr_time_add (gpr_now (exec_ctx, GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), on_keys_retrieved, ctx); grpc_json_destroy (json); gpr_free (req.host); return; @@ -893,7 +893,7 @@ retrieve_key_and_verify (grpc_exec_ctx * exec_ctx, verifier_cb_ctx * ctx) http_cb = on_openid_config_retrieved; } - grpc_httpcli_get (&ctx->verifier->http_ctx, ctx->pollset, &req, gpr_time_add (gpr_now (GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), http_cb, ctx, closure_list); + grpc_httpcli_get (&ctx->verifier->http_ctx, ctx->pollset, &req, gpr_time_add (gpr_now (exec_ctx, GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), http_cb, ctx); gpr_free (req.host); gpr_free (req.path); return; @@ -943,7 +943,7 @@ grpc_jwt_verifier_verify (grpc_exec_ctx * exec_ctx, grpc_jwt_verifier * verifier signature = grpc_base64_decode (cur, 1); if (GPR_SLICE_IS_EMPTY (signature)) goto error; - retrieve_key_and_verify (verifier_cb_ctx_create (verifier, pollset, header, claims, audience, signature, jwt, signed_jwt_len, user_data, cb), closure_list); + retrieve_key_and_verify (verifier_cb_ctx_create (exec_ctx, verifier, pollset, header, claims, audience, signature, jwt, signed_jwt_len, user_data, cb)); return; error: diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 966dfbb666..7dee1c1edb 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -72,7 +72,7 @@ static void destroy (grpc_exec_ctx * exec_ctx, secure_endpoint * secure_ep) { secure_endpoint *ep = secure_ep; - grpc_endpoint_destroy (ep->wrapped_ep, closure_list); + grpc_endpoint_destroy (exec_ctx, ep->wrapped_ep); tsi_frame_protector_destroy (ep->protector); gpr_slice_buffer_destroy (&ep->leftover_bytes); gpr_slice_unref (ep->read_staging_buffer); @@ -95,7 +95,7 @@ secure_endpoint_unref (secure_endpoint * ep, grpc_closure_list * closure_list, c gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d", ep, reason, ep->ref.count, ep->ref.count - 1); if (gpr_unref (&ep->ref)) { - destroy (ep, closure_list); + destroy (exec_ctx, ep); } } @@ -113,7 +113,7 @@ secure_endpoint_unref (grpc_exec_ctx * exec_ctx, secure_endpoint * ep) { if (gpr_unref (&ep->ref)) { - destroy (ep, closure_list); + destroy (exec_ctx, ep); } } @@ -149,7 +149,7 @@ call_read_cb (grpc_exec_ctx * exec_ctx, secure_endpoint * ep, int success) } ep->read_buffer = NULL; grpc_closure_list_add (closure_list, ep->read_cb, success); - SECURE_ENDPOINT_UNREF (ep, "read", closure_list); + SECURE_ENDPOINT_UNREF (exec_ctx, ep, "read"); } static void @@ -165,7 +165,7 @@ on_read (grpc_exec_ctx * exec_ctx, void *user_data, int success) if (!success) { gpr_slice_buffer_reset_and_unref (ep->read_buffer); - call_read_cb (ep, 0, closure_list); + call_read_cb (exec_ctx, ep, 0); return; } @@ -226,11 +226,11 @@ on_read (grpc_exec_ctx * exec_ctx, void *user_data, int success) if (result != TSI_OK) { gpr_slice_buffer_reset_and_unref (ep->read_buffer); - call_read_cb (ep, 0, closure_list); + call_read_cb (exec_ctx, ep, 0); return; } - call_read_cb (ep, 1, closure_list); + call_read_cb (exec_ctx, ep, 1); } static void @@ -246,11 +246,11 @@ endpoint_read (grpc_exec_ctx * exec_ctx, grpc_endpoint * secure_ep, gpr_slice_bu { gpr_slice_buffer_swap (&ep->leftover_bytes, &ep->source_buffer); GPR_ASSERT (ep->leftover_bytes.count == 0); - on_read (ep, 1, closure_list); + on_read (exec_ctx, ep, 1); return; } - grpc_endpoint_read (ep->wrapped_ep, &ep->source_buffer, &ep->on_read, closure_list); + grpc_endpoint_read (exec_ctx, ep->wrapped_ep, &ep->source_buffer, &ep->on_read); } static void @@ -344,35 +344,35 @@ endpoint_write (grpc_exec_ctx * exec_ctx, grpc_endpoint * secure_ep, gpr_slice_b return; } - grpc_endpoint_write (ep->wrapped_ep, &ep->output_buffer, cb, closure_list); + grpc_endpoint_write (exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb); } static void endpoint_shutdown (grpc_exec_ctx * exec_ctx, grpc_endpoint * secure_ep) { secure_endpoint *ep = (secure_endpoint *) secure_ep; - grpc_endpoint_shutdown (ep->wrapped_ep, closure_list); + grpc_endpoint_shutdown (exec_ctx, ep->wrapped_ep); } static void endpoint_destroy (grpc_exec_ctx * exec_ctx, grpc_endpoint * secure_ep) { secure_endpoint *ep = (secure_endpoint *) secure_ep; - SECURE_ENDPOINT_UNREF (ep, "destroy", closure_list); + SECURE_ENDPOINT_UNREF (exec_ctx, ep, "destroy"); } static void endpoint_add_to_pollset (grpc_exec_ctx * exec_ctx, grpc_endpoint * secure_ep, grpc_pollset * pollset) { secure_endpoint *ep = (secure_endpoint *) secure_ep; - grpc_endpoint_add_to_pollset (ep->wrapped_ep, pollset, closure_list); + grpc_endpoint_add_to_pollset (exec_ctx, ep->wrapped_ep, pollset); } static void endpoint_add_to_pollset_set (grpc_exec_ctx * exec_ctx, grpc_endpoint * secure_ep, grpc_pollset_set * pollset_set) { secure_endpoint *ep = (secure_endpoint *) secure_ep; - grpc_endpoint_add_to_pollset_set (ep->wrapped_ep, pollset_set, closure_list); + grpc_endpoint_add_to_pollset_set (exec_ctx, ep->wrapped_ep, pollset_set); } static char * diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 91462c52cd..729de29d49 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -114,11 +114,11 @@ grpc_security_connector_do_handshake (grpc_exec_ctx * exec_ctx, grpc_security_co { if (sc == NULL || nonsecure_endpoint == NULL) { - cb (user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL, closure_list); + cb (exec_ctx, user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL); } else { - sc->vtable->do_handshake (sc, nonsecure_endpoint, cb, user_data, closure_list); + sc->vtable->do_handshake (exec_ctx, sc, nonsecure_endpoint, cb, user_data); } } @@ -138,7 +138,7 @@ grpc_channel_security_connector_check_call_host (grpc_exec_ctx * exec_ctx, grpc_ { if (sc == NULL || sc->check_call_host == NULL) return GRPC_SECURITY_ERROR; - return sc->check_call_host (sc, host, cb, user_data, closure_list); + return sc->check_call_host (exec_ctx, sc, host, cb, user_data); } #ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG @@ -303,7 +303,7 @@ fake_channel_check_call_host (grpc_exec_ctx * exec_ctx, grpc_channel_security_co grpc_fake_channel_security_connector *c = (grpc_fake_channel_security_connector *) sc; if (c->call_host_check_is_async) { - cb (user_data, GRPC_SECURITY_OK, closure_list); + cb (exec_ctx, user_data, GRPC_SECURITY_OK); return GRPC_SECURITY_PENDING; } else @@ -315,13 +315,13 @@ fake_channel_check_call_host (grpc_exec_ctx * exec_ctx, grpc_channel_security_co static void fake_channel_do_handshake (grpc_exec_ctx * exec_ctx, grpc_security_connector * sc, grpc_endpoint * nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { - grpc_do_security_handshake (tsi_create_fake_handshaker (1), sc, nonsecure_endpoint, cb, user_data, closure_list); + grpc_do_security_handshake (tsi_create_fake_handshaker (exec_ctx, 1), sc, nonsecure_endpoint, cb, user_data); } static void fake_server_do_handshake (grpc_exec_ctx * exec_ctx, grpc_security_connector * sc, grpc_endpoint * nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { - grpc_do_security_handshake (tsi_create_fake_handshaker (0), sc, nonsecure_endpoint, cb, user_data, closure_list); + grpc_do_security_handshake (tsi_create_fake_handshaker (exec_ctx, 0), sc, nonsecure_endpoint, cb, user_data); } static grpc_security_connector_vtable fake_channel_vtable = { @@ -432,11 +432,11 @@ ssl_channel_do_handshake (grpc_exec_ctx * exec_ctx, grpc_security_connector * sc &handshaker); if (status != GRPC_SECURITY_OK) { - cb (user_data, status, nonsecure_endpoint, NULL, closure_list); + cb (exec_ctx, user_data, status, nonsecure_endpoint, NULL); } else { - grpc_do_security_handshake (handshaker, sc, nonsecure_endpoint, cb, user_data, closure_list); + grpc_do_security_handshake (exec_ctx, handshaker, sc, nonsecure_endpoint, cb, user_data); } } @@ -448,11 +448,11 @@ ssl_server_do_handshake (grpc_exec_ctx * exec_ctx, grpc_security_connector * sc, grpc_security_status status = ssl_create_handshaker (c->handshaker_factory, 0, NULL, &handshaker); if (status != GRPC_SECURITY_OK) { - cb (user_data, status, nonsecure_endpoint, NULL, closure_list); + cb (exec_ctx, user_data, status, nonsecure_endpoint, NULL); } else { - grpc_do_security_handshake (handshaker, sc, nonsecure_endpoint, cb, user_data, closure_list); + grpc_do_security_handshake (exec_ctx, handshaker, sc, nonsecure_endpoint, cb, user_data); } } diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 86a5f9cb3e..77239e782e 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -175,7 +175,7 @@ auth_on_recv (grpc_exec_ctx * exec_ctx, void *user_data, int success) return; } } - calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list); + calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success); } static void @@ -202,7 +202,7 @@ static void auth_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) { set_recv_ops_md_callbacks (elem, op); - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } /* Constructor for call_data */ diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index b11cf1079a..5b67f51b7d 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -104,7 +104,7 @@ setup_transport (grpc_exec_ctx * exec_ctx, void *statep, grpc_transport * transp args_to_add[0] = grpc_security_connector_to_arg (state->sc); args_to_add[1] = grpc_auth_metadata_processor_to_arg (&state->creds->processor); args_copy = grpc_channel_args_copy_and_add (grpc_server_get_channel_args (state->server), args_to_add, GPR_ARRAY_SIZE (args_to_add)); - grpc_server_setup_transport (state->server, transport, extra_filters, GPR_ARRAY_SIZE (extra_filters), mdctx, args_copy, closure_list); + grpc_server_setup_transport (state->server, transport, extra_filters, GPR_ARRAY_SIZE (exec_ctx, extra_filters), mdctx, args_copy); grpc_channel_args_destroy (args_copy); } @@ -146,15 +146,15 @@ on_secure_handshake_done (grpc_exec_ctx * exec_ctx, void *statep, grpc_security_ if (!state->is_shutdown) { mdctx = grpc_mdctx_create (); - transport = grpc_create_chttp2_transport (grpc_server_get_channel_args (state->server), secure_endpoint, mdctx, 0, closure_list); - setup_transport (state, transport, mdctx, closure_list); - grpc_chttp2_transport_start_reading (transport, NULL, 0, closure_list); + transport = grpc_create_chttp2_transport (grpc_server_get_channel_args (exec_ctx, state->server), secure_endpoint, mdctx, 0); + setup_transport (exec_ctx, state, transport, mdctx); + grpc_chttp2_transport_start_reading (exec_ctx, transport, NULL, 0); } else { /* We need to consume this here, because the server may already have gone * away. */ - grpc_endpoint_destroy (secure_endpoint, closure_list); + grpc_endpoint_destroy (exec_ctx, secure_endpoint); } gpr_mu_unlock (&state->mu); } @@ -180,7 +180,7 @@ on_accept (grpc_exec_ctx * exec_ctx, void *statep, grpc_endpoint * tcp) node->next = state->handshaking_tcp_endpoints; state->handshaking_tcp_endpoints = node; gpr_mu_unlock (&state->mu); - grpc_security_connector_do_handshake (state->sc, tcp, on_secure_handshake_done, state, closure_list); + grpc_security_connector_do_handshake (exec_ctx, state->sc, tcp, on_secure_handshake_done, state); } /* Server callback: start listening on our ports */ @@ -188,18 +188,18 @@ static void start (grpc_exec_ctx * exec_ctx, grpc_server * server, void *statep, grpc_pollset ** pollsets, size_t pollset_count) { grpc_server_secure_state *state = statep; - grpc_tcp_server_start (state->tcp, pollsets, pollset_count, on_accept, state, closure_list); + grpc_tcp_server_start (exec_ctx, state->tcp, pollsets, pollset_count, on_accept, state); } static void destroy_done (grpc_exec_ctx * exec_ctx, void *statep, int success) { grpc_server_secure_state *state = statep; - state->destroy_callback->cb (state->destroy_callback->cb_arg, success, closure_list); + state->destroy_callback->cb (exec_ctx, state->destroy_callback->cb_arg, success); gpr_mu_lock (&state->mu); while (state->handshaking_tcp_endpoints != NULL) { - grpc_endpoint_shutdown (state->handshaking_tcp_endpoints->tcp_endpoint, closure_list); + grpc_endpoint_shutdown (exec_ctx, state->handshaking_tcp_endpoints->tcp_endpoint); remove_tcp_from_list_locked (state, state->handshaking_tcp_endpoints->tcp_endpoint); } gpr_mu_unlock (&state->mu); @@ -219,7 +219,7 @@ destroy (grpc_exec_ctx * exec_ctx, grpc_server * server, void *statep, grpc_clos tcp = state->tcp; gpr_mu_unlock (&state->mu); grpc_closure_init (&state->destroy_closure, destroy_done, state); - grpc_tcp_server_destroy (tcp, &state->destroy_closure, closure_list); + grpc_tcp_server_destroy (exec_ctx, tcp, &state->destroy_closure); } int diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5360abdfa9..de47bc3808 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -417,7 +417,7 @@ grpc_call_set_completion_queue (grpc_exec_ctx * exec_ctx, grpc_call * call, grpc { GRPC_CQ_INTERNAL_REF (cq, "bind"); } - unlock (call, closure_list); + unlock (exec_ctx, call); } grpc_completion_queue * @@ -452,7 +452,7 @@ done_completion (grpc_exec_ctx * exec_ctx, void *call, grpc_cq_completion * comp gpr_mu_lock (&c->completion_mu); c->allocated_completions &= (gpr_uint8) ~ (1u << (completion - c->completions)); gpr_mu_unlock (&c->completion_mu); - GRPC_CALL_INTERNAL_UNREF (c, "completion", closure_list); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, c, "completion"); } #ifdef GRPC_CALL_REF_COUNT_DEBUG @@ -473,8 +473,8 @@ destroy_call (grpc_exec_ctx * exec_ctx, grpc_call * call) { size_t i; grpc_call *c = call; - grpc_call_stack_destroy (CALL_STACK_FROM_CALL (c), closure_list); - GRPC_CHANNEL_INTERNAL_UNREF (c->channel, "call", closure_list); + grpc_call_stack_destroy (CALL_STACK_FROM_CALL (exec_ctx, c)); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->channel, "call"); gpr_mu_destroy (&c->mu); gpr_mu_destroy (&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) @@ -527,7 +527,7 @@ grpc_call_internal_unref (grpc_exec_ctx * exec_ctx, grpc_call * c) #endif if (gpr_unref (&c->internal_refcount)) { - destroy_call (c, closure_list); + destroy_call (exec_ctx, c); } } @@ -715,19 +715,19 @@ unlock (grpc_exec_ctx * exec_ctx, grpc_call * call) if (start_op) { - execute_op (call, &op, closure_list); + execute_op (exec_ctx, call, &op); } if (completing_requests > 0) { for (i = 0; i < completing_requests; i++) { - completed_requests[i].on_complete (call, completed_requests[i].success, completed_requests[i].user_data, closure_list); + completed_requests[i].on_complete (exec_ctx, call, completed_requests[i].success, completed_requests[i].user_data); } lock (call); call->completing = 0; - unlock (call, closure_list); - GRPC_CALL_INTERNAL_UNREF (call, "completing", closure_list); + unlock (exec_ctx, call); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, call, "completing"); } } @@ -917,8 +917,8 @@ call_on_done_send (grpc_exec_ctx * exec_ctx, void *pc, int success) call->send_ops.nops = 0; call->last_send_contains = 0; call->sending = 0; - unlock (call, closure_list); - GRPC_CALL_INTERNAL_UNREF (call, "sending", closure_list); + unlock (exec_ctx, call); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, call, "sending"); } static void @@ -1055,7 +1055,7 @@ call_on_done_recv (grpc_exec_ctx * exec_ctx, void *pc, int success) case GRPC_NO_OP: break; case GRPC_OP_METADATA: - recv_metadata (call, &op->data.metadata, closure_list); + recv_metadata (exec_ctx, call, &op->data.metadata); break; case GRPC_OP_BEGIN_MESSAGE: success = begin_message (call, op->data.begin_message); @@ -1080,7 +1080,7 @@ call_on_done_recv (grpc_exec_ctx * exec_ctx, void *pc, int success) call->read_state = READ_STATE_STREAM_CLOSED; if (call->have_alarm) { - grpc_alarm_cancel (&call->alarm, closure_list); + grpc_alarm_cancel (exec_ctx, &call->alarm); } /* propagate cancellation to any interested children */ child_call = call->first_child; @@ -1093,13 +1093,13 @@ call_on_done_recv (grpc_exec_ctx * exec_ctx, void *pc, int success) { GRPC_CALL_INTERNAL_REF (child_call, "propagate_cancel"); grpc_call_cancel (child_call, NULL); - GRPC_CALL_INTERNAL_UNREF (child_call, "propagate_cancel", closure_list); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, child_call, "propagate_cancel"); } child_call = next_child_call; } while (child_call != call->first_child); } - GRPC_CALL_INTERNAL_UNREF (call, "closed", closure_list); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, call, "closed"); } finish_read_ops (call); } @@ -1113,9 +1113,9 @@ call_on_done_recv (grpc_exec_ctx * exec_ctx, void *pc, int success) finish_ioreq_op (call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0); } call->recv_ops.nops = 0; - unlock (call, closure_list); + unlock (exec_ctx, call); - GRPC_CALL_INTERNAL_UNREF (call, "receiving", closure_list); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, call, "receiving"); GRPC_TIMER_END (GRPC_PTAG_CALL_ON_DONE_RECV, 0); } @@ -1386,7 +1386,7 @@ grpc_call_start_ioreq_and_call_back (grpc_exec_ctx * exec_ctx, grpc_call * call, grpc_call_error err; lock (call); err = start_ioreq (call, reqs, nreqs, on_complete, user_data); - unlock (call, closure_list); + unlock (exec_ctx, call); return err; } @@ -1467,7 +1467,7 @@ cancel_with_status (grpc_call * c, grpc_status_code status, const char *descript static void finished_loose_op (grpc_exec_ctx * exec_ctx, void *call, int success_ignored) { - GRPC_CALL_INTERNAL_UNREF (call, "loose-op", closure_list); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, call, "loose-op"); } typedef struct @@ -1480,7 +1480,7 @@ static void finished_loose_op_allocated (grpc_exec_ctx * exec_ctx, void *alloc, int success) { finished_loose_op_allocated_args *args = alloc; - finished_loose_op (args->call, success, closure_list); + finished_loose_op (exec_ctx, args->call, success); gpr_free (args); } @@ -1508,7 +1508,7 @@ execute_op (grpc_exec_ctx * exec_ctx, grpc_call * call, grpc_transport_stream_op elem = CALL_ELEM_FROM_CALL (call, 0); op->context = call->context; - elem->filter->start_transport_stream_op (elem, op, closure_list); + elem->filter->start_transport_stream_op (exec_ctx, elem, op); } char * @@ -1538,8 +1538,8 @@ call_alarm (grpc_exec_ctx * exec_ctx, void *arg, int success) cancel_with_status (call, GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded"); } finish_read_ops (call); - unlock (call, closure_list); - GRPC_CALL_INTERNAL_UNREF (call, "alarm", closure_list); + unlock (exec_ctx, call); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, call, "alarm"); } static void @@ -1554,7 +1554,7 @@ set_deadline_alarm (grpc_exec_ctx * exec_ctx, grpc_call * call, gpr_timespec dea GRPC_CALL_INTERNAL_REF (call, "alarm"); call->have_alarm = 1; call->send_deadline = gpr_convert_clock_type (deadline, GPR_CLOCK_MONOTONIC); - grpc_alarm_init (&call->alarm, call->send_deadline, call_alarm, call, gpr_now (GPR_CLOCK_MONOTONIC), closure_list); + grpc_alarm_init (&call->alarm, call->send_deadline, call_alarm, call, gpr_now (exec_ctx, GPR_CLOCK_MONOTONIC)); } /* we offset status by a small amount when storing it into transport metadata @@ -1669,7 +1669,7 @@ recv_metadata (grpc_exec_ctx * exec_ctx, grpc_call * call, grpc_metadata_batch * } if (gpr_time_cmp (md->deadline, gpr_inf_future (md->deadline.clock_type)) != 0 && !call->is_client) { - set_deadline_alarm (call, md->deadline, closure_list); + set_deadline_alarm (exec_ctx, call, md->deadline); } if (!is_trailing) { @@ -1714,13 +1714,13 @@ set_cancelled_value (grpc_status_code status, void *dest) static void finish_batch (grpc_exec_ctx * exec_ctx, grpc_call * call, int success, void *tag) { - grpc_cq_end_op (call->cq, tag, success, done_completion, call, allocate_completion (call), closure_list); + grpc_cq_end_op (call->cq, tag, success, done_completion, call, allocate_completion (exec_ctx, call)); } static void finish_batch_with_close (grpc_exec_ctx * exec_ctx, grpc_call * call, int success, void *tag) { - grpc_cq_end_op (call->cq, tag, 1, done_completion, call, allocate_completion (call), closure_list); + grpc_cq_end_op (call->cq, tag, 1, done_completion, call, allocate_completion (exec_ctx, call)); } static int diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 29a3e11354..b1f3be8521 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -107,14 +107,14 @@ extern "C" void grpc_call_internal_unref (grpc_exec_ctx * exec_ctx, grpc_call * call, const char *reason); #define GRPC_CALL_INTERNAL_REF(call, reason) \ grpc_call_internal_ref(call, reason) -#define GRPC_CALL_INTERNAL_UNREF(call, reason, closure_list) \ - grpc_call_internal_unref(call, reason, closure_list) +#define GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, reason) \ + grpc_call_internal_unref(exec_ctx, call, reason) #else void grpc_call_internal_ref (grpc_call * call); void grpc_call_internal_unref (grpc_exec_ctx * exec_ctx, grpc_call * call); #define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call) -#define GRPC_CALL_INTERNAL_UNREF(call, reason, closure_list) \ - grpc_call_internal_unref(call, closure_list) +#define GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, reason) \ + grpc_call_internal_unref(exec_ctx, call) #endif grpc_call_error grpc_call_start_ioreq_and_call_back (grpc_exec_ctx * exec_ctx, grpc_call * call, const grpc_ioreq * reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 82a463dd75..2d120cf807 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -187,7 +187,7 @@ grpc_channel_create_from_filters (grpc_exec_ctx * exec_ctx, const char *target, gpr_free (default_authority); } - grpc_channel_stack_init (filters, num_filters, channel, args, channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL (channel), closure_list); + grpc_channel_stack_init (filters, num_filters, channel, args, channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL (exec_ctx, channel)); return channel; } @@ -265,7 +265,7 @@ static void destroy_channel (grpc_exec_ctx * exec_ctx, grpc_channel * channel) { size_t i; - grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CHANNEL (channel), closure_list); + grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CHANNEL (exec_ctx, channel)); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { GRPC_MDELEM_UNREF (channel->grpc_status_elem[i]); @@ -309,7 +309,7 @@ grpc_channel_internal_unref (grpc_exec_ctx * exec_ctx, grpc_channel * channel) #endif if (gpr_unref (&channel->refs)) { - destroy_channel (channel, closure_list); + destroy_channel (exec_ctx, channel); } } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 982986e966..678aa5ba56 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -61,15 +61,15 @@ void grpc_channel_internal_ref (grpc_channel * channel, const char *reason); void grpc_channel_internal_unref (grpc_exec_ctx * exec_ctx, grpc_channel * channel, const char *reason); #define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \ grpc_channel_internal_ref(channel, reason) -#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, closure_list) \ - grpc_channel_internal_unref(channel, reason, closure_list) +#define GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, reason) \ + grpc_channel_internal_unref(exec_ctx, channel, reason) #else void grpc_channel_internal_ref (grpc_channel * channel); void grpc_channel_internal_unref (grpc_exec_ctx * exec_ctx, grpc_channel * channel); #define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \ grpc_channel_internal_ref(channel) -#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, closure_list) \ - grpc_channel_internal_unref(channel, closure_list) +#define GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, reason) \ + grpc_channel_internal_unref(exec_ctx, channel) #endif #endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 7c174aa47f..b72be303d1 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -83,7 +83,7 @@ typedef struct static void delete_state_watcher (grpc_exec_ctx * exec_ctx, state_watcher * w) { - GRPC_CHANNEL_INTERNAL_UNREF (w->channel, "watch_connectivity", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, w->channel, "watch_connectivity"); gpr_mu_destroy (&w->mu); gpr_free (w); } @@ -112,7 +112,7 @@ finished_completion (grpc_exec_ctx * exec_ctx, void *pw, grpc_cq_completion * ig if (delete) { - delete_state_watcher (w, closure_list); + delete_state_watcher (exec_ctx, w); } } @@ -127,7 +127,7 @@ partly_done (grpc_exec_ctx * exec_ctx, state_watcher * w, int due_to_completion) { w->removed = 1; client_channel_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (w->channel)); - grpc_client_channel_del_interested_party (client_channel_elem, grpc_cq_pollset (w->cq), closure_list); + grpc_client_channel_del_interested_party (client_channel_elem, grpc_cq_pollset (exec_ctx, w->cq)); } gpr_mu_unlock (&w->mu); if (due_to_completion) @@ -135,7 +135,7 @@ partly_done (grpc_exec_ctx * exec_ctx, state_watcher * w, int due_to_completion) gpr_mu_lock (&w->mu); w->success = 1; gpr_mu_unlock (&w->mu); - grpc_alarm_cancel (&w->alarm, closure_list); + grpc_alarm_cancel (exec_ctx, &w->alarm); } gpr_mu_lock (&w->mu); @@ -143,7 +143,7 @@ partly_done (grpc_exec_ctx * exec_ctx, state_watcher * w, int due_to_completion) { case WAITING: w->phase = CALLING_BACK; - grpc_cq_end_op (w->cq, w->tag, w->success, finished_completion, w, &w->completion_storage, closure_list); + grpc_cq_end_op (exec_ctx, w->cq, w->tag, w->success, finished_completion, w, &w->completion_storage); break; case CALLING_BACK: w->phase = CALLING_BACK_AND_FINISHED; @@ -160,20 +160,20 @@ partly_done (grpc_exec_ctx * exec_ctx, state_watcher * w, int due_to_completion) if (delete) { - delete_state_watcher (w, closure_list); + delete_state_watcher (exec_ctx, w); } } static void watch_complete (grpc_exec_ctx * exec_ctx, void *pw, int success) { - partly_done (pw, 1, closure_list); + partly_done (exec_ctx, pw, 1); } static void timeout_complete (grpc_exec_ctx * exec_ctx, void *pw, int success) { - partly_done (pw, 0, closure_list); + partly_done (exec_ctx, pw, 0); } void diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index a8c9f1749f..2f597563ab 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -90,8 +90,8 @@ connected (grpc_exec_ctx * exec_ctx, void *arg, int success) grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { - c->result->transport = grpc_create_chttp2_transport (c->args.channel_args, tcp, c->mdctx, 1, closure_list); - grpc_chttp2_transport_start_reading (c->result->transport, NULL, 0, closure_list); + c->result->transport = grpc_create_chttp2_transport (exec_ctx, c->args.channel_args, tcp, c->mdctx, 1); + grpc_chttp2_transport_start_reading (exec_ctx, c->result->transport, NULL, 0); GPR_ASSERT (c->result->transport); c->result->filters = gpr_malloc (sizeof (grpc_channel_filter *)); c->result->filters[0] = &grpc_http_client_filter; @@ -103,7 +103,7 @@ connected (grpc_exec_ctx * exec_ctx, void *arg, int success) } notify = c->notify; c->notify = NULL; - notify->cb (notify->cb_arg, 1, closure_list); + notify->cb (exec_ctx, notify->cb_arg, 1); } static void @@ -122,7 +122,7 @@ connector_connect (grpc_exec_ctx * exec_ctx, grpc_connector * con, const grpc_co c->result = result; c->tcp = NULL; grpc_closure_init (&c->connected, connected, c); - grpc_tcp_client_connect (&c->connected, &c->tcp, args->interested_parties, args->addr, args->addr_len, args->deadline, closure_list); + grpc_tcp_client_connect (exec_ctx, &c->connected, &c->tcp, args->interested_parties, args->addr, args->addr_len, args->deadline); } static const grpc_connector_vtable connector_vtable = { @@ -151,7 +151,7 @@ subchannel_factory_unref (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * sc subchannel_factory *f = (subchannel_factory *) scf; if (gpr_unref (&f->refs)) { - GRPC_CHANNEL_INTERNAL_UNREF (f->master, "subchannel_factory", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, f->master, "subchannel_factory"); grpc_channel_args_destroy (f->merge_args); grpc_mdctx_unref (f->mdctx); gpr_free (f); @@ -174,7 +174,7 @@ subchannel_factory_create_subchannel (grpc_exec_ctx * exec_ctx, grpc_subchannel_ args->args = final_args; args->master = f->master; s = grpc_subchannel_create (&c->base, args); - grpc_connector_unref (&c->base, closure_list); + grpc_connector_unref (exec_ctx, &c->base); grpc_channel_args_destroy (final_args); return s; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 2410ad6fb5..5cc07261bf 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -181,7 +181,7 @@ grpc_cq_end_op (grpc_completion_queue * cc, void *tag, int success, void (*done) GPR_ASSERT (cc->shutdown_called); cc->shutdown = 1; gpr_mu_unlock (GRPC_POLLSET_MU (&cc->pollset)); - grpc_pollset_shutdown (&cc->pollset, &cc->pollset_destroy_done, closure_list); + grpc_pollset_shutdown (exec_ctx, &cc->pollset, &cc->pollset_destroy_done); } } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index be3a308f0f..e2a8e191ad 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -65,7 +65,7 @@ lame_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * el if (op->send_ops != NULL) { grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb (op->on_done_send->cb_arg, 0, closure_list); + op->on_done_send->cb (exec_ctx, op->on_done_send->cb_arg, 0); } if (op->recv_ops != NULL) { @@ -83,11 +83,11 @@ lame_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * el mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME); grpc_sopb_add_metadata (op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb (op->on_done_recv->cb_arg, 1, closure_list); + op->on_done_recv->cb (exec_ctx, op->on_done_recv->cb_arg, 1); } if (op->on_consumed != NULL) { - op->on_consumed->cb (op->on_consumed->cb_arg, 0, closure_list); + op->on_consumed->cb (exec_ctx, op->on_consumed->cb_arg, 0); } } @@ -105,11 +105,11 @@ lame_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, { GPR_ASSERT (*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; - op->on_connectivity_state_change->cb (op->on_connectivity_state_change->cb_arg, 1, closure_list); + op->on_connectivity_state_change->cb (exec_ctx, op->on_connectivity_state_change->cb_arg, 1); } if (op->on_consumed != NULL) { - op->on_consumed->cb (op->on_consumed->cb_arg, 1, closure_list); + op->on_consumed->cb (exec_ctx, op->on_consumed->cb_arg, 1); } } @@ -118,7 +118,7 @@ init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void * { if (initial_op) { - grpc_transport_stream_op_finish_with_failure (initial_op, closure_list); + grpc_transport_stream_op_finish_with_failure (exec_ctx, initial_op); } } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index b65b03a002..1290308992 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -113,8 +113,8 @@ on_secure_handshake_done (grpc_exec_ctx * exec_ctx, void *arg, grpc_security_sta GPR_ASSERT (c->connecting_endpoint == wrapped_endpoint); c->connecting_endpoint = NULL; gpr_mu_unlock (&c->mu); - c->result->transport = grpc_create_chttp2_transport (c->args.channel_args, secure_endpoint, c->mdctx, 1, closure_list); - grpc_chttp2_transport_start_reading (c->result->transport, NULL, 0, closure_list); + c->result->transport = grpc_create_chttp2_transport (exec_ctx, c->args.channel_args, secure_endpoint, c->mdctx, 1); + grpc_chttp2_transport_start_reading (exec_ctx, c->result->transport, NULL, 0); c->result->filters = gpr_malloc (sizeof (grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_http_client_filter; c->result->filters[1] = &grpc_client_auth_filter; @@ -122,7 +122,7 @@ on_secure_handshake_done (grpc_exec_ctx * exec_ctx, void *arg, grpc_security_sta } notify = c->notify; c->notify = NULL; - notify->cb (notify->cb_arg, 1, closure_list); + notify->cb (exec_ctx, notify->cb_arg, 1); } static void @@ -137,14 +137,14 @@ connected (grpc_exec_ctx * exec_ctx, void *arg, int success) GPR_ASSERT (c->connecting_endpoint == NULL); c->connecting_endpoint = tcp; gpr_mu_unlock (&c->mu); - grpc_security_connector_do_handshake (&c->security_connector->base, tcp, on_secure_handshake_done, c, closure_list); + grpc_security_connector_do_handshake (exec_ctx, &c->security_connector->base, tcp, on_secure_handshake_done, c); } else { memset (c->result, 0, sizeof (*c->result)); notify = c->notify; c->notify = NULL; - notify->cb (notify->cb_arg, 1, closure_list); + notify->cb (exec_ctx, notify->cb_arg, 1); } } @@ -159,7 +159,7 @@ connector_shutdown (grpc_exec_ctx * exec_ctx, grpc_connector * con) gpr_mu_unlock (&c->mu); if (ep) { - grpc_endpoint_shutdown (ep, closure_list); + grpc_endpoint_shutdown (exec_ctx, ep); } } @@ -176,7 +176,7 @@ connector_connect (grpc_exec_ctx * exec_ctx, grpc_connector * con, const grpc_co GPR_ASSERT (c->connecting_endpoint == NULL); gpr_mu_unlock (&c->mu); grpc_closure_init (&c->connected_closure, connected, c); - grpc_tcp_client_connect (&c->connected_closure, &c->newly_connecting_endpoint, args->interested_parties, args->addr, args->addr_len, args->deadline, closure_list); + grpc_tcp_client_connect (exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, args->interested_parties, args->addr, args->addr_len, args->deadline); } static const grpc_connector_vtable connector_vtable = { @@ -207,7 +207,7 @@ subchannel_factory_unref (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * sc if (gpr_unref (&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF (&f->security_connector->base, "subchannel_factory"); - GRPC_CHANNEL_INTERNAL_UNREF (f->master, "subchannel_factory", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, f->master, "subchannel_factory"); grpc_channel_args_destroy (f->merge_args); grpc_mdctx_unref (f->mdctx); gpr_free (f); @@ -231,7 +231,7 @@ subchannel_factory_create_subchannel (grpc_exec_ctx * exec_ctx, grpc_subchannel_ args->master = f->master; args->mdctx = f->mdctx; s = grpc_subchannel_create (&c->base, args); - grpc_connector_unref (&c->base, closure_list); + grpc_connector_unref (exec_ctx, &c->base); grpc_channel_args_destroy (final_args); return s; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 793d485dfb..fac93beaa4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -302,7 +302,7 @@ send_shutdown (grpc_exec_ctx * exec_ctx, grpc_channel * channel, int send_goaway op.on_consumed = &sc->closure; elem = grpc_channel_stack_element (grpc_channel_get_channel_stack (channel), 0); - elem->filter->start_transport_op (elem, &op, closure_list); + elem->filter->start_transport_op (exec_ctx, elem, &op); } static void @@ -312,8 +312,8 @@ channel_broadcaster_shutdown (grpc_exec_ctx * exec_ctx, channel_broadcaster * cb for (i = 0; i < cb->num_channels; i++) { - send_shutdown (cb->channels[i], send_goaway, force_disconnect, closure_list); - GRPC_CHANNEL_INTERNAL_UNREF (cb->channels[i], "broadcast", closure_list); + send_shutdown (exec_ctx, cb->channels[i], send_goaway, force_disconnect); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, cb->channels[i], "broadcast"); } gpr_free (cb->channels); } @@ -363,7 +363,7 @@ request_matcher_kill_requests (grpc_exec_ctx * exec_ctx, grpc_server * server, r int request_id; while ((request_id = gpr_stack_lockfree_pop (rm->requests)) != -1) { - fail_call (server, &server->requested_calls[request_id], closure_list); + fail_call (exec_ctx, server, &server->requested_calls[request_id]); } } @@ -412,7 +412,7 @@ server_unref (grpc_exec_ctx * exec_ctx, grpc_server * server) { if (gpr_unref (&server->internal_refcount)) { - server_delete (server, closure_list); + server_delete (exec_ctx, server); } } @@ -436,8 +436,8 @@ finish_destroy_channel (grpc_exec_ctx * exec_ctx, void *cd, int success) channel_data *chand = cd; grpc_server *server = chand->server; gpr_log (GPR_DEBUG, "finish_destroy_channel: %p", chand->channel); - GRPC_CHANNEL_INTERNAL_UNREF (chand->channel, "server", closure_list); - server_unref (server, closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, chand->channel, "server"); + server_unref (exec_ctx, server); } static void @@ -448,7 +448,7 @@ destroy_channel (grpc_exec_ctx * exec_ctx, channel_data * chand) GPR_ASSERT (chand->server != NULL); orphan_channel (chand); server_ref (chand->server); - maybe_finish_shutdown (chand->server, closure_list); + maybe_finish_shutdown (exec_ctx, chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; grpc_closure_list_add (closure_list, &chand->finish_destroy_channel_closure, 1); @@ -494,7 +494,7 @@ finish_start_new_rpc (grpc_exec_ctx * exec_ctx, grpc_server * server, grpc_call_ gpr_mu_lock (&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock (&calld->mu_state); - begin_call (server, calld, &server->requested_calls[request_id], closure_list); + begin_call (exec_ctx, server, calld, &server->requested_calls[request_id]); } } @@ -522,7 +522,7 @@ start_new_rpc (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc (server, elem, &rm->server_registered_method->request_matcher, closure_list); + finish_start_new_rpc (exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; } /* check for a wildcard method definition (no host set) */ @@ -536,11 +536,11 @@ start_new_rpc (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc (server, elem, &rm->server_registered_method->request_matcher, closure_list); + finish_start_new_rpc (exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; } } - finish_start_new_rpc (server, elem, &server->unregistered_request_matcher, closure_list); + finish_start_new_rpc (exec_ctx, server, elem, &server->unregistered_request_matcher); } static int @@ -558,7 +558,7 @@ num_listeners (grpc_server * server) static void done_shutdown_event (grpc_exec_ctx * exec_ctx, void *server, grpc_cq_completion * completion) { - server_unref (server, closure_list); + server_unref (exec_ctx, server); } static int @@ -577,12 +577,12 @@ static void kill_pending_work_locked (grpc_exec_ctx * exec_ctx, grpc_server * server) { registered_method *rm; - request_matcher_kill_requests (server, &server->unregistered_request_matcher, closure_list); - request_matcher_zombify_all_pending_calls (&server->unregistered_request_matcher, closure_list); + request_matcher_kill_requests (exec_ctx, server, &server->unregistered_request_matcher); + request_matcher_zombify_all_pending_calls (exec_ctx, &server->unregistered_request_matcher); for (rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests (server, &rm->request_matcher, closure_list); - request_matcher_zombify_all_pending_calls (&rm->request_matcher, closure_list); + request_matcher_kill_requests (exec_ctx, server, &rm->request_matcher); + request_matcher_zombify_all_pending_calls (exec_ctx, &rm->request_matcher); } } @@ -595,7 +595,7 @@ maybe_finish_shutdown (grpc_exec_ctx * exec_ctx, grpc_server * server) return; } - kill_pending_work_locked (server, closure_list); + kill_pending_work_locked (exec_ctx, server); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners (server)) { @@ -610,7 +610,7 @@ maybe_finish_shutdown (grpc_exec_ctx * exec_ctx, grpc_server * server) for (i = 0; i < server->num_shutdown_tags; i++) { server_ref (server); - grpc_cq_end_op (server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, done_shutdown_event, server, &server->shutdown_tags[i].completion, closure_list); + grpc_cq_end_op (exec_ctx, server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, done_shutdown_event, server, &server->shutdown_tags[i].completion); } } @@ -659,7 +659,7 @@ server_on_recv (grpc_exec_ctx * exec_ctx, void *ptr, int success) if (calld->host && calld->path) { calld->got_initial_metadata = 1; - start_new_rpc (elem, closure_list); + start_new_rpc (exec_ctx, elem); } break; } @@ -708,7 +708,7 @@ server_on_recv (grpc_exec_ctx * exec_ctx, void *ptr, int success) break; } - calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list); + calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success); } static void @@ -731,7 +731,7 @@ server_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * { GRPC_CALL_LOG_OP (GPR_INFO, elem, op); server_mutate_op (elem, op); - grpc_call_next_op (elem, op, closure_list); + grpc_call_next_op (exec_ctx, elem, op); } static void @@ -752,14 +752,14 @@ channel_connectivity_changed (grpc_exec_ctx * exec_ctx, void *cd, int iomgr_stat grpc_transport_op op; memset (&op, 0, sizeof (op)); op.on_connectivity_state_change = &chand->channel_connectivity_changed, op.connectivity_state = &chand->connectivity_state; - grpc_channel_next_op (grpc_channel_stack_element (grpc_channel_get_channel_stack (chand->channel), 0), &op, closure_list); + grpc_channel_next_op (grpc_channel_stack_element (grpc_channel_get_channel_stack (exec_ctx, chand->channel), 0), &op); } else { gpr_mu_lock (&server->mu_global); - destroy_channel (chand, closure_list); + destroy_channel (exec_ctx, chand); gpr_mu_unlock (&server->mu_global); - GRPC_CHANNEL_INTERNAL_UNREF (chand->channel, "connectivity", closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, chand->channel, "connectivity"); } } @@ -800,7 +800,7 @@ destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) gpr_mu_destroy (&calld->mu_state); - server_unref (chand->server, closure_list); + server_unref (exec_ctx, chand->server); } static void @@ -845,11 +845,11 @@ destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; - maybe_finish_shutdown (chand->server, closure_list); + maybe_finish_shutdown (exec_ctx, chand->server); gpr_mu_unlock (&chand->server->mu_global); GRPC_MDSTR_UNREF (chand->path_key); GRPC_MDSTR_UNREF (chand->authority_key); - server_unref (chand->server, closure_list); + server_unref (exec_ctx, chand->server); } } @@ -1031,10 +1031,10 @@ grpc_server_setup_transport (grpc_exec_ctx * exec_ctx, grpc_server * s, grpc_tra { memset (&op, 0, sizeof (op)); op.bind_pollset = grpc_cq_pollset (s->cqs[i]); - grpc_transport_perform_op (transport, &op, closure_list); + grpc_transport_perform_op (exec_ctx, transport, &op); } - channel = grpc_channel_create_from_filters (NULL, filters, num_filters, args, mdctx, 0, closure_list); + channel = grpc_channel_create_from_filters (exec_ctx, NULL, filters, num_filters, args, mdctx, 0); chand = (channel_data *) grpc_channel_stack_element (grpc_channel_get_channel_stack (channel), 0)->channel_data; chand->server = s; server_ref (s); @@ -1089,7 +1089,7 @@ grpc_server_setup_transport (grpc_exec_ctx * exec_ctx, grpc_server * s, grpc_tra op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; op.disconnect = gpr_atm_acq_load (&s->shutdown_flag) != 0; - grpc_transport_perform_op (transport, &op, closure_list); + grpc_transport_perform_op (exec_ctx, transport, &op); } void @@ -1105,7 +1105,7 @@ listener_destroy_done (grpc_exec_ctx * exec_ctx, void *s, int success) grpc_server *server = s; gpr_mu_lock (&server->mu_global); server->listeners_destroyed++; - maybe_finish_shutdown (server, closure_list); + maybe_finish_shutdown (exec_ctx, server); gpr_mu_unlock (&server->mu_global); } @@ -1220,14 +1220,14 @@ queue_call_request (grpc_exec_ctx * exec_ctx, grpc_server * server, requested_ca int request_id; if (gpr_atm_acq_load (&server->shutdown_flag)) { - fail_call (server, rc, closure_list); + fail_call (exec_ctx, server, rc); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop (server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call (server, rc, closure_list); + fail_call (exec_ctx, server, rc); return GRPC_CALL_OK; } switch (rc->type) @@ -1265,7 +1265,7 @@ queue_call_request (grpc_exec_ctx * exec_ctx, grpc_server * server, requested_ca GPR_ASSERT (calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock (&calld->mu_state); - begin_call (server, calld, &server->requested_calls[request_id], closure_list); + begin_call (exec_ctx, server, calld, &server->requested_calls[request_id]); } gpr_mu_lock (&server->mu_call); } @@ -1367,7 +1367,7 @@ begin_call (grpc_exec_ctx * exec_ctx, grpc_server * server, call_data * calld, r fill in the metadata array passed by the client, we need to perform an ioreq op, that should complete immediately. */ - grpc_call_set_completion_queue (calld->call, rc->cq_bound_to_call, closure_list); + grpc_call_set_completion_queue (exec_ctx, calld->call, rc->cq_bound_to_call); *rc->call = calld->call; calld->cq_new = rc->cq_for_notification; switch (rc->type) @@ -1402,7 +1402,7 @@ begin_call (grpc_exec_ctx * exec_ctx, grpc_server * server, call_data * calld, r } GRPC_CALL_INTERNAL_REF (calld->call, "server"); - grpc_call_start_ioreq_and_call_back (calld->call, req, (size_t) (r - req), publish, rc, closure_list); + grpc_call_start_ioreq_and_call_back (calld->call, req, (size_t) (exec_ctx, r - req), publish, rc); } static void @@ -1421,7 +1421,7 @@ done_request_event (grpc_exec_ctx * exec_ctx, void *req, grpc_cq_completion * c) gpr_free (req); } - server_unref (server, closure_list); + server_unref (exec_ctx, server); } static void @@ -1438,7 +1438,7 @@ fail_call (grpc_exec_ctx * exec_ctx, grpc_server * server, requested_call * rc) break; } server_ref (server); - grpc_cq_end_op (rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion, closure_list); + grpc_cq_end_op (exec_ctx, rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); } static void @@ -1449,8 +1449,8 @@ publish_registered_or_batch (grpc_exec_ctx * exec_ctx, grpc_call * call, int suc call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; server_ref (chand->server); - grpc_cq_end_op (calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion, closure_list); - GRPC_CALL_INTERNAL_UNREF (call, "server", closure_list); + grpc_cq_end_op (exec_ctx, calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); + GRPC_CALL_INTERNAL_UNREF (exec_ctx, call, "server"); } const grpc_channel_args * diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index aa4eb9b05b..438ac385b4 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -48,7 +48,7 @@ setup_transport (grpc_exec_ctx * exec_ctx, void *server, grpc_transport * transp static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter }; - grpc_server_setup_transport (server, transport, extra_filters, GPR_ARRAY_SIZE (extra_filters), mdctx, grpc_server_get_channel_args (server), closure_list); + grpc_server_setup_transport (server, transport, extra_filters, GPR_ARRAY_SIZE (extra_filters), mdctx, grpc_server_get_channel_args (exec_ctx, server)); } static void @@ -62,9 +62,9 @@ new_transport (grpc_exec_ctx * exec_ctx, void *server, grpc_endpoint * tcp) * case. */ grpc_mdctx *mdctx = grpc_mdctx_create (); - grpc_transport *transport = grpc_create_chttp2_transport (grpc_server_get_channel_args (server), tcp, mdctx, 0, closure_list); - setup_transport (server, transport, mdctx, closure_list); - grpc_chttp2_transport_start_reading (transport, NULL, 0, closure_list); + grpc_transport *transport = grpc_create_chttp2_transport (grpc_server_get_channel_args (exec_ctx, server), tcp, mdctx, 0); + setup_transport (exec_ctx, server, transport, mdctx); + grpc_chttp2_transport_start_reading (exec_ctx, transport, NULL, 0); } /* Server callback: start listening on our ports */ @@ -72,7 +72,7 @@ static void start (grpc_exec_ctx * exec_ctx, grpc_server * server, void *tcpp, grpc_pollset ** pollsets, size_t pollset_count) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start (tcp, pollsets, pollset_count, new_transport, server, closure_list); + grpc_tcp_server_start (exec_ctx, tcp, pollsets, pollset_count, new_transport, server); } /* Server callback: destroy the tcp listener (so we don't generate further @@ -81,7 +81,7 @@ static void destroy (grpc_exec_ctx * exec_ctx, grpc_server * server, void *tcpp, grpc_closure * destroy_done) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_destroy (tcp, destroy_done, closure_list); + grpc_tcp_server_destroy (exec_ctx, tcp, destroy_done); } int diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8e1dfc9d36..5b55147677 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -119,7 +119,7 @@ grpc_chttp2_publish_reads (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_globa published later */ if (transport_parsing->goaway_received) { - grpc_chttp2_add_incoming_goaway (transport_global, (gpr_uint32) transport_parsing->goaway_error, transport_parsing->goaway_text, closure_list); + grpc_chttp2_add_incoming_goaway (transport_global, (exec_ctx, gpr_uint32) transport_parsing->goaway_error, transport_parsing->goaway_text); transport_parsing->goaway_text = gpr_empty_slice (); transport_parsing->goaway_received = 0; } @@ -339,7 +339,7 @@ grpc_chttp2_perform_read (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_parsin } if (transport_parsing->incoming_frame_size == 0) { - if (!parse_frame_slice (transport_parsing, gpr_empty_slice (), 1, closure_list)) + if (!parse_frame_slice (transport_parsing, gpr_empty_slice (exec_ctx, ), 1)) { return 0; } @@ -360,7 +360,7 @@ grpc_chttp2_perform_read (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_parsin GPR_ASSERT (cur < end); if ((gpr_uint32) (end - cur) == transport_parsing->incoming_frame_size) { - if (!parse_frame_slice (transport_parsing, gpr_slice_sub_no_ref (slice, (size_t) (cur - beg), (size_t) (end - beg)), 1, closure_list)) + if (!parse_frame_slice (transport_parsing, gpr_slice_sub_no_ref (slice, (size_t) (cur - beg), (size_t) (exec_ctx, end - beg)), 1)) { return 0; } @@ -371,7 +371,7 @@ grpc_chttp2_perform_read (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_parsin else if ((gpr_uint32) (end - cur) > transport_parsing->incoming_frame_size) { size_t cur_offset = (size_t) (cur - beg); - if (!parse_frame_slice (transport_parsing, gpr_slice_sub_no_ref (slice, cur_offset, cur_offset + transport_parsing->incoming_frame_size), 1, closure_list)) + if (!parse_frame_slice (transport_parsing, gpr_slice_sub_no_ref (exec_ctx, slice, cur_offset, cur_offset + transport_parsing->incoming_frame_size), 1)) { return 0; } @@ -381,7 +381,7 @@ grpc_chttp2_perform_read (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_parsin } else { - if (!parse_frame_slice (transport_parsing, gpr_slice_sub_no_ref (slice, (size_t) (cur - beg), (size_t) (end - beg)), 0, closure_list)) + if (!parse_frame_slice (transport_parsing, gpr_slice_sub_no_ref (slice, (size_t) (cur - beg), (size_t) (exec_ctx, end - beg)), 0)) { return 0; } @@ -757,7 +757,7 @@ static int parse_frame_slice (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_parsing * transport_parsing, gpr_slice slice, int is_last) { grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; - switch (transport_parsing->parser (transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last, closure_list)) + switch (transport_parsing->parser (exec_ctx, transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) { case GRPC_CHTTP2_PARSE_OK: if (stream_parsing) diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 6c4f0102f7..6c051f5b47 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -150,7 +150,7 @@ grpc_chttp2_perform_writes (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_writ GPR_ASSERT (transport_writing->outbuf.count > 0); GPR_ASSERT (endpoint); - grpc_endpoint_write (endpoint, &transport_writing->outbuf, &transport_writing->done_cb, closure_list); + grpc_endpoint_write (exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb); } static void diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index bef7d03300..d880a1b9db 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -147,7 +147,7 @@ destruct_transport (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t) grpc_chttp2_stream_map_destroy (&t->parsing_stream_map); grpc_chttp2_stream_map_destroy (&t->new_stream_map); - grpc_connectivity_state_destroy (&t->channel_callback.state_tracker, closure_list); + grpc_connectivity_state_destroy (exec_ctx, &t->channel_callback.state_tracker); gpr_mu_unlock (&t->mu); gpr_mu_destroy (&t->mu); @@ -178,7 +178,7 @@ unref_transport (grpc_chttp2_transport * t, grpc_closure_list * closure_list, co gpr_log (GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count, t->refs.count - 1, reason, file, line); if (!gpr_unref (&t->refs)) return; - destruct_transport (t, closure_list); + destruct_transport (exec_ctx, t); } static void @@ -195,7 +195,7 @@ unref_transport (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t) { if (!gpr_unref (&t->refs)) return; - destruct_transport (t, closure_list); + destruct_transport (exec_ctx, t); } static void @@ -333,10 +333,10 @@ destroy_transport (grpc_exec_ctx * exec_ctx, grpc_transport * gt) lock (t); t->destroying = 1; - drop_connection (t, closure_list); - unlock (t, closure_list); + drop_connection (exec_ctx, t); + unlock (exec_ctx, t); - UNREF_TRANSPORT (t, "destroy", closure_list); + UNREF_TRANSPORT (exec_ctx, t, "destroy"); } /** block grpc_endpoint_shutdown being called until a paired @@ -355,7 +355,7 @@ allow_endpoint_shutdown_locked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport { if (t->ep) { - grpc_endpoint_shutdown (t->ep, closure_list); + grpc_endpoint_shutdown (exec_ctx, t->ep); } } } @@ -368,7 +368,7 @@ allow_endpoint_shutdown_unlocked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transpor gpr_mu_lock (&t->mu); if (t->ep) { - grpc_endpoint_shutdown (t->ep, closure_list); + grpc_endpoint_shutdown (exec_ctx, t->ep); } gpr_mu_unlock (&t->mu); } @@ -377,10 +377,10 @@ allow_endpoint_shutdown_unlocked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transpor static void destroy_endpoint (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t) { - grpc_endpoint_destroy (t->ep, closure_list); + grpc_endpoint_destroy (exec_ctx, t->ep); t->ep = NULL; /* safe because we'll still have the ref for write */ - UNREF_TRANSPORT (t, "disconnect", closure_list); + UNREF_TRANSPORT (exec_ctx, t, "disconnect"); } static void @@ -389,10 +389,10 @@ close_transport_locked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t) if (!t->closed) { t->closed = 1; - connectivity_state_set (&t->global, GRPC_CHANNEL_FATAL_FAILURE, "close_transport", closure_list); + connectivity_state_set (exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE, "close_transport"); if (t->ep) { - allow_endpoint_shutdown_locked (t, closure_list); + allow_endpoint_shutdown_locked (exec_ctx, t); } } } @@ -427,8 +427,8 @@ init_stream (grpc_exec_ctx * exec_ctx, grpc_transport * gt, grpc_stream * gs, co } if (initial_op) - perform_stream_op_locked (&t->global, &s->global, initial_op, closure_list); - unlock (t, closure_list); + perform_stream_op_locked (exec_ctx, &t->global, &s->global, initial_op); + unlock (exec_ctx, t); return 0; } @@ -446,7 +446,7 @@ destroy_stream (grpc_exec_ctx * exec_ctx, grpc_transport * gt, grpc_stream * gs) GPR_ASSERT (!s->global.in_stream_map); if (grpc_chttp2_unregister_stream (t, s) && t->global.sent_goaway) { - close_transport_locked (t, closure_list); + close_transport_locked (exec_ctx, t); } if (!t->parsing_active && s->global.id) { @@ -476,7 +476,7 @@ destroy_stream (grpc_exec_ctx * exec_ctx, grpc_transport * gt, grpc_stream * gs) grpc_chttp2_incoming_metadata_buffer_destroy (&s->global.incoming_metadata); grpc_chttp2_incoming_metadata_live_op_buffer_end (&s->global.outstanding_metadata); - UNREF_TRANSPORT (t, "stream", closure_list); + UNREF_TRANSPORT (exec_ctx, t, "stream"); } grpc_chttp2_stream_parsing * @@ -518,7 +518,7 @@ lock (grpc_chttp2_transport * t) static void unlock (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t) { - unlock_check_read_write_state (t, closure_list); + unlock_check_read_write_state (exec_ctx, t); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes (&t->global, &t->writing)) { t->writing_active = 1; @@ -558,34 +558,34 @@ grpc_chttp2_terminate_writing (grpc_exec_ctx * exec_ctx, void *transport_writing lock (t); - allow_endpoint_shutdown_locked (t, closure_list); + allow_endpoint_shutdown_locked (exec_ctx, t); if (!success) { - drop_connection (t, closure_list); + drop_connection (exec_ctx, t); } /* cleanup writing related jazz */ - grpc_chttp2_cleanup_writing (&t->global, &t->writing, closure_list); + grpc_chttp2_cleanup_writing (exec_ctx, &t->global, &t->writing); /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing_active = 0; if (t->ep && !t->endpoint_reading) { - destroy_endpoint (t, closure_list); + destroy_endpoint (exec_ctx, t); } - unlock (t, closure_list); + unlock (exec_ctx, t); - UNREF_TRANSPORT (t, "writing", closure_list); + UNREF_TRANSPORT (exec_ctx, t, "writing"); } static void writing_action (grpc_exec_ctx * exec_ctx, void *gt, int iomgr_success_ignored) { grpc_chttp2_transport *t = gt; - grpc_chttp2_perform_writes (&t->writing, t->ep, closure_list); + grpc_chttp2_perform_writes (exec_ctx, &t->writing, t->ep); } void @@ -596,7 +596,7 @@ grpc_chttp2_add_incoming_goaway (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport gpr_free (msg); gpr_slice_unref (goaway_text); transport_global->seen_goaway = 1; - connectivity_state_set (transport_global, GRPC_CHANNEL_FATAL_FAILURE, "got_goaway", closure_list); + connectivity_state_set (exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE, "got_goaway"); } static void @@ -615,7 +615,7 @@ maybe_start_some_streams (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { - connectivity_state_set (transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, "no_more_stream_ids", closure_list); + connectivity_state_set (exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, "no_more_stream_ids"); } stream_global->outgoing_window = transport_global->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; @@ -663,7 +663,7 @@ perform_stream_op_locked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global { GRPC_CHTTP2_IF_TRACING (gpr_log (GPR_DEBUG, "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", transport_global->is_client ? "CLI" : "SVR", stream_global)); grpc_chttp2_list_add_waiting_for_concurrency (transport_global, stream_global); - maybe_start_some_streams (transport_global, closure_list); + maybe_start_some_streams (exec_ctx, transport_global); } else if (stream_global->outgoing_window > 0) { @@ -704,7 +704,7 @@ perform_stream_op_locked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global if (op->bind_pollset) { - add_to_pollset_locked (TRANSPORT_FROM_GLOBAL (transport_global), op->bind_pollset, closure_list); + add_to_pollset_locked (TRANSPORT_FROM_GLOBAL (exec_ctx, transport_global), op->bind_pollset); } grpc_closure_list_add (closure_list, op->on_consumed, 1); @@ -717,8 +717,8 @@ perform_stream_op (grpc_exec_ctx * exec_ctx, grpc_transport * gt, grpc_stream * grpc_chttp2_stream *s = (grpc_chttp2_stream *) gs; lock (t); - perform_stream_op_locked (&t->global, &s->global, op, closure_list); - unlock (t, closure_list); + perform_stream_op_locked (exec_ctx, &t->global, &s->global, op); + unlock (exec_ctx, t); } static void @@ -752,7 +752,7 @@ perform_transport_op (grpc_exec_ctx * exec_ctx, grpc_transport * gt, grpc_transp if (op->on_connectivity_state_change) { - grpc_connectivity_state_notify_on_state_change (&t->channel_callback.state_tracker, op->connectivity_state, op->on_connectivity_state_change, closure_list); + grpc_connectivity_state_notify_on_state_change (exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, op->on_connectivity_state_change); } if (op->send_goaway) @@ -770,12 +770,12 @@ perform_transport_op (grpc_exec_ctx * exec_ctx, grpc_transport * gt, grpc_transp if (op->bind_pollset) { - add_to_pollset_locked (t, op->bind_pollset, closure_list); + add_to_pollset_locked (exec_ctx, t, op->bind_pollset); } if (op->bind_pollset_set) { - add_to_pollset_set_locked (t, op->bind_pollset_set, closure_list); + add_to_pollset_set_locked (exec_ctx, t, op->bind_pollset_set); } if (op->send_ping) @@ -785,16 +785,16 @@ perform_transport_op (grpc_exec_ctx * exec_ctx, grpc_transport * gt, grpc_transp if (op->disconnect) { - close_transport_locked (t, closure_list); + close_transport_locked (exec_ctx, t); } - unlock (t, closure_list); + unlock (exec_ctx, t); if (close_transport) { lock (t); - close_transport_locked (t, closure_list); - unlock (t, closure_list); + close_transport_locked (exec_ctx, t); + unlock (exec_ctx, t); } } @@ -833,7 +833,7 @@ remove_stream (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t, gpr_uint32 i } if (grpc_chttp2_unregister_stream (t, s) && t->global.sent_goaway) { - close_transport_locked (t, closure_list); + close_transport_locked (exec_ctx, t); } new_stream_count = grpc_chttp2_stream_map_size (&t->parsing_stream_map) + grpc_chttp2_stream_map_size (&t->new_stream_map); @@ -841,7 +841,7 @@ remove_stream (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t, gpr_uint32 i if (new_stream_count != t->global.concurrent_stream_count) { t->global.concurrent_stream_count = (gpr_uint32) new_stream_count; - maybe_start_some_streams (&t->global, closure_list); + maybe_start_some_streams (exec_ctx, &t->global); } } @@ -862,7 +862,7 @@ unlock_check_read_write_state (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * GPR_ASSERT (stream_global->in_stream_map); GPR_ASSERT (stream_global->write_state != GRPC_WRITE_STATE_OPEN); GPR_ASSERT (stream_global->read_closed); - remove_stream (t, stream_global->id, closure_list); + remove_stream (exec_ctx, t, stream_global->id); grpc_chttp2_list_add_read_write_state_changed (transport_global, stream_global); } } @@ -911,7 +911,7 @@ unlock_check_read_write_state (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * } else { - remove_stream (t, stream_global->id, closure_list); + remove_stream (exec_ctx, t, stream_global->id); } } if (!stream_global->publish_sopb) @@ -1072,7 +1072,7 @@ end_all_the_calls (grpc_chttp2_transport * t) static void drop_connection (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t) { - close_transport_locked (t, closure_list); + close_transport_locked (exec_ctx, t); end_all_the_calls (t); } @@ -1104,7 +1104,7 @@ read_error_locked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t) t->endpoint_reading = 0; if (!t->writing_active && t->ep) { - destroy_endpoint (t, closure_list); + destroy_endpoint (exec_ctx, t); } } @@ -1126,12 +1126,12 @@ recv_data (grpc_exec_ctx * exec_ctx, void *tp, int success) grpc_chttp2_stream_map_move_into (&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read (&t->global, &t->parsing); gpr_mu_unlock (&t->mu); - for (; i < t->read_buffer.count && grpc_chttp2_perform_read (&t->parsing, t->read_buffer.slices[i], closure_list); i++) + for (; i < t->read_buffer.count && grpc_chttp2_perform_read (exec_ctx, &t->parsing, t->read_buffer.slices[i]); i++) ; gpr_mu_lock (&t->mu); if (i != t->read_buffer.count) { - drop_connection (t, closure_list); + drop_connection (exec_ctx, t); } /* merge stream lists */ grpc_chttp2_stream_map_move_into (&t->new_stream_map, &t->parsing_stream_map); @@ -1142,13 +1142,13 @@ recv_data (grpc_exec_ctx * exec_ctx, void *tp, int success) t->parsing.initial_window_update = 0; } /* handle higher level things */ - grpc_chttp2_publish_reads (&t->global, &t->parsing, closure_list); + grpc_chttp2_publish_reads (exec_ctx, &t->global, &t->parsing); t->parsing_active = 0; } if (!success || i != t->read_buffer.count) { - drop_connection (t, closure_list); - read_error_locked (t, closure_list); + drop_connection (exec_ctx, t); + read_error_locked (exec_ctx, t); } else if (!t->closed) { @@ -1157,17 +1157,17 @@ recv_data (grpc_exec_ctx * exec_ctx, void *tp, int success) prevent_endpoint_shutdown (t); } gpr_slice_buffer_reset_and_unref (&t->read_buffer); - unlock (t, closure_list); + unlock (exec_ctx, t); if (keep_reading) { - grpc_endpoint_read (t->ep, &t->read_buffer, &t->recv_data, closure_list); - allow_endpoint_shutdown_unlocked (t, closure_list); - UNREF_TRANSPORT (t, "keep_reading", closure_list); + grpc_endpoint_read (exec_ctx, t->ep, &t->read_buffer, &t->recv_data); + allow_endpoint_shutdown_unlocked (exec_ctx, t); + UNREF_TRANSPORT (exec_ctx, t, "keep_reading"); } else { - UNREF_TRANSPORT (t, "recv_data", closure_list); + UNREF_TRANSPORT (exec_ctx, t, "recv_data"); } } @@ -1179,7 +1179,7 @@ static void connectivity_state_set (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global * transport_global, grpc_connectivity_state state, const char *reason) { GRPC_CHTTP2_IF_TRACING (gpr_log (GPR_DEBUG, "set connectivity_state=%d", state)); - grpc_connectivity_state_set (&TRANSPORT_FROM_GLOBAL (transport_global)->channel_callback.state_tracker, state, reason, closure_list); + grpc_connectivity_state_set (&TRANSPORT_FROM_GLOBAL (exec_ctx, transport_global)->channel_callback.state_tracker, state, reason); } /* @@ -1191,7 +1191,7 @@ add_to_pollset_locked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t, grpc { if (t->ep) { - grpc_endpoint_add_to_pollset (t->ep, pollset, closure_list); + grpc_endpoint_add_to_pollset (exec_ctx, t->ep, pollset); } } @@ -1200,7 +1200,7 @@ add_to_pollset_set_locked (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport * t, { if (t->ep) { - grpc_endpoint_add_to_pollset_set (t->ep, pollset_set, closure_list); + grpc_endpoint_add_to_pollset_set (exec_ctx, t->ep, pollset_set); } } @@ -1256,7 +1256,7 @@ grpc_transport * grpc_create_chttp2_transport (grpc_exec_ctx * exec_ctx, const grpc_channel_args * channel_args, grpc_endpoint * ep, grpc_mdctx * mdctx, int is_client) { grpc_chttp2_transport *t = gpr_malloc (sizeof (grpc_chttp2_transport)); - init_transport (t, channel_args, ep, mdctx, is_client != 0, closure_list); + init_transport (exec_ctx, t, channel_args, ep, mdctx, is_client != 0); return &t->base; } @@ -1266,5 +1266,5 @@ grpc_chttp2_transport_start_reading (grpc_exec_ctx * exec_ctx, grpc_transport * grpc_chttp2_transport *t = (grpc_chttp2_transport *) transport; REF_TRANSPORT (t, "recv_data"); /* matches unref inside recv_data */ gpr_slice_buffer_addn (&t->read_buffer, slices, nslices); - recv_data (t, 1, closure_list); + recv_data (exec_ctx, t, 1); } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index ca04d34b3f..e41481aa64 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -45,37 +45,37 @@ grpc_transport_stream_size (grpc_transport * transport) void grpc_transport_destroy (grpc_exec_ctx * exec_ctx, grpc_transport * transport) { - transport->vtable->destroy (transport, closure_list); + transport->vtable->destroy (exec_ctx, transport); } int grpc_transport_init_stream (grpc_exec_ctx * exec_ctx, grpc_transport * transport, grpc_stream * stream, const void *server_data, grpc_transport_stream_op * initial_op) { - return transport->vtable->init_stream (transport, stream, server_data, initial_op, closure_list); + return transport->vtable->init_stream (exec_ctx, transport, stream, server_data, initial_op); } void grpc_transport_perform_stream_op (grpc_exec_ctx * exec_ctx, grpc_transport * transport, grpc_stream * stream, grpc_transport_stream_op * op) { - transport->vtable->perform_stream_op (transport, stream, op, closure_list); + transport->vtable->perform_stream_op (exec_ctx, transport, stream, op); } void grpc_transport_perform_op (grpc_exec_ctx * exec_ctx, grpc_transport * transport, grpc_transport_op * op) { - transport->vtable->perform_op (transport, op, closure_list); + transport->vtable->perform_op (exec_ctx, transport, op); } void grpc_transport_destroy_stream (grpc_exec_ctx * exec_ctx, grpc_transport * transport, grpc_stream * stream) { - transport->vtable->destroy_stream (transport, stream, closure_list); + transport->vtable->destroy_stream (exec_ctx, transport, stream); } char * grpc_transport_get_peer (grpc_exec_ctx * exec_ctx, grpc_transport * transport) { - return transport->vtable->get_peer (transport, closure_list); + return transport->vtable->get_peer (exec_ctx, transport); } void @@ -119,7 +119,7 @@ free_message (grpc_exec_ctx * exec_ctx, void *p, int iomgr_success) gpr_slice_unref (cmd->message); if (cmd->then_call != NULL) { - cmd->then_call->cb (cmd->then_call->cb_arg, iomgr_success, closure_list); + cmd->then_call->cb (exec_ctx, cmd->then_call->cb_arg, iomgr_success); } gpr_free (cmd); } |