diff options
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/client_config.c | 6 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 2 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 57 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 14 |
4 files changed, 61 insertions, 18 deletions
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index 6ecffb3854..c500af25ee 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,7 +53,9 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, grpc_client_config *c) { if (gpr_unref(&c->refs)) { - GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); + if (c->lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); + } gpr_free(c); } } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 81167b31c8..8ed1223d39 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -387,8 +387,8 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + if (args->num_subchannels == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 376b6b3d76..e28e4757a1 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -41,6 +41,7 @@ #include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" typedef struct { @@ -71,6 +72,9 @@ typedef struct { grpc_client_config **target_config; /** current (fully resolved) config */ grpc_client_config *resolved_config; + /** retry timer */ + bool have_retry_timer; + grpc_timer retry_timer; } dns_resolver; static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); @@ -91,6 +95,9 @@ static const grpc_resolver_vtable dns_resolver_vtable = { static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; gpr_mu_lock(&r->mu); + if (r->have_retry_timer) { + grpc_timer_cancel(exec_ctx, &r->retry_timer); + } if (r->next_completion != NULL) { *r->target_config = NULL; grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); @@ -125,6 +132,22 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, gpr_mu_unlock(&r->mu); } +static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, + bool success) { + dns_resolver *r = arg; + + gpr_mu_lock(&r->mu); + r->have_retry_timer = false; + if (success) { + if (!r->resolving) { + dns_start_resolving_locked(r); + } + } + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); +} + static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { dns_resolver *r = arg; @@ -133,29 +156,47 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel_args args; grpc_lb_policy *lb_policy; size_t i; - if (addresses) { + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving); + r->resolving = 0; + if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); + size_t naddrs = 0; for (i = 0; i < addresses->naddrs; i++) { memset(&args, 0, sizeof(args)); args.addr = (struct sockaddr *)(addresses->addrs[i].addr); args.addr_len = (size_t)addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( + grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( exec_ctx, r->subchannel_factory, &args); + if (subchannel != NULL) { + subchannels[naddrs++] = subchannel; + } } memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy_args.num_subchannels = naddrs; lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + } grpc_resolved_addresses_destroy(addresses); gpr_free(subchannels); + } else { + int retry_seconds = 15; + gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d seconds", + retry_seconds); + GPR_ASSERT(!r->have_retry_timer); + r->have_retry_timer = true; + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + GRPC_RESOLVER_REF(&r->base, "retry-timer"); + grpc_timer_init( + exec_ctx, &r->retry_timer, + gpr_time_add(now, gpr_time_from_seconds(retry_seconds, GPR_TIMESPAN)), + dns_on_retry_timer, r, now); } - gpr_mu_lock(&r->mu); - GPR_ASSERT(r->resolving); - r->resolving = 0; if (r->resolved_config) { grpc_client_config_unref(exec_ctx, r->resolved_config); } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index d91dd116b8..ec9e47a6dd 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -505,7 +505,8 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, elem->filter->start_transport_op(exec_ctx, elem, &op); } -static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { +static void publish_transport_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { size_t channel_stack_size; grpc_connected_subchannel *con; grpc_channel_stack *stk; @@ -541,8 +542,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); - gpr_mu_lock(&c->mu); - if (c->disconnected) { gpr_mu_unlock(&c->mu); gpr_free(sw_subchannel); @@ -575,7 +574,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected"); - gpr_mu_unlock(&c->mu); gpr_free((void *)filters); } @@ -644,21 +642,23 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; + GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); + gpr_mu_lock(&c->mu); if (c->connecting_result.transport != NULL) { - publish_transport(exec_ctx, c); + publish_transport_locked(exec_ctx, c); } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed"); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); - gpr_mu_unlock(&c->mu); } + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { |