diff options
6 files changed, 39 insertions, 7 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 18979829bd..6741e9df33 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -101,6 +101,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" @@ -1046,6 +1047,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, gpr_free(glb_policy); return NULL; } + grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, grpc_combiner_scheduler(args->combiner)); @@ -1072,6 +1074,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); + grpc_subchannel_index_unref(); if (glb_policy->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); gpr_free(glb_policy->pending_update_args); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index fab3073eb9..50b02016a1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -89,6 +89,7 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { "picked_first_destroy"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); if (p->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); gpr_free(p->pending_update_args); @@ -686,6 +687,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, grpc_combiner_scheduler(args->combiner)); return &p->base; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index be91d3d651..8ac1a46abd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -30,6 +30,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/combiner.h" @@ -310,6 +311,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { (void *)pol, (void *)pol); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); gpr_free(p); } @@ -890,6 +892,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->client_channel_factory != NULL); round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); rr_update_locked(exec_ctx, &p->base, args); diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index f57b631c41..000ee70be7 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -34,6 +34,8 @@ static gpr_avl g_subchannel_index; static gpr_mu g_mu; +static gpr_refcount g_refcount; + struct grpc_subchannel_key { grpc_subchannel_args args; }; @@ -119,15 +121,27 @@ static const gpr_avl_vtable subchannel_avl_vtable = { void grpc_subchannel_index_init(void) { g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); gpr_mu_init(&g_mu); + gpr_ref_init(&g_refcount, 1); } void grpc_subchannel_index_shutdown(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index, &exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); + // TODO(juanlishen): This refcounting mechanism may lead to memory leackage. + // To solve that, we should force polling to flush any pending callbacks, then + // shutdown safely. + grpc_subchannel_index_unref(); +} + +void grpc_subchannel_index_unref(void) { + if (gpr_unref(&g_refcount)) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_subchannel_index, &exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + } } +void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); } + grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key) { // Lock, and take a reference to the subchannel index. diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index 98d882a453..92e36d5283 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -59,6 +59,13 @@ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); +/** Increment the refcount (non-zero) of subchannel index (global). */ +void grpc_subchannel_index_ref(void); + +/** Decrement the refcount of subchannel index (global). If the refcount drops + to zero, unref the subchannel index and destroy its mutex. */ +void grpc_subchannel_index_unref(void); + /** \em TEST ONLY. * If \a force_creation is true, all key comparisons will be false, resulting in * new subchannels always being created. Otherwise, the keys will be compared as diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 280315036f..b089da2c54 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/alarm_internal.h" @@ -179,14 +180,16 @@ void grpc_shutdown(void) { GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { - grpc_iomgr_shutdown(&exec_ctx); - gpr_timers_global_destroy(); - grpc_tracer_shutdown(); + grpc_executor_shutdown(&exec_ctx); + grpc_timer_manager_set_threading(false); // shutdown timer_manager thread for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != NULL) { g_all_of_the_plugins[i].destroy(); } } + grpc_iomgr_shutdown(&exec_ctx); + gpr_timers_global_destroy(); + grpc_tracer_shutdown(); grpc_mdctx_global_shutdown(&exec_ctx); grpc_handshaker_factory_registry_shutdown(&exec_ctx); grpc_slice_intern_shutdown(); |