aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-08-05 11:40:47 -0700
committerGravatar Vijay Pai <vpai@google.com>2015-08-05 11:40:47 -0700
commit9f007f059764133a49fdf7210ce9881bc38e9c05 (patch)
tree5d84a84cad47e23ed722208bd0b292655469d58f
parent507bdb4c037f859a372a17f4d68e954b5e68c439 (diff)
parenta0cfd1cb611453608bddc0fc0561ab2324fbca86 (diff)
Merge pull request #2732 from ctiller/primary-goat-whisperer
Fix bugs leading to servers not shutting down
-rw-r--r--src/core/surface/server.c73
-rw-r--r--src/core/transport/chttp2_transport.c6
2 files changed, 29 insertions, 50 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index c370a9b8ab..29d893db71 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -327,6 +327,14 @@ static void request_matcher_zombify_all_pending_calls(
}
}
+static void request_matcher_kill_requests(grpc_server *server,
+ request_matcher *rm) {
+ int request_id;
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
+ fail_call(server, &server->requested_calls[request_id]);
+ }
+}
+
/*
* server proper
*/
@@ -492,12 +500,25 @@ static int num_channels(grpc_server *server) {
return n;
}
+static void kill_pending_work_locked(grpc_server *server) {
+ registered_method *rm;
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher);
+ request_matcher_zombify_all_pending_calls(
+ &server->unregistered_request_matcher);
+ for (rm = server->registered_methods; rm; rm = rm->next) {
+ request_matcher_kill_requests(server, &rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher);
+ }
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
+ kill_pending_work_locked(server);
+
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
@@ -947,52 +968,15 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
op.set_accept_stream_user_data = chand;
op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state;
+ op.disconnect = gpr_atm_acq_load(&s->shutdown_flag);
grpc_transport_perform_op(transport, &op);
}
-typedef struct {
- requested_call **requests;
- size_t count;
- size_t capacity;
-} request_killer;
-
-static void request_killer_init(request_killer *rk) {
- memset(rk, 0, sizeof(*rk));
-}
-
-static void request_killer_add(request_killer *rk, requested_call *rc) {
- if (rk->capacity == rk->count) {
- rk->capacity = GPR_MAX(8, rk->capacity * 2);
- rk->requests =
- gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
- }
- rk->requests[rk->count++] = rc;
-}
-
-static void request_killer_add_request_matcher(request_killer *rk,
- grpc_server *server,
- request_matcher *rm) {
- int request_id;
- while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- request_killer_add(rk, &server->requested_calls[request_id]);
- }
-}
-
-static void request_killer_run(request_killer *rk, grpc_server *server) {
- size_t i;
- for (i = 0; i < rk->count; i++) {
- fail_call(server, rk->requests[i]);
- }
- gpr_free(rk->requests);
-}
-
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
- request_killer reqkill;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
@@ -1013,27 +997,16 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
channel_broadcaster_init(server, &broadcaster);
- request_killer_init(&reqkill);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- request_killer_add_request_matcher(&reqkill, server,
- &server->unregistered_request_matcher);
- request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher);
- for (rm = server->registered_methods; rm; rm = rm->next) {
- request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher);
- }
+ kill_pending_work_locked(server);
gpr_mu_unlock(&server->mu_call);
gpr_atm_rel_store(&server->shutdown_flag, 1);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
- /* terminate all the requested calls */
- request_killer_run(&reqkill, server);
-
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1ea4a82c16..f05ec99256 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -823,6 +823,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
stream_global);
} else {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
+ if (stream_global->outgoing_sopb != NULL) {
+ grpc_sopb_reset(stream_global->outgoing_sopb);
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
char buffer[GPR_LTOA_MIN_BUFSIZE];