From 87cc0848ce279892cce562b3aa6cfae870d05e50 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 30 Jun 2015 08:15:55 -0700 Subject: client_config bugfixes --- src/core/client_config/lb_policies/pick_first.c | 17 +++++++++++------ src/core/client_config/subchannel.c | 7 ++++++- src/core/client_config/subchannel.h | 2 -- src/core/iomgr/fd_posix.c | 3 ++- src/core/iomgr/iomgr.c | 8 +++++--- src/core/surface/channel_create.c | 1 + 6 files changed, 25 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index a8e5b5cc4a..ffdae75b28 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -176,15 +176,20 @@ loop: del_interested_parties_locked(p); GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); - p->checking_subchannel %= p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); - add_interested_parties_locked(p); if (p->num_subchannels == 0) { - abort(); + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + gpr_free(pp); + } } else { + p->checking_subchannel %= p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity( + p->subchannels[p->checking_subchannel]); + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); + add_interested_parties_locked(p); goto loop; } } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index cae6db0110..d16786a7ad 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -546,6 +546,7 @@ static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); c->have_alarm = 0; + connectivity_state_changed_locked(c); gpr_mu_unlock(&c->mu); if (iomgr_success) { continue_connect(c); @@ -560,6 +561,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { publish_transport(c); } else { gpr_mu_lock(&c->mu); + connectivity_state_changed_locked(c); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta); @@ -570,7 +572,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - return gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); + return gpr_time_add(c->next_attempt, c->backoff_delta); } static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { @@ -578,6 +580,9 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { return GRPC_CHANNEL_FATAL_FAILURE; } if (c->connecting) { + if (c->have_alarm) { + return GRPC_CHANNEL_TRANSIENT_FAILURE; + } return GRPC_CHANNEL_CONNECTING; } if (c->active) { diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 03bd4f63e0..5435ef703b 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -43,8 +43,6 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; -#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG - #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index d12974cf3c..446081954d 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -115,7 +115,7 @@ static void destroy(grpc_fd *fd) { #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { - gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, + gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); #else @@ -159,6 +159,7 @@ void grpc_fd_global_shutdown(void) { grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); + gpr_log(GPR_DEBUG, "FD %d %p create", r->fd, r); grpc_iomgr_register_object(&r->iomgr_object, name); return r; } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 8fbddd73b0..e400601311 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -158,7 +158,7 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { - gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s", obj->name); + gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s %p", obj->name, obj); } break; } @@ -177,8 +177,9 @@ void grpc_iomgr_shutdown(void) { } void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) { - gpr_mu_lock(&g_mu); obj->name = gpr_strdup(name); + gpr_log(GPR_DEBUG, "register: %s %p", obj->name, obj); + gpr_mu_lock(&g_mu); obj->next = &g_root_object; obj->prev = obj->next->prev; obj->next->prev = obj->prev->next = obj; @@ -186,12 +187,13 @@ void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) { } void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { + gpr_log(GPR_DEBUG, "unregister: %s %p", obj->name, obj); gpr_mu_lock(&g_mu); obj->next->prev = obj->prev; obj->prev->next = obj->next; - gpr_free(obj->name); gpr_cv_signal(&g_rcv); gpr_mu_unlock(&g_mu); + gpr_free(obj->name); } void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 6b559e1fb4..20da830388 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -73,6 +73,7 @@ static void connected(void *arg, grpc_endpoint *tcp) { if (tcp != NULL) { c->result->transport = grpc_create_chttp2_transport(c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1); + GPR_ASSERT(c->result->transport); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter*)); c->result->filters[0] = &grpc_http_client_filter; c->result->num_filters = 1; -- cgit v1.2.3