From 092d8d1b7f4acd55a7bb89b10deb8e4c33ad6930 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 4 Jul 2015 22:35:00 -0700 Subject: Remove ALL_CALLS list --- src/core/surface/server.c | 104 +++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 61 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f29d47c17c..ee394bb33a 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -51,7 +51,7 @@ #include #include -typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; +typedef enum { PENDING_START, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; @@ -183,6 +183,10 @@ typedef enum { struct call_data { grpc_call *call; + /** is this call counted towards the channels total + number of calls? */ + gpr_uint8 active; + call_state state; grpc_mdstr *path; grpc_mdstr *host; @@ -280,15 +284,27 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, int send_disconnect) { + int send_goaway, int force_disconnect) { size_t i; + if (send_goaway) { + for (i = 0; i < cb->num_channels; i++) { + send_shutdown(cb->channels[i], 1, 0); + } + } + if (force_disconnect) { + for (i = 0; i < cb->num_channels; i++) { + send_shutdown(cb->channels[i], 0, 1); + } + } else { + for (i = 0; i < cb->num_disconnects; i++) { + send_shutdown(cb->disconnects[i], 0, 1); + } + } for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], 1, 0); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); } for (i = 0; i < cb->num_disconnects; i++) { - send_shutdown(cb->disconnects[i], 0, 1); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect"); } gpr_free(cb->channels); @@ -501,15 +517,6 @@ static void maybe_finish_shutdown(grpc_server *server) { return; } - gpr_mu_lock(&server->mu_call); - if (server->lists[ALL_CALLS] != NULL) { - gpr_log(GPR_DEBUG, - "Waiting for all calls to finish before destroying server"); - gpr_mu_unlock(&server->mu_call); - return; - } - gpr_mu_unlock(&server->mu_call); - if (server->root_channel_data.next != &server->root_channel_data) { gpr_log(GPR_DEBUG, "Waiting for all channels to close before destroying server"); @@ -601,7 +608,8 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - remove_res = call_list_remove(calld, ALL_CALLS); + remove_res = calld->active; + calld->active = 0; gpr_mu_unlock(&chand->server->mu_call); gpr_mu_lock(&chand->server->mu_global); if (remove_res) { @@ -676,13 +684,10 @@ static void init_call_elem(grpc_call_element *elem, memset(calld, 0, sizeof(call_data)); calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); + calld->active = 1; grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); - gpr_mu_lock(&chand->server->mu_call); - call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); - gpr_mu_unlock(&chand->server->mu_call); - gpr_mu_lock(&chand->server->mu_global); chand->num_calls++; gpr_mu_unlock(&chand->server->mu_global); @@ -695,20 +700,31 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - int removed[CALL_LIST_COUNT]; + int disconnect = 0; + int active; size_t i; gpr_mu_lock(&chand->server->mu_call); for (i = 0; i < CALL_LIST_COUNT; i++) { - removed[i] = call_list_remove(elem->call_data, i); + call_list_remove(elem->call_data, i); } + active = calld->active; + calld->active = 0; gpr_mu_unlock(&chand->server->mu_call); - if (removed[ALL_CALLS]) { + if (active) { gpr_mu_lock(&chand->server->mu_global); - decrement_call_count(chand); + disconnect = decrement_call_count(chand); + if (disconnect) { + GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); + } gpr_mu_unlock(&chand->server->mu_global); } + if (disconnect) { + send_shutdown(chand->channel, 0, 1); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); + } + if (calld->host) { grpc_mdstr_unref(calld->host); } @@ -1049,47 +1065,13 @@ void grpc_server_listener_destroy_done(void *s) { } void grpc_server_cancel_all_calls(grpc_server *server) { - call_data *calld; - grpc_call **calls; - size_t call_count; - size_t call_capacity; - int is_first = 1; - size_t i; - - gpr_mu_lock(&server->mu_call); - - GPR_ASSERT(server->shutdown); - - if (!server->lists[ALL_CALLS]) { - gpr_mu_unlock(&server->mu_call); - return; - } - - call_capacity = 8; - call_count = 0; - calls = gpr_malloc(sizeof(grpc_call *) * call_capacity); - - for (calld = server->lists[ALL_CALLS]; - calld != server->lists[ALL_CALLS] || is_first; - calld = calld->links[ALL_CALLS].next) { - if (call_count == call_capacity) { - call_capacity *= 2; - calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity); - } - calls[call_count++] = calld->call; - GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all"); - is_first = 0; - } - - gpr_mu_unlock(&server->mu_call); + channel_broadcaster broadcaster; - for (i = 0; i < call_count; i++) { - grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, - "Unavailable"); - GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1); - } + gpr_mu_lock(&server->mu_global); + channel_broadcaster_init(server, &broadcaster); + gpr_mu_unlock(&server->mu_global); - gpr_free(calls); + channel_broadcaster_shutdown(&broadcaster, 0, 1); } void grpc_server_destroy(grpc_server *server) { -- cgit v1.2.3