diff options
author | yang-g <yangg@google.com> | 2015-07-21 16:11:55 -0700 |
---|---|---|
committer | yang-g <yangg@google.com> | 2015-07-21 16:11:55 -0700 |
commit | b4e262cb600055053c8932e585e7c8873a90a09b (patch) | |
tree | 5246173bd423feb9246f066af84eb76ffb30175d /src | |
parent | 5553eb3ee2a335328633598d54bb1e0484d13040 (diff) |
reconnect support
Diffstat (limited to 'src')
-rw-r--r-- | src/core/client_config/subchannel.c | 63 |
1 files changed, 53 insertions, 10 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 074e64dfc5..5cf56d33eb 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -44,6 +44,12 @@ #include "src/core/transport/connectivity_state.h" #include "src/core/surface/channel.h" +#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 +#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 +#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 +#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 +#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 + typedef struct { /* all fields protected by subchannel->mu */ /** refcount */ @@ -125,6 +131,8 @@ struct grpc_subchannel { int have_alarm; /** our alarm */ grpc_alarm alarm; + /** current random value */ + gpr_int32 random; }; struct grpc_subchannel_call { @@ -264,6 +272,10 @@ void grpc_subchannel_del_interested_party(grpc_subchannel *c, grpc_pollset_set_del_pollset(c->pollset_set, pollset); } +static gpr_int32 random_seed() { + return gpr_time_to_millis(gpr_now(GPR_CLOCK_REALTIME)); +} + grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); @@ -284,6 +296,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->mdctx = args->mdctx; c->master = args->master; c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); + c->random = random_seed(); grpc_mdctx_ref(c->mdctx); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, @@ -307,10 +320,9 @@ static void continue_connect(grpc_subchannel *c) { } static void start_connect(grpc_subchannel *c) { - gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); - c->next_attempt = now; - c->backoff_delta = gpr_time_from_seconds(1, GPR_TIMESPAN); - + c->backoff_delta = gpr_time_from_seconds( + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); + c->next_attempt = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c->backoff_delta); continue_connect(c); } @@ -576,6 +588,34 @@ static void publish_transport(grpc_subchannel *c) { } } +/* Generate a random number between 0 and 1. */ +static double generate_uniform_random_number(grpc_subchannel *c) { + c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31); + return c->random / (double)((gpr_uint32)1 << 31); +} + +/* Update backoff_delta and next_attempt in subchannel */ +static void update_reconnect_parameters(grpc_subchannel *c) { + gpr_int32 backoff_delta_millis, jitter; + gpr_int32 max_backoff_millis = + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + double jitter_range; + backoff_delta_millis = + (gpr_int32)(gpr_time_to_millis(c->backoff_delta) * + GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); + if (backoff_delta_millis > max_backoff_millis) { + backoff_delta_millis = max_backoff_millis; + } + c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); + c->next_attempt = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c->backoff_delta); + + jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; + jitter = + (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range); + c->next_attempt = + gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); +} + static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); @@ -586,6 +626,7 @@ static void on_alarm(void *arg, int iomgr_success) { connectivity_state_changed_locked(c, "alarm"); gpr_mu_unlock(&c->mu); if (iomgr_success) { + update_reconnect_parameters(c); continue_connect(c); } else { GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); @@ -603,18 +644,20 @@ static void subchannel_connected(void *arg, int iomgr_success) { GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; connectivity_state_changed_locked(c, "connect_failed"); - c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta); - if (gpr_time_cmp(c->backoff_delta, - gpr_time_from_seconds(60, GPR_TIMESPAN)) < 0) { - c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); - } grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - return gpr_time_add(c->next_attempt, c->backoff_delta); + gpr_timespec current_deadline = + gpr_time_add(c->next_attempt, c->backoff_delta); + gpr_timespec min_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, + GPR_TIMESPAN)); + return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline + : min_deadline; } static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { |