diff options
70 files changed, 254 insertions, 234 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index ee1f62f8c2..db432ee552 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -91,7 +91,7 @@ static void client_start_transport_op(grpc_exec_ctx *exec_ctx, } static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, - int success) { + bool success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index a92a6ecaf2..97282dbbcb 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -135,7 +135,7 @@ static void on_lb_policy_state_changed_locked( } static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { lb_policy_connectivity_watcher *w = arg; gpr_mu_lock(&w->chand->mu_config); @@ -161,7 +161,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, } static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; @@ -191,7 +191,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures); + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -249,7 +250,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); GPR_ASSERT(op->set_accept_stream == NULL); if (op->bind_pollset != NULL) { @@ -268,7 +269,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -310,15 +311,15 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { continue_picking_args *cpa = arg; if (!success) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); } else if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->connected_subchannel, cpa->on_ready)) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1); + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL); } gpr_free(cpa); } @@ -346,7 +347,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); } } gpr_mu_unlock(&chand->mu_config); @@ -497,7 +498,7 @@ typedef struct { } external_connectivity_watcher; static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 2c0b07d8bf..edc70d40d3 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -79,7 +79,7 @@ typedef struct client_uchannel_channel_data { typedef grpc_subchannel_call_holder call_data; static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { channel_data *chand = arg; grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, chand->subchannel_connectivity, @@ -105,7 +105,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->bind_pollset == NULL); diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 3de1ef8a35..8847539bf6 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -153,7 +153,7 @@ static void process_send_initial_metadata( static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) { +static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; gpr_slice_buffer_reset_and_unref(&calld->slices); @@ -183,7 +183,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_next_op(exec_ctx, elem, &calld->send_op); } -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) { +static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 65cfb778bb..ace5416e7a 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -80,7 +80,7 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { return md; } -static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; client_recv_filter_args a; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index a10b105494..51e03e32d1 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -131,7 +131,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } } -static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (success) { diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 1e8f4ba1b6..63213d63ca 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -43,9 +43,9 @@ #define CANCELLED_CALL ((grpc_subchannel_call *)1) static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, - int success); + bool success); static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, - int success); + bool success); static void add_waiting_locked(grpc_subchannel_call_holder *holder, grpc_transport_stream_op *op); @@ -166,7 +166,7 @@ retry: GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_subchannel_call_holder *holder = arg; grpc_subchannel_call *call; gpr_mu_lock(&holder->mu); @@ -209,10 +209,11 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true, + NULL); } -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) { +static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) { retry_ops_args *a = args; size_t i; for (i = 0; i < a->nops; i++) { @@ -240,9 +241,10 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { size_t i; for (i = 0; i < holder->waiting_ops_count; i++) { - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false, + NULL); grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready, - 0); + false, NULL); } holder->waiting_ops_count = 0; } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 5b10600ab5..459bbebb68 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -121,7 +121,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); pp = next; } @@ -140,7 +140,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -209,7 +209,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, } static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { pick_first_lb_policy *p = arg; size_t i; size_t num_subchannels = p->num_subchannels; @@ -230,7 +230,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, } static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; @@ -272,15 +272,15 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_exec_ctx_enqueue(exec_ctx, - grpc_closure_create(destroy_subchannels, p), 1); + grpc_exec_ctx_enqueue( + exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -327,7 +327,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -374,7 +374,7 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (selected) { grpc_connected_subchannel_ping(exec_ctx, selected, closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); } } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index d487456363..ca5cf530c1 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -237,7 +237,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); } grpc_connectivity_state_set(exec_ctx, &p->state_tracker, @@ -263,7 +263,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -336,7 +336,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, } static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; @@ -376,7 +376,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -428,7 +428,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } } else { @@ -479,7 +479,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel_ping(exec_ctx, target, closure); } else { gpr_mu_unlock(&p->mu); - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); } } diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 28ca30b946..3b3f50101b 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -93,7 +93,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -182,7 +182,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 6343259246..d2b6297c1c 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -98,7 +98,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -153,7 +153,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; *r->target_config = cfg; - grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); r->next_completion = NULL; } } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index fccc1dda54..0a996a1e8b 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -145,7 +145,7 @@ struct grpc_subchannel_call { static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, - int iomgr_success); + bool iomgr_success); #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define REF_REASON reason @@ -175,7 +175,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, */ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { grpc_connected_subchannel *c = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); @@ -198,7 +198,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, */ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { grpc_subchannel *c = arg; gpr_free((void *)c->filters); grpc_channel_args_destroy(c->args); @@ -268,7 +268,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), - 1); + true, NULL); } } @@ -341,7 +341,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { } static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { @@ -413,7 +413,7 @@ void grpc_connected_subchannel_process_transport_op( } static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, - int iomgr_success) { + bool iomgr_success) { state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -594,7 +594,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) { gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { +static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); c->have_alarm = 0; @@ -611,7 +611,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { } static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { grpc_subchannel *c = arg; if (c->connecting_result.transport != NULL) { @@ -647,7 +647,7 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { */ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, - int success) { + bool success) { grpc_subchannel_call *c = call; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index b5cd8d8d2a..a7adb83093 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -114,13 +114,13 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, gpr_free(req); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success); +static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success); static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) { grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { internal_request *req = user_data; size_t i; @@ -147,7 +147,7 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) { do_read(exec_ctx, req); } -static void done_write(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void done_write(grpc_exec_ctx *exec_ctx, void *arg, bool success) { internal_request *req = arg; if (success) { on_written(exec_ctx, req); @@ -175,7 +175,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, start_write(exec_ctx, req); } -static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { internal_request *req = arg; if (!req->ep) { diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index 7646a88ac5..d439eaf32c 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -43,7 +43,7 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, } void grpc_closure_list_add(grpc_closure_list *closure_list, - grpc_closure *closure, int success) { + grpc_closure *closure, bool success) { if (closure == NULL) return; closure->final_data = (success != 0); if (closure_list->head == NULL) { @@ -54,7 +54,7 @@ void grpc_closure_list_add(grpc_closure_list *closure_list, closure_list->tail = closure; } -int grpc_closure_list_empty(grpc_closure_list closure_list) { +bool grpc_closure_list_empty(grpc_closure_list closure_list) { return closure_list.head == NULL; } @@ -77,7 +77,7 @@ typedef struct { grpc_closure wrapper; } wrapped_closure; -static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) { wrapped_closure *wc = arg; grpc_iomgr_cb_func cb = wc->cb; void *cb_arg = wc->cb_arg; diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index 98ef91e1db..34c6f21eae 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H #include <grpc/support/port_platform.h> +#include <stdbool.h> struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -54,7 +55,7 @@ typedef struct grpc_closure_list { * \param success An indication on the state of the iomgr. On false, cleanup * actions should be taken (eg, shutdown). */ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, - int success); + bool success); /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { @@ -83,13 +84,13 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); /** add \a closure to the end of \a list and set \a closure's success to \a * success */ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, - int success); + bool success); /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); /** return whether \a list is empty. */ -int grpc_closure_list_empty(grpc_closure_list list); +bool grpc_closure_list_empty(grpc_closure_list list); /** return the next pointer for a queued closure list */ grpc_closure *grpc_closure_next(grpc_closure *closure); diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 6059a6031c..3ac8198f8a 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -37,16 +37,16 @@ #include "src/core/profiling/timers.h" -int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { - int did_something = 0; +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { + bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { - int success = (int)(c->final_data & 1); + bool success = (bool)(c->final_data & 1); grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1); - did_something++; + did_something = true; GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); c->cb(exec_ctx, c->cb_arg, success); GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); @@ -62,11 +62,15 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { } void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - int success) { + bool success, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_add(&exec_ctx->closure_list, closure, success); } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, - grpc_closure_list *list) { + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index 43df488094..69908b0882 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -57,22 +57,29 @@ struct grpc_exec_ctx { grpc_closure_list closure_list; }; +/** A workqueue represents a list of work to be executed asynchronously. + Forward declared here to avoid a circular dependency with workqueue.h. */ +struct grpc_workqueue; +typedef struct grpc_workqueue grpc_workqueue; + #define GRPC_EXEC_CTX_INIT \ { GRPC_CLOSURE_LIST_INIT } /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. - * Returns 1 if work was performed, 0 otherwise. */ -int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); + * Returns true if work was performed, false otherwise. */ +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); /** Add a closure to be executed at the next flush/finish point */ void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - int success); + bool success, + grpc_workqueue *offload_target_or_null); /** Add a list of closures to be executed at the next flush/finish point. * Leaves \a list empty. */ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, - grpc_closure_list *list); + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null); #endif diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c index 00c68f7828..f603f5e42a 100644 --- a/src/core/iomgr/executor.c +++ b/src/core/iomgr/executor.c @@ -77,7 +77,7 @@ static void closure_exec_thread_func(void *ignored) { gpr_mu_unlock(&g_executor.mu); break; } else { - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); } gpr_mu_unlock(&g_executor.mu); grpc_exec_ctx_flush(&exec_ctx); @@ -112,7 +112,7 @@ static void maybe_spawn_locked() { g_executor.pending_join = 1; } -void grpc_executor_enqueue(grpc_closure *closure, int success) { +void grpc_executor_enqueue(grpc_closure *closure, bool success) { gpr_mu_lock(&g_executor.mu); if (g_executor.shutting_down == 0) { grpc_closure_list_add(&g_executor.closures, closure, success); @@ -133,7 +133,7 @@ void grpc_executor_shutdown() { * list below because we aren't accepting new work */ /* Execute pending callbacks, some may be performing cleanups */ - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); if (pending_join) { diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h index 6da446ae9c..d6a7b0a7ad 100644 --- a/src/core/iomgr/executor.h +++ b/src/core/iomgr/executor.h @@ -45,7 +45,7 @@ void grpc_executor_init(); /** Enqueue \a closure for its eventual execution of \a f(arg) on a separate * thread */ -void grpc_executor_enqueue(grpc_closure *closure, int success); +void grpc_executor_enqueue(grpc_closure *closure, bool success); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 89c938bc04..85eadd754b 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -218,7 +218,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { } else { grpc_remove_fd_from_all_epoll_sets(fd->fd); } - grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); } int grpc_fd_wrapped_fd(grpc_fd *fd) { @@ -273,7 +273,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown); + grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -296,7 +296,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown); + grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); *st = CLOSURE_NOT_READY; return 1; } diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index d117485327..4acae2bb71 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -141,7 +141,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_status) { + bool iomgr_status) { delayed_add *da = arg; if (!grpc_fd_is_orphaned(da->fd)) { @@ -154,7 +154,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1); + grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); } } gpr_mu_unlock(&da->pollset->mu); @@ -178,7 +178,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, GRPC_FD_REF(fd, "delayed_add"); grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index c325b634ae..595a934752 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -243,7 +243,7 @@ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); pollset->vtable->finish_shutdown(pollset); - grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1); + grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -271,7 +271,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!grpc_pollset_has_workers(pollset) && !grpc_closure_list_empty(pollset->idle_jobs)) { GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0); - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); goto done; } /* Check alarms - these are a global resource so we just ping @@ -365,7 +365,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ gpr_mu_lock(&pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); gpr_mu_unlock(&pollset->mu); grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); @@ -381,7 +381,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutdown_done = closure; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!grpc_pollset_has_workers(pollset)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); } if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && !grpc_pollset_has_workers(pollset)) { @@ -419,7 +419,8 @@ typedef struct grpc_unary_promote_args { grpc_closure promotion_closure; } grpc_unary_promote_args; -static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) { +static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, + bool success) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index 555c74ce7e..9a5e48c12e 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -152,7 +152,7 @@ done: /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ -static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) { +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { request *r = rp; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index d9d24ee9a3..ae008e572b 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -91,7 +91,7 @@ error: return 0; } -static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) { +static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) { int done; async_connect *ac = acp; if (grpc_tcp_trace) { @@ -111,7 +111,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) { } } -static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) { +static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; @@ -206,7 +206,7 @@ finish: gpr_free(ac->addr_str); gpr_free(ac); } - grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL); + grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL); } void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, @@ -243,7 +243,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, addr_len = sizeof(addr4_copy); } if (!prepare_socket(addr, fd)) { - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); return; } @@ -259,14 +259,14 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); goto done; } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 4fa8ca8c71..048e907441 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -100,9 +100,9 @@ typedef struct { } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success); + bool success); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success); + bool success); static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -247,7 +247,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success) { + bool success) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); @@ -273,7 +273,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = 0; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL); } } @@ -360,7 +360,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { } static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - int success) { + bool success) { grpc_tcp *tcp = (grpc_tcp *)arg; flush_result status; grpc_closure *cb; @@ -407,7 +407,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_enqueue(exec_ctx, cb, 1); + grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); return; } tcp->outgoing_buffer = buf; @@ -420,7 +420,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE); + grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL); } GPR_TIMER_END("tcp_write", 0); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index adf14aeb59..5e07f8261c 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -160,7 +160,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); + grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL); } gpr_mu_destroy(&s->mu); @@ -174,7 +174,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(s); } -static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) { +static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, + bool success) { grpc_tcp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; @@ -317,7 +318,7 @@ error: } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_tcp_listener *sp = arg; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, sp->fd_index}; @@ -602,7 +603,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&s->mu); - grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting); + grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL); gpr_mu_unlock(&s->mu); if (exec_ctx == NULL) { grpc_exec_ctx_flush(&local_exec_ctx); diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index 24d6204e07..cd6ad6ece6 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -224,7 +224,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0); + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -290,7 +290,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success); + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index 714536233c..14fe2fedd2 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -47,9 +47,7 @@ #include "src/core/iomgr/workqueue_windows.h" #endif -/** A workqueue represents a list of work to be executed asynchronously. */ -struct grpc_workqueue; -typedef struct grpc_workqueue grpc_workqueue; +/* grpc_workqueue is forward declared in exec_ctx.h */ /** Create a work queue */ grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx); diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index d2a1c34612..2d490259f7 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -45,7 +45,7 @@ #include "src/core/iomgr/fd_posix.h" -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success); +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { char name[32]; @@ -110,7 +110,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_unlock(&workqueue->mu); } -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_workqueue *workqueue = arg; if (!success) { diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 8b56c57645..afba0079f5 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -793,7 +793,7 @@ static void md_only_test_destruct(grpc_call_credentials *creds) { } static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx, - void *user_data, int success) { + void *user_data, bool success) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds; @@ -812,7 +812,7 @@ static void md_only_test_get_request_metadata( grpc_credentials_metadata_request *cb_arg = grpc_credentials_metadata_request_create(creds, cb, user_data); grpc_executor_enqueue( - grpc_closure_create(on_simulated_token_fetch_done, cb_arg), 1); + grpc_closure_create(on_simulated_token_fetch_done, cb_arg), true); } else { cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 5385e41130..ca4d75ee48 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -86,7 +86,7 @@ static void on_compute_engine_detection_http_response( gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int s) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) { grpc_pollset_destroy(p); } diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c index 364b765396..c7187d6706 100644 --- a/src/core/security/handshake.c +++ b/src/core/security/handshake.c @@ -61,10 +61,10 @@ typedef struct { } grpc_security_handshake; static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, - void *setup, int success); + void *setup, bool success); static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup, - int success); + bool success); static void security_connector_remove_handshake(grpc_security_handshake *h) { grpc_security_connector_handshake_list *node; @@ -198,7 +198,8 @@ static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx, } static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, - void *handshake, int success) { + void *handshake, + bool success) { grpc_security_handshake *h = handshake; size_t consumed_slice_size = 0; tsi_result result = TSI_OK; @@ -265,7 +266,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, /* If handshake is NULL, the handshake is done. */ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, - void *handshake, int success) { + void *handshake, bool success) { grpc_security_handshake *h = handshake; /* Make sure that write is OK. */ diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 9b87e268c6..97249bb087 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -126,7 +126,7 @@ static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur, } static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, - int success) { + bool success) { if (grpc_trace_secure_endpoint) { size_t i; for (i = 0; i < ep->read_buffer->count; i++) { @@ -137,11 +137,11 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, } } ep->read_buffer = NULL; - grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success); + grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success, NULL); SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { unsigned i; uint8_t keep_looping = 0; tsi_result result = TSI_OK; @@ -315,7 +315,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - grpc_exec_ctx_enqueue(exec_ctx, cb, 0); + grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL); return; } diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index d5c8c54369..04b4072f92 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -153,7 +153,7 @@ static void on_md_processing_done( } static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, - int success) { + bool success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 08713fceaf..ee3443c828 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -142,7 +142,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, on_accept, state); } -static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) { +static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { grpc_server_secure_state *state = statep; if (state->destroy_callback != NULL) { state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 880666bb38..9495e748b5 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -229,9 +229,9 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, - int success); + bool success); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - int success); + bool success); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, @@ -351,7 +351,7 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { size_t i; int ii; grpc_call *c = call; @@ -688,13 +688,13 @@ typedef struct cancel_closure { grpc_status_code status; } cancel_closure; -static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { +static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { cancel_closure *cc = ccp; GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); gpr_free(cc); } -static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { +static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { grpc_transport_stream_op op; cancel_closure *cc = ccp; memset(&op, 0, sizeof(op)); @@ -721,7 +721,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, cc->call = c; cc->status = status; GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL); return GRPC_CALL_OK; } @@ -757,7 +757,7 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } -static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_call *call = arg; gpr_mu_lock(&call->mu); call->have_alarm = 0; @@ -934,7 +934,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { - grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); + grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -974,7 +974,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, } static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - int success) { + bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; @@ -993,7 +993,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } -static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { +static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; grpc_call *child_call; @@ -1066,7 +1066,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { } static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - int success) { + bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 001692376f..ffbc2ca971 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -80,7 +80,7 @@ struct grpc_channel { /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) -static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success); +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success); grpc_channel *grpc_channel_create_from_filters( grpc_exec_ctx *exec_ctx, const char *target, @@ -268,7 +268,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, } static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { + bool iomgr_success) { grpc_channel *channel = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 10f5c4da4d..e2973bba6e 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -165,11 +165,11 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } } -static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) { +static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { partly_done(exec_ctx, pw, 1); } -static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) { +static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { partly_done(exec_ctx, pw, 0); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 49083f0870..b3ee203ddc 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -80,11 +80,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { connector_unref(exec_ctx, arg); } -static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { connector *c = arg; grpc_closure *notify; grpc_endpoint *tcp = c->tcp; diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c index b4ce282787..4a63f29c6a 100644 --- a/src/core/surface/channel_ping.c +++ b/src/core/surface/channel_ping.c @@ -53,7 +53,7 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(arg); } -static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, bool success) { ping_result *pr = arg; grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr, &pr->completion_storage); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c0db9c508a..5ac02f5b94 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -86,7 +86,7 @@ static gpr_mu g_freelist_mu; grpc_completion_queue *g_freelist; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, - int success); + bool success); void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); } @@ -169,7 +169,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index a60e9d20da..b6d468ed26 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -78,8 +78,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0); - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 552a570713..0a7d51d97e 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -128,14 +128,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { connector *c = arg; grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base, c->connecting_endpoint, on_secure_handshake_done, c); } -static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { connector *c = arg; grpc_closure *notify; grpc_endpoint *tcp = c->newly_connecting_endpoint; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 0928f1e045..42cffccb4c 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -260,7 +260,7 @@ struct shutdown_cleanup_args { }; static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_status_ignored) { + bool iomgr_status_ignored) { struct shutdown_cleanup_args *a = arg; gpr_slice_unref(a->slice); gpr_free(a); @@ -313,7 +313,7 @@ static void request_matcher_destroy(request_matcher *rm) { gpr_stack_lockfree_destroy(rm->requests); } -static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) { +static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -328,7 +328,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); } } @@ -392,7 +392,7 @@ static void orphan_channel(channel_data *chand) { } static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, - int success) { + bool success) { channel_data *chand = cd; grpc_server *server = chand->server; GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); @@ -407,7 +407,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { maybe_finish_shutdown(exec_ctx, chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true, + NULL); } static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -420,7 +421,7 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); return; } @@ -569,7 +570,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - int success) { + bool success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; gpr_timespec op_deadline; @@ -609,7 +610,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - int success) { + bool success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; if (success) { @@ -620,7 +621,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -653,7 +654,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, } static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, - int iomgr_status_ignored) { + bool iomgr_status_ignored) { channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { @@ -985,7 +986,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, } static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s, - int success) { + bool success) { grpc_server *server = s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; @@ -1140,7 +1141,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, + NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; @@ -1229,7 +1231,7 @@ done: } static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, - void *user_data, int success); + void *user_data, bool success); static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { gpr_slice slice = value->slice; @@ -1315,7 +1317,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, } static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc, - int success) { + bool success) { requested_call *rc = prc; grpc_call *call = *rc->call; grpc_call_element *elem = diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 6e21d2dcd7..ce970dfe73 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -82,7 +82,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1); + grpc_exec_ctx_enqueue(exec_ctx, destroy_done, true, NULL); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index a8262b7af2..6823f8f551 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -488,7 +488,7 @@ void grpc_chttp2_perform_writes( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, - void *transport_writing, int success); + void *transport_writing, bool success); void grpc_chttp2_cleanup_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index fdad05b5fb..cfbe241409 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -188,7 +188,7 @@ void grpc_chttp2_perform_writes( grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb); } else { - grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1); + grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, true, NULL); } } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 05b25fd8b0..9298573c7f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -86,14 +86,14 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(grpc_exec_ctx *exec_ctx, void *t, - int iomgr_success_ignored); + bool iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); /** Endpoint callback to process incoming data */ -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success); +static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success); /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); @@ -183,7 +183,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0); + grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, false, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -602,7 +602,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { t->parsing_active)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); - grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1); + grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL); prevent_endpoint_shutdown(t); } check_read_ops(exec_ctx, &t->global); @@ -631,7 +631,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, - void *transport_writing_ptr, int success) { + void *transport_writing_ptr, bool success) { grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); grpc_chttp2_stream_global *stream_global; @@ -669,7 +669,7 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, - int iomgr_success_ignored) { + bool iomgr_success_ignored) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("writing_action", 0); grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep); @@ -759,7 +759,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, closure->final_data |= 1; } if (closure->final_data < 2) { - grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0); + grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0, NULL); } *pclosure = NULL; } @@ -777,7 +777,7 @@ static int contains_non_ok_status( return 0; } -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {} +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {} static void perform_stream_op_locked( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -934,7 +934,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1); + grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, true, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -951,7 +951,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, lock(t); - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -1022,11 +1022,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); GPR_ASSERT(*stream_global->recv_message != NULL); - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true, + NULL); stream_global->recv_message_ready = NULL; } else if (stream_global->published_trailing_metadata) { *stream_global->recv_message = NULL; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true, + NULL); stream_global->recv_message_ready = NULL; } } @@ -1336,7 +1338,7 @@ static void read_error_locked(grpc_exec_ctx *exec_ctx, } /* tcp read callback */ -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { +static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { size_t i; int keep_reading = 0; grpc_chttp2_transport *t = tp; @@ -1523,7 +1525,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, unlock(exec_ctx, bs->transport); return 1; } else if (bs->failed) { - grpc_exec_ctx_enqueue(exec_ctx, on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL); unlock(exec_ctx, bs->transport); return 0; } else { @@ -1552,7 +1554,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bs->transport->mu); if (bs->on_next != NULL) { *bs->next = slice; - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1); + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL); bs->on_next = NULL; } else { gpr_slice_buffer_add(&bs->slices, slice); @@ -1567,7 +1569,7 @@ void grpc_chttp2_incoming_byte_stream_finished( if (from_parsing_thread) { gpr_mu_lock(&bs->transport->mu); } - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 0); + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); bs->on_next = NULL; bs->failed = 1; if (from_parsing_thread) { diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 3c3fd4671d..8d10eb2599 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -78,7 +78,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } else { success = 0; } - grpc_exec_ctx_enqueue(exec_ctx, w->notify, success); + grpc_exec_ctx_enqueue(exec_ctx, w->notify, success, NULL); gpr_free(w); } gpr_free(tracker->name); @@ -109,7 +109,7 @@ int grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL); tracker->watchers = w->next; gpr_free(w); return 0; @@ -117,7 +117,7 @@ int grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL); w->next = w->next->next; gpr_free(rm_candidate); return 0; @@ -128,7 +128,7 @@ int grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + grpc_exec_ctx_enqueue(exec_ctx, notify, true, NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -158,7 +158,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_exec_ctx_enqueue(exec_ctx, w->notify, 1); + grpc_exec_ctx_enqueue(exec_ctx, w->notify, true, NULL); gpr_free(w); } } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 2ab978be46..3aca8f04ba 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -59,7 +59,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, 1); + grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, true, NULL); } } @@ -125,8 +125,8 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0); - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, @@ -150,7 +150,7 @@ typedef struct { grpc_closure closure; } close_message_data; -static void free_message(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) { +static void free_message(grpc_exec_ctx *exec_ctx, void *p, bool iomgr_success) { close_message_data *cmd = p; gpr_slice_unref(cmd->message); if (cmd->then_call != NULL) { diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 832570a81d..55fa93e2b4 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -60,7 +60,7 @@ static void thd_func(void *arg) { gpr_event_set(&a->done_thd, (void *)1); } -static void done_write(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void done_write(grpc_exec_ctx *exec_ctx, void *arg, bool success) { thd_args *a = arg; gpr_event_set(&a->done_write, (void *)1); } diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index f1bb37c0bf..daf68c2080 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -81,12 +81,12 @@ static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return gpr_strdup("peer"); } -static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_channel_stack_destroy(exec_ctx, arg); gpr_free(arg); } -static void free_call(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void free_call(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_call_stack_destroy(exec_ctx, arg); gpr_free(arg); } diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index 33cab715b2..bcd1f26123 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -64,7 +64,7 @@ static int server_port; static struct rpc_state state; static grpc_closure on_read; -static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_ASSERT(success); gpr_slice_buffer_move_into(&state.temp_incoming_buffer, &state.incoming_buffer); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 9b622e80d6..13c815b706 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -81,7 +81,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { } } -static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { connector *c = arg; grpc_closure *notify; grpc_endpoint *tcp = c->tcp; @@ -240,7 +240,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_micro_fullstack( grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE; grpc_pollset_set g_interested_parties; -static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, bool success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( exec_ctx, arg, &g_interested_parties, &g_state, @@ -248,7 +248,7 @@ static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { } } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_pollset_destroy(arg); } diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index 612388c61d..6a0213fedf 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -131,7 +131,7 @@ static void test_post(int port) { gpr_free(host); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/httpcli/httpscli_test.c b/test/core/httpcli/httpscli_test.c index ba5660bd18..d9a22054bf 100644 --- a/test/core/httpcli/httpscli_test.c +++ b/test/core/httpcli/httpscli_test.c @@ -133,7 +133,7 @@ static void test_post(int port) { gpr_free(host); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index ff590cf2d5..7bdb43aae6 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -65,7 +65,7 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up}, }; -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index a66fe32fde..58e00a7848 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -127,7 +127,7 @@ struct read_and_write_test_state { }; static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, - void *data, int success) { + void *data, bool success) { struct read_and_write_test_state *state = data; state->bytes_read += count_slices( @@ -145,7 +145,7 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, } static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, - void *data, int success) { + void *data, bool success) { struct read_and_write_test_state *state = data; gpr_slice *slices = NULL; size_t nslices; diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index b186520729..d17c87130c 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -118,7 +118,7 @@ typedef struct { /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ - int success) { + bool success) { session *se = arg; server *sv = se->sv; grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a"); @@ -129,7 +129,7 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ /* Called when data become readable in a session. */ static void session_read_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ - int success) { + bool success) { session *se = arg; int fd = se->em_fd->fd; @@ -187,7 +187,7 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */, /* Called when a new TCP connection request arrives in the listening port. */ static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/ - int success) { + bool success) { server *sv = arg; int fd; int flags; @@ -301,7 +301,7 @@ static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx, /* Write as much as possible, then register notify_on_write. */ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ - int success) { + bool success) { client *cl = arg; int fd = cl->em_fd->fd; ssize_t write_once = 0; @@ -399,7 +399,7 @@ static void test_grpc_fd(void) { } typedef struct fd_change_data { - void (*cb_that_ran)(grpc_exec_ctx *exec_ctx, void *, int success); + grpc_iomgr_cb_func cb_that_ran; } fd_change_data; void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; } @@ -407,7 +407,7 @@ void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; } void destroy_change_data(fd_change_data *fdc) {} static void first_read_callback(grpc_exec_ctx *exec_ctx, - void *arg /* fd_change_data */, int success) { + void *arg /* fd_change_data */, bool success) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -417,7 +417,7 @@ static void first_read_callback(grpc_exec_ctx *exec_ctx, } static void second_read_callback(grpc_exec_ctx *exec_ctx, - void *arg /* fd_change_data */, int success) { + void *arg /* fd_change_data */, bool success) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -510,7 +510,7 @@ static void test_grpc_fd_change(void) { close(sv[1]); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 833ceace54..059779880e 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -63,7 +63,7 @@ static void finish_connection() { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } -static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_ASSERT(g_connecting != NULL); GPR_ASSERT(success); grpc_endpoint_shutdown(exec_ctx, g_connecting); @@ -72,7 +72,7 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, int success) { finish_connection(); } -static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_ASSERT(g_connecting == NULL); GPR_ASSERT(!success); finish_connection(); @@ -258,7 +258,7 @@ void test_times_out(void) { } } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index e0136b3cd7..d290c6bc3a 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -138,7 +138,7 @@ static size_t count_slices(gpr_slice *slices, size_t nslices, return num_bytes; } -static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, int success) { +static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { struct read_socket_state *state = (struct read_socket_state *)user_data; size_t read_bytes; int current_data; @@ -280,7 +280,7 @@ static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size, } static void write_done(grpc_exec_ctx *exec_ctx, - void *user_data /* write_socket_state */, int success) { + void *user_data /* write_socket_state */, bool success) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -383,7 +383,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_exec_ctx_finish(&exec_ctx); } -void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, int success) { +void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, bool success) { int *done = arg; *done = 1; grpc_pollset_kick(&g_pollset, NULL); @@ -503,7 +503,7 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, }; -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index f7097ac904..272d97bfcb 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -86,7 +86,7 @@ static void on_connect_result_set(on_connect_result *result, } static void server_weak_ref_shutdown(grpc_exec_ctx *exec_ctx, void *arg, - int success) { + bool success) { server_weak_ref *weak_ref = arg; weak_ref->server = NULL; } @@ -303,7 +303,7 @@ static void test_connect(unsigned n) { grpc_exec_ctx_finish(&exec_ctx); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c index 63014c3939..09cc979845 100644 --- a/test/core/iomgr/timer_list_test.c +++ b/test/core/iomgr/timer_list_test.c @@ -43,7 +43,7 @@ static int cb_called[MAX_CB][2]; -static void cb(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { cb_called[(intptr_t)arg][success]++; } diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index d1f9dabc57..5add23cbdd 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -40,7 +40,7 @@ static grpc_pollset g_pollset; -static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, bool success) { GPR_ASSERT(success == 1); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); *(int *)p = 1; @@ -90,7 +90,7 @@ static void test_flush(void) { grpc_pollset_worker worker; grpc_closure_init(&c, must_succeed, &done); - grpc_exec_ctx_enqueue(&exec_ctx, &c, 1); + grpc_exec_ctx_enqueue(&exec_ctx, &c, true, NULL); grpc_workqueue_flush(&exec_ctx, wq); grpc_workqueue_add_to_pollset(&exec_ctx, wq, &g_pollset); @@ -106,7 +106,7 @@ static void test_flush(void) { grpc_exec_ctx_finish(&exec_ctx); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index fb62bf4134..58b42eeff7 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -73,7 +73,7 @@ static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *user_data, gpr_mu_unlock(GRPC_POLLSET_MU(&request->pollset)); } -static void do_nothing(grpc_exec_ctx *exec_ctx, void *unused, int success) {} +static void do_nothing(grpc_exec_ctx *exec_ctx, void *unused, bool success) {} char *grpc_test_fetch_oauth2_token_with_credentials( grpc_call_credentials *creds) { diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 240c4596b4..0ec46e187b 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -138,7 +138,7 @@ static grpc_endpoint_test_config configs[] = { secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up}, }; -static void inc_call_ctr(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void inc_call_ctr(grpc_exec_ctx *exec_ctx, void *arg, bool success) { ++*(int *)arg; } @@ -171,7 +171,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { clean_up(); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index e03cce1322..2cceaee48b 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -47,13 +47,13 @@ grpc_closure transport_op_cb; static void *tag(intptr_t x) { return (void *)x; } -void verify_connectivity(grpc_exec_ctx *exec_ctx, void *arg, int success) { +void verify_connectivity(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_transport_op *op = arg; GPR_ASSERT(GRPC_CHANNEL_FATAL_FAILURE == *op->connectivity_state); GPR_ASSERT(success); } -void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {} +void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {} void test_transport_op(grpc_channel *channel) { grpc_transport_op op; diff --git a/test/core/transport/connectivity_state_test.c b/test/core/transport/connectivity_state_test.c index 34ab45d260..fef8800add 100644 --- a/test/core/transport/connectivity_state_test.c +++ b/test/core/transport/connectivity_state_test.c @@ -43,13 +43,13 @@ int g_counter; -static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_ASSERT(success); GPR_ASSERT(arg == THE_ARG); g_counter++; } -static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_ASSERT(!success); GPR_ASSERT(arg == THE_ARG); g_counter++; diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index 11cefbf01c..f65a30b240 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -74,7 +74,7 @@ typedef struct freereq { } freereq; static void destroy_pollset_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, - int success) { + bool success) { grpc_pollset_destroy(p); grpc_shutdown(); } diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index aaba7be356..66470c0288 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -46,7 +46,7 @@ #include "test/core/util/port.h" static void on_server_destroyed(grpc_exec_ctx *exec_ctx, void *data, - int success) { + bool success) { test_tcp_server *server = data; server->shutdown = 1; } @@ -97,7 +97,7 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) { grpc_exec_ctx_finish(&exec_ctx); } -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {} +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {} void test_tcp_server_destroy(test_tcp_server *server) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |