From 12ad5d6cd69d657c082704637d9cb6886a05dd4b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 27 Nov 2015 12:17:30 -0800 Subject: Strong/weak refcounting for subchannel --- src/core/client_config/subchannel.c | 94 ++++++++++++++++++++++++------------- src/core/client_config/subchannel.h | 11 +++++ 2 files changed, 73 insertions(+), 32 deletions(-) (limited to 'src/core/client_config') diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 3872cacfa0..1c66a73146 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -47,6 +47,9 @@ #include "src/core/transport/connectivity_state.h" #include "src/core/transport/connectivity_state.h" +#define INTERNAL_REF_BITS 16 +#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) + #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 @@ -69,8 +72,12 @@ typedef struct { struct grpc_subchannel { grpc_connector *connector; - /** refcount */ - gpr_refcount refs; + /** refcount + - lower INTERNAL_REF_BITS bits are for internal references: + these do not keep the subchannel open. + - upper remaining bits are for public references: these do + keep the subchannel open */ + gpr_atm ref_pair; /** non-transport related channel filters */ const grpc_channel_filter **filters; @@ -133,16 +140,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, int iomgr_success); #ifdef GRPC_STREAM_REFCOUNT_DEBUG -#define SUBCHANNEL_REF_LOCKED(p, r) \ - subchannel_ref_locked((p), __FILE__, __LINE__, (r)) -#define SUBCHANNEL_UNREF_LOCKED(p, r) \ - subchannel_unref_locked((p), __FILE__, __LINE__, (r)) -#define CONNECTION_REF_LOCKED(p, r) \ - connection_ref_locked((p), __FILE__, __LINE__, (r)) -#define CONNECTION_UNREF_LOCKED(cl, p, r) \ - connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) -#define REF_PASS_ARGS , file, line, reason -#define REF_PASS_REASON , reason #define REF_REASON reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ @@ -150,13 +147,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define UNREF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) +#define REF_MUTATE_EXTRA_ARGS GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose +#define REF_MUTATE_PURPOSE(x) , file, line, reason, x #else -#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p)) -#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p)) -#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p)) -#define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) -#define REF_PASS_ARGS -#define REF_PASS_REASON #define REF_REASON "" #define REF_LOG(name, p) \ do { \ @@ -164,6 +157,8 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define UNREF_LOG(name, p) \ do { \ } while (0) +#define REF_MUTATE_EXTRA_ARGS +#define REF_MUTATE_PURPOSE(x) #endif /* @@ -196,10 +191,6 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { grpc_subchannel *c = arg; - grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); - } gpr_free((void *)c->filters); grpc_channel_args_destroy(c->args); gpr_free(c->addr); @@ -210,15 +201,54 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } +gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); +#endif + return old_val; +} + void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("SUBCHANNEL", c); - gpr_ref(&c->refs); + gpr_atm old_refs; + old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); + GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); +} + +void grpc_subchannel_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); + GPR_ASSERT(old_refs != 0); +} + +static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { + grpc_connected_subchannel *con; + gpr_mu_lock(&c->mu); + GPR_ASSERT(!c->disconnected); + c->disconnected = 1; + con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); + if (con != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); + gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef); + } + gpr_mu_unlock(&c->mu); } void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - UNREF_LOG("SUBCHANNEL", c); - if (gpr_unref(&c->refs)) { + gpr_atm old_refs; + old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); + if ((old_refs & STRONG_REF_MASK) == 0) { + disconnect(exec_ctx, c); + } + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref"); +} + +void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); + if (old_refs == 0) { grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1); } @@ -232,7 +262,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); - gpr_ref_init(&c->refs, 1); + gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; grpc_connector_ref(c->connector); c->num_filters = args->filter_count; @@ -297,7 +327,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, i external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); - GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); follow_up->cb(exec_ctx, follow_up->cb_arg, success); } @@ -314,7 +344,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); - GRPC_SUBCHANNEL_REF(c, "external_state_watcher"); + GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change( exec_ctx, &c->state_tracker, state, &w->closure)) { @@ -401,13 +431,13 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, grpc_connected_subchannel_notify_on_state_change( exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure); - GRPC_SUBCHANNEL_REF(c, "state_watcher"); + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); sw = NULL; } } gpr_mu_unlock(mu); - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "state_watcher"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher"); gpr_free(sw); } @@ -488,7 +518,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ - GRPC_SUBCHANNEL_REF(c, "state_watcher"); + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( exec_ctx, con, &sw_subchannel->connectivity_state, diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 20d74e9f10..b64a26561b 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -50,6 +50,10 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \ + grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \ + grpc_subchannel_weak_unref((cl), (p), __FILE__, __LINE__, (r)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ @@ -63,6 +67,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #else #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) +#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) grpc_subchannel_weak_unref((cl), (p)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) #define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ grpc_connected_subchannel_unref((cl), (p)) @@ -77,6 +83,11 @@ void grpc_subchannel_ref(grpc_subchannel *channel void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_weak_ref(grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, -- cgit v1.2.3