From 5634ef6e4a86d284d973c400f49b6370ad6035eb Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Thu, 9 Feb 2017 14:25:32 -0800 Subject: Make combiners refcounted, to facilitate sharing --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 2 +- src/core/lib/iomgr/combiner.c | 12 +++++++++++- src/core/lib/iomgr/combiner.h | 5 +++-- src/core/lib/iomgr/resource_quota.c | 2 +- test/core/iomgr/combiner_test.c | 8 ++++---- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8a9eaa8b6a..d576f85dd2 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_destroy(exec_ctx, t->combiner); + grpc_combiner_unref(exec_ctx, t->combiner); cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index ba6c7087a9..93d8c0bd70 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -72,6 +72,7 @@ struct grpc_combiner { bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; + gpr_refcount refs; }; static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, @@ -126,6 +127,7 @@ static bool is_covered_by_poller(grpc_combiner *lock) { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + gpr_ref_init(&lock->refs, 1); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; @@ -152,7 +154,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_free(lock); } -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); @@ -161,6 +163,14 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + if (gpr_unref(&lock->refs)) { + start_destroy(exec_ctx, lock); + } +} + +void grpc_combiner_ref(grpc_combiner *lock) { gpr_ref(&lock->refs); } + static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = NULL; diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 81dff85d40..352e87d050 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -48,8 +48,9 @@ // Initialize the lock, with an optional workqueue to shift load to when // necessary grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); -// Destroy the lock -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); +// Ref/unref the lock, for when we're sharing the lock ownership +void grpc_combiner_ref(grpc_combiner *lock); +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); // Fetch a scheduler to schedule closures against grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, bool covered_by_poller); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 2cc979467f..3c701164ad 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -599,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { if (gpr_unref(&resource_quota->refs)) { - grpc_combiner_destroy(exec_ctx, resource_quota->combiner); + grpc_combiner_unref(exec_ctx, resource_quota->combiner); gpr_free(resource_quota->name); gpr_free(resource_quota); } diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c index 4c9275a673..08cd2c1e26 100644 --- a/test/core/iomgr/combiner_test.c +++ b/test/core/iomgr/combiner_test.c @@ -44,7 +44,7 @@ static void test_no_op(void) { gpr_log(GPR_DEBUG, "test_no_op"); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL)); + grpc_combiner_unref(&exec_ctx, grpc_combiner_create(NULL)); grpc_exec_ctx_finish(&exec_ctx); } @@ -65,7 +65,7 @@ static void test_execute_one(void) { GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(done); - grpc_combiner_destroy(&exec_ctx, lock); + grpc_combiner_unref(&exec_ctx, lock); grpc_exec_ctx_finish(&exec_ctx); } @@ -125,7 +125,7 @@ static void test_execute_many(void) { gpr_thd_join(thds[i]); } grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_combiner_destroy(&exec_ctx, lock); + grpc_combiner_unref(&exec_ctx, lock); grpc_exec_ctx_finish(&exec_ctx); } @@ -153,7 +153,7 @@ static void test_execute_finally(void) { GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(got_in_finally); - grpc_combiner_destroy(&exec_ctx, lock); + grpc_combiner_unref(&exec_ctx, lock); grpc_exec_ctx_finish(&exec_ctx); } -- cgit v1.2.3