aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2016-01-22 20:01:55 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2016-01-22 20:01:55 -0800
commit8cdba6644a83333e53b6188ca95b5a6c88968257 (patch)
tree6eeec2864e8c5b2fb5170afe708216ab67a68440
parentd8e6f8d4f58cb9e9c492204dae1fbb2fac0b7349 (diff)
Subchannel index compiles
-rw-r--r--src/core/client_config/subchannel.c15
-rw-r--r--src/core/client_config/subchannel.h2
-rw-r--r--src/core/client_config/subchannel_index.c72
-rw-r--r--src/core/client_config/subchannel_index.h9
-rw-r--r--src/core/surface/channel_create.c2
-rw-r--r--src/core/surface/init.c5
-rw-r--r--test/core/end2end/fixtures/h2_uchannel.c2
7 files changed, 84 insertions, 23 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index bf2f15a444..c704595ec7 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -239,6 +239,21 @@ void grpc_subchannel_weak_ref(grpc_subchannel *c
GPR_ASSERT(old_refs != 0);
}
+grpc_subchannel *grpc_subchannel_ref_from_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ if (!c) return NULL;
+ for (;;) {
+ gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
+ if (old_refs >= (1 << INTERNAL_REF_BITS)) {
+ gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
+ if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) {
+ return c;
+ }
+ } else {
+ return NULL;
+ }
+ }
+}
+
static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connected_subchannel *con;
grpc_subchannel_index_unregister(exec_ctx, c->key, c);
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 8fd976276b..0d470f593c 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -84,6 +84,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
void grpc_subchannel_ref(grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+grpc_subchannel *grpc_subchannel_ref_from_weak_ref(grpc_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
diff --git a/src/core/client_config/subchannel_index.c b/src/core/client_config/subchannel_index.c
index 9f6ecca295..575ccb4aad 100644
--- a/src/core/client_config/subchannel_index.c
+++ b/src/core/client_config/subchannel_index.c
@@ -33,6 +33,7 @@
#include "src/core/client_config/subchannel_index.h"
+#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -104,7 +105,7 @@ static int subchannel_key_compare(grpc_subchannel_key *a, grpc_subchannel_key *b
return grpc_channel_args_compare(a->args.args, b->args.args);
}
-static void subchannel_key_destroy(grpc_subchannel_key *k) {
+void grpc_subchannel_key_destroy(grpc_subchannel_key *k) {
gpr_free(k->args.addr);
gpr_free(k->args.filters);
grpc_channel_args_destroy((grpc_channel_args*)k->args.args);
@@ -112,7 +113,7 @@ static void subchannel_key_destroy(grpc_subchannel_key *k) {
}
static void sck_avl_destroy(void *p) {
- subchannel_key_destroy(p);
+ grpc_subchannel_key_destroy(p);
}
static void *sck_avl_copy(void *p) {
@@ -141,33 +142,38 @@ static const gpr_avl_vtable subchannel_avl_vtable = {
.copy_value = scv_avl_copy
};
+void grpc_subchannel_index_init(void) {
+ g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable);
+ gpr_mu_init(&g_mu);
+}
+
+void grpc_subchannel_index_shutdown(void) {
+ gpr_mu_destroy(&g_mu);
+ gpr_avl_unref(g_subchannel_index);
+}
+
grpc_subchannel *grpc_subchannel_index_find(
grpc_exec_ctx *exec_ctx,
- grpc_connector *connector,
- grpc_subchannel_args *args) {
- enter_ctx(ctx);
+ grpc_subchannel_key *key) {
+ enter_ctx(exec_ctx);
gpr_mu_lock(&g_mu);
gpr_avl index = gpr_avl_ref(g_subchannel_index);
gpr_mu_unlock(&g_mu);
- subchannel_key *key = subchannel_key_create(connector, args);
- grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key));
- subchannel_key_destroy(key);
+ grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find");
gpr_avl_unref(index);
- leave_ctx(ctx);
+ leave_ctx(exec_ctx);
return c;
}
grpc_subchannel *grpc_subchannel_index_register(
grpc_exec_ctx *exec_ctx,
- grpc_connector *connector,
- grpc_subchannel_args *args,
+ grpc_subchannel_key *key,
grpc_subchannel *constructed) {
- enter_ctx(ctx);
+ enter_ctx(exec_ctx);
- subchannel_key *key = subchannel_key_create(connector, args);
grpc_subchannel *c = NULL;
while (c == NULL) {
@@ -177,13 +183,13 @@ grpc_subchannel *grpc_subchannel_index_register(
c = gpr_avl_get(index, key);
if (c != NULL) {
- GRPC_SUBCHANNEL_WEAK_UNREF(constructed);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register");
} else {
gpr_avl updated = gpr_avl_add(index, key, constructed);
gpr_mu_lock(&g_mu);
if (index.root == g_subchannel_index.root) {
- GPR_SWAP(index, g_subchannel_index);
+ GPR_SWAP(gpr_avl, updated, g_subchannel_index);
c = constructed;
}
gpr_mu_unlock(&g_mu);
@@ -191,7 +197,41 @@ grpc_subchannel *grpc_subchannel_index_register(
gpr_avl_unref(index);
}
- leave_ctx(ctx);
+ leave_ctx(exec_ctx);
return c;
}
+
+void grpc_subchannel_index_unregister(
+ grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key,
+ grpc_subchannel *constructed) {
+ enter_ctx(exec_ctx);
+
+ bool done = false;
+ while (!done) {
+ gpr_mu_lock(&g_mu);
+ gpr_avl index = gpr_avl_ref(g_subchannel_index);
+ gpr_mu_unlock(&g_mu);
+
+ grpc_subchannel *c = gpr_avl_get(index, key);
+ if (c != constructed) {
+ break;
+ }
+
+ gpr_avl updated = gpr_avl_remove(index, key);
+
+ gpr_mu_lock(&g_mu);
+ if (index.root == g_subchannel_index.root) {
+ GPR_SWAP(gpr_avl, updated, g_subchannel_index);
+ done = true;
+ } else {
+ GPR_SWAP(gpr_avl, updated, index);
+ }
+ gpr_mu_unlock(&g_mu);
+
+ gpr_avl_unref(index);
+ }
+
+ leave_ctx(exec_ctx);
+}
diff --git a/src/core/client_config/subchannel_index.h b/src/core/client_config/subchannel_index.h
index dfbc3228d9..69c19969a9 100644
--- a/src/core/client_config/subchannel_index.h
+++ b/src/core/client_config/subchannel_index.h
@@ -45,17 +45,20 @@ grpc_subchannel_key *grpc_subchannel_key_create(
void grpc_subchannel_key_destroy(grpc_subchannel_key *key);
grpc_subchannel *grpc_subchannel_index_find(
- grpc_exec_ctx *ctx,
+ grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key);
grpc_subchannel *grpc_subchannel_index_register(
- grpc_exec_ctx *ctx,
+ grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed);
void grpc_subchannel_index_unregister(
- grpc_exec_ctx *ctx,
+ grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed);
+void grpc_subchannel_index_init(void);
+void grpc_subchannel_index_shutdown(void);
+
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 49083f0870..031ae1b543 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -172,7 +172,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
args->args = final_args;
- s = grpc_subchannel_create(&c->base, args);
+ s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
return s;
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 8f1936227e..66c5a522a4 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -47,6 +47,7 @@
#include "src/core/client_config/resolvers/dns_resolver.h"
#include "src/core/client_config/resolvers/sockaddr_resolver.h"
#include "src/core/client_config/subchannel.h"
+#include "src/core/client_config/subchannel_index.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/executor.h"
#include "src/core/iomgr/iomgr.h"
@@ -126,7 +127,7 @@ void grpc_init(void) {
}
gpr_timers_global_init();
grpc_cq_global_init();
- grpc_subchannel_global_init();
+ grpc_subchannel_index_init();
for (i = 0; i < g_number_of_plugins; i++) {
if (g_all_of_the_plugins[i].init != NULL) {
g_all_of_the_plugins[i].init();
@@ -145,7 +146,7 @@ void grpc_shutdown(void) {
grpc_executor_shutdown();
grpc_cq_global_shutdown();
grpc_iomgr_shutdown();
- grpc_subchannel_global_shutdown();
+ grpc_subchannel_index_shutdown();
census_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c
index 9b622e80d6..ee6aac796d 100644
--- a/test/core/end2end/fixtures/h2_uchannel.c
+++ b/test/core/end2end/fixtures/h2_uchannel.c
@@ -159,7 +159,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
args->args = final_args;
- s = grpc_subchannel_create(&c->base, args);
+ s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
if (*f->sniffed_subchannel) {