diff options
author | 2015-09-18 07:44:19 -0700 | |
---|---|---|
committer | 2015-09-18 07:44:19 -0700 | |
commit | 33825118df7157219cec15382beb006d3462ad96 (patch) | |
tree | 649a40da98f56a875ed6558e474dd6c61ce2c7be /src/core | |
parent | 000cd8f9f7346defc79fe6aa877af11b42ab5f1e (diff) |
Cleanup
Diffstat (limited to 'src/core')
51 files changed, 350 insertions, 438 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index 8b6ba1d472..d98162ec9a 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -54,7 +54,7 @@ typedef struct call_data { /* recv callback */ grpc_stream_op_buffer* recv_ops; - grpc_iomgr_closure* on_done_recv; + grpc_closure* on_done_recv; } call_data; typedef struct channel_data { @@ -145,7 +145,7 @@ static void server_init_call_elem(grpc_call_element* elem, GPR_ASSERT(d != NULL); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); /* TODO(hongyu): call census_tracing_start_op here. */ - grpc_iomgr_closure_init(d->on_done_recv, server_on_done_recv, elem); + grpc_closure_init(d->on_done_recv, server_on_done_recv, elem); if (initial_op) server_mutate_op(elem, initial_op); } diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index a2346503d9..588b9c36ee 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -73,9 +73,9 @@ typedef struct { guarded by mu_config */ grpc_client_config *incoming_configuration; /** a list of closures that are all waiting for config to come in */ - grpc_iomgr_call_list waiting_for_config_closures; + grpc_call_list waiting_for_config_closures; /** resolver callback */ - grpc_iomgr_closure on_config_changed; + grpc_closure on_config_changed; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ @@ -91,7 +91,7 @@ typedef struct { update the channel, and create a new watcher */ typedef struct { channel_data *chand; - grpc_iomgr_closure on_changed; + grpc_closure on_changed; grpc_connectivity_state state; grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; @@ -115,7 +115,7 @@ struct call_data { call_state state; gpr_timespec deadline; grpc_subchannel *picked_channel; - grpc_iomgr_closure async_setup_task; + grpc_closure async_setup_task; grpc_transport_stream_op waiting_op; /* our child call stack */ grpc_subchannel_call *subchannel_call; @@ -123,9 +123,9 @@ struct call_data { grpc_linked_mdelem details; }; -static grpc_iomgr_closure *merge_into_waiting_op( - grpc_call_element *elem, - grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT; +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) + GRPC_MUST_USE_RESULT; static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_stream_op *op) { @@ -160,7 +160,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem, } typedef struct { - grpc_iomgr_closure closure; + grpc_closure closure; grpc_call_element *elem; } waiting_call; @@ -179,10 +179,9 @@ static void add_to_lb_policy_wait_queue_locked_state_config( grpc_call_element *elem) { channel_data *chand = elem->channel_data; waiting_call *wc = gpr_malloc(sizeof(*wc)); - grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); + grpc_closure_init(&wc->closure, continue_with_pick, wc); wc->elem = elem; - grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure, - 1); + grpc_call_list_add(&chand->waiting_for_config_closures, &wc->closure, 1); } static int is_empty(void *p, int len) { @@ -230,7 +229,7 @@ static void started_call(void *arg, int iomgr_success) { static void picked_target(void *arg, int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; if (calld->picked_channel == NULL) { /* treat this like a cancellation */ @@ -246,19 +245,19 @@ static void picked_target(void *arg, int iomgr_success) { calld->state = CALL_WAITING_FOR_CALL; pollset = calld->waiting_op.bind_pollset; gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); + grpc_closure_init(&calld->async_setup_task, started_call, calld); grpc_subchannel_create_call(calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task, &call_list); } } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } -static grpc_iomgr_closure *merge_into_waiting_op( - grpc_call_element *elem, grpc_transport_stream_op *new_op) { +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) { call_data *calld = elem->call_data; - grpc_iomgr_closure *consumed_op = NULL; + grpc_closure *consumed_op = NULL; grpc_transport_stream_op *waiting_op = &calld->waiting_op; GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); @@ -312,7 +311,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -330,7 +329,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, break; case CALL_WAITING_FOR_SEND: GPR_ASSERT(!continuation); - grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1); + grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1); if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { gpr_mu_unlock(&calld->mu_state); @@ -359,8 +358,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, handle_op_after_cancellation(elem, op); handle_op_after_cancellation(elem, &op2); } else { - grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), - 1); + grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1); gpr_mu_unlock(&calld->mu_state); } break; @@ -397,8 +395,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, - calld); + grpc_closure_init(&calld->async_setup_task, picked_target, calld); grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task, &call_list); @@ -427,7 +424,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, break; } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } static void cc_start_transport_stream_op(grpc_call_element *elem, @@ -437,10 +434,10 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state, - grpc_iomgr_call_list *cl); + grpc_call_list *cl); static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w, - grpc_iomgr_call_list *cl) { + grpc_call_list *cl) { /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; @@ -453,13 +450,13 @@ static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w, static void on_lb_policy_state_changed(void *arg, int iomgr_success) { lb_policy_connectivity_watcher *w = arg; - grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list cl = GRPC_CALL_LIST_INIT; gpr_mu_lock(&w->chand->mu_config); on_lb_policy_state_changed_locked(w, &cl); gpr_mu_unlock(&w->chand->mu_config); - grpc_iomgr_call_list_run(cl); + grpc_call_list_run(cl); GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); gpr_free(w); @@ -467,12 +464,12 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) { static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); w->chand = chand; - grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed, @@ -485,7 +482,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list cl = GRPC_CALL_LIST_INIT; int exit_idle = 0; if (chand->incoming_configuration != NULL) { @@ -505,7 +502,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl); + grpc_call_list_move(&chand->waiting_for_config_closures, &cl); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -553,7 +550,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { GRPC_LB_POLICY_UNREF(lb_policy, "config_change"); } - grpc_iomgr_call_list_run(cl); + grpc_call_list_run(cl); GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } @@ -562,10 +559,10 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; if (op->on_consumed) { - grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1); + grpc_call_list_add(&call_list, op->on_consumed, 1); op->on_consumed = NULL; } @@ -612,7 +609,7 @@ static void cc_start_transport_op(grpc_channel_element *elem, GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } /* Constructor for call_data */ @@ -677,8 +674,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, chand->mdctx = metadata_context; chand->master = master; grpc_pollset_set_init(&chand->pollset_set); - grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, - chand); + grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -722,7 +718,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, GPR_ASSERT(!chand->resolver); chand->resolver = resolver; GRPC_RESOLVER_REF(resolver, "channel"); - if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) || + if (!grpc_call_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); @@ -733,8 +729,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, } grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect, - grpc_iomgr_call_list *call_list) { + grpc_channel_element *elem, int try_to_connect, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; gpr_mu_lock(&chand->mu_config); @@ -758,7 +753,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) { + grpc_closure *on_complete, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 60090e8d7e..bd84dad6b7 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -53,12 +53,11 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect, - grpc_iomgr_call_list *call_list); + grpc_channel_element *elem, int try_to_connect, grpc_call_list *call_list); void grpc_client_channel_watch_connectivity_state( grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list); + grpc_closure *on_complete, grpc_call_list *call_list); grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( grpc_channel_element *elem); diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index ec832a0367..bcaaca0f7d 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -50,11 +50,11 @@ typedef struct call_data { grpc_stream_op_buffer *recv_ops; /** Closure to call when finished with the hc_on_recv hook */ - grpc_iomgr_closure *on_done_recv; + grpc_closure *on_done_recv; /** Receive closures are chained: we inject this closure as the on_done_recv up-call on transport_op, and remember to call our on_done_recv member after handling it. */ - grpc_iomgr_closure hc_on_recv; + grpc_closure hc_on_recv; } call_data; typedef struct channel_data { @@ -162,7 +162,7 @@ static void init_call_elem(grpc_call_element *elem, calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; calld->on_done_recv = NULL; - grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem); + grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); if (initial_op) hc_mutate_op(elem, initial_op); } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 2f061946a1..7a9f17d057 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -50,11 +50,11 @@ typedef struct call_data { grpc_stream_op_buffer *recv_ops; /** Closure to call when finished with the hs_on_recv hook */ - grpc_iomgr_closure *on_done_recv; + grpc_closure *on_done_recv; /** Receive closures are chained: we inject this closure as the on_done_recv up-call on transport_op, and remember to call our on_done_recv member after handling it. */ - grpc_iomgr_closure hs_on_recv; + grpc_closure hs_on_recv; } call_data; typedef struct channel_data { @@ -232,7 +232,7 @@ static void init_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; /* initialize members */ memset(calld, 0, sizeof(*calld)); - grpc_iomgr_closure_init(&calld->hs_on_recv, hs_on_recv, elem); + grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); if (initial_op) hs_mutate_op(elem, initial_op); } diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index c1e583e4a5..6252d57271 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -44,7 +44,7 @@ void grpc_connector_unref(grpc_connector *connector) { void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, - grpc_iomgr_closure *notify) { + grpc_closure *notify) { connector->vtable->connect(connector, in_args, out_args, notify); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index ad0b567553..fc1f6708de 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -77,7 +77,7 @@ struct grpc_connector_vtable { /** Implementation of grpc_connector_connect */ void (*connect)(grpc_connector *connector, const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); + grpc_connect_out_args *out_args, grpc_closure *notify); }; void grpc_connector_ref(grpc_connector *connector); @@ -86,7 +86,7 @@ void grpc_connector_unref(grpc_connector *connector); void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, - grpc_iomgr_closure *notify); + grpc_closure *notify); /** Cancel any pending connection */ void grpc_connector_shutdown(grpc_connector *connector); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 8fd8dd7b67..852eed310d 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -43,7 +43,7 @@ typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; - grpc_iomgr_closure *on_complete; + grpc_closure *on_complete; } pending_pick; typedef struct { @@ -55,7 +55,7 @@ typedef struct { /** workqueue for async work */ grpc_workqueue *workqueue; - grpc_iomgr_closure connectivity_changed; + grpc_closure connectivity_changed; /** mutex protecting remaining members */ gpr_mu mu; @@ -108,7 +108,7 @@ void pf_destroy(grpc_lb_policy *pol) { gpr_free(p); } -void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { +void pf_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -122,14 +122,13 @@ void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_iomgr_call_list_add(call_list, pp->on_complete, 1); + grpc_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); pp = next; } } -static void start_picking(pick_first_lb_policy *p, - grpc_iomgr_call_list *call_list) { +static void start_picking(pick_first_lb_policy *p, grpc_call_list *call_list) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; @@ -139,7 +138,7 @@ static void start_picking(pick_first_lb_policy *p, &p->connectivity_changed, call_list); } -void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { +void pf_exit_idle(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; gpr_mu_lock(&p->mu); if (!p->started_picking) { @@ -150,7 +149,7 @@ void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) { + grpc_closure *on_complete, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -178,7 +177,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; int unref = 0; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&p->mu); @@ -205,7 +204,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); + grpc_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -251,7 +250,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); + grpc_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } unref = 1; @@ -270,7 +269,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { gpr_mu_unlock(&p->mu); - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); if (unref) { GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); @@ -278,7 +277,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { } static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; size_t n; @@ -301,7 +300,7 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, } static grpc_connectivity_state pf_check_connectivity( - grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { + grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); @@ -312,8 +311,8 @@ static grpc_connectivity_state pf_check_connectivity( void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list) { + grpc_closure *notify, + grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; gpr_mu_lock(&p->mu); grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, @@ -348,7 +347,7 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, "pick_first"); memcpy(p->subchannels, args->subchannels, sizeof(grpc_subchannel *) * args->num_subchannels); - grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); + grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index a9dc9dcca8..95ab29cd88 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -64,37 +64,36 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy) { } void grpc_lb_policy_shutdown(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { policy->vtable->shutdown(policy, call_list); } void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, - grpc_iomgr_closure *on_complete, - grpc_iomgr_call_list *call_list) { + grpc_subchannel **target, grpc_closure *on_complete, + grpc_call_list *call_list) { policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete, call_list); } void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { policy->vtable->broadcast(policy, op, call_list); } void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { policy->vtable->exit_idle(policy, call_list); } void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure, - grpc_iomgr_call_list *call_list) { + grpc_closure *closure, + grpc_call_list *call_list) { policy->vtable->notify_on_state_change(policy, state, closure, call_list); } grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) { + grpc_lb_policy *policy, grpc_call_list *call_list) { return policy->vtable->check_connectivity(policy, call_list); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 8d7eb579b5..e445d2cded 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -53,31 +53,30 @@ struct grpc_lb_policy { struct grpc_lb_policy_vtable { void (*destroy)(grpc_lb_policy *policy); - void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + void (*shutdown)(grpc_lb_policy *policy, grpc_call_list *call_list); /** implement grpc_lb_policy_pick */ void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete, - grpc_iomgr_call_list *call_list); + grpc_closure *on_complete, grpc_call_list *call_list); /** try to enter a READY connectivity state */ - void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + void (*exit_idle)(grpc_lb_policy *policy, grpc_call_list *call_list); /** broadcast a transport op to all subchannels */ void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity)( - grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy, + grpc_call_list *call_list); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure, - grpc_iomgr_call_list *call_list); + grpc_closure *closure, + grpc_call_list *call_list); }; #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -101,8 +100,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); /** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list); +void grpc_lb_policy_shutdown(grpc_lb_policy *policy, grpc_call_list *call_list); /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting @@ -110,22 +108,21 @@ void grpc_lb_policy_shutdown(grpc_lb_policy *policy, Picking can be asynchronous. Any IO should be done under \a pollset. */ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, - grpc_iomgr_closure *on_complete, - grpc_iomgr_call_list *call_list); + grpc_subchannel **target, grpc_closure *on_complete, + grpc_call_list *call_list); void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure, - grpc_iomgr_call_list *call_list); + grpc_closure *closure, + grpc_call_list *call_list); grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + grpc_lb_policy *policy, grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index 91e42bb684..6a7d73c54f 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -78,6 +78,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver, void grpc_resolver_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { resolver->vtable->next(resolver, target_config, on_complete); } diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index 8ad87d789b..217d061abe 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -55,7 +55,7 @@ struct grpc_resolver_vtable { struct sockaddr *failing_address, int failing_address_len); void (*next)(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG @@ -92,6 +92,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver, schedule on_complete. */ void grpc_resolver_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 4fee543a5b..e251664e03 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -69,7 +69,7 @@ typedef struct { /** which version of resolved_config is current? */ int resolved_version; /** pending next completion, or NULL */ - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; /** target config address for next completion */ grpc_client_config **target_config; /** current (fully resolved) config */ @@ -79,7 +79,7 @@ typedef struct { static void dns_destroy(grpc_resolver *r); static void dns_start_resolving_locked(dns_resolver *r); -static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) +static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) GRPC_MUST_USE_RESULT; static void dns_shutdown(grpc_resolver *r); @@ -87,14 +87,14 @@ static void dns_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); static void dns_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; static void dns_shutdown(grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; gpr_mu_lock(&r->mu); next_completion = r->next_completion; r->next_completion = NULL; @@ -117,9 +117,9 @@ static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, static void dns_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; @@ -141,7 +141,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; - grpc_iomgr_closure *call; + grpc_closure *call; size_t i; if (addresses) { grpc_lb_policy_args lb_policy_args; @@ -188,8 +188,8 @@ static void dns_start_resolving_locked(dns_resolver *r) { grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); } -static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) { - grpc_iomgr_closure *ret = NULL; +static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) { + grpc_closure *ret = NULL; if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 220915853c..47fe5fad4a 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -73,22 +73,22 @@ typedef struct { /** have we published? */ int published; /** pending next completion, or NULL */ - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; /** target config address for next completion */ grpc_client_config **target_config; } sockaddr_resolver; static void sockaddr_destroy(grpc_resolver *r); -static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked( - sockaddr_resolver *r) GRPC_MUST_USE_RESULT; +static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) + GRPC_MUST_USE_RESULT; static void sockaddr_shutdown(grpc_resolver *r); static void sockaddr_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, @@ -96,7 +96,7 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = { static void sockaddr_shutdown(grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; @@ -114,9 +114,9 @@ static void sockaddr_channel_saw_error(grpc_resolver *resolver, static void sockaddr_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; @@ -126,14 +126,13 @@ static void sockaddr_next(grpc_resolver *resolver, if (call) call->cb(call->cb_arg, 1); } -static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked( - sockaddr_resolver *r) { +static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; if (r->next_completion != NULL && !r->published) { size_t i; diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 94ec12dd16..5632f0425e 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -73,7 +73,7 @@ typedef struct { /** which version of resolved_config is current? */ int resolved_version; /** pending next completion, or NULL */ - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; /** target config address for next completion */ grpc_client_config **target_config; /** current (fully resolved) config */ @@ -92,15 +92,15 @@ typedef struct { static void zookeeper_destroy(grpc_resolver *r); static void zookeeper_start_resolving_locked(zookeeper_resolver *r); -static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked( - zookeeper_resolver *r) GRPC_MUST_USE_RESULT; +static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) + GRPC_MUST_USE_RESULT; static void zookeeper_shutdown(grpc_resolver *r); static void zookeeper_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); static const grpc_resolver_vtable zookeeper_resolver_vtable = { zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, @@ -108,7 +108,7 @@ static const grpc_resolver_vtable zookeeper_resolver_vtable = { static void zookeeper_shutdown(grpc_resolver *resolver) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; @@ -134,9 +134,9 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver, static void zookeeper_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; - grpc_iomgr_closure *call; + grpc_closure *call; gpr_mu_lock(&r->mu); GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; @@ -189,7 +189,7 @@ static void zookeeper_on_resolved(void *arg, grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; - grpc_iomgr_closure *call; + grpc_closure *call; size_t i; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; @@ -414,9 +414,8 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { zookeeper_resolve_address(r); } -static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked( - zookeeper_resolver *r) { - grpc_iomgr_closure *call = NULL; +static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { + grpc_closure *call = NULL; if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index b15acf826a..99310e02e5 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -59,7 +59,7 @@ typedef struct { } connection; typedef struct { - grpc_iomgr_closure closure; + grpc_closure closure; size_t version; grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; @@ -67,11 +67,11 @@ typedef struct { typedef struct waiting_for_connect { struct waiting_for_connect *next; - grpc_iomgr_closure *notify; + grpc_closure *notify; grpc_pollset *pollset; grpc_subchannel_call **target; grpc_subchannel *subchannel; - grpc_iomgr_closure continuation; + grpc_closure continuation; } waiting_for_connect; struct grpc_subchannel { @@ -100,7 +100,7 @@ struct grpc_subchannel { grpc_connect_out_args connecting_result; /** callback for connection finishing */ - grpc_iomgr_closure connected; + grpc_closure connected; /** pollset_set tracking who's interested in a connection being setup - owned by the master channel (in particular the @@ -147,7 +147,7 @@ struct grpc_subchannel_call { static grpc_subchannel_call *create_call(connection *con); static void connectivity_state_changed_locked(grpc_subchannel *c, const char *reason, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); @@ -303,7 +303,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_mdctx_ref(c->mdctx); - grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); + grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); gpr_mu_init(&c->mu); @@ -334,7 +334,7 @@ static void start_connect(grpc_subchannel *c) { static void continue_creating_call(void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset); grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, w4c->notify, &call_list); @@ -344,8 +344,8 @@ static void continue_creating_call(void *arg, int iomgr_success) { void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, grpc_subchannel_call **target, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list) { + grpc_closure *notify, + grpc_call_list *call_list) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -364,7 +364,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, w4c->subchannel = c; /* released when clearing w4c */ SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); - grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c); + grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; grpc_subchannel_add_interested_party(c, pollset); if (!c->connecting) { @@ -392,8 +392,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, grpc_connectivity_state *state, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list) { + grpc_closure *notify, + grpc_call_list *call_list) { int do_connect = 0; gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, @@ -417,7 +417,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; @@ -454,7 +454,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_connector_shutdown(c->connector); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } static void on_state_changed(void *p, int iomgr_success) { @@ -465,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_transport_op op; grpc_channel_element *elem; connection *destroy_connection = NULL; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(mu); @@ -514,11 +514,10 @@ done: if (destroy_connection != NULL) { connection_destroy(destroy_connection); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } -static void publish_transport(grpc_subchannel *c, - grpc_iomgr_call_list *call_list) { +static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -552,7 +551,7 @@ static void publish_transport(grpc_subchannel *c, /* initialize state watcher */ sw = gpr_malloc(sizeof(*sw)); - grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw); + grpc_closure_init(&sw->closure, on_state_changed, sw); sw->subchannel = c; sw->connectivity_state = GRPC_CHANNEL_READY; @@ -599,7 +598,7 @@ static void publish_transport(grpc_subchannel *c, while (w4c != NULL) { waiting_for_connect *next = w4c; - grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1); + grpc_call_list_add(call_list, &w4c->continuation, 1); w4c = next; } @@ -641,7 +640,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) { static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { @@ -656,12 +655,12 @@ static void on_alarm(void *arg, int iomgr_success) { GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(c, "connecting"); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } static void subchannel_connected(void *arg, int iomgr_success) { grpc_subchannel *c = arg; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; if (c->connecting_result.transport != NULL) { publish_transport(c, &call_list); } else { @@ -673,7 +672,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { @@ -705,7 +704,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { static void connectivity_state_changed_locked(grpc_subchannel *c, const char *reason, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { grpc_connectivity_state current = compute_connectivity_locked(c); grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list); } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 7c00ff172d..189e531d35 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -76,8 +76,8 @@ void grpc_subchannel_call_unref( void grpc_subchannel_create_call(grpc_subchannel *subchannel, grpc_pollset *pollset, grpc_subchannel_call **target, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list); + grpc_closure *notify, + grpc_call_list *call_list); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, @@ -91,8 +91,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( Updates *state with the new state of the channel */ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, grpc_connectivity_state *state, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list); + grpc_closure *notify, + grpc_call_list *call_list); /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_subchannel *channel, diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 75f3f7db4b..c484f4c7b3 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -63,8 +63,8 @@ typedef struct { grpc_iomgr_object iomgr_obj; gpr_slice_buffer incoming; gpr_slice_buffer outgoing; - grpc_iomgr_closure on_read; - grpc_iomgr_closure done_write; + grpc_closure on_read; + grpc_closure done_write; grpc_workqueue *workqueue; } internal_request; @@ -237,8 +237,8 @@ static void internal_request_begin(grpc_httpcli_context *context, request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; req->context = context; req->pollset = pollset; - grpc_iomgr_closure_init(&req->on_read, on_read, req); - grpc_iomgr_closure_init(&req->done_write, done_write, req); + grpc_closure_init(&req->on_read, on_read, req); + grpc_closure_init(&req->done_write, done_write, req); gpr_slice_buffer_init(&req->incoming); gpr_slice_buffer_init(&req->outgoing); grpc_iomgr_register_object(&req->iomgr_obj, name); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index a7878e31dd..1955f74b9a 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -35,13 +35,13 @@ grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { return ep->vtable->read(ep, slices, cb); } grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { return ep->vtable->write(ep, slices, cb); } diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index d14d52d561..79b7d6a78e 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -54,9 +54,9 @@ typedef enum grpc_endpoint_op_status { struct grpc_endpoint_vtable { grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb); + grpc_closure *cb); grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb); + grpc_closure *cb); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*shutdown)(grpc_endpoint *ep); @@ -70,7 +70,7 @@ struct grpc_endpoint_vtable { Valid slices may be placed into \a slices even on callback success == 0. */ grpc_endpoint_op_status grpc_endpoint_read( grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; + grpc_closure *cb) GRPC_MUST_USE_RESULT; char *grpc_endpoint_get_peer(grpc_endpoint *ep); @@ -86,7 +86,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); */ grpc_endpoint_op_status grpc_endpoint_write( grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; + grpc_closure *cb) GRPC_MUST_USE_RESULT; /* Causes any pending read/write callbacks to run immediately with success==0 */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 5bdce0bfd8..dfebbcc2e1 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -218,8 +218,7 @@ static int has_watchers(grpc_fd *fd) { fd->inactive_watcher_root.next != &fd->inactive_watcher_root; } -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, - const char *reason) { +void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); gpr_mu_lock(&fd->watcher_mu); @@ -253,7 +252,7 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif -static void process_callback(grpc_iomgr_closure *closure, int success, +static void process_callback(grpc_closure *closure, int success, grpc_workqueue *optional_workqueue) { if (optional_workqueue == NULL) { closure->cb(closure->cb_arg, success); @@ -262,15 +261,15 @@ static void process_callback(grpc_iomgr_closure *closure, int success, } } -static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, - int success, grpc_workqueue *optional_workqueue) { +static void process_callbacks(grpc_closure *callbacks, size_t n, int success, + grpc_workqueue *optional_workqueue) { size_t i; for (i = 0; i < n; i++) { process_callback(callbacks + i, success, optional_workqueue); } } -static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, +static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, int allow_synchronous_callback) { switch (gpr_atm_acq_load(st)) { case NOT_READY: @@ -307,7 +306,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, abort(); } -static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks, +static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks, size_t *ncallbacks) { gpr_intptr state = gpr_atm_acq_load(st); @@ -327,7 +326,7 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks, default: /* waiting */ GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && gpr_atm_no_barrier_load(st) != NOT_READY); - callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state; + callbacks[(*ncallbacks)++] = (grpc_closure *)state; gpr_atm_rel_store(st, NOT_READY); return; } @@ -338,7 +337,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure *closure; + grpc_closure *closure; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); @@ -365,11 +364,11 @@ void grpc_fd_shutdown(grpc_fd *fd) { 0 /* GPR_FALSE */); } -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure) { notify_on(fd, &fd->readst, closure, 0); } -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) { +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure) { notify_on(fd, &fd->writest, closure, 0); } diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index e5157ad342..bb85b6c16e 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -96,8 +96,8 @@ struct grpc_fd { struct grpc_fd *freelist_next; - grpc_iomgr_closure *on_done_closure; - grpc_iomgr_closure *shutdown_closures[2]; + grpc_closure *on_done_closure; + grpc_closure *shutdown_closures[2]; grpc_iomgr_object iomgr_object; }; @@ -113,8 +113,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name); Requires: *fd initialized; no outstanding notify_on_read or notify_on_write. MUST NOT be called with a pollset lock taken */ -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, - const char *reason); +void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason); /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read @@ -153,10 +152,10 @@ void grpc_fd_shutdown(grpc_fd *fd); underlying platform. This means that users must drain fd in read_cb before calling notify_on_read again. Users are also expected to handle spurious events, i.e read_cb is called while nothing can be readable from fd */ -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure); +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure); /* Exactly the same semantics as above, except based on writable events. */ -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure); +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure); /* Notification from the poller to an fd that it has become readable or writable. diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index ba8f73fe08..dd76044913 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -151,15 +151,15 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_free(obj->name); } -void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, - void *cb_arg) { +void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg) { closure->cb = cb; closure->cb_arg = cb_arg; closure->next = NULL; } -void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list, - grpc_iomgr_closure *closure, int success) { +void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure, + int success) { if (!closure) return; closure->next = NULL; closure->success = success; @@ -171,21 +171,20 @@ void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list, call_list->tail = closure; } -void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) { - grpc_iomgr_closure *c = call_list.head; +void grpc_call_list_run(grpc_call_list call_list) { + grpc_closure *c = call_list.head; while (c) { - grpc_iomgr_closure *next = c->next; + grpc_closure *next = c->next; c->cb(c->cb_arg, c->success); c = next; } } -int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) { +int grpc_call_list_empty(grpc_call_list call_list) { return call_list.head == NULL; } -void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src, - grpc_iomgr_call_list *dst) { +void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) { if (dst->head == NULL) { *dst = *src; return; diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 58c31763d2..56f0195be9 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -42,7 +42,7 @@ typedef void (*grpc_iomgr_cb_func)(void *arg, int success); /** A closure over a grpc_iomgr_cb_func. */ -typedef struct grpc_iomgr_closure { +typedef struct grpc_closure { /** Bound callback. */ grpc_iomgr_cb_func cb; @@ -55,27 +55,26 @@ typedef struct grpc_iomgr_closure { int success; /**< Internal. Do not touch */ - struct grpc_iomgr_closure *next; -} grpc_iomgr_closure; + struct grpc_closure *next; +} grpc_closure; -typedef struct grpc_iomgr_call_list { - grpc_iomgr_closure *head; - grpc_iomgr_closure *tail; -} grpc_iomgr_call_list; +typedef struct grpc_call_list { + grpc_closure *head; + grpc_closure *tail; +} grpc_call_list; /** Initializes \a closure with \a cb and \a cb_arg. */ -void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, - void *cb_arg); +void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg); -#define GRPC_IOMGR_CALL_LIST_INIT \ +#define GRPC_CALL_LIST_INIT \ { NULL, NULL } -void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list, - grpc_iomgr_closure *closure, int success); -void grpc_iomgr_call_list_run(grpc_iomgr_call_list list); -void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src, - grpc_iomgr_call_list *dst); -int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list); +void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure, + int success); +void grpc_call_list_run(grpc_call_list list); +void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst); +int grpc_call_list_empty(grpc_call_list list); /** Initializes the iomgr. */ void grpc_iomgr_init(void); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 253f3190f1..5ca957b8e1 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -53,7 +53,7 @@ typedef struct wakeup_fd_hdl { typedef struct { grpc_pollset *pollset; grpc_fd *fd; - grpc_iomgr_closure closure; + grpc_closure closure; } delayed_add; typedef struct { @@ -125,7 +125,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, da->pollset = pollset; da->fd = fd; GRPC_FD_REF(fd, "delayed_add"); - grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da); + grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; grpc_pollset_add_unlock_job(pollset, &da->closure); } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 107ffb0b5e..0fe3f80d44 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -175,30 +175,28 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } -static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) { - grpc_iomgr_closure *exec = *root; +static void run_jobs(grpc_pollset *pollset, grpc_closure **root) { + grpc_closure *exec = *root; *root = NULL; gpr_mu_unlock(&pollset->mu); while (exec != NULL) { - grpc_iomgr_closure *next = exec->next; + grpc_closure *next = exec->next; exec->cb(exec->cb_arg, 1); exec = next; } gpr_mu_lock(&pollset->mu); } -static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) { +static void add_job(grpc_closure **root, grpc_closure *closure) { closure->next = *root; *root = closure; } -void grpc_pollset_add_idle_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure) { +void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) { add_job(&pollset->idle_jobs, closure); } -void grpc_pollset_add_unlock_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure) { +void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) { add_job(&pollset->unlock_jobs, closure); } @@ -316,7 +314,7 @@ typedef struct grpc_unary_promote_args { const grpc_pollset_vtable *original_vtable; grpc_pollset *pollset; grpc_fd *fd; - grpc_iomgr_closure promotion_closure; + grpc_closure promotion_closure; } grpc_unary_promote_args; static void basic_do_promote(void *args, int success) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 3f89095d40..4f09f870f7 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -67,8 +67,8 @@ typedef struct grpc_pollset { int kicked_without_pollers; void (*shutdown_done_cb)(void *arg); void *shutdown_done_arg; - grpc_iomgr_closure *unlock_jobs; - grpc_iomgr_closure *idle_jobs; + grpc_closure *unlock_jobs; + grpc_closure *idle_jobs; union { int fd; void *ptr; @@ -128,10 +128,8 @@ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; /** schedule a closure to be run next time there are no active workers */ -void grpc_pollset_add_idle_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure); +void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure); /** schedule a closure to be run next time the pollset is unlocked */ -void grpc_pollset_add_unlock_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure); +void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index 498921e0fd..45bc657225 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -91,7 +91,7 @@ typedef struct grpc_winsocket { This prevents that. */ int added_to_iocp; - grpc_iomgr_closure shutdown_closure; + grpc_closure shutdown_closure; /* A label for iomgr to track outstanding objects */ grpc_iomgr_object iomgr_object; diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 0df984c3e6..1ea2155060 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -64,7 +64,7 @@ typedef struct { gpr_timespec deadline; grpc_alarm alarm; int refs; - grpc_iomgr_closure write_closure; + grpc_closure write_closure; grpc_pollset_set *interested_parties; char *addr_str; } async_connect; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index c539cf2d34..374d2f3a40 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -85,11 +85,11 @@ typedef struct { /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ size_t outgoing_byte_idx; - grpc_iomgr_closure *read_cb; - grpc_iomgr_closure *write_cb; + grpc_closure *read_cb; + grpc_closure *write_cb; - grpc_iomgr_closure read_closure; - grpc_iomgr_closure write_closure; + grpc_closure read_closure; + grpc_closure write_closure; char *peer_string; } grpc_tcp; @@ -145,7 +145,7 @@ static void tcp_destroy(grpc_endpoint *ep) { } static void call_read_cb(grpc_tcp *tcp, int success) { - grpc_iomgr_closure *cb = tcp->read_cb; + grpc_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { size_t i; @@ -250,7 +250,7 @@ static void tcp_handle_read(void *arg /* grpc_tcp */, int success) { static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, gpr_slice_buffer *incoming_buffer, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; @@ -350,7 +350,7 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; grpc_endpoint_op_status status; - grpc_iomgr_closure *cb; + grpc_closure *cb; if (!success) { cb = tcp->write_cb; @@ -375,7 +375,7 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_endpoint_op_status status; diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 0b79678e9d..213b2e1113 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -84,8 +84,8 @@ typedef struct { struct sockaddr_un un; } addr; size_t addr_len; - grpc_iomgr_closure read_closure; - grpc_iomgr_closure destroyed_closure; + grpc_closure read_closure; + grpc_closure destroyed_closure; } server_port; static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index fe3673c607..e361d22dc3 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -82,8 +82,8 @@ typedef struct grpc_tcp { /* Refcounting how many operations are in progress. */ gpr_refcount refcount; - grpc_iomgr_closure *read_cb; - grpc_iomgr_closure *write_cb; + grpc_closure *read_cb; + grpc_closure *write_cb; gpr_slice read_slice; gpr_slice_buffer *write_slices; gpr_slice_buffer *read_slices; @@ -169,7 +169,7 @@ static int on_read(grpc_tcp *tcp, int success) { static void on_read_cb(void *tcpp, int from_iocp) { grpc_tcp *tcp = tcpp; - grpc_iomgr_closure *cb = tcp->read_cb; + grpc_closure *cb = tcp->read_cb; int success = on_read(tcp, from_iocp); tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); @@ -180,7 +180,7 @@ static void on_read_cb(void *tcpp, int from_iocp) { static grpc_endpoint_op_status win_read(grpc_endpoint *ep, gpr_slice_buffer *read_slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->read_info; @@ -241,7 +241,7 @@ static void on_write(void *tcpp, int success) { grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; - grpc_iomgr_closure *cb; + grpc_closure *cb; int do_abort = 0; gpr_mu_lock(&tcp->mu); @@ -269,7 +269,7 @@ static void on_write(void *tcpp, int success) { /* Initiates a write. */ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *socket = tcp->socket; grpc_winsocket_callback_info *info = &socket->write_info; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 30957f8dee..1a1d812050 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -79,8 +79,8 @@ typedef struct { struct sockaddr_un un; } addr; size_t addr_len; - grpc_iomgr_closure read_closure; - grpc_iomgr_closure destroyed_closure; + grpc_closure read_closure; + grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; } server_port; diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index 124f294a23..6f09399b55 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -52,7 +52,7 @@ typedef struct grpc_workqueue grpc_workqueue; /** Create a work queue */ grpc_workqueue *grpc_workqueue_create(void); -void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously); +void grpc_workqueue_flush(grpc_workqueue *workqueue); #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -76,7 +76,7 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, +void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, int success); #endif diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index ead8c50d8e..85b541f4d2 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -41,7 +41,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" @@ -52,71 +52,19 @@ grpc_workqueue *grpc_workqueue_create(void) { grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); gpr_ref_init(&workqueue->refs, 1); gpr_mu_init(&workqueue->mu); - workqueue->head.next = NULL; - workqueue->tail = &workqueue->head; + workqueue->call_list.head = workqueue->call_list.tail = NULL; grpc_wakeup_fd_init(&workqueue->wakeup_fd); sprintf(name, "workqueue:%p", (void *)workqueue); workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */ workqueue->wakeup_read_fd = grpc_fd_create( GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name); - grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue); + grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); return workqueue; } -static void shutdown_thread(void *arg) { - grpc_iomgr_closure *todo = arg; - - while (todo) { - grpc_iomgr_closure *next = todo->next; - todo->cb(todo->cb_arg, todo->success); - todo = next; - } -} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static size_t count_waiting(grpc_workqueue *workqueue) { - size_t i = 0; - grpc_iomgr_closure *c; - for (c = workqueue->head.next; c; c = c->next) { - i++; - } - return i; -} -#endif - -void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) { - grpc_iomgr_closure *todo; - gpr_thd_id thd; - - gpr_mu_lock(&workqueue->mu); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - if (workqueue->head.next) { - gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue, - count_waiting(workqueue), - asynchronously ? "asynchronously" : "synchronously"); - } -#endif - todo = workqueue->head.next; - workqueue->head.next = NULL; - workqueue->tail = &workqueue->head; - gpr_mu_unlock(&workqueue->mu); - - if (todo != NULL) { - if (asynchronously) { - gpr_thd_new(&thd, shutdown_thread, todo, NULL); - } else { - while (todo) { - grpc_iomgr_closure *next = todo->next; - todo->cb(todo->cb_arg, todo->success); - todo = next; - } - } - } -} - static void workqueue_destroy(grpc_workqueue *workqueue) { - GPR_ASSERT(workqueue->tail == &workqueue->head); + GPR_ASSERT(grpc_call_list_empty(workqueue->call_list)); grpc_fd_shutdown(workqueue->wakeup_read_fd); } @@ -151,9 +99,16 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd); } +void grpc_workqueue_flush(grpc_workqueue *workqueue) { + grpc_call_list todo = GRPC_CALL_LIST_INIT; + gpr_mu_lock(&workqueue->mu); + GPR_SWAP(grpc_call_list, todo, workqueue->call_list); + gpr_mu_unlock(&workqueue->mu); + grpc_call_list_run(todo); +} + static void on_readable(void *arg, int success) { grpc_workqueue *workqueue = arg; - grpc_iomgr_closure *todo; if (!success) { gpr_mu_destroy(&workqueue->mu); @@ -162,42 +117,26 @@ static void on_readable(void *arg, int success) { grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy"); gpr_free(workqueue); - return; } else { + grpc_call_list todo = GRPC_CALL_LIST_INIT; gpr_mu_lock(&workqueue->mu); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, - count_waiting(workqueue)); -#endif - todo = workqueue->head.next; - workqueue->head.next = NULL; - workqueue->tail = &workqueue->head; + GPR_SWAP(grpc_call_list, todo, workqueue->call_list); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); - - while (todo) { - grpc_iomgr_closure *next = todo->next; - todo->cb(todo->cb_arg, todo->success); - todo = next; - } + grpc_call_list_run(todo); } } -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, +void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, int success) { closure->success = success; closure->next = NULL; gpr_mu_lock(&workqueue->mu); - if (workqueue->tail == &workqueue->head) { + if (grpc_call_list_empty(workqueue->call_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - workqueue->tail->next = closure; - workqueue->tail = closure; -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, - count_waiting(workqueue)); -#endif + grpc_call_list_add(&workqueue->call_list, closure, success); gpr_mu_unlock(&workqueue->mu); } diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 1b3a0e281b..22c48c4926 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -40,13 +40,12 @@ struct grpc_workqueue { gpr_refcount refs; gpr_mu mu; - grpc_iomgr_closure head; - grpc_iomgr_closure *tail; + grpc_call_list call_list; grpc_wakeup_fd wakeup_fd; struct grpc_fd *wakeup_read_fd; - grpc_iomgr_closure read_closure; + grpc_closure read_closure; }; #endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index b696e384fc..dcafc0fc0f 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -49,9 +49,9 @@ typedef struct { struct tsi_frame_protector *protector; gpr_mu protector_mu; /* saved upper level callbacks and user_data. */ - grpc_iomgr_closure *read_cb; - grpc_iomgr_closure *write_cb; - grpc_iomgr_closure on_read; + grpc_closure *read_cb; + grpc_closure *write_cb; + grpc_closure on_read; gpr_slice_buffer *read_buffer; gpr_slice_buffer source_buffer; /* saved handshaker leftover data to unprotect. */ @@ -214,7 +214,7 @@ static void on_read_cb(void *user_data, int success) { static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { secure_endpoint *ep = (secure_endpoint *)secure_ep; int immediate_read_success = -1; ep->read_cb = cb; @@ -257,7 +257,7 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { unsigned i; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)secure_ep; @@ -386,7 +386,7 @@ grpc_endpoint *grpc_secure_endpoint_create( gpr_slice_buffer_init(&ep->output_buffer); gpr_slice_buffer_init(&ep->source_buffer); ep->read_buffer = NULL; - grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep); + grpc_closure_init(&ep->on_read, on_read_cb, ep); gpr_mu_init(&ep->protector_mu); gpr_ref_init(&ep->ref, 1); return &ep->base; diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index bf0079577e..181aa74ba9 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -54,8 +54,8 @@ typedef struct { gpr_slice_buffer outgoing; grpc_secure_transport_setup_done_cb cb; void *user_data; - grpc_iomgr_closure on_handshake_data_sent_to_peer; - grpc_iomgr_closure on_handshake_data_received_from_peer; + grpc_closure on_handshake_data_sent_to_peer; + grpc_closure on_handshake_data_received_from_peer; } grpc_secure_transport_setup; static void on_handshake_data_received_from_peer(void *setup, int success); @@ -301,10 +301,10 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, s->wrapped_endpoint = nonsecure_endpoint; s->user_data = user_data; s->cb = cb; - grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer, - on_handshake_data_sent_to_peer, s); - grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer, - on_handshake_data_received_from_peer, s); + grpc_closure_init(&s->on_handshake_data_sent_to_peer, + on_handshake_data_sent_to_peer, s); + grpc_closure_init(&s->on_handshake_data_received_from_peer, + on_handshake_data_received_from_peer, s); gpr_slice_buffer_init(&s->left_overs); gpr_slice_buffer_init(&s->outgoing); gpr_slice_buffer_init(&s->incoming); diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index d134201e87..8468697472 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -44,11 +44,11 @@ typedef struct call_data { gpr_uint8 got_client_metadata; grpc_stream_op_buffer *recv_ops; /* Closure to call when finished with the auth_on_recv hook. */ - grpc_iomgr_closure *on_done_recv; + grpc_closure *on_done_recv; /* Receive closures are chained: we inject this closure as the on_done_recv up-call on transport_op, and remember to call our on_done_recv member after handling it. */ - grpc_iomgr_closure auth_on_recv; + grpc_closure auth_on_recv; grpc_transport_stream_op transport_op; grpc_metadata_array md; const grpc_metadata *consumed_md; @@ -202,7 +202,7 @@ static void init_call_elem(grpc_call_element *elem, /* initialize members */ memset(calld, 0, sizeof(*calld)); - grpc_iomgr_closure_init(&calld->auth_on_recv, auth_on_recv, elem); + grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem); GPR_ASSERT(initial_op && initial_op->context != NULL && initial_op->context[GRPC_CONTEXT_SECURITY].value == NULL); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 8f9e97a396..b5a4155e02 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -256,10 +256,10 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; gpr_uint32 incoming_message_flags; - grpc_iomgr_closure destroy_closure; - grpc_iomgr_closure on_done_recv; - grpc_iomgr_closure on_done_send; - grpc_iomgr_closure on_done_bind; + grpc_closure destroy_closure; + grpc_closure on_done_recv; + grpc_closure on_done_send; + grpc_closure on_done_bind; /** completion events - for completion queue use */ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; @@ -333,9 +333,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->recv_ops); gpr_slice_buffer_init(&call->incoming_message); - grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call); - grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call); - grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call); + grpc_closure_init(&call->on_done_recv, call_on_done_recv, call); + grpc_closure_init(&call->on_done_send, call_on_done_send, call); + grpc_closure_init(&call->on_done_bind, finished_loose_op, call); /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ gpr_ref_init(&call->internal_refcount, 2); /* server hack: start reads immediately so we can get initial metadata. @@ -1353,7 +1353,7 @@ static void finished_loose_op(void *call, int success_ignored) { typedef struct { grpc_call *call; - grpc_iomgr_closure closure; + grpc_closure closure; } finished_loose_op_allocated_args; static void finished_loose_op_allocated(void *alloc, int success) { @@ -1373,8 +1373,7 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { } else { finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); args->call = call; - grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, - args); + grpc_closure_init(&args->closure, finished_loose_op_allocated, args); op->on_consumed = &args->closure; } } diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index ef47a28fda..4d0cf1ed8b 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -45,7 +45,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( /* forward through to the underlying client channel */ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_connectivity_state state; if (client_channel_elem->filter != &grpc_client_channel_filter) { gpr_log(GPR_ERROR, @@ -56,7 +56,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( } state = grpc_client_channel_check_connectivity_state( client_channel_elem, try_to_connect, &call_list); - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); return state; } @@ -71,7 +71,7 @@ typedef struct { gpr_mu mu; callback_phase phase; int success; - grpc_iomgr_closure on_complete; + grpc_closure on_complete; grpc_alarm alarm; grpc_connectivity_state state; grpc_completion_queue *cq; @@ -158,13 +158,13 @@ void grpc_channel_watch_connectivity_state( gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; state_watcher *w = gpr_malloc(sizeof(*w)); grpc_cq_begin_op(cq); gpr_mu_init(&w->mu); - grpc_iomgr_closure_init(&w->on_complete, watch_complete, w); + grpc_closure_init(&w->on_complete, watch_complete, w); w->phase = WAITING; w->state = last_observed_state; w->success = 0; @@ -181,7 +181,7 @@ void grpc_channel_watch_connectivity_state( "grpc_channel_watch_connectivity_state called on something that is " "not a client channel, but '%s'", client_channel_elem->filter->name); - grpc_iomgr_call_list_add(&call_list, &w->on_complete, 1); + grpc_call_list_add(&call_list, &w->on_complete, 1); } else { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); grpc_client_channel_add_interested_party(client_channel_elem, @@ -190,5 +190,5 @@ void grpc_channel_watch_connectivity_state( &w->on_complete, &call_list); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 908c07b752..f6f42b3d7a 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -52,7 +52,7 @@ typedef struct { grpc_connector base; gpr_refcount refs; - grpc_iomgr_closure *notify; + grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; } connector; @@ -71,7 +71,7 @@ static void connector_unref(grpc_connector *con) { static void connected(void *arg, grpc_endpoint *tcp) { connector *c = arg; - grpc_iomgr_closure *notify; + grpc_closure *notify; if (tcp != NULL) { c->result->transport = grpc_create_chttp2_transport( c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue, @@ -94,7 +94,7 @@ static void connector_shutdown(grpc_connector *con) {} static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, - grpc_iomgr_closure *notify) { + grpc_closure *notify) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); GPR_ASSERT(notify->cb); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 690612d164..d325b3f77d 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -58,7 +58,7 @@ typedef struct { grpc_channel_security_connector *security_connector; - grpc_iomgr_closure *notify; + grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; @@ -83,7 +83,7 @@ static void on_secure_transport_setup_done(void *arg, grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { connector *c = arg; - grpc_iomgr_closure *notify; + grpc_closure *notify; gpr_mu_lock(&c->mu); if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); @@ -114,7 +114,7 @@ static void on_secure_transport_setup_done(void *arg, static void connected(void *arg, grpc_endpoint *tcp) { connector *c = arg; - grpc_iomgr_closure *notify; + grpc_closure *notify; if (tcp != NULL) { gpr_mu_lock(&c->mu); GPR_ASSERT(c->connecting_endpoint == NULL); @@ -145,7 +145,7 @@ static void connector_shutdown(grpc_connector *con) { static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, - grpc_iomgr_closure *notify) { + grpc_closure *notify) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); GPR_ASSERT(notify->cb); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index efddb9ee47..75600027ed 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -113,8 +113,8 @@ struct channel_data { channel_registered_method *registered_methods; gpr_uint32 registered_method_slots; gpr_uint32 registered_method_max_probes; - grpc_iomgr_closure finish_destroy_channel_closure; - grpc_iomgr_closure channel_connectivity_changed; + grpc_closure finish_destroy_channel_closure; + grpc_closure channel_connectivity_changed; }; typedef struct shutdown_tag { @@ -153,10 +153,10 @@ struct call_data { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; - grpc_iomgr_closure *on_done_recv; + grpc_closure *on_done_recv; - grpc_iomgr_closure server_on_recv; - grpc_iomgr_closure kill_zombie_closure; + grpc_closure server_on_recv; + grpc_closure kill_zombie_closure; call_data *pending_next; }; @@ -254,7 +254,7 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { } struct shutdown_cleanup_args { - grpc_iomgr_closure closure; + grpc_closure closure; gpr_slice slice; }; @@ -277,7 +277,7 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, op.goaway_message = &sc->slice; op.goaway_status = GRPC_STATUS_OK; op.disconnect = send_disconnect; - grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc); + grpc_closure_init(&sc->closure, shutdown_cleanup, sc); op.on_consumed = &sc->closure; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); @@ -323,7 +323,7 @@ static void request_matcher_zombify_all_pending_calls( gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init( + grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1); @@ -420,7 +420,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); return; } @@ -603,7 +603,7 @@ static void server_on_recv(void *ptr, int success) { if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_workqueue_push(chand->server->workqueue, &calld->kill_zombie_closure, 1); } else { @@ -615,7 +615,7 @@ static void server_on_recv(void *ptr, int success) { if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_workqueue_push(chand->server->workqueue, &calld->kill_zombie_closure, 1); } else if (calld->state == PENDING) { @@ -689,7 +689,7 @@ static void init_call_elem(grpc_call_element *elem, calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); - grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); + grpc_closure_init(&calld->server_on_recv, server_on_recv, elem); server_ref(chand->server); @@ -729,8 +729,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; - grpc_iomgr_closure_init(&chand->channel_connectivity_changed, - channel_connectivity_changed, chand); + grpc_closure_init(&chand->channel_connectivity_changed, + channel_connectivity_changed, chand); } static void destroy_channel_elem(grpc_channel_element *elem) { @@ -1077,7 +1077,7 @@ void grpc_server_destroy(grpc_server *server) { gpr_mu_unlock(&server->mu_global); - grpc_workqueue_flush(server->workqueue, 0); + grpc_workqueue_flush(server->workqueue); server_unref(server); } @@ -1132,7 +1132,7 @@ static grpc_call_error queue_call_request(grpc_server *server, gpr_mu_lock(&calld->mu_state); if (calld->state == ZOMBIED) { gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init( + grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 9be1f2f7cf..386f2dd315 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -155,7 +155,7 @@ typedef enum { /* Outstanding ping request data */ typedef struct grpc_chttp2_outstanding_ping { gpr_uint8 id[8]; - grpc_iomgr_closure *on_recv; + grpc_closure *on_recv; struct grpc_chttp2_outstanding_ping *next; struct grpc_chttp2_outstanding_ping *prev; } grpc_chttp2_outstanding_ping; @@ -164,7 +164,7 @@ typedef struct { /** data to write next write */ gpr_slice_buffer qbuf; /** queued callbacks */ - grpc_iomgr_call_list run_at_unlock; + grpc_call_list run_at_unlock; /** window available for us to send to peer */ gpr_int64 outgoing_window; @@ -214,7 +214,7 @@ typedef struct { /** is this a client? */ gpr_uint8 is_client; /** callback for when writing is done */ - grpc_iomgr_closure done_cb; + grpc_closure done_cb; } grpc_chttp2_transport_writing; struct grpc_chttp2_transport_parsing { @@ -332,9 +332,9 @@ struct grpc_chttp2_transport { grpc_chttp2_stream_map new_stream_map; /** closure to execute writing */ - grpc_iomgr_closure writing_action; + grpc_closure writing_action; /** closure to finish reading from the endpoint */ - grpc_iomgr_closure recv_data; + grpc_closure recv_data; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -359,8 +359,8 @@ typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - grpc_iomgr_closure *send_done_closure; - grpc_iomgr_closure *recv_done_closure; + grpc_closure *send_done_closure; + grpc_closure *recv_done_closure; /** window available for us to send to peer */ gpr_int64 outgoing_window; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 1da3f85bde..db6715b43a 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -238,8 +238,8 @@ void grpc_chttp2_cleanup_writing( stream_global->outgoing_sopb->nops == 0) { GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); stream_global->outgoing_sopb = NULL; - grpc_iomgr_call_list_add(&transport_global->run_at_unlock, - stream_global->send_done_closure, 1); + grpc_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 1); } } stream_global->writing_now = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 50d5a3e8a8..bc056ac0b8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -250,15 +250,15 @@ static void init_transport(grpc_chttp2_transport *t, gpr_slice_buffer_init(&t->writing.outbuf); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); - grpc_iomgr_closure_init(&t->writing_action, writing_action, t); + grpc_closure_init(&t->writing_action, writing_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); - grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, - &t->writing); - grpc_iomgr_closure_init(&t->recv_data, recv_data, t); + grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, + &t->writing); + grpc_closure_init(&t->recv_data, recv_data, t); gpr_slice_buffer_init(&t->read_buffer); if (is_client) { @@ -499,20 +499,20 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { - grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list run = GRPC_CALL_LIST_INIT; unlock_check_read_write_state(t); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); - grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1); + grpc_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1); prevent_endpoint_shutdown(t); } - GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock); + GPR_SWAP(grpc_call_list, run, t->global.run_at_unlock); gpr_mu_unlock(&t->mu); - grpc_iomgr_call_list_run(run); + grpc_call_list_run(run); } /* @@ -664,8 +664,8 @@ static void perform_stream_op_locked( } } else { grpc_sopb_reset(op->send_ops); - grpc_iomgr_call_list_add(&transport_global->run_at_unlock, - stream_global->send_done_closure, 0); + grpc_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 0); } } @@ -703,8 +703,7 @@ static void perform_stream_op_locked( op->bind_pollset); } - grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed, - 1); + grpc_call_list_add(&transport_global->run_at_unlock, op->on_consumed, 1); } static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, @@ -717,8 +716,7 @@ static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, unlock(t); } -static void send_ping_locked(grpc_chttp2_transport *t, - grpc_iomgr_closure *on_recv) { +static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); p->next = &t->global.pings; p->prev = p->next->prev; @@ -741,7 +739,7 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { lock(t); - grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1); + grpc_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1); if (op->on_connectivity_state_change) { grpc_connectivity_state_notify_on_state_change( @@ -868,8 +866,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (stream_global->outgoing_sopb != NULL) { grpc_sopb_reset(stream_global->outgoing_sopb); stream_global->outgoing_sopb = NULL; - grpc_iomgr_call_list_add(&transport_global->run_at_unlock, - stream_global->send_done_closure, 1); + grpc_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 1); } stream_global->read_closed = 1; if (!stream_global->published_cancelled) { @@ -919,8 +917,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; - grpc_iomgr_call_list_add(&transport_global->run_at_unlock, - stream_global->recv_done_closure, 1); + grpc_call_list_add(&transport_global->run_at_unlock, + stream_global->recv_done_closure, 1); stream_global->recv_done_closure = NULL; stream_global->publish_sopb = NULL; stream_global->publish_state = NULL; diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index b605da9dbf..8b2e1b9b02 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -91,7 +91,7 @@ grpc_connectivity_state grpc_connectivity_state_check( int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list) { + grpc_closure *notify, grpc_call_list *call_list) { if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p", tracker->name, grpc_connectivity_state_name(*current), @@ -99,7 +99,7 @@ int grpc_connectivity_state_notify_on_state_change( } if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_iomgr_call_list_add(call_list, notify, 1); + grpc_call_list_add(call_list, notify, 1); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -113,7 +113,7 @@ int grpc_connectivity_state_notify_on_state_change( void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, const char *reason, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, @@ -128,7 +128,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_iomgr_call_list_add(call_list, w->notify, 1); + grpc_call_list_add(call_list, w->notify, 1); gpr_free(w); } } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 49c2815266..a29c655b4e 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -42,7 +42,7 @@ typedef struct grpc_connectivity_state_watcher { /** we keep watchers in a linked list */ struct grpc_connectivity_state_watcher *next; /** closure to notify on change */ - grpc_iomgr_closure *notify; + grpc_closure *notify; /** the current state as believed by the watcher */ grpc_connectivity_state *current; } grpc_connectivity_state_watcher; @@ -67,8 +67,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); * external lock */ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - const char *reason, - grpc_iomgr_call_list *call_list); + const char *reason, grpc_call_list *call_list); grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); @@ -76,6 +75,6 @@ grpc_connectivity_state grpc_connectivity_state_check( /** Return 1 if the channel should start connecting, 0 otherwise */ int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list); + grpc_closure *notify, grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index c0d92cf93f..3f6b93c3e8 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -101,8 +101,8 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, typedef struct { gpr_slice message; - grpc_iomgr_closure *then_call; - grpc_iomgr_closure closure; + grpc_closure *then_call; + grpc_closure closure; } close_message_data; static void free_message(void *p, int iomgr_success) { @@ -130,7 +130,7 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, cmd = gpr_malloc(sizeof(*cmd)); cmd->message = *optional_message; cmd->then_call = op->on_consumed; - grpc_iomgr_closure_init(&cmd->closure, free_message, cmd); + grpc_closure_init(&cmd->closure, free_message, cmd); op->on_consumed = &cmd->closure; op->optional_close_message = &cmd->message; } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 6e1ec2f64c..271891d430 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -64,11 +64,11 @@ typedef enum grpc_stream_state { /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { - grpc_iomgr_closure *on_consumed; + grpc_closure *on_consumed; grpc_stream_op_buffer *send_ops; int is_last_send; - grpc_iomgr_closure *on_done_send; + grpc_closure *on_done_send; grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; @@ -76,7 +76,7 @@ typedef struct grpc_transport_stream_op { These bytes will be eventually used to replenish per-stream flow control windows. */ size_t max_recv_bytes; - grpc_iomgr_closure *on_done_recv; + grpc_closure *on_done_recv; grpc_pollset *bind_pollset; @@ -95,9 +95,9 @@ typedef struct grpc_transport_stream_op { /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { /** called when processing of this op is done */ - grpc_iomgr_closure *on_consumed; + grpc_closure *on_consumed; /** connectivity monitoring */ - grpc_iomgr_closure *on_connectivity_state_change; + grpc_closure *on_connectivity_state_change; grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ int disconnect; @@ -118,7 +118,7 @@ typedef struct grpc_transport_op { /** add this transport to a pollset_set */ grpc_pollset_set *bind_pollset_set; /** send a ping, call this back if not NULL */ - grpc_iomgr_closure *send_ping; + grpc_closure *send_ping; } grpc_transport_op; /* Returns the amount of memory required to store a grpc_stream for this @@ -181,7 +181,7 @@ void grpc_transport_perform_op(grpc_transport *transport, /* Send a ping on a transport Calls cb with user data when a response is received. */ -void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb); +void grpc_transport_ping(grpc_transport *transport, grpc_closure *cb); /* Advise peer of pending connection termination. */ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, |