diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/surface/server.c | 73 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 6 |
2 files changed, 29 insertions, 50 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index c370a9b8ab..29d893db71 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -327,6 +327,14 @@ static void request_matcher_zombify_all_pending_calls( } } +static void request_matcher_kill_requests(grpc_server *server, + request_matcher *rm) { + int request_id; + while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { + fail_call(server, &server->requested_calls[request_id]); + } +} + /* * server proper */ @@ -492,12 +500,25 @@ static int num_channels(grpc_server *server) { return n; } +static void kill_pending_work_locked(grpc_server *server) { + registered_method *rm; + request_matcher_kill_requests(server, &server->unregistered_request_matcher); + request_matcher_zombify_all_pending_calls( + &server->unregistered_request_matcher); + for (rm = server->registered_methods; rm; rm = rm->next) { + request_matcher_kill_requests(server, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(&rm->request_matcher); + } +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } + kill_pending_work_locked(server); + if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), @@ -947,52 +968,15 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; + op.disconnect = gpr_atm_acq_load(&s->shutdown_flag); grpc_transport_perform_op(transport, &op); } -typedef struct { - requested_call **requests; - size_t count; - size_t capacity; -} request_killer; - -static void request_killer_init(request_killer *rk) { - memset(rk, 0, sizeof(*rk)); -} - -static void request_killer_add(request_killer *rk, requested_call *rc) { - if (rk->capacity == rk->count) { - rk->capacity = GPR_MAX(8, rk->capacity * 2); - rk->requests = - gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests)); - } - rk->requests[rk->count++] = rc; -} - -static void request_killer_add_request_matcher(request_killer *rk, - grpc_server *server, - request_matcher *rm) { - int request_id; - while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - request_killer_add(rk, &server->requested_calls[request_id]); - } -} - -static void request_killer_run(request_killer *rk, grpc_server *server) { - size_t i; - for (i = 0; i < rk->count; i++) { - fail_call(server, rk->requests[i]); - } - gpr_free(rk->requests); -} - void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; - request_killer reqkill; GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag); @@ -1013,27 +997,16 @@ void grpc_server_shutdown_and_notify(grpc_server *server, server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); channel_broadcaster_init(server, &broadcaster); - request_killer_init(&reqkill); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - request_killer_add_request_matcher(&reqkill, server, - &server->unregistered_request_matcher); - request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher); - for (rm = server->registered_methods; rm; rm = rm->next) { - request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(&rm->request_matcher); - } + kill_pending_work_locked(server); gpr_mu_unlock(&server->mu_call); gpr_atm_rel_store(&server->shutdown_flag, 1); maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); - /* terminate all the requested calls */ - request_killer_run(&reqkill, server); - /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { l->destroy(server, l->arg); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1ea4a82c16..f05ec99256 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -823,6 +823,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { stream_global); } else { stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; + if (stream_global->outgoing_sopb != NULL) { + grpc_sopb_reset(stream_global->outgoing_sopb); + stream_global->outgoing_sopb = NULL; + grpc_chttp2_schedule_closure(transport_global, + stream_global->send_done_closure, 1); + } stream_global->read_closed = 1; if (!stream_global->published_cancelled) { char buffer[GPR_LTOA_MIN_BUFSIZE]; |