aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-30 14:49:02 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-30 14:49:02 -0700
commit1191e218f705055f76d7344ad3acbef241d2f378 (patch)
tree3b3daefaccdc46f4758f496516fb5138c8aeaa88 /src/core
parent0613e5835b30367d7e60c934044a9a074df109d8 (diff)
More aggressively kill pending work
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/server.c78
1 files changed, 28 insertions, 50 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 7031e63916..b46558df1b 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -327,6 +327,14 @@ static void request_matcher_zombify_all_pending_calls(
}
}
+static void request_matcher_kill_requests(grpc_server *server,
+ request_matcher *rm) {
+ int request_id;
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
+ fail_call(server, &server->requested_calls[request_id]);
+ }
+}
+
/*
* server proper
*/
@@ -492,12 +500,25 @@ static int num_channels(grpc_server *server) {
return n;
}
+static void kill_pending_work_locked(grpc_server *server) {
+ registered_method *rm;
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher);
+ request_matcher_zombify_all_pending_calls(
+ &server->unregistered_request_matcher);
+ for (rm = server->registered_methods; rm; rm = rm->next) {
+ request_matcher_kill_requests(server, &rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher);
+ }
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
+ kill_pending_work_locked(server);
+
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
@@ -948,49 +969,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, &op);
}
-typedef struct {
- requested_call **requests;
- size_t count;
- size_t capacity;
-} request_killer;
-
-static void request_killer_init(request_killer *rk) {
- memset(rk, 0, sizeof(*rk));
-}
-
-static void request_killer_add(request_killer *rk, requested_call *rc) {
- if (rk->capacity == rk->count) {
- rk->capacity = GPR_MAX(8, rk->capacity * 2);
- rk->requests =
- gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
- }
- rk->requests[rk->count++] = rc;
-}
-
-static void request_killer_add_request_matcher(request_killer *rk,
- grpc_server *server,
- request_matcher *rm) {
- int request_id;
- while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- request_killer_add(rk, &server->requested_calls[request_id]);
- }
-}
-
-static void request_killer_run(request_killer *rk, grpc_server *server) {
- size_t i;
- for (i = 0; i < rk->count; i++) {
- fail_call(server, rk->requests[i]);
- }
- gpr_free(rk->requests);
-}
-
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
- request_killer reqkill;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
@@ -1011,27 +994,16 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
channel_broadcaster_init(server, &broadcaster);
- request_killer_init(&reqkill);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- request_killer_add_request_matcher(&reqkill, server,
- &server->unregistered_request_matcher);
- request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher);
- for (rm = server->registered_methods; rm; rm = rm->next) {
- request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher);
- }
+ kill_pending_work_locked(server);
gpr_mu_unlock(&server->mu_call);
gpr_atm_rel_store(&server->shutdown_flag, 1);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
- /* terminate all the requested calls */
- request_killer_run(&reqkill, server);
-
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
@@ -1272,6 +1244,12 @@ static void done_request_event(void *req, grpc_cq_completion *c) {
gpr_free(req);
}
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_mu_lock(&server->mu_global);
+ maybe_finish_shutdown(server);
+ gpr_mu_unlock(&server->mu_global);
+ }
+
server_unref(server);
}