diff options
Diffstat (limited to 'src/core/lib/surface/server.c')
-rw-r--r-- | src/core/lib/surface/server.c | 116 |
1 files changed, 71 insertions, 45 deletions
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 88d898bc1c..def6e5068b 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -230,9 +230,10 @@ struct grpc_server { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success); +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, + grpc_error *error); static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - size_t cq_idx, requested_call *rc); + size_t cq_idx, requested_call *rc, grpc_error *error); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server); @@ -263,14 +264,14 @@ struct shutdown_cleanup_args { }; static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_status_ignored) { + grpc_error *error) { struct shutdown_cleanup_args *a = arg; gpr_slice_unref(a->slice); gpr_free(a); } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - int send_goaway, int send_disconnect) { + int send_goaway, grpc_error *send_disconnect) { grpc_transport_op op; struct shutdown_cleanup_args *sc; grpc_channel_element *elem; @@ -281,7 +282,7 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, sc->slice = gpr_slice_from_copied_string("Server shutdown"); op.goaway_message = &sc->slice; op.goaway_status = GRPC_STATUS_OK; - op.disconnect = send_disconnect; + op.disconnect_with_error = send_disconnect; grpc_closure_init(&sc->closure, shutdown_cleanup, sc); op.on_consumed = &sc->closure; @@ -292,14 +293,16 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, channel_broadcaster *cb, int send_goaway, - int force_disconnect) { + grpc_error *force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(exec_ctx, cb->channels[i], send_goaway, force_disconnect); + send_shutdown(exec_ctx, cb->channels[i], send_goaway, + GRPC_ERROR_REF(force_disconnect)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast"); } gpr_free(cb->channels); + GRPC_ERROR_UNREF(force_disconnect); } /* @@ -325,7 +328,8 @@ static void request_matcher_destroy(request_matcher *rm) { gpr_free(rm->requests_per_cq); } -static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) { +static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, + grpc_error *error) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -340,20 +344,24 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); } } static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, - request_matcher *rm) { + request_matcher *rm, + grpc_error *error) { int request_id; for (size_t i = 0; i < server->cq_count; i++) { while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != -1) { - fail_call(exec_ctx, server, i, &server->requested_calls[request_id]); + fail_call(exec_ctx, server, i, &server->requested_calls[request_id], + GRPC_ERROR_REF(error)); } } + GRPC_ERROR_UNREF(error); } /* @@ -410,7 +418,7 @@ static void orphan_channel(channel_data *chand) { } static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, - bool success) { + grpc_error *error) { channel_data *chand = cd; grpc_server *server = chand->server; GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); @@ -499,25 +507,26 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); channel_data *chand = elem->channel_data; server_ref(chand->server); - grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc, - &rc->completion); + grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, + done_request_event, rc, &rc->completion); } -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *call_elem = arg; call_data *calld = call_elem->call_data; channel_data *chand = call_elem->channel_data; request_matcher *rm = calld->request_matcher; grpc_server *server = rm->server; - if (!success || gpr_atm_acq_load(&server->shutdown_flag)) { + if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); return; } @@ -562,7 +571,8 @@ static void finish_start_new_rpc( calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); return; } @@ -570,7 +580,7 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(exec_ctx, elem, true); + publish_new_rpc(exec_ctx, elem, GRPC_ERROR_NONE); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; @@ -658,18 +668,21 @@ static int num_channels(grpc_server *server) { } static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, - grpc_server *server) { + grpc_server *server, grpc_error *error) { if (server->started) { request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matcher); + &server->unregistered_request_matcher, + GRPC_ERROR_REF(error)); request_matcher_zombify_all_pending_calls( exec_ctx, &server->unregistered_request_matcher); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); + request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, + GRPC_ERROR_REF(error)); request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); } } + GRPC_ERROR_UNREF(error); } static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, @@ -679,7 +692,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, return; } - kill_pending_work_locked(exec_ctx, server); + kill_pending_work_locked(exec_ctx, server, + GRPC_ERROR_CREATE("Server Shutdown")); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -700,7 +714,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, for (i = 0; i < server->num_shutdown_tags; i++) { server_ref(server); grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq, - server->shutdown_tags[i].tag, 1, done_shutdown_event, server, + server->shutdown_tags[i].tag, GRPC_ERROR_NONE, + done_shutdown_event, server, &server->shutdown_tags[i].completion); } } @@ -723,11 +738,12 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - bool success) { + grpc_error *error) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; gpr_timespec op_deadline; + GRPC_ERROR_REF(error); grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem); op_deadline = calld->recv_initial_metadata->deadline; if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { @@ -736,11 +752,13 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, if (calld->host && calld->path) { /* do nothing */ } else { - success = 0; + GRPC_ERROR_UNREF(error); + error = + GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); } - calld->on_done_recv_initial_metadata->cb( - exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success); + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error, + NULL); } static void server_mutate_op(grpc_call_element *elem, @@ -765,10 +783,10 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - bool success) { + grpc_error *error) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - if (success) { + if (error == GRPC_ERROR_NONE) { start_new_rpc(exec_ctx, elem); } else { gpr_mu_lock(&calld->mu_state); @@ -776,7 +794,8 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -809,7 +828,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, } static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, - bool iomgr_status_ignored) { + grpc_error *error) { channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { @@ -1148,7 +1167,9 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; - op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0; + if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { + op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + } grpc_transport_perform_op(exec_ctx, transport, &op); } @@ -1159,7 +1180,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, } static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s, - bool success) { + grpc_error *error) { grpc_server *server = s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; @@ -1181,8 +1202,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { - grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, + NULL, gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); goto done; } @@ -1205,7 +1226,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - kill_pending_work_locked(&exec_ctx, server); + kill_pending_work_locked(&exec_ctx, server, + GRPC_ERROR_CREATE("Server Shutdown")); gpr_mu_unlock(&server->mu_call); maybe_finish_shutdown(&exec_ctx, server); @@ -1233,7 +1255,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1); + channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, + GRPC_ERROR_CREATE("Cancelling all calls")); grpc_exec_ctx_finish(&exec_ctx); } @@ -1280,13 +1303,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, request_matcher *rm = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(exec_ctx, server, cq_idx, rc); + fail_call(exec_ctx, server, cq_idx, rc, + GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, cq_idx, rc); + fail_call(exec_ctx, server, cq_idx, rc, + GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } switch (rc->type) { @@ -1314,8 +1339,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; @@ -1421,13 +1446,14 @@ done: } static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - size_t cq_idx, requested_call *rc) { + size_t cq_idx, requested_call *rc, grpc_error *error) { *rc->call = NULL; rc->initial_metadata->count = 0; + GPR_ASSERT(error != GRPC_ERROR_NONE); server_ref(server); - grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, 0, done_request_event, - rc, &rc->completion); + grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, + done_request_event, rc, &rc->completion); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { |