aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-07-08 08:47:41 -0700
committerGravatar Vijay Pai <vpai@google.com>2015-07-08 08:47:41 -0700
commitc6964b1d980df3c8809809ef05c954626cdda95f (patch)
treeeead5cf3f63db6968e00b7e6921ea898b09b23f9
parent8311a0b548e0abd6f8fe94a59854ca34525ec9c8 (diff)
parent550342f70d8de704eb226bc8fff96422efe8d349 (diff)
Merge pull request #2304 from ctiller/just-say-goodbye-when-we-are-done
Change transport contract to automatically disconnect after sending a goaway
-rw-r--r--src/core/surface/server.c120
-rw-r--r--src/core/transport/chttp2/internal.h8
-rw-r--r--src/core/transport/chttp2/stream_lists.c9
-rw-r--r--src/core/transport/chttp2_transport.c11
-rw-r--r--src/core/transport/transport.h4
5 files changed, 35 insertions, 117 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index f29d47c17c..341ca2942c 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -51,7 +51,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
+typedef enum { PENDING_START, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
@@ -114,7 +114,6 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
- size_t num_calls;
grpc_connectivity_state connectivity_state;
grpc_channel *channel;
grpc_mdstr *path_key;
@@ -204,9 +203,7 @@ struct call_data {
typedef struct {
grpc_channel **channels;
- grpc_channel **disconnects;
size_t num_channels;
- size_t num_disconnects;
} channel_broadcaster;
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -225,26 +222,15 @@ static void maybe_finish_shutdown(grpc_server *server);
static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
channel_data *c;
size_t count = 0;
- size_t dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
count++;
- if (c->num_calls == 0) {
- dc_count++;
- }
}
cb->num_channels = count;
- cb->num_disconnects = dc_count;
cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
- cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects);
count = 0;
- dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
cb->channels[count++] = c->channel;
GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
- if (c->num_calls == 0) {
- cb->disconnects[dc_count++] = c->channel;
- GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect");
- }
}
}
@@ -280,19 +266,14 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway, int send_disconnect) {
+ int send_goaway, int force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
- send_shutdown(cb->channels[i], 1, 0);
+ send_shutdown(cb->channels[i], send_goaway, force_disconnect);
GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
}
- for (i = 0; i < cb->num_disconnects; i++) {
- send_shutdown(cb->disconnects[i], 0, 1);
- GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect");
- }
gpr_free(cb->channels);
- gpr_free(cb->disconnects);
}
/* call list */
@@ -501,15 +482,6 @@ static void maybe_finish_shutdown(grpc_server *server) {
return;
}
- gpr_mu_lock(&server->mu_call);
- if (server->lists[ALL_CALLS] != NULL) {
- gpr_log(GPR_DEBUG,
- "Waiting for all calls to finish before destroying server");
- gpr_mu_unlock(&server->mu_call);
- return;
- }
- gpr_mu_unlock(&server->mu_call);
-
if (server->root_channel_data.next != &server->root_channel_data) {
gpr_log(GPR_DEBUG,
"Waiting for all channels to close before destroying server");
@@ -541,22 +513,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static int decrement_call_count(channel_data *chand) {
- int disconnect = 0;
- chand->num_calls--;
- if (0 == chand->num_calls && chand->server->shutdown) {
- disconnect = 1;
- }
- maybe_finish_shutdown(chand->server);
- return disconnect;
-}
-
static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- int remove_res;
- int disconnect = 0;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -601,20 +561,7 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
- remove_res = call_list_remove(calld, ALL_CALLS);
gpr_mu_unlock(&chand->server->mu_call);
- gpr_mu_lock(&chand->server->mu_global);
- if (remove_res) {
- disconnect = decrement_call_count(chand);
- if (disconnect) {
- GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect");
- }
- }
- gpr_mu_unlock(&chand->server->mu_global);
- if (disconnect) {
- send_shutdown(chand->channel, 0, 1);
- GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect");
- }
break;
}
@@ -679,14 +626,6 @@ static void init_call_elem(grpc_call_element *elem,
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
- gpr_mu_lock(&chand->server->mu_call);
- call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
- gpr_mu_unlock(&chand->server->mu_call);
-
- gpr_mu_lock(&chand->server->mu_global);
- chand->num_calls++;
- gpr_mu_unlock(&chand->server->mu_global);
-
server_ref(chand->server);
if (initial_op) server_mutate_op(elem, initial_op);
@@ -695,19 +634,13 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- int removed[CALL_LIST_COUNT];
size_t i;
gpr_mu_lock(&chand->server->mu_call);
for (i = 0; i < CALL_LIST_COUNT; i++) {
- removed[i] = call_list_remove(elem->call_data, i);
+ call_list_remove(elem->call_data, i);
}
gpr_mu_unlock(&chand->server->mu_call);
- if (removed[ALL_CALLS]) {
- gpr_mu_lock(&chand->server->mu_global);
- decrement_call_count(chand);
- gpr_mu_unlock(&chand->server->mu_global);
- }
if (calld->host) {
grpc_mdstr_unref(calld->host);
@@ -727,7 +660,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
chand->server = NULL;
- chand->num_calls = 0;
chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
@@ -1049,47 +981,13 @@ void grpc_server_listener_destroy_done(void *s) {
}
void grpc_server_cancel_all_calls(grpc_server *server) {
- call_data *calld;
- grpc_call **calls;
- size_t call_count;
- size_t call_capacity;
- int is_first = 1;
- size_t i;
-
- gpr_mu_lock(&server->mu_call);
-
- GPR_ASSERT(server->shutdown);
-
- if (!server->lists[ALL_CALLS]) {
- gpr_mu_unlock(&server->mu_call);
- return;
- }
-
- call_capacity = 8;
- call_count = 0;
- calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
-
- for (calld = server->lists[ALL_CALLS];
- calld != server->lists[ALL_CALLS] || is_first;
- calld = calld->links[ALL_CALLS].next) {
- if (call_count == call_capacity) {
- call_capacity *= 2;
- calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
- }
- calls[call_count++] = calld->call;
- GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
- is_first = 0;
- }
-
- gpr_mu_unlock(&server->mu_call);
+ channel_broadcaster broadcaster;
- for (i = 0; i < call_count; i++) {
- grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
- "Unavailable");
- GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
- }
+ gpr_mu_lock(&server->mu_global);
+ channel_broadcaster_init(server, &broadcaster);
+ gpr_mu_unlock(&server->mu_global);
- gpr_free(calls);
+ channel_broadcaster_shutdown(&broadcaster, 0, 1);
}
void grpc_server_destroy(grpc_server *server) {
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 7f98a5bd71..bdd4b432eb 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -173,6 +173,8 @@ typedef struct {
/** have we seen a goaway */
gpr_uint8 seen_goaway;
+ /** have we sent a goaway */
+ gpr_uint8 sent_goaway;
/** is this transport a client? */
gpr_uint8 is_client;
@@ -557,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway(
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
+/* returns 1 if this is the last stream, 0 otherwise */
+int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
+int grpc_chttp2_has_streams(grpc_chttp2_transport *t);
void grpc_chttp2_for_all_streams(
grpc_chttp2_transport_global *transport_global, void *user_data,
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 85691b32d2..4fea058c19 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -354,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
-void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
+int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
+ stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
+ return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
+}
+
+int grpc_chttp2_has_streams(grpc_chttp2_transport *t) {
+ return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
void grpc_chttp2_for_all_streams(
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 70df146239..47d0c548cb 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -382,7 +382,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
- grpc_chttp2_unregister_stream(t, s);
+ if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
+ close_transport_locked(t);
+ }
if (!t->parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
s->global.id) == NULL);
@@ -680,10 +682,14 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
}
if (op->send_goaway) {
+ t->global.sent_goaway = 1;
grpc_chttp2_goaway_append(
t->global.last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
+ if (!grpc_chttp2_has_streams(t)) {
+ close_transport_locked(t);
+ }
}
if (op->set_accept_stream != NULL) {
@@ -732,6 +738,9 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
t->parsing.incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(&t->parsing);
}
+ if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
+ close_transport_locked(t);
+ }
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 579bcc943f..1429737721 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -91,7 +91,9 @@ typedef struct grpc_transport_op {
grpc_connectivity_state *connectivity_state;
/** should the transport be disconnected */
int disconnect;
- /** should we send a goaway? */
+ /** should we send a goaway?
+ after a goaway is sent, once there are no more active calls on
+ the transport, the transport should disconnect */
int send_goaway;
/** what should the goaway contain? */
grpc_status_code goaway_status;