diff options
Diffstat (limited to 'src/core/lib/surface/server.c')
-rw-r--r-- | src/core/lib/surface/server.c | 81 |
1 files changed, 47 insertions, 34 deletions
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index fe73aa375c..addb7c4fbc 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -45,6 +45,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" @@ -271,14 +272,15 @@ struct shutdown_cleanup_args { static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { struct shutdown_cleanup_args *a = arg; - grpc_slice_unref(a->slice); + grpc_slice_unref_internal(exec_ctx, a->slice); gpr_free(a); } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, int send_goaway, grpc_error *send_disconnect) { struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); - grpc_closure_init(&sc->closure, shutdown_cleanup, sc); + grpc_closure_init(&sc->closure, shutdown_cleanup, sc, + grpc_schedule_on_exec_ctx); grpc_transport_op *op = grpc_make_transport_op(&sc->closure); grpc_channel_element *elem; @@ -346,9 +348,9 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } } @@ -379,7 +381,7 @@ static void server_ref(grpc_server *server) { static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { registered_method *rm; size_t i; - grpc_channel_args_destroy(server->channel_args); + grpc_channel_args_destroy(exec_ctx, server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); while ((rm = server->registered_methods) != NULL) { @@ -440,8 +442,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, orphan_channel(chand); server_ref(chand->server); maybe_finish_shutdown(exec_ctx, chand->server); - chand->finish_destroy_channel_closure.cb = finish_destroy_channel; - chand->finish_destroy_channel_closure.cb_arg = chand; + grpc_closure_init(&chand->finish_destroy_channel_closure, + finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); if (grpc_server_channel_trace && error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(error); @@ -545,8 +547,9 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error); return; } @@ -590,9 +593,9 @@ static void finish_start_new_rpc( gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); return; } @@ -607,7 +610,8 @@ static void finish_start_new_rpc( memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; op.data.recv_message = &calld->payload; - grpc_closure_init(&calld->publish, publish_new_rpc, elem); + grpc_closure_init(&calld->publish, publish_new_rpc, elem, + grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, &calld->publish); break; @@ -740,7 +744,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, } } -static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *server_filter(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (md->key == GRPC_MDSTR_PATH) { @@ -764,7 +769,8 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, gpr_timespec op_deadline; GRPC_ERROR_REF(error); - grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem); + grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata, + server_filter, elem); op_deadline = calld->recv_initial_metadata->deadline; if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { calld->deadline = op_deadline; @@ -813,9 +819,10 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -838,7 +845,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, args.server_transport_data = transport_server_data; args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); grpc_call *call; - grpc_error *error = grpc_call_create(&args, &call); + grpc_error *error = grpc_call_create(exec_ctx, &args, &call); grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); if (error != GRPC_ERROR_NONE) { @@ -851,7 +858,8 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_INITIAL_METADATA; op.data.recv_initial_metadata = &calld->initial_metadata; - grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem); + grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem, + grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, &calld->got_initial_metadata); } @@ -887,7 +895,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&calld->mu_state); grpc_closure_init(&calld->server_on_recv_initial_metadata, - server_on_recv_initial_metadata, elem); + server_on_recv_initial_metadata, elem, + grpc_schedule_on_exec_ctx); server_ref(chand->server); return GRPC_ERROR_NONE; @@ -902,10 +911,10 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(calld->state != PENDING); if (calld->host) { - GRPC_MDSTR_UNREF(calld->host); + GRPC_MDSTR_UNREF(exec_ctx, calld->host); } if (calld->path) { - GRPC_MDSTR_UNREF(calld->path); + GRPC_MDSTR_UNREF(exec_ctx, calld->path); } grpc_metadata_array_destroy(&calld->initial_metadata); @@ -914,9 +923,9 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, server_unref(exec_ctx, chand->server); } -static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; GPR_ASSERT(args->is_first); GPR_ASSERT(!args->is_last); @@ -926,7 +935,9 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; grpc_closure_init(&chand->channel_connectivity_changed, - channel_connectivity_changed, chand); + channel_connectivity_changed, chand, + grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; } static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, @@ -936,10 +947,10 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { if (chand->registered_methods[i].method) { - GRPC_MDSTR_UNREF(chand->registered_methods[i].method); + GRPC_MDSTR_UNREF(exec_ctx, chand->registered_methods[i].method); } if (chand->registered_methods[i].host) { - GRPC_MDSTR_UNREF(chand->registered_methods[i].host); + GRPC_MDSTR_UNREF(exec_ctx, chand->registered_methods[i].host); } } gpr_free(chand->registered_methods); @@ -1277,7 +1288,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { - grpc_closure_init(&l->destroy_done, listener_destroy_done, server); + grpc_closure_init(&l->destroy_done, listener_destroy_done, server, + grpc_schedule_on_exec_ctx); l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); } @@ -1383,9 +1395,10 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; |