diff options
author | Vijay Pai <vpai@google.com> | 2015-07-08 08:47:41 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-07-08 08:47:41 -0700 |
commit | c6964b1d980df3c8809809ef05c954626cdda95f (patch) | |
tree | eead5cf3f63db6968e00b7e6921ea898b09b23f9 | |
parent | 8311a0b548e0abd6f8fe94a59854ca34525ec9c8 (diff) | |
parent | 550342f70d8de704eb226bc8fff96422efe8d349 (diff) |
Merge pull request #2304 from ctiller/just-say-goodbye-when-we-are-done
Change transport contract to automatically disconnect after sending a goaway
-rw-r--r-- | src/core/surface/server.c | 120 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 8 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 9 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 11 | ||||
-rw-r--r-- | src/core/transport/transport.h | 4 |
5 files changed, 35 insertions, 117 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f29d47c17c..341ca2942c 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -51,7 +51,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; +typedef enum { PENDING_START, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; @@ -114,7 +114,6 @@ typedef struct channel_registered_method { struct channel_data { grpc_server *server; - size_t num_calls; grpc_connectivity_state connectivity_state; grpc_channel *channel; grpc_mdstr *path_key; @@ -204,9 +203,7 @@ struct call_data { typedef struct { grpc_channel **channels; - grpc_channel **disconnects; size_t num_channels; - size_t num_disconnects; } channel_broadcaster; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -225,26 +222,15 @@ static void maybe_finish_shutdown(grpc_server *server); static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { channel_data *c; size_t count = 0; - size_t dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { count++; - if (c->num_calls == 0) { - dc_count++; - } } cb->num_channels = count; - cb->num_disconnects = dc_count; cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); - cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects); count = 0; - dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { cb->channels[count++] = c->channel; GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); - if (c->num_calls == 0) { - cb->disconnects[dc_count++] = c->channel; - GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect"); - } } } @@ -280,19 +266,14 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, int send_disconnect) { + int send_goaway, int force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], 1, 0); + send_shutdown(cb->channels[i], send_goaway, force_disconnect); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); } - for (i = 0; i < cb->num_disconnects; i++) { - send_shutdown(cb->disconnects[i], 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect"); - } gpr_free(cb->channels); - gpr_free(cb->disconnects); } /* call list */ @@ -501,15 +482,6 @@ static void maybe_finish_shutdown(grpc_server *server) { return; } - gpr_mu_lock(&server->mu_call); - if (server->lists[ALL_CALLS] != NULL) { - gpr_log(GPR_DEBUG, - "Waiting for all calls to finish before destroying server"); - gpr_mu_unlock(&server->mu_call); - return; - } - gpr_mu_unlock(&server->mu_call); - if (server->root_channel_data.next != &server->root_channel_data) { gpr_log(GPR_DEBUG, "Waiting for all channels to close before destroying server"); @@ -541,22 +513,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static int decrement_call_count(channel_data *chand) { - int disconnect = 0; - chand->num_calls--; - if (0 == chand->num_calls && chand->server->shutdown) { - disconnect = 1; - } - maybe_finish_shutdown(chand->server); - return disconnect; -} - static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - int remove_res; - int disconnect = 0; if (success && !calld->got_initial_metadata) { size_t i; @@ -601,20 +561,7 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - remove_res = call_list_remove(calld, ALL_CALLS); gpr_mu_unlock(&chand->server->mu_call); - gpr_mu_lock(&chand->server->mu_global); - if (remove_res) { - disconnect = decrement_call_count(chand); - if (disconnect) { - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); - } - } - gpr_mu_unlock(&chand->server->mu_global); - if (disconnect) { - send_shutdown(chand->channel, 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); - } break; } @@ -679,14 +626,6 @@ static void init_call_elem(grpc_call_element *elem, grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); - gpr_mu_lock(&chand->server->mu_call); - call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); - gpr_mu_unlock(&chand->server->mu_call); - - gpr_mu_lock(&chand->server->mu_global); - chand->num_calls++; - gpr_mu_unlock(&chand->server->mu_global); - server_ref(chand->server); if (initial_op) server_mutate_op(elem, initial_op); @@ -695,19 +634,13 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - int removed[CALL_LIST_COUNT]; size_t i; gpr_mu_lock(&chand->server->mu_call); for (i = 0; i < CALL_LIST_COUNT; i++) { - removed[i] = call_list_remove(elem->call_data, i); + call_list_remove(elem->call_data, i); } gpr_mu_unlock(&chand->server->mu_call); - if (removed[ALL_CALLS]) { - gpr_mu_lock(&chand->server->mu_global); - decrement_call_count(chand); - gpr_mu_unlock(&chand->server->mu_global); - } if (calld->host) { grpc_mdstr_unref(calld->host); @@ -727,7 +660,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(is_first); GPR_ASSERT(!is_last); chand->server = NULL; - chand->num_calls = 0; chand->channel = NULL; chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); @@ -1049,47 +981,13 @@ void grpc_server_listener_destroy_done(void *s) { } void grpc_server_cancel_all_calls(grpc_server *server) { - call_data *calld; - grpc_call **calls; - size_t call_count; - size_t call_capacity; - int is_first = 1; - size_t i; - - gpr_mu_lock(&server->mu_call); - - GPR_ASSERT(server->shutdown); - - if (!server->lists[ALL_CALLS]) { - gpr_mu_unlock(&server->mu_call); - return; - } - - call_capacity = 8; - call_count = 0; - calls = gpr_malloc(sizeof(grpc_call *) * call_capacity); - - for (calld = server->lists[ALL_CALLS]; - calld != server->lists[ALL_CALLS] || is_first; - calld = calld->links[ALL_CALLS].next) { - if (call_count == call_capacity) { - call_capacity *= 2; - calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity); - } - calls[call_count++] = calld->call; - GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all"); - is_first = 0; - } - - gpr_mu_unlock(&server->mu_call); + channel_broadcaster broadcaster; - for (i = 0; i < call_count; i++) { - grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, - "Unavailable"); - GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1); - } + gpr_mu_lock(&server->mu_global); + channel_broadcaster_init(server, &broadcaster); + gpr_mu_unlock(&server->mu_global); - gpr_free(calls); + channel_broadcaster_shutdown(&broadcaster, 0, 1); } void grpc_server_destroy(grpc_server *server) { diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7f98a5bd71..bdd4b432eb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -173,6 +173,8 @@ typedef struct { /** have we seen a goaway */ gpr_uint8 seen_goaway; + /** have we sent a goaway */ + gpr_uint8 sent_goaway; /** is this transport a client? */ gpr_uint8 is_client; @@ -557,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway( void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +/* returns 1 if this is the last stream, 0 otherwise */ +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT; +int grpc_chttp2_has_streams(grpc_chttp2_transport *t); void grpc_chttp2_for_all_streams( grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 85691b32d2..4fea058c19 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -354,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t, stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); +} + +int grpc_chttp2_has_streams(grpc_chttp2_transport *t) { + return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); } void grpc_chttp2_for_all_streams( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 70df146239..47d0c548cb 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -382,7 +382,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); - grpc_chttp2_unregister_stream(t, s); + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } if (!t->parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); @@ -680,10 +682,14 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->send_goaway) { + t->global.sent_goaway = 1; grpc_chttp2_goaway_append( t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), gpr_slice_ref(*op->goaway_message), &t->global.qbuf); + if (!grpc_chttp2_has_streams(t)) { + close_transport_locked(t); + } } if (op->set_accept_stream != NULL) { @@ -732,6 +738,9 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(&t->parsing); } + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 579bcc943f..1429737721 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -91,7 +91,9 @@ typedef struct grpc_transport_op { grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ int disconnect; - /** should we send a goaway? */ + /** should we send a goaway? + after a goaway is sent, once there are no more active calls on + the transport, the transport should disconnect */ int send_goaway; /** what should the goaway contain? */ grpc_status_code goaway_status; |