diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-11-27 21:45:11 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-11-27 21:45:11 -0800 |
commit | 50ec2670a45799b95f2910f26a5a9f79ab2e8404 (patch) | |
tree | 97991af0e6a88fe6e10bbccc9044df6b7ae499d4 /src | |
parent | 12ad5d6cd69d657c082704637d9cb6886a05dd4b (diff) |
Most of the way to auto-cleanup subchannels
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/channel_stack.c | 3 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 1 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 9 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 44 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 25 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 6 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 7 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 50 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.c | 13 | ||||
-rw-r--r-- | src/core/surface/channel.c | 2 | ||||
-rw-r--r-- | src/core/transport/transport.h | 2 |
13 files changed, 29 insertions, 142 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 559ad0a32c..d2f6a90ca8 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -106,6 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *channel_args, + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + @@ -117,7 +118,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, stack->count = filter_count; GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg, - "CHANNEL_STACK"); + name); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 766f543404..bb7081b2a2 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -183,6 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args, + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5fec87c67c..5ad2e075c3 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -260,10 +260,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); - } - if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); @@ -282,11 +278,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, destroy_resolver); GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); } - - if (lb_policy) { - grpc_lb_policy_broadcast(exec_ctx, lb_policy, op); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast"); - } } typedef struct { diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index c0f1d3fd94..d83f3718c2 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -185,7 +185,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; size_t i; - grpc_transport_op op; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; grpc_connected_subchannel *exclude_subchannel; @@ -199,12 +198,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != - exclude_subchannel) { - memset(&op, 0, sizeof(op)); - op.disconnect = 1; - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); - } GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); } @@ -323,41 +316,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&p->mu); } -static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_transport_op *op) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - size_t i; - size_t n; - grpc_subchannel **subchannels; - grpc_connected_subchannel *selected; - - gpr_mu_lock(&p->mu); - n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - selected = p->selected; - if (selected) { - GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); - } - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) { - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); - } - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); - } - if (p->selected) { - grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, - "pf_broadcast_to_selected"); - } - gpr_free(subchannels); -} - static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; @@ -380,7 +338,7 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle, - pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; + pf_check_connectivity, pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 10688b3fa5..16afd8c10e 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -451,29 +451,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } } -static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_transport_op *op) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; - size_t n; - grpc_subchannel **subchannels; - - gpr_mu_lock(&p->mu); - n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast"); - } - gpr_free(subchannels); -} - static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; @@ -497,7 +474,7 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, - rr_broadcast, rr_check_connectivity, rr_notify_on_state_change}; + rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 8d9287c36a..2b874daef6 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -61,6 +61,7 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { #endif if (gpr_unref(&policy->refs)) { + grpc_pollset_set_destroy(&policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } @@ -83,11 +84,6 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, policy->vtable->cancel_pick(exec_ctx, policy, target); } -void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op) { - policy->vtable->broadcast(exec_ctx, policy, op); -} - void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { policy->vtable->exit_idle(exec_ctx, policy); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 985c96630f..96b2bdf4ca 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -66,10 +66,6 @@ struct grpc_lb_policy_vtable { /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** broadcast a transport op to all subchannels */ - void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op); - /** check the current connectivity of the lb_policy */ grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -118,9 +114,6 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); -void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op); - void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 1c66a73146..434a37cf6b 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -238,7 +238,7 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); - if ((old_refs & STRONG_REF_MASK) == 0) { + if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { disconnect(exec_ctx, c); } GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref"); @@ -351,7 +351,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, do_connect = 1; c->connecting = 1; /* released by connection */ - GRPC_SUBCHANNEL_REF(c, "connecting"); + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); } gpr_mu_unlock(&c->mu); @@ -369,40 +369,6 @@ void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&c->mu); } -void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_transport_op *op) { - grpc_connected_subchannel *con; - int cancel_alarm = 0; - gpr_mu_lock(&c->mu); - con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != NULL) { - GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op"); - } - if (op->disconnect) { - c->disconnected = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - if (c->have_alarm) { - cancel_alarm = 1; - } - } - gpr_mu_unlock(&c->mu); - - if (con != NULL) { - grpc_connected_subchannel_process_transport_op(exec_ctx, con, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op"); - } - - if (cancel_alarm) { - grpc_timer_cancel(exec_ctx, &c->alarm); - } - - if (op->disconnect) { - grpc_connector_shutdown(exec_ctx, c->connector); - } -} - void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { @@ -488,7 +454,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { con = gpr_malloc(channel_stack_size); stk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, - num_filters, c->args, stk); + num_filters, c->args, "CONNECTED_SUBCHANNEL", stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -507,7 +473,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free(sw_subchannel); gpr_free((void *)filters); grpc_channel_stack_destroy(exec_ctx, stk); - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + gpr_free(con); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); return; } @@ -519,7 +486,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure); @@ -588,17 +555,18 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } } static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { grpc_subchannel *c = arg; + if (c->connecting_result.transport != NULL) { publish_transport(exec_ctx, c); } else if (c->disconnected) { - /* do nothing */ + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_mu_lock(&c->mu); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b64a26561b..66c13990e9 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -105,9 +105,6 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call( grpc_pollset *pollset); /** process a transport level op */ -void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_transport_op *op); void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel, grpc_transport_op *op); diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 7ff80e6cf8..81c19ca797 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -43,6 +43,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> #define CLOSURE_NOT_READY ((grpc_closure *)0) @@ -158,7 +159,10 @@ void grpc_fd_global_shutdown(void) { grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); - grpc_iomgr_register_object(&r->iomgr_object, name); + char *name2; + gpr_asprintf(&name2, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&r->iomgr_object, name2); + gpr_free(name2); #ifdef GRPC_FD_REF_COUNT_DEBUG gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name); #endif diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index f29ef7cdcf..7f0b34c36b 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -52,7 +52,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { size_t i; gpr_mu_destroy(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); @@ -74,7 +74,7 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, pollset_set->pollsets[pollset_set->pollset_count++] = pollset; for (i = 0, j = 0; i < pollset_set->fd_count; i++) { if (grpc_fd_is_orphaned(pollset_set->fds[i])) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } else { grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); pollset_set->fds[j++] = pollset_set->fds[i]; @@ -107,12 +107,13 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bag->mu); if (bag->pollset_set_count == bag->pollset_set_capacity) { bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + bag->pollset_sets = gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); } bag->pollset_sets[bag->pollset_set_count++] = item; for (i = 0, j = 0; i < bag->fd_count; i++) { if (grpc_fd_is_orphaned(bag->fds[i])) { - GRPC_FD_UNREF(bag->fds[i], "pollset"); + GRPC_FD_UNREF(bag->fds[i], "pollset_set"); } else { grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); bag->fds[j++] = bag->fds[i]; @@ -130,7 +131,9 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, for (i = 0; i < bag->pollset_set_count; i++) { if (bag->pollset_sets[i] == item) { bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]); + GPR_SWAP(grpc_pollset_set *, + bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); break; } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a78fd0aae2..92fd3dadd7 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -152,7 +152,7 @@ grpc_channel *grpc_channel_create_from_filters( } grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, - num_filters, args, + num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 08f34ff0aa..4de72d7d75 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,8 +50,6 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -#define GRPC_STREAM_REFCOUNT_DEBUG - typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; |