diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-06 09:05:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-06 09:05:05 -0800 |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/surface/server.cc | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/surface/server.cc')
-rw-r--r-- | src/core/lib/surface/server.cc | 322 |
1 files changed, 187 insertions, 135 deletions
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 4f07183180..0f8a057f31 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -46,9 +46,10 @@ typedef struct listener { void* arg; - void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets, - size_t pollset_count); - void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure); + void (*start)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg, + grpc_pollset** pollsets, size_t pollset_count); + void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg, + grpc_closure* closure); struct listener* next; grpc_closure destroy_done; } listener; @@ -223,12 +224,13 @@ struct grpc_server { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data*)(elem)->channel_data)->server) -static void publish_new_rpc(void* calld, grpc_error* error); -static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, - grpc_error* error); +static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* calld, + grpc_error* error); +static void fail_call(grpc_exec_ctx* exec_ctx, grpc_server* server, + size_t cq_idx, requested_call* rc, grpc_error* error); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ -static void maybe_finish_shutdown(grpc_server* server); +static void maybe_finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_server* server); /* * channel broadcaster @@ -256,14 +258,15 @@ struct shutdown_cleanup_args { grpc_slice slice; }; -static void shutdown_cleanup(void* arg, grpc_error* error) { +static void shutdown_cleanup(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { struct shutdown_cleanup_args* a = (struct shutdown_cleanup_args*)arg; - grpc_slice_unref_internal(a->slice); + grpc_slice_unref_internal(exec_ctx, a->slice); gpr_free(a); } -static void send_shutdown(grpc_channel* channel, bool send_goaway, - grpc_error* send_disconnect) { +static void send_shutdown(grpc_exec_ctx* exec_ctx, grpc_channel* channel, + bool send_goaway, grpc_error* send_disconnect) { struct shutdown_cleanup_args* sc = (struct shutdown_cleanup_args*)gpr_malloc(sizeof(*sc)); GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc, @@ -281,18 +284,19 @@ static void send_shutdown(grpc_channel* channel, bool send_goaway, op->disconnect_with_error = send_disconnect; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(elem, op); + elem->filter->start_transport_op(exec_ctx, elem, op); } -static void channel_broadcaster_shutdown(channel_broadcaster* cb, +static void channel_broadcaster_shutdown(grpc_exec_ctx* exec_ctx, + channel_broadcaster* cb, bool send_goaway, grpc_error* force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], send_goaway, + send_shutdown(exec_ctx, cb->channels[i], send_goaway, GRPC_ERROR_REF(force_disconnect)); - GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast"); } gpr_free(cb->channels); GRPC_ERROR_UNREF(force_disconnect); @@ -320,11 +324,13 @@ static void request_matcher_destroy(request_matcher* rm) { gpr_free(rm->requests_per_cq); } -static void kill_zombie(void* elem, grpc_error* error) { +static void kill_zombie(grpc_exec_ctx* exec_ctx, void* elem, + grpc_error* error) { grpc_call_unref(grpc_call_from_top_element((grpc_call_element*)elem)); } -static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { +static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx* exec_ctx, + request_matcher* rm) { while (rm->pending_head) { call_data* calld = rm->pending_head; rm->pending_head = calld->pending_next; @@ -333,18 +339,19 @@ static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } } -static void request_matcher_kill_requests(grpc_server* server, +static void request_matcher_kill_requests(grpc_exec_ctx* exec_ctx, + grpc_server* server, request_matcher* rm, grpc_error* error) { requested_call* rc; for (size_t i = 0; i < server->cq_count; i++) { while ((rc = (requested_call*)gpr_locked_mpscq_pop( &rm->requests_per_cq[i])) != nullptr) { - fail_call(server, i, rc, GRPC_ERROR_REF(error)); + fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -358,10 +365,10 @@ static void server_ref(grpc_server* server) { gpr_ref(&server->internal_refcount); } -static void server_delete(grpc_server* server) { +static void server_delete(grpc_exec_ctx* exec_ctx, grpc_server* server) { registered_method* rm; size_t i; - grpc_channel_args_destroy(server->channel_args); + grpc_channel_args_destroy(exec_ctx, server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); gpr_cv_destroy(&server->starting_cv); @@ -378,7 +385,7 @@ static void server_delete(grpc_server* server) { request_matcher_destroy(&server->unregistered_request_matcher); } for (i = 0; i < server->cq_count; i++) { - GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); } gpr_free(server->cqs); gpr_free(server->pollsets); @@ -386,9 +393,9 @@ static void server_delete(grpc_server* server) { gpr_free(server); } -static void server_unref(grpc_server* server) { +static void server_unref(grpc_exec_ctx* exec_ctx, grpc_server* server) { if (gpr_unref(&server->internal_refcount)) { - server_delete(server); + server_delete(exec_ctx, server); } } @@ -402,19 +409,21 @@ static void orphan_channel(channel_data* chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(void* cd, grpc_error* error) { +static void finish_destroy_channel(grpc_exec_ctx* exec_ctx, void* cd, + grpc_error* error) { channel_data* chand = (channel_data*)cd; grpc_server* server = chand->server; - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server"); - server_unref(server); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); + server_unref(exec_ctx, server); } -static void destroy_channel(channel_data* chand, grpc_error* error) { +static void destroy_channel(grpc_exec_ctx* exec_ctx, channel_data* chand, + grpc_error* error) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != nullptr); orphan_channel(chand); server_ref(chand->server); - maybe_finish_shutdown(chand->server); + maybe_finish_shutdown(exec_ctx, chand->server); GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure, finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); @@ -427,18 +436,20 @@ static void destroy_channel(channel_data* chand, grpc_error* error) { grpc_transport_op* op = grpc_make_transport_op(&chand->finish_destroy_channel_closure); op->set_accept_stream = true; - grpc_channel_next_op(grpc_channel_stack_element( + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), op); } -static void done_request_event(void* req, grpc_cq_completion* c) { +static void done_request_event(grpc_exec_ctx* exec_ctx, void* req, + grpc_cq_completion* c) { gpr_free(req); } -static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, - requested_call* rc) { - grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); +static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server, + call_data* calld, size_t cq_idx, requested_call* rc) { + grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); grpc_call* call = calld->call; *rc->call = call; calld->cq_new = server->cqs[cq_idx]; @@ -465,11 +476,12 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, GPR_UNREACHABLE_CODE(return ); } - grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, - rc, &rc->completion); + grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, + done_request_event, rc, &rc->completion); } -static void publish_new_rpc(void* arg, grpc_error* error) { +static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_call_element* call_elem = (grpc_call_element*)arg; call_data* calld = (call_data*)call_elem->call_data; channel_data* chand = (channel_data*)call_elem->channel_data; @@ -482,7 +494,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) { &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_REF(error)); return; } @@ -493,15 +506,15 @@ static void publish_new_rpc(void* arg, grpc_error* error) { if (rc == nullptr) { continue; } else { - GRPC_STATS_INC_SERVER_CQS_CHECKED(i); + GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i); gpr_atm_no_barrier_store(&calld->state, ACTIVATED); - publish_call(server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, rc); return; /* early out */ } } /* no cq to take the request found: queue it on the slow list */ - GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(); + GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx); gpr_mu_lock(&server->mu_call); // We need to ensure that all the queues are empty. We do this under @@ -516,9 +529,9 @@ static void publish_new_rpc(void* arg, grpc_error* error) { continue; } else { gpr_mu_unlock(&server->mu_call); - GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count); + GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i + server->cq_count); gpr_atm_no_barrier_store(&calld->state, ACTIVATED); - publish_call(server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, rc); return; /* early out */ } } @@ -535,7 +548,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) { } static void finish_start_new_rpc( - grpc_server* server, grpc_call_element* elem, request_matcher* rm, + grpc_exec_ctx* exec_ctx, grpc_server* server, grpc_call_element* elem, + request_matcher* rm, grpc_server_register_method_payload_handling payload_handling) { call_data* calld = (call_data*)elem->call_data; @@ -543,7 +557,7 @@ static void finish_start_new_rpc( gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); return; } @@ -551,7 +565,7 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(elem, GRPC_ERROR_NONE); + publish_new_rpc(exec_ctx, elem, GRPC_ERROR_NONE); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; @@ -560,13 +574,14 @@ static void finish_start_new_rpc( op.data.recv_message.recv_message = &calld->payload; GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem, grpc_schedule_on_exec_ctx); - grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish); + grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, + &calld->publish); break; } } } -static void start_new_rpc(grpc_call_element* elem) { +static void start_new_rpc(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; grpc_server* server = chand->server; @@ -591,7 +606,8 @@ static void start_new_rpc(grpc_call_element* elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher, + finish_start_new_rpc(exec_ctx, server, elem, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -608,12 +624,14 @@ static void start_new_rpc(grpc_call_element* elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher, + finish_start_new_rpc(exec_ctx, server, elem, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } } - finish_start_new_rpc(server, elem, &server->unregistered_request_matcher, + finish_start_new_rpc(exec_ctx, server, elem, + &server->unregistered_request_matcher, GRPC_SRM_PAYLOAD_NONE); } @@ -626,8 +644,9 @@ static int num_listeners(grpc_server* server) { return n; } -static void done_shutdown_event(void* server, grpc_cq_completion* completion) { - server_unref((grpc_server*)server); +static void done_shutdown_event(grpc_exec_ctx* exec_ctx, void* server, + grpc_cq_completion* completion) { + server_unref(exec_ctx, (grpc_server*)server); } static int num_channels(grpc_server* server) { @@ -640,30 +659,34 @@ static int num_channels(grpc_server* server) { return n; } -static void kill_pending_work_locked(grpc_server* server, grpc_error* error) { +static void kill_pending_work_locked(grpc_exec_ctx* exec_ctx, + grpc_server* server, grpc_error* error) { if (server->started) { - request_matcher_kill_requests(server, &server->unregistered_request_matcher, + request_matcher_kill_requests(exec_ctx, server, + &server->unregistered_request_matcher, GRPC_ERROR_REF(error)); request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher); + exec_ctx, &server->unregistered_request_matcher); for (registered_method* rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(server, &rm->matcher, + request_matcher_kill_requests(exec_ctx, server, &rm->matcher, GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls(&rm->matcher); + request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher); } } GRPC_ERROR_UNREF(error); } -static void maybe_finish_shutdown(grpc_server* server) { +static void maybe_finish_shutdown(grpc_exec_ctx* exec_ctx, + grpc_server* server) { size_t i; if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } kill_pending_work_locked( - server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); + exec_ctx, server, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -683,13 +706,15 @@ static void maybe_finish_shutdown(grpc_server* server) { server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { server_ref(server); - grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, - GRPC_ERROR_NONE, done_shutdown_event, server, + grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq, + server->shutdown_tags[i].tag, GRPC_ERROR_NONE, + done_shutdown_event, server, &server->shutdown_tags[i].completion); } } -static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { +static void server_on_recv_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)ptr; call_data* calld = (call_data*)elem->call_data; grpc_millis op_deadline; @@ -703,10 +728,10 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md)); calld->path_set = true; calld->host_set = true; - grpc_metadata_batch_remove(calld->recv_initial_metadata, + grpc_metadata_batch_remove(exec_ctx, calld->recv_initial_metadata, calld->recv_initial_metadata->idx.named.path); grpc_metadata_batch_remove( - calld->recv_initial_metadata, + exec_ctx, calld->recv_initial_metadata, calld->recv_initial_metadata->idx.named.authority); } else { GRPC_ERROR_REF(error); @@ -724,7 +749,7 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { GRPC_ERROR_UNREF(src_error); } - GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); } static void server_mutate_op(grpc_call_element* elem, @@ -745,21 +770,24 @@ static void server_mutate_op(grpc_call_element* elem, } static void server_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* op) { + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { server_mutate_op(elem, op); - grpc_call_next_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); } -static void got_initial_metadata(void* ptr, grpc_error* error) { +static void got_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)ptr; call_data* calld = (call_data*)elem->call_data; if (error == GRPC_ERROR_NONE) { - start_new_rpc(elem); + start_new_rpc(exec_ctx, elem); } else { if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) { GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) { /* zombied call will be destroyed when it's removed from the pending queue... later */ @@ -767,7 +795,8 @@ static void got_initial_metadata(void* ptr, grpc_error* error) { } } -static void accept_stream(void* cd, grpc_transport* transport, +static void accept_stream(grpc_exec_ctx* exec_ctx, void* cd, + grpc_transport* transport, const void* transport_server_data) { channel_data* chand = (channel_data*)cd; /* create a call */ @@ -777,11 +806,11 @@ static void accept_stream(void* cd, grpc_transport* transport, args.server_transport_data = transport_server_data; args.send_deadline = GRPC_MILLIS_INF_FUTURE; grpc_call* call; - grpc_error* error = grpc_call_create(&args, &call); + grpc_error* error = grpc_call_create(exec_ctx, &args, &call); grpc_call_element* elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); if (error != GRPC_ERROR_NONE) { - got_initial_metadata(elem, error); + got_initial_metadata(exec_ctx, elem, error); GRPC_ERROR_UNREF(error); return; } @@ -793,28 +822,32 @@ static void accept_stream(void* cd, grpc_transport* transport, &calld->initial_metadata; GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem, grpc_schedule_on_exec_ctx); - grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata); + grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, + &calld->got_initial_metadata); } -static void channel_connectivity_changed(void* cd, grpc_error* error) { +static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* cd, + grpc_error* error) { channel_data* chand = (channel_data*)cd; grpc_server* server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op* op = grpc_make_transport_op(nullptr); op->on_connectivity_state_change = &chand->channel_connectivity_changed; op->connectivity_state = &chand->connectivity_state; - grpc_channel_next_op(grpc_channel_stack_element( + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), op); } else { gpr_mu_lock(&server->mu_global); - destroy_channel(chand, GRPC_ERROR_REF(error)); + destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error)); gpr_mu_unlock(&server->mu_global); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity"); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity"); } } -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; @@ -830,7 +863,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, return GRPC_ERROR_NONE; } -static void destroy_call_elem(grpc_call_element* elem, +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { channel_data* chand = (channel_data*)elem->channel_data; @@ -839,18 +872,19 @@ static void destroy_call_elem(grpc_call_element* elem, GPR_ASSERT(calld->state != PENDING); if (calld->host_set) { - grpc_slice_unref_internal(calld->host); + grpc_slice_unref_internal(exec_ctx, calld->host); } if (calld->path_set) { - grpc_slice_unref_internal(calld->path); + grpc_slice_unref_internal(exec_ctx, calld->path); } grpc_metadata_array_destroy(&calld->initial_metadata); grpc_byte_buffer_destroy(calld->payload); - server_unref(chand->server); + server_unref(exec_ctx, chand->server); } -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(args->is_first); @@ -866,14 +900,15 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, return GRPC_ERROR_NONE; } -static void destroy_channel_elem(grpc_channel_element* elem) { +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) { size_t i; channel_data* chand = (channel_data*)elem->channel_data; if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { - grpc_slice_unref_internal(chand->registered_methods[i].method); + grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].method); if (chand->registered_methods[i].has_host) { - grpc_slice_unref_internal(chand->registered_methods[i].host); + grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].host); } } gpr_free(chand->registered_methods); @@ -883,9 +918,9 @@ static void destroy_channel_elem(grpc_channel_element* elem) { chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; - maybe_finish_shutdown(chand->server); + maybe_finish_shutdown(exec_ctx, chand->server); gpr_mu_unlock(&chand->server->mu_global); - server_unref(chand->server); + server_unref(exec_ctx, chand->server); } } @@ -999,10 +1034,11 @@ void* grpc_server_register_method( return m; } -static void start_listeners(void* s, grpc_error* error) { +static void start_listeners(grpc_exec_ctx* exec_ctx, void* s, + grpc_error* error) { grpc_server* server = (grpc_server*)s; for (listener* l = server->listeners; l; l = l->next) { - l->start(server, l->arg, server->pollsets, server->pollset_count); + l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count); } gpr_mu_lock(&server->mu_global); @@ -1010,12 +1046,12 @@ static void start_listeners(void* s, grpc_error* error) { gpr_cv_signal(&server->starting_cv); gpr_mu_unlock(&server->mu_global); - server_unref(server); + server_unref(exec_ctx, server); } void grpc_server_start(grpc_server* server) { size_t i; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); @@ -1037,9 +1073,12 @@ void grpc_server_start(grpc_server* server) { server_ref(server); server->starting = true; GRPC_CLOSURE_SCHED( + &exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server, grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), GRPC_ERROR_NONE); + + grpc_exec_ctx_finish(&exec_ctx); } void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, @@ -1048,7 +1087,8 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, *pollsets = server->pollsets; } -void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, +void grpc_server_setup_transport(grpc_exec_ctx* exec_ctx, grpc_server* s, + grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args) { size_t num_registered_methods; @@ -1063,7 +1103,8 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, uint32_t max_probes = 0; grpc_transport_op* op = nullptr; - channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport); + channel = grpc_channel_create(exec_ctx, nullptr, args, GRPC_SERVER_CHANNEL, + transport); chand = (channel_data*)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; @@ -1140,19 +1181,21 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"); } - grpc_transport_perform_op(transport, op); + grpc_transport_perform_op(exec_ctx, transport, op); } -void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) { +void done_published_shutdown(grpc_exec_ctx* exec_ctx, void* done_arg, + grpc_cq_completion* storage) { (void)done_arg; gpr_free(storage); } -static void listener_destroy_done(void* s, grpc_error* error) { +static void listener_destroy_done(grpc_exec_ctx* exec_ctx, void* s, + grpc_error* error) { grpc_server* server = (grpc_server*)s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; - maybe_finish_shutdown(server); + maybe_finish_shutdown(exec_ctx, server); gpr_mu_unlock(&server->mu_global); } @@ -1161,7 +1204,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, listener* l; shutdown_tag* sdt; channel_broadcaster broadcaster; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); @@ -1176,10 +1219,11 @@ void grpc_server_shutdown_and_notify(grpc_server* server, /* stay locked, and gather up some stuff to do */ GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { - grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr, + grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, + nullptr, (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); - return; + goto done; } server->shutdown_tags = (shutdown_tag*)gpr_realloc( server->shutdown_tags, @@ -1189,7 +1233,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, sdt->cq = cq; if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); - return; + goto done; } server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); @@ -1201,26 +1245,30 @@ void grpc_server_shutdown_and_notify(grpc_server* server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); kill_pending_work_locked( - server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); + &exec_ctx, server, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); gpr_mu_unlock(&server->mu_call); - maybe_finish_shutdown(server); + maybe_finish_shutdown(&exec_ctx, server); gpr_mu_unlock(&server->mu_global); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server, grpc_schedule_on_exec_ctx); - l->destroy(server, l->arg, &l->destroy_done); + l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); } - channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */, + channel_broadcaster_shutdown(&exec_ctx, &broadcaster, true /* send_goaway */, GRPC_ERROR_NONE); + +done: + grpc_exec_ctx_finish(&exec_ctx); } void grpc_server_cancel_all_calls(grpc_server* server) { channel_broadcaster broadcaster; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server)); @@ -1229,13 +1277,14 @@ void grpc_server_cancel_all_calls(grpc_server* server) { gpr_mu_unlock(&server->mu_global); channel_broadcaster_shutdown( - &broadcaster, false /* send_goaway */, + &exec_ctx, &broadcaster, false /* send_goaway */, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); + grpc_exec_ctx_finish(&exec_ctx); } void grpc_server_destroy(grpc_server* server) { listener* l; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server)); @@ -1251,15 +1300,16 @@ void grpc_server_destroy(grpc_server* server) { gpr_mu_unlock(&server->mu_global); - server_unref(server); + server_unref(&exec_ctx, server); + grpc_exec_ctx_finish(&exec_ctx); } -void grpc_server_add_listener(grpc_server* server, void* arg, - void (*start)(grpc_server* server, void* arg, - grpc_pollset** pollsets, - size_t pollset_count), - void (*destroy)(grpc_server* server, void* arg, - grpc_closure* on_done)) { +void grpc_server_add_listener( + grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg, + void (*start)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg, + grpc_pollset** pollsets, size_t pollset_count), + void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg, + grpc_closure* on_done)) { listener* l = (listener*)gpr_malloc(sizeof(listener)); l->arg = arg; l->start = start; @@ -1268,12 +1318,13 @@ void grpc_server_add_listener(grpc_server* server, void* arg, server->listeners = l; } -static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, +static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx, + grpc_server* server, size_t cq_idx, requested_call* rc) { call_data* calld = nullptr; request_matcher* rm = nullptr; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(server, cq_idx, rc, + fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } @@ -1300,9 +1351,10 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else { - publish_call(server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, rc); } gpr_mu_lock(&server->mu_call); } @@ -1317,9 +1369,9 @@ grpc_call_error grpc_server_request_call( grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { grpc_call_error error; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc)); - GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); + GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx); GRPC_API_TRACE( "grpc_server_request_call(" "server=%p, call=%p, details=%p, initial_metadata=%p, " @@ -1352,9 +1404,9 @@ grpc_call_error grpc_server_request_call( rc->call = call; rc->data.batch.details = details; rc->initial_metadata = initial_metadata; - error = queue_call_request(server, cq_idx, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: - + grpc_exec_ctx_finish(&exec_ctx); return error; } @@ -1364,10 +1416,10 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { grpc_call_error error; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc)); registered_method* rm = (registered_method*)rmp; - GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); + GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx); GRPC_API_TRACE( "grpc_server_request_registered_call(" "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, " @@ -1409,20 +1461,20 @@ grpc_call_error grpc_server_request_registered_call( rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - error = queue_call_request(server, cq_idx, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: - + grpc_exec_ctx_finish(&exec_ctx); return error; } -static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, - grpc_error* error) { +static void fail_call(grpc_exec_ctx* exec_ctx, grpc_server* server, + size_t cq_idx, requested_call* rc, grpc_error* error) { *rc->call = nullptr; rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); - grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc, - &rc->completion); + grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, + done_request_event, rc, &rc->completion); } const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) { |