diff options
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 48 |
1 files changed, 41 insertions, 7 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 0a996a1e8b..6599c75dba 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -36,16 +36,17 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/avl.h> #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/client_config/initial_connect_string.h" +#include "src/core/client_config/subchannel_index.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/surface/channel.h" #include "src/core/transport/connectivity_state.h" -#include "src/core/transport/connectivity_state.h" #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) @@ -94,6 +95,8 @@ struct grpc_subchannel { struct sockaddr *addr; size_t addr_len; + grpc_subchannel_key *key; + /** initial string to send to peer */ gpr_slice initial_connect_string; @@ -207,6 +210,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(&c->pollset_set); + grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -222,22 +226,42 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); + return c; } -void grpc_subchannel_weak_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); + return c; +} + +grpc_subchannel *grpc_subchannel_ref_from_weak_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + if (!c) return NULL; + for (;;) { + gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); + if (old_refs >= (1 << INTERNAL_REF_BITS)) { + gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); + if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { + return c; + } + } else { + return NULL; + } + } } static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connected_subchannel *con; + grpc_subchannel_index_unregister(exec_ctx, c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = 1; @@ -276,10 +300,19 @@ static uint32_t random_seed() { return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, +grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, + grpc_connector *connector, grpc_subchannel_args *args) { - grpc_subchannel *c = gpr_malloc(sizeof(*c)); + grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args); + grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); + if (c) { + grpc_subchannel_key_destroy(exec_ctx, key); + return c; + } + + c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); + c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; grpc_connector_ref(c->connector); @@ -305,7 +338,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); gpr_mu_init(&c->mu); - return c; + + return grpc_subchannel_index_register(exec_ctx, key, c); } static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { |