aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c3
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.c22
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h7
-rw-r--r--src/core/lib/surface/init.c9
6 files changed, 39 insertions, 7 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 18979829bd..6741e9df33 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -101,6 +101,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -1046,6 +1047,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
gpr_free(glb_policy);
return NULL;
}
+ grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
@@ -1072,6 +1074,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
+ grpc_subchannel_index_unref();
if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
gpr_free(glb_policy->pending_update_args);
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index fab3073eb9..50b02016a1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -89,6 +89,7 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
"picked_first_destroy");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_subchannel_index_unref();
if (p->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
gpr_free(p->pending_update_args);
@@ -686,6 +687,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
+ grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
grpc_combiner_scheduler(args->combiner));
return &p->base;
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index be91d3d651..8ac1a46abd 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -30,6 +30,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -310,6 +311,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
(void *)pol, (void *)pol);
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_subchannel_index_unref();
gpr_free(p);
}
@@ -890,6 +892,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->client_channel_factory != NULL);
round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
+ grpc_subchannel_index_ref();
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
rr_update_locked(exec_ctx, &p->base, args);
diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c
index f57b631c41..000ee70be7 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.c
+++ b/src/core/ext/filters/client_channel/subchannel_index.c
@@ -34,6 +34,8 @@ static gpr_avl g_subchannel_index;
static gpr_mu g_mu;
+static gpr_refcount g_refcount;
+
struct grpc_subchannel_key {
grpc_subchannel_args args;
};
@@ -119,15 +121,27 @@ static const gpr_avl_vtable subchannel_avl_vtable = {
void grpc_subchannel_index_init(void) {
g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable);
gpr_mu_init(&g_mu);
+ gpr_ref_init(&g_refcount, 1);
}
void grpc_subchannel_index_shutdown(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_mu_destroy(&g_mu);
- gpr_avl_unref(g_subchannel_index, &exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ // TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
+ // To solve that, we should force polling to flush any pending callbacks, then
+ // shutdown safely.
+ grpc_subchannel_index_unref();
+}
+
+void grpc_subchannel_index_unref(void) {
+ if (gpr_unref(&g_refcount)) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_destroy(&g_mu);
+ gpr_avl_unref(g_subchannel_index, &exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
}
+void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }
+
grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key) {
// Lock, and take a reference to the subchannel index.
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index 98d882a453..92e36d5283 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -59,6 +59,13 @@ void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
+/** Increment the refcount (non-zero) of subchannel index (global). */
+void grpc_subchannel_index_ref(void);
+
+/** Decrement the refcount of subchannel index (global). If the refcount drops
+ to zero, unref the subchannel index and destroy its mutex. */
+void grpc_subchannel_index_unref(void);
+
/** \em TEST ONLY.
* If \a force_creation is true, all key comparisons will be false, resulting in
* new subchannels always being created. Otherwise, the keys will be compared as
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 280315036f..b089da2c54 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -36,6 +36,7 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/resource_quota.h"
+#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/alarm_internal.h"
@@ -179,14 +180,16 @@ void grpc_shutdown(void) {
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
- grpc_iomgr_shutdown(&exec_ctx);
- gpr_timers_global_destroy();
- grpc_tracer_shutdown();
+ grpc_executor_shutdown(&exec_ctx);
+ grpc_timer_manager_set_threading(false); // shutdown timer_manager thread
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != NULL) {
g_all_of_the_plugins[i].destroy();
}
}
+ grpc_iomgr_shutdown(&exec_ctx);
+ gpr_timers_global_destroy();
+ grpc_tracer_shutdown();
grpc_mdctx_global_shutdown(&exec_ctx);
grpc_handshaker_factory_registry_shutdown(&exec_ctx);
grpc_slice_intern_shutdown();