aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-05-26 11:49:10 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-05-26 11:49:10 -0700
commit916a5003a8046cf36a0140fc8cbaac04ffda5f71 (patch)
treeaa9e14fced4366ff32ef585913e7aa0dfc7d97d4
parent5d2766bbd373bfc7fbbe92cf49c4b8d3deef6b02 (diff)
Add silent events to completion queue - to allow server shutdown to complete
-rw-r--r--src/core/surface/completion_queue.c20
-rw-r--r--src/core/surface/completion_queue.h6
-rw-r--r--src/core/surface/server.c14
3 files changed, 36 insertions, 4 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b48fbace31..bc7c15178c 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -167,6 +167,26 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
}
}
+void grpc_cq_begin_silent_op(grpc_completion_queue *cc) {
+ gpr_ref(&cc->refs);
+}
+
+void grpc_cq_end_silent_op(grpc_completion_queue *cc) {
+ int shutdown = 0;
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ if (gpr_unref(&cc->refs)) {
+ GPR_ASSERT(!cc->shutdown);
+ GPR_ASSERT(cc->shutdown_called);
+ cc->shutdown = 1;
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ shutdown = 1;
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ if (shutdown) {
+ grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ }
+}
+
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
static event *create_shutdown_event(void) {
event *ev = gpr_malloc(sizeof(event));
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 7b6fad98fd..896b7f708b 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -50,6 +50,12 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success);
+/* Begin a 'silent operation' - one that blocks completion queue shutdown
+ until it is ended */
+void grpc_cq_begin_silent_op(grpc_completion_queue *cc);
+/* End such an operation */
+void grpc_cq_end_silent_op(grpc_completion_queue *cc);
+
/* disable polling for some tests */
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 90d5acc569..2802671bdc 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -533,8 +533,9 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_list_remove(elem->call_data, i);
}
if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
- for (i = 0; i < chand->server->num_shutdown_tags; i++) {
- for (j = 0; j < chand->server->cq_count; j++) {
+ for (j = 0; j < chand->server->cq_count; j++) {
+ grpc_cq_end_silent_op(chand->server->cqs[j]);
+ for (i = 0; i < chand->server->num_shutdown_tags; i++) {
grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i],
NULL, 1);
}
@@ -821,6 +822,10 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
return;
}
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_begin_silent_op(server->cqs[i]);
+ }
+
nchannels = 0;
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
@@ -857,8 +862,9 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
server->shutdown = 1;
if (server->lists[ALL_CALLS] == NULL) {
- for (i = 0; i < server->num_shutdown_tags; i++) {
- for (j = 0; j < server->cq_count; j++) {
+ for (j = 0; j < server->cq_count; j++) {
+ grpc_cq_end_silent_op(server->cqs[j]);
+ for (i = 0; i < server->num_shutdown_tags; i++) {
grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1);
}
}