From 5634ef6e4a86d284d973c400f49b6370ad6035eb Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Thu, 9 Feb 2017 14:25:32 -0800
Subject: Make combiners refcounted, to facilitate sharing

---
 src/core/ext/transport/chttp2/transport/chttp2_transport.c |  2 +-
 src/core/lib/iomgr/combiner.c                              | 12 +++++++++++-
 src/core/lib/iomgr/combiner.h                              |  5 +++--
 src/core/lib/iomgr/resource_quota.c                        |  2 +-
 test/core/iomgr/combiner_test.c                            |  8 ++++----
 5 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8a9eaa8b6a..d576f85dd2 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
   grpc_chttp2_stream_map_destroy(&t->stream_map);
   grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
 
-  grpc_combiner_destroy(exec_ctx, t->combiner);
+  grpc_combiner_unref(exec_ctx, t->combiner);
 
   cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
 
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index ba6c7087a9..93d8c0bd70 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -72,6 +72,7 @@ struct grpc_combiner {
   bool final_list_covered_by_poller;
   grpc_closure_list final_list;
   grpc_closure offload;
+  gpr_refcount refs;
 };
 
 static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
@@ -126,6 +127,7 @@ static bool is_covered_by_poller(grpc_combiner *lock) {
 
 grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
   grpc_combiner *lock = gpr_malloc(sizeof(*lock));
+  gpr_ref_init(&lock->refs, 1);
   lock->next_combiner_on_this_exec_ctx = NULL;
   lock->time_to_execute_final_list = false;
   lock->optional_workqueue = optional_workqueue;
@@ -152,7 +154,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
   gpr_free(lock);
 }
 
-void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
   gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
   GRPC_COMBINER_TRACE(gpr_log(
       GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
@@ -161,6 +163,14 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
   }
 }
 
+void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+  if (gpr_unref(&lock->refs)) {
+    start_destroy(exec_ctx, lock);
+  }
+}
+
+void grpc_combiner_ref(grpc_combiner *lock) { gpr_ref(&lock->refs); }
+
 static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx,
                                   grpc_combiner *lock) {
   lock->next_combiner_on_this_exec_ctx = NULL;
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 81dff85d40..352e87d050 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -48,8 +48,9 @@
 // Initialize the lock, with an optional workqueue to shift load to when
 // necessary
 grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
-// Destroy the lock
-void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
+// Ref/unref the lock, for when we're sharing the lock ownership
+void grpc_combiner_ref(grpc_combiner *lock);
+void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
 // Fetch a scheduler to schedule closures against
 grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
                                                 bool covered_by_poller);
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 2cc979467f..3c701164ad 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -599,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
 void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
                                         grpc_resource_quota *resource_quota) {
   if (gpr_unref(&resource_quota->refs)) {
-    grpc_combiner_destroy(exec_ctx, resource_quota->combiner);
+    grpc_combiner_unref(exec_ctx, resource_quota->combiner);
     gpr_free(resource_quota->name);
     gpr_free(resource_quota);
   }
diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c
index 4c9275a673..08cd2c1e26 100644
--- a/test/core/iomgr/combiner_test.c
+++ b/test/core/iomgr/combiner_test.c
@@ -44,7 +44,7 @@
 static void test_no_op(void) {
   gpr_log(GPR_DEBUG, "test_no_op");
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-  grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL));
+  grpc_combiner_unref(&exec_ctx, grpc_combiner_create(NULL));
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
@@ -65,7 +65,7 @@ static void test_execute_one(void) {
                      GRPC_ERROR_NONE);
   grpc_exec_ctx_flush(&exec_ctx);
   GPR_ASSERT(done);
-  grpc_combiner_destroy(&exec_ctx, lock);
+  grpc_combiner_unref(&exec_ctx, lock);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
@@ -125,7 +125,7 @@ static void test_execute_many(void) {
     gpr_thd_join(thds[i]);
   }
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-  grpc_combiner_destroy(&exec_ctx, lock);
+  grpc_combiner_unref(&exec_ctx, lock);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
@@ -153,7 +153,7 @@ static void test_execute_finally(void) {
                      GRPC_ERROR_NONE);
   grpc_exec_ctx_flush(&exec_ctx);
   GPR_ASSERT(got_in_finally);
-  grpc_combiner_destroy(&exec_ctx, lock);
+  grpc_combiner_unref(&exec_ctx, lock);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
-- 
cgit v1.2.3