diff options
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 86 |
1 files changed, 23 insertions, 63 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index de28437d0c..8f150a8d81 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -45,6 +45,7 @@ #include "src/core/client_config/subchannel_index.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" +#include "src/core/support/backoff.h" #include "src/core/surface/channel.h" #include "src/core/surface/channel_init.h" #include "src/core/transport/connectivity_state.h" @@ -128,8 +129,8 @@ struct grpc_subchannel { /** next connect attempt time */ gpr_timespec next_attempt; - /** amount to backoff each failure */ - gpr_timespec backoff_delta; + /** backoff state */ + gpr_backoff backoff_state; /** do we have an active alarm? */ int have_alarm; /** our alarm */ @@ -147,7 +148,6 @@ struct grpc_subchannel_call { #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) -static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, bool iomgr_success); @@ -338,6 +338,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + gpr_backoff_init(&c->backoff_state, + GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_SUBCHANNEL_RECONNECT_JITTER, + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + if (c->args) { + for (size_t i = 0; i < c->args->num_args; i++) { + if (0 == strcmp(c->args->args[i].key, + "grpc.testing.fixed_reconnect_backoff")) { + GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); + gpr_backoff_init(&c->backoff_state, 1.0, 0.0, + c->args->args[i].value.integer, + c->args->args[i].value.integer); + } + } + } gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -349,7 +365,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline(c); + args.deadline = c->next_attempt; args.channel_args = c->args; args.initial_connect_string = c->initial_connect_string; @@ -360,10 +376,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { } static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - c->backoff_delta = gpr_time_from_seconds( - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); + gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); } @@ -557,50 +571,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, "connected"); } -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(grpc_subchannel *c) { - c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31); - return c->random / (double)((uint32_t)1 << 31); -} - -/* Update backoff_delta and next_attempt in subchannel */ -static void update_reconnect_parameters(grpc_subchannel *c) { - size_t i; - int32_t backoff_delta_millis, jitter; - int32_t max_backoff_millis = - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - double jitter_range; - - if (c->args) { - for (i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff")) { - GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); - c->next_attempt = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN)); - return; - } - } - } - - backoff_delta_millis = - (int32_t)(gpr_time_to_millis(c->backoff_delta) * - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); - if (backoff_delta_millis > max_backoff_millis) { - backoff_delta_millis = max_backoff_millis; - } - c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - - jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; - jitter = - (int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range); - c->next_attempt = - gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); -} - static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); @@ -609,7 +579,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { iomgr_success = 0; } if (iomgr_success) { - update_reconnect_parameters(c); + c->next_attempt = + gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -641,17 +612,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } -static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - gpr_timespec current_deadline = - gpr_time_add(c->next_attempt, c->backoff_delta); - gpr_timespec min_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, - GPR_TIMESPAN)); - return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline - : min_deadline; -} - /* * grpc_subchannel_call implementation */ |