diff options
author | Craig Tiller <ctiller@google.com> | 2015-07-17 23:12:34 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-07-17 23:12:34 -0700 |
commit | 03dc655d2e856d5809135dd021cf17fa7ace5021 (patch) | |
tree | 979375dee598ce8fd97a58bf31b025db66bb2e8e /src/core/client_config | |
parent | b5980be9a08678212e5dbd6549b923f545d83539 (diff) |
Fix state tracking, refcounting bugs
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 39 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 8 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 10 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 40 |
4 files changed, 63 insertions, 34 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 1a0f9d1790..5ae2e0ea52 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -116,7 +116,8 @@ void pf_shutdown(grpc_lb_policy *pol) { grpc_iomgr_add_delayed_callback(pp->on_complete, 0); gpr_free(pp); } - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, + "shutdown"); gpr_mu_unlock(&p->mu); } @@ -175,17 +176,20 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { if (p->shutdown) { unref = 1; } else if (p->selected != NULL) { - grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity); + grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, + "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed); } else { unref = 1; } } else { -loop: + loop: switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY); + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + "connecting_ready"); p->selected = p->subchannels[p->checking_subchannel]; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -194,10 +198,13 @@ loop: grpc_iomgr_add_delayed_callback(pp->on_complete, 1); gpr_free(pp); } - grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE); + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure"); del_interested_parties_locked(p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; @@ -205,14 +212,17 @@ loop: p->subchannels[p->checking_subchannel]); add_interested_parties_locked(p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed); } else { goto loop; } break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity); + grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, + "connecting_changed"); grpc_subchannel_notify_on_state_change( p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); @@ -224,7 +234,9 @@ loop: p->num_subchannels--; GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, + "no_more_channels"); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; @@ -233,7 +245,9 @@ loop: } unref = 1; } else { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE); + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "subchannel_failed"); p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); @@ -308,7 +322,8 @@ grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); p->num_subchannels = num_subchannels; - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "pick_first"); memcpy(p->subchannels, subchannels, sizeof(grpc_subchannel *) * num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 7a425f9448..90ec44432f 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -82,11 +82,13 @@ void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) { policy->vtable->exit_idle(policy); } -void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure) { +void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure) { policy->vtable->notify_on_state_change(policy, state, closure); } -grpc_connectivity_state grpc_lb_policy_check_connectivity(grpc_lb_policy *policy) { +grpc_connectivity_state grpc_lb_policy_check_connectivity( + grpc_lb_policy *policy) { return policy->vtable->check_connectivity(policy); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 320f383371..d62419b457 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -75,6 +75,8 @@ struct grpc_lb_policy_vtable { grpc_iomgr_closure *closure); }; +#define GRPC_LB_POLICY_REFCOUNT_DEBUG + #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) @@ -111,9 +113,11 @@ void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op); void grpc_lb_policy_exit_idle(grpc_lb_policy *policy); -void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure); +void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure); -grpc_connectivity_state grpc_lb_policy_check_connectivity(grpc_lb_policy *policy); +grpc_connectivity_state grpc_lb_policy_check_connectivity( + grpc_lb_policy *policy); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 5dd280a703..c455c0112b 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -96,7 +96,8 @@ struct grpc_subchannel { grpc_iomgr_closure connected; /** pollset_set tracking who's interested in a connection - being setup - owned by the master channel (in particular the client_channel + being setup - owned by the master channel (in particular the + client_channel filter there-in) */ grpc_pollset_set *pollset_set; @@ -135,7 +136,8 @@ struct grpc_subchannel_call { #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) static grpc_subchannel_call *create_call(connection *con); -static void connectivity_state_changed_locked(grpc_subchannel *c); +static void connectivity_state_changed_locked(grpc_subchannel *c, + const char *reason); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); @@ -265,7 +267,8 @@ void grpc_subchannel_del_interested_party(grpc_subchannel *c, grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); - grpc_channel_element *parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(args->master)); + grpc_channel_element *parent_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(args->master)); memset(c, 0, sizeof(*c)); c->refs = 1; c->connector = connector; @@ -283,7 +286,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); grpc_mdctx_ref(c->mdctx); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); - grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, + "subchannel"); gpr_mu_init(&c->mu); return c; } @@ -345,7 +349,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, grpc_subchannel_add_interested_party(c, pollset); if (!c->connecting) { c->connecting = 1; - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "create_call"); /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); @@ -378,7 +382,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "state_change"); } gpr_mu_unlock(&c->mu); if (do_connect) { @@ -394,7 +398,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, gpr_mu_lock(&c->mu); if (op->disconnect) { c->disconnected = 1; - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; } @@ -462,13 +466,15 @@ static void on_state_changed(void *p, int iomgr_success) { destroy_connection = sw->subchannel->active; } sw->subchannel->active = NULL; - grpc_connectivity_state_set(&c->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE); + grpc_connectivity_state_set( + &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE + : GRPC_CHANNEL_TRANSIENT_FAILURE, + "connection_failed"); break; } done: - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "transport_state_changed"); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); gpr_mu_unlock(mu); @@ -555,7 +561,7 @@ static void publish_transport(grpc_subchannel *c) { elem->filter->start_transport_op(elem, &op); /* signal completion */ - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "connected"); while ((w4c = c->waiting)) { c->waiting = w4c->next; grpc_iomgr_add_callback(&w4c->continuation); @@ -579,7 +585,7 @@ static void on_alarm(void *arg, int iomgr_success) { if (c->disconnected) { iomgr_success = 0; } - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "alarm"); gpr_mu_unlock(&c->mu); if (iomgr_success) { continue_connect(c); @@ -598,9 +604,10 @@ static void subchannel_connected(void *arg, int iomgr_success) { gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "connect_failed"); c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta); - if (gpr_time_cmp(c->backoff_delta, gpr_time_from_seconds(60)) < 0) { + if (gpr_time_cmp(c->backoff_delta, + gpr_time_from_seconds(60, GPR_TIMESPAN)) < 0) { c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); } gpr_log(GPR_DEBUG, "wait: %d.%09d %d.%09d %d.%09d", now.tv_sec, now.tv_nsec, @@ -631,9 +638,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { return GRPC_CHANNEL_IDLE; } -static void connectivity_state_changed_locked(grpc_subchannel *c) { +static void connectivity_state_changed_locked(grpc_subchannel *c, + const char *reason) { grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(&c->state_tracker, current); + grpc_connectivity_state_set(&c->state_tracker, current, reason); } /* |