diff options
author | Vijay Pai <vpai@google.com> | 2015-04-28 09:19:43 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-04-28 09:19:43 -0700 |
commit | 20ecfc81d0fa88bfbf9b833d818ac6cb0e75fd37 (patch) | |
tree | 04d3aa5f72ea1d26d7c4515a1f1a214f2f31bb62 /src/core/surface | |
parent | 809bcffb02604f67c6babe5b24ee0e5e3f05feff (diff) | |
parent | 5717a9801e4184a833e04c62b0b20318e09add50 (diff) |
Merge pull request #1377 from ctiller/server-cq-own
Have server hold a reference to completion queues
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/completion_queue.c | 29 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/surface/server.c | 5 |
3 files changed, 29 insertions, 8 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 24f4a05071..e0135d9fb9 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -67,6 +67,8 @@ struct grpc_completion_queue { /* When refs drops to zero, we are in shutdown mode, and will be destroyable once all queued events are drained */ gpr_refcount refs; + /* Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; /* the set of low level i/o things that concern this cq */ grpc_pollset pollset; /* 0 initially, 1 once we've begun shutting down */ @@ -91,11 +93,29 @@ grpc_completion_queue *grpc_completion_queue_create(void) { memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->refs, 1); + gpr_ref_init(&cc->owning_refs, 1); grpc_pollset_init(&cc->pollset); cc->allow_polling = 1; return cc; } +void grpc_cq_internal_ref(grpc_completion_queue *cc) { + gpr_ref(&cc->owning_refs); +} + +static void on_pollset_destroy_done(void *arg) { + grpc_completion_queue *cc = arg; + grpc_pollset_destroy(&cc->pollset); + gpr_free(cc); +} + +void grpc_cq_internal_unref(grpc_completion_queue *cc) { + if (gpr_unref(&cc->owning_refs)) { + GPR_ASSERT(cc->queue == NULL); + grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + } +} + void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { cc->allow_polling = 0; } @@ -394,15 +414,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { } } -static void on_pollset_destroy_done(void *arg) { - grpc_completion_queue *cc = arg; - grpc_pollset_destroy(&cc->pollset); - gpr_free(cc); -} - void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GPR_ASSERT(cc->queue == NULL); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_cq_internal_unref(cc); } void grpc_event_finish(grpc_event *base) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 3a7cc99dda..41024cda14 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -43,6 +43,9 @@ grpc_event_finish */ typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error); +void grpc_cq_internal_ref(grpc_completion_queue *cc); +void grpc_cq_internal_unref(grpc_completion_queue *cc); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 3b129039bb..a2e94d5598 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -262,6 +262,7 @@ static void server_ref(grpc_server *server) { static void server_unref(grpc_server *server) { registered_method *rm; + size_t i; if (gpr_unref(&server->internal_refcount)) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); @@ -275,6 +276,9 @@ static void server_unref(grpc_server *server) { requested_call_array_destroy(&rm->requested); gpr_free(rm); } + for (i = 0; i < server->cq_count; i++) { + grpc_cq_internal_unref(server->cqs[i]); + } gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -601,6 +605,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } + grpc_cq_internal_ref(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); |