aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-04-28 09:19:43 -0700
committerGravatar Vijay Pai <vpai@google.com>2015-04-28 09:19:43 -0700
commit20ecfc81d0fa88bfbf9b833d818ac6cb0e75fd37 (patch)
tree04d3aa5f72ea1d26d7c4515a1f1a214f2f31bb62 /src/core/surface
parent809bcffb02604f67c6babe5b24ee0e5e3f05feff (diff)
parent5717a9801e4184a833e04c62b0b20318e09add50 (diff)
Merge pull request #1377 from ctiller/server-cq-own
Have server hold a reference to completion queues
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/completion_queue.c29
-rw-r--r--src/core/surface/completion_queue.h3
-rw-r--r--src/core/surface/server.c5
3 files changed, 29 insertions, 8 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 24f4a05071..e0135d9fb9 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -67,6 +67,8 @@ struct grpc_completion_queue {
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
once all queued events are drained */
gpr_refcount refs;
+ /* Once owning_refs drops to zero, we will destroy the cq */
+ gpr_refcount owning_refs;
/* the set of low level i/o things that concern this cq */
grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
@@ -91,11 +93,29 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
+ gpr_ref_init(&cc->owning_refs, 1);
grpc_pollset_init(&cc->pollset);
cc->allow_polling = 1;
return cc;
}
+void grpc_cq_internal_ref(grpc_completion_queue *cc) {
+ gpr_ref(&cc->owning_refs);
+}
+
+static void on_pollset_destroy_done(void *arg) {
+ grpc_completion_queue *cc = arg;
+ grpc_pollset_destroy(&cc->pollset);
+ gpr_free(cc);
+}
+
+void grpc_cq_internal_unref(grpc_completion_queue *cc) {
+ if (gpr_unref(&cc->owning_refs)) {
+ GPR_ASSERT(cc->queue == NULL);
+ grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ }
+}
+
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
cc->allow_polling = 0;
}
@@ -394,15 +414,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
}
}
-static void on_pollset_destroy_done(void *arg) {
- grpc_completion_queue *cc = arg;
- grpc_pollset_destroy(&cc->pollset);
- gpr_free(cc);
-}
-
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
- GPR_ASSERT(cc->queue == NULL);
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_cq_internal_unref(cc);
}
void grpc_event_finish(grpc_event *base) {
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 3a7cc99dda..41024cda14 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -43,6 +43,9 @@
grpc_event_finish */
typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error);
+void grpc_cq_internal_ref(grpc_completion_queue *cc);
+void grpc_cq_internal_unref(grpc_completion_queue *cc);
+
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 3b129039bb..a2e94d5598 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -262,6 +262,7 @@ static void server_ref(grpc_server *server) {
static void server_unref(grpc_server *server) {
registered_method *rm;
+ size_t i;
if (gpr_unref(&server->internal_refcount)) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
@@ -275,6 +276,9 @@ static void server_unref(grpc_server *server) {
requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_internal_unref(server->cqs[i]);
+ }
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -601,6 +605,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
+ grpc_cq_internal_ref(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));