diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-09-21 18:56:08 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2017-10-02 16:29:41 -0700 |
commit | 83062842c3601faeddcae8f901c515e3c78f3661 (patch) | |
tree | 58536b25350a5f2b3123161ab4ca9b8ce01a43a5 /src/core/ext/filters/client_channel/subchannel_index.cc | |
parent | 1c9b584a153ff592c92b6cf6baa620d8602a37cd (diff) |
Changes for C to C++. Adding extern C to header files for compatibility.
Also converting to .cc
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel_index.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel_index.cc | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc new file mode 100644 index 0000000000..d7a51f3899 --- /dev/null +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -0,0 +1,251 @@ +// +// +// Copyright 2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include "src/core/ext/filters/client_channel/subchannel_index.h" + +#include <stdbool.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/avl.h> +#include <grpc/support/string_util.h> +#include <grpc/support/tls.h> + +#include "src/core/lib/channel/channel_args.h" + +// a map of subchannel_key --> subchannel, used for detecting connections +// to the same destination in order to share them +static gpr_avl g_subchannel_index; + +static gpr_mu g_mu; + +static gpr_refcount g_refcount; + +struct grpc_subchannel_key { + grpc_subchannel_args args; +}; + +static bool g_force_creation = false; + +static grpc_subchannel_key *create_key( + const grpc_subchannel_args *args, + grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) { + grpc_subchannel_key *k = (grpc_subchannel_key *)gpr_malloc(sizeof(*k)); + k->args.filter_count = args->filter_count; + if (k->args.filter_count > 0) { + k->args.filters = (const grpc_channel_filter **)gpr_malloc( + sizeof(*k->args.filters) * k->args.filter_count); + memcpy((grpc_channel_filter *)k->args.filters, args->filters, + sizeof(*k->args.filters) * k->args.filter_count); + } else { + k->args.filters = NULL; + } + k->args.args = copy_channel_args(args->args); + return k; +} + +grpc_subchannel_key *grpc_subchannel_key_create( + const grpc_subchannel_args *args) { + return create_key(args, grpc_channel_args_normalize); +} + +static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) { + return create_key(&k->args, grpc_channel_args_copy); +} + +int grpc_subchannel_key_compare(const grpc_subchannel_key *a, + const grpc_subchannel_key *b) { + if (g_force_creation) return false; + int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); + if (c != 0) return c; + if (a->args.filter_count > 0) { + c = memcmp(a->args.filters, b->args.filters, + a->args.filter_count * sizeof(*a->args.filters)); + if (c != 0) return c; + } + return grpc_channel_args_compare(a->args.args, b->args.args); +} + +void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *k) { + gpr_free((grpc_channel_args *)k->args.filters); + grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args); + gpr_free(k); +} + +static void sck_avl_destroy(void *p, void *user_data) { + grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; + grpc_subchannel_key_destroy(exec_ctx, (grpc_subchannel_key *)p); +} + +static void *sck_avl_copy(void *p, void *unused) { + return subchannel_key_copy((grpc_subchannel_key *)p); +} + +static long sck_avl_compare(void *a, void *b, void *unused) { + return grpc_subchannel_key_compare((grpc_subchannel_key *)a, + (grpc_subchannel_key *)b); +} + +static void scv_avl_destroy(void *p, void *user_data) { + grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, (grpc_subchannel *)p, + "subchannel_index"); +} + +static void *scv_avl_copy(void *p, void *unused) { + GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel *)p, "subchannel_index"); + return p; +} + +static const gpr_avl_vtable subchannel_avl_vtable = { + .destroy_key = sck_avl_destroy, + .copy_key = sck_avl_copy, + .compare_keys = sck_avl_compare, + .destroy_value = scv_avl_destroy, + .copy_value = scv_avl_copy}; + +void grpc_subchannel_index_init(void) { + g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); + gpr_mu_init(&g_mu); + gpr_ref_init(&g_refcount, 1); +} + +void grpc_subchannel_index_shutdown(void) { + // TODO(juanlishen): This refcounting mechanism may lead to memory leackage. + // To solve that, we should force polling to flush any pending callbacks, then + // shutdown safely. + grpc_subchannel_index_unref(); +} + +void grpc_subchannel_index_unref(void) { + if (gpr_unref(&g_refcount)) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_subchannel_index, &exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); } + +grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *key) { + // Lock, and take a reference to the subchannel index. + // We don't need to do the search under a lock as avl's are immutable. + gpr_mu_lock(&g_mu); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); + gpr_mu_unlock(&g_mu); + + grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( + (grpc_subchannel *)gpr_avl_get(index, key, exec_ctx), "index_find"); + gpr_avl_unref(index, exec_ctx); + + return c; +} + +grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed) { + grpc_subchannel *c = NULL; + bool need_to_unref_constructed; + + while (c == NULL) { + need_to_unref_constructed = false; + + // Compare and swap loop: + // - take a reference to the current index + gpr_mu_lock(&g_mu); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); + gpr_mu_unlock(&g_mu); + + // - Check to see if a subchannel already exists + c = (grpc_subchannel *)gpr_avl_get(index, key, exec_ctx); + if (c != NULL) { + c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); + } + if (c != NULL) { + // yes -> we're done + need_to_unref_constructed = true; + } else { + // no -> update the avl and compare/swap + gpr_avl updated = gpr_avl_add( + gpr_avl_ref(index, exec_ctx), subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), exec_ctx); + + // it may happen (but it's expected to be unlikely) + // that some other thread has changed the index: + // compare/swap here to check that, and retry as necessary + gpr_mu_lock(&g_mu); + if (index.root == g_subchannel_index.root) { + GPR_SWAP(gpr_avl, updated, g_subchannel_index); + c = constructed; + } + gpr_mu_unlock(&g_mu); + + gpr_avl_unref(updated, exec_ctx); + } + gpr_avl_unref(index, exec_ctx); + } + + if (need_to_unref_constructed) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, constructed, "index_register"); + } + + return c; +} + +void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed) { + bool done = false; + while (!done) { + // Compare and swap loop: + // - take a reference to the current index + gpr_mu_lock(&g_mu); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); + gpr_mu_unlock(&g_mu); + + // Check to see if this key still refers to the previously + // registered subchannel + grpc_subchannel *c = (grpc_subchannel *)gpr_avl_get(index, key, exec_ctx); + if (c != constructed) { + gpr_avl_unref(index, exec_ctx); + break; + } + + // compare and swap the update (some other thread may have + // mutated the index behind us) + gpr_avl updated = + gpr_avl_remove(gpr_avl_ref(index, exec_ctx), key, exec_ctx); + + gpr_mu_lock(&g_mu); + if (index.root == g_subchannel_index.root) { + GPR_SWAP(gpr_avl, updated, g_subchannel_index); + done = true; + } + gpr_mu_unlock(&g_mu); + + gpr_avl_unref(updated, exec_ctx); + gpr_avl_unref(index, exec_ctx); + } +} + +void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) { + g_force_creation = force_creation; +} |