diff options
-rw-r--r-- | src/core/channel/client_channel.c | 2 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 8 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 16 |
3 files changed, 22 insertions, 4 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 3fce931284..835467d102 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -415,7 +415,7 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) { /* check if the notification is for a stale policy */ if (w->lb_policy == w->chand->lb_policy) { grpc_connectivity_state_set(&w->chand->state_tracker, w->state); - start_new = 1; + start_new = (w->state != GRPC_CHANNEL_FATAL_FAILURE); } gpr_mu_unlock(&w->chand->mu_config); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 8409aab14e..1a0f9d1790 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -62,6 +62,8 @@ typedef struct { grpc_subchannel *selected; /** have we started picking? */ int started_picking; + /** are we shut down? */ + int shutdown; /** which subchannel are we watching? */ size_t checking_subchannel; /** what is the connectivity of that channel? */ @@ -107,12 +109,14 @@ void pf_shutdown(grpc_lb_policy *pol) { pending_pick *pp; gpr_mu_lock(&p->mu); del_interested_parties_locked(p); + p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; grpc_iomgr_add_delayed_callback(pp->on_complete, 0); gpr_free(pp); } + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); gpr_mu_unlock(&p->mu); } @@ -168,7 +172,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { gpr_mu_lock(&p->mu); - if (p->selected != NULL) { + if (p->shutdown) { + unref = 1; + } else if (p->selected != NULL) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index be26e0bd1d..a2f672df1c 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -492,6 +492,8 @@ static void publish_transport(grpc_subchannel *c) { connection *destroy_connection = NULL; grpc_channel_element *elem; + gpr_log(GPR_DEBUG, "publish_transport: %p", c->master); + /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; filters = gpr_malloc(sizeof(*filters) * num_filters); @@ -525,6 +527,8 @@ static void publish_transport(grpc_subchannel *c) { gpr_free(sw); gpr_free(filters); grpc_channel_stack_destroy(stk); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); + GRPC_SUBCHANNEL_UNREF(c, "connecting"); return; } @@ -569,6 +573,8 @@ static void publish_transport(grpc_subchannel *c) { static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); + gpr_log(GPR_DEBUG, "on_alarm:%d:%d:%d", c->have_alarm, iomgr_success, + c->disconnected); c->have_alarm = 0; if (c->disconnected) { iomgr_success = 0; @@ -588,13 +594,19 @@ static void subchannel_connected(void *arg, int iomgr_success) { if (c->connecting_result.transport != NULL) { publish_transport(c); } else { + gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; connectivity_state_changed_locked(c); c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta); - c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); - grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_REALTIME)); + if (gpr_time_cmp(c->backoff_delta, gpr_time_from_seconds(60)) < 0) { + c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); + } + gpr_log(GPR_DEBUG, "wait: %d.%09d %d.%09d %d.%09d", now.tv_sec, now.tv_nsec, + c->next_attempt.tv_sec, c->next_attempt.tv_nsec, + c->backoff_delta.tv_sec, c->backoff_delta.tv_nsec); + grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } } |