aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-27 21:45:11 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-27 21:45:11 -0800
commit50ec2670a45799b95f2910f26a5a9f79ab2e8404 (patch)
tree97991af0e6a88fe6e10bbccc9044df6b7ae499d4 /src
parent12ad5d6cd69d657c082704637d9cb6886a05dd4b (diff)
Most of the way to auto-cleanup subchannels
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/channel_stack.c3
-rw-r--r--src/core/channel/channel_stack.h1
-rw-r--r--src/core/channel/client_channel.c9
-rw-r--r--src/core/client_config/lb_policies/pick_first.c44
-rw-r--r--src/core/client_config/lb_policies/round_robin.c25
-rw-r--r--src/core/client_config/lb_policy.c6
-rw-r--r--src/core/client_config/lb_policy.h7
-rw-r--r--src/core/client_config/subchannel.c50
-rw-r--r--src/core/client_config/subchannel.h3
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/pollset_set_posix.c13
-rw-r--r--src/core/surface/channel.c2
-rw-r--r--src/core/transport/transport.h2
13 files changed, 29 insertions, 142 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 559ad0a32c..d2f6a90ca8 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -106,6 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
const grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *channel_args,
+ const char *name,
grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
@@ -117,7 +118,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
stack->count = filter_count;
GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
- "CHANNEL_STACK");
+ name);
elems = CHANNEL_ELEMS_FROM_STACK(stack);
user_data =
((char *)elems) +
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 766f543404..bb7081b2a2 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -183,6 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
size_t filter_count, const grpc_channel_args *args,
+ const char *name,
grpc_channel_stack *stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 5fec87c67c..5ad2e075c3 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -260,10 +260,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
- }
-
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
@@ -282,11 +278,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, destroy_resolver);
GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
}
-
- if (lb_policy) {
- grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
- }
}
typedef struct {
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index c0f1d3fd94..d83f3718c2 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -185,7 +185,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
size_t i;
- grpc_transport_op op;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
grpc_connected_subchannel *exclude_subchannel;
@@ -199,12 +198,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
- if (grpc_subchannel_get_connected_subchannel(subchannels[i]) !=
- exclude_subchannel) {
- memset(&op, 0, sizeof(op));
- op.disconnect = 1;
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
- }
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
}
@@ -323,41 +316,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&p->mu);
}
-static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_transport_op *op) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- size_t i;
- size_t n;
- grpc_subchannel **subchannels;
- grpc_connected_subchannel *selected;
-
- gpr_mu_lock(&p->mu);
- n = p->num_subchannels;
- subchannels = gpr_malloc(n * sizeof(*subchannels));
- selected = p->selected;
- if (selected) {
- GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
- }
- for (i = 0; i < n; i++) {
- subchannels[i] = p->subchannels[i];
- GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
- }
- gpr_mu_unlock(&p->mu);
-
- for (i = 0; i < n; i++) {
- if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) {
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
- }
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
- }
- if (p->selected) {
- grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected,
- "pf_broadcast_to_selected");
- }
- gpr_free(subchannels);
-}
-
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -380,7 +338,7 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
- pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+ pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index 10688b3fa5..16afd8c10e 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -451,29 +451,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
}
-static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_transport_op *op) {
- round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- size_t i;
- size_t n;
- grpc_subchannel **subchannels;
-
- gpr_mu_lock(&p->mu);
- n = p->num_subchannels;
- subchannels = gpr_malloc(n * sizeof(*subchannels));
- for (i = 0; i < n; i++) {
- subchannels[i] = p->subchannels[i];
- GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast");
- }
- gpr_mu_unlock(&p->mu);
-
- for (i = 0; i < n; i++) {
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast");
- }
- gpr_free(subchannels);
-}
-
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -497,7 +474,7 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle,
- rr_broadcast, rr_check_connectivity, rr_notify_on_state_change};
+ rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 8d9287c36a..2b874daef6 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -61,6 +61,7 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy,
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
#endif
if (gpr_unref(&policy->refs)) {
+ grpc_pollset_set_destroy(&policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);
}
}
@@ -83,11 +84,6 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
-void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op) {
- policy->vtable->broadcast(exec_ctx, policy, op);
-}
-
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 985c96630f..96b2bdf4ca 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -66,10 +66,6 @@ struct grpc_lb_policy_vtable {
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** broadcast a transport op to all subchannels */
- void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op);
-
/** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy);
@@ -118,9 +114,6 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
-void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op);
-
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 1c66a73146..434a37cf6b 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -238,7 +238,7 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
- if ((old_refs & STRONG_REF_MASK) == 0) {
+ if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
disconnect(exec_ctx, c);
}
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
@@ -351,7 +351,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
do_connect = 1;
c->connecting = 1;
/* released by connection */
- GRPC_SUBCHANNEL_REF(c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
}
gpr_mu_unlock(&c->mu);
@@ -369,40 +369,6 @@ void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&c->mu);
}
-void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_transport_op *op) {
- grpc_connected_subchannel *con;
- int cancel_alarm = 0;
- gpr_mu_lock(&c->mu);
- con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
- if (con != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op");
- }
- if (op->disconnect) {
- c->disconnected = 1;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
- if (c->have_alarm) {
- cancel_alarm = 1;
- }
- }
- gpr_mu_unlock(&c->mu);
-
- if (con != NULL) {
- grpc_connected_subchannel_process_transport_op(exec_ctx, con, op);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op");
- }
-
- if (cancel_alarm) {
- grpc_timer_cancel(exec_ctx, &c->alarm);
- }
-
- if (op->disconnect) {
- grpc_connector_shutdown(exec_ctx, c->connector);
- }
-}
-
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_transport_op *op) {
@@ -488,7 +454,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
con = gpr_malloc(channel_stack_size);
stk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters,
- num_filters, c->args, stk);
+ num_filters, c->args, "CONNECTED_SUBCHANNEL", stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
gpr_free((void *)c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@@ -507,7 +473,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_free(sw_subchannel);
gpr_free((void *)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ gpr_free(con);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
return;
}
@@ -519,7 +486,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
for connecting is donated
to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, con, &sw_subchannel->connectivity_state,
&sw_subchannel->closure);
@@ -588,17 +555,18 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
} else {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
grpc_subchannel *c = arg;
+
if (c->connecting_result.transport != NULL) {
publish_transport(exec_ctx, c);
} else if (c->disconnected) {
- /* do nothing */
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index b64a26561b..66c13990e9 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -105,9 +105,6 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_pollset *pollset);
/** process a transport level op */
-void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_transport_op *op);
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel,
grpc_transport_op *op);
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 7ff80e6cf8..81c19ca797 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -43,6 +43,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#define CLOSURE_NOT_READY ((grpc_closure *)0)
@@ -158,7 +159,10 @@ void grpc_fd_global_shutdown(void) {
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
- grpc_iomgr_register_object(&r->iomgr_object, name);
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
#ifdef GRPC_FD_REF_COUNT_DEBUG
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
#endif
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index f29ef7cdcf..7f0b34c36b 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -52,7 +52,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
size_t i;
gpr_mu_destroy(&pollset_set->mu);
for (i = 0; i < pollset_set->fd_count; i++) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
}
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->pollset_sets);
@@ -74,7 +74,7 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
if (grpc_fd_is_orphaned(pollset_set->fds[i])) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
} else {
grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
pollset_set->fds[j++] = pollset_set->fds[i];
@@ -107,12 +107,13 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&bag->mu);
if (bag->pollset_set_count == bag->pollset_set_capacity) {
bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
- bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ bag->pollset_sets = gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
}
bag->pollset_sets[bag->pollset_set_count++] = item;
for (i = 0, j = 0; i < bag->fd_count; i++) {
if (grpc_fd_is_orphaned(bag->fds[i])) {
- GRPC_FD_UNREF(bag->fds[i], "pollset");
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
} else {
grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
bag->fds[j++] = bag->fds[i];
@@ -130,7 +131,9 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
for (i = 0; i < bag->pollset_set_count; i++) {
if (bag->pollset_sets[i] == item) {
bag->pollset_set_count--;
- GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]);
+ GPR_SWAP(grpc_pollset_set *,
+ bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
break;
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a78fd0aae2..92fd3dadd7 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -152,7 +152,7 @@ grpc_channel *grpc_channel_create_from_filters(
}
grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters,
- num_filters, args,
+ num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 08f34ff0aa..4de72d7d75 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -50,8 +50,6 @@ typedef struct grpc_transport grpc_transport;
for a stream. */
typedef struct grpc_stream grpc_stream;
-#define GRPC_STREAM_REFCOUNT_DEBUG
-
typedef struct grpc_stream_refcount {
gpr_refcount refs;
grpc_closure destroy;