diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2016-01-22 20:01:55 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2016-01-22 20:01:55 -0800 |
commit | 8cdba6644a83333e53b6188ca95b5a6c88968257 (patch) | |
tree | 6eeec2864e8c5b2fb5170afe708216ab67a68440 | |
parent | d8e6f8d4f58cb9e9c492204dae1fbb2fac0b7349 (diff) |
Subchannel index compiles
-rw-r--r-- | src/core/client_config/subchannel.c | 15 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 2 | ||||
-rw-r--r-- | src/core/client_config/subchannel_index.c | 72 | ||||
-rw-r--r-- | src/core/client_config/subchannel_index.h | 9 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 2 | ||||
-rw-r--r-- | src/core/surface/init.c | 5 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_uchannel.c | 2 |
7 files changed, 84 insertions, 23 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index bf2f15a444..c704595ec7 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -239,6 +239,21 @@ void grpc_subchannel_weak_ref(grpc_subchannel *c GPR_ASSERT(old_refs != 0); } +grpc_subchannel *grpc_subchannel_ref_from_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + if (!c) return NULL; + for (;;) { + gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); + if (old_refs >= (1 << INTERNAL_REF_BITS)) { + gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); + if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { + return c; + } + } else { + return NULL; + } + } +} + static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connected_subchannel *con; grpc_subchannel_index_unregister(exec_ctx, c->key, c); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 8fd976276b..0d470f593c 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -84,6 +84,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; void grpc_subchannel_ref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_subchannel *grpc_subchannel_ref_from_weak_ref(grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); diff --git a/src/core/client_config/subchannel_index.c b/src/core/client_config/subchannel_index.c index 9f6ecca295..575ccb4aad 100644 --- a/src/core/client_config/subchannel_index.c +++ b/src/core/client_config/subchannel_index.c @@ -33,6 +33,7 @@ #include "src/core/client_config/subchannel_index.h" +#include <stdbool.h> #include <string.h> #include <grpc/support/alloc.h> @@ -104,7 +105,7 @@ static int subchannel_key_compare(grpc_subchannel_key *a, grpc_subchannel_key *b return grpc_channel_args_compare(a->args.args, b->args.args); } -static void subchannel_key_destroy(grpc_subchannel_key *k) { +void grpc_subchannel_key_destroy(grpc_subchannel_key *k) { gpr_free(k->args.addr); gpr_free(k->args.filters); grpc_channel_args_destroy((grpc_channel_args*)k->args.args); @@ -112,7 +113,7 @@ static void subchannel_key_destroy(grpc_subchannel_key *k) { } static void sck_avl_destroy(void *p) { - subchannel_key_destroy(p); + grpc_subchannel_key_destroy(p); } static void *sck_avl_copy(void *p) { @@ -141,33 +142,38 @@ static const gpr_avl_vtable subchannel_avl_vtable = { .copy_value = scv_avl_copy }; +void grpc_subchannel_index_init(void) { + g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); + gpr_mu_init(&g_mu); +} + +void grpc_subchannel_index_shutdown(void) { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_subchannel_index); +} + grpc_subchannel *grpc_subchannel_index_find( grpc_exec_ctx *exec_ctx, - grpc_connector *connector, - grpc_subchannel_args *args) { - enter_ctx(ctx); + grpc_subchannel_key *key) { + enter_ctx(exec_ctx); gpr_mu_lock(&g_mu); gpr_avl index = gpr_avl_ref(g_subchannel_index); gpr_mu_unlock(&g_mu); - subchannel_key *key = subchannel_key_create(connector, args); - grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key)); - subchannel_key_destroy(key); + grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find"); gpr_avl_unref(index); - leave_ctx(ctx); + leave_ctx(exec_ctx); return c; } grpc_subchannel *grpc_subchannel_index_register( grpc_exec_ctx *exec_ctx, - grpc_connector *connector, - grpc_subchannel_args *args, + grpc_subchannel_key *key, grpc_subchannel *constructed) { - enter_ctx(ctx); + enter_ctx(exec_ctx); - subchannel_key *key = subchannel_key_create(connector, args); grpc_subchannel *c = NULL; while (c == NULL) { @@ -177,13 +183,13 @@ grpc_subchannel *grpc_subchannel_index_register( c = gpr_avl_get(index, key); if (c != NULL) { - GRPC_SUBCHANNEL_WEAK_UNREF(constructed); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register"); } else { gpr_avl updated = gpr_avl_add(index, key, constructed); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { - GPR_SWAP(index, g_subchannel_index); + GPR_SWAP(gpr_avl, updated, g_subchannel_index); c = constructed; } gpr_mu_unlock(&g_mu); @@ -191,7 +197,41 @@ grpc_subchannel *grpc_subchannel_index_register( gpr_avl_unref(index); } - leave_ctx(ctx); + leave_ctx(exec_ctx); return c; } + +void grpc_subchannel_index_unregister( + grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed) { + enter_ctx(exec_ctx); + + bool done = false; + while (!done) { + gpr_mu_lock(&g_mu); + gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_mu_unlock(&g_mu); + + grpc_subchannel *c = gpr_avl_get(index, key); + if (c != constructed) { + break; + } + + gpr_avl updated = gpr_avl_remove(index, key); + + gpr_mu_lock(&g_mu); + if (index.root == g_subchannel_index.root) { + GPR_SWAP(gpr_avl, updated, g_subchannel_index); + done = true; + } else { + GPR_SWAP(gpr_avl, updated, index); + } + gpr_mu_unlock(&g_mu); + + gpr_avl_unref(index); + } + + leave_ctx(exec_ctx); +} diff --git a/src/core/client_config/subchannel_index.h b/src/core/client_config/subchannel_index.h index dfbc3228d9..69c19969a9 100644 --- a/src/core/client_config/subchannel_index.h +++ b/src/core/client_config/subchannel_index.h @@ -45,17 +45,20 @@ grpc_subchannel_key *grpc_subchannel_key_create( void grpc_subchannel_key_destroy(grpc_subchannel_key *key); grpc_subchannel *grpc_subchannel_index_find( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key); grpc_subchannel *grpc_subchannel_index_register( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); void grpc_subchannel_index_unregister( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); +void grpc_subchannel_index_init(void); +void grpc_subchannel_index_shutdown(void); + #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 49083f0870..031ae1b543 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -172,7 +172,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - s = grpc_subchannel_create(&c->base, args); + s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); return s; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 8f1936227e..66c5a522a4 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" #include "src/core/client_config/subchannel.h" +#include "src/core/client_config/subchannel_index.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr.h" @@ -126,7 +127,7 @@ void grpc_init(void) { } gpr_timers_global_init(); grpc_cq_global_init(); - grpc_subchannel_global_init(); + grpc_subchannel_index_init(); for (i = 0; i < g_number_of_plugins; i++) { if (g_all_of_the_plugins[i].init != NULL) { g_all_of_the_plugins[i].init(); @@ -145,7 +146,7 @@ void grpc_shutdown(void) { grpc_executor_shutdown(); grpc_cq_global_shutdown(); grpc_iomgr_shutdown(); - grpc_subchannel_global_shutdown(); + grpc_subchannel_index_shutdown(); census_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 9b622e80d6..ee6aac796d 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -159,7 +159,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - s = grpc_subchannel_create(&c->base, args); + s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); if (*f->sniffed_subchannel) { |