From 536fc539cc36be41a1db2d16458a00d2e5df1c55 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 11 Mar 2016 12:36:56 -0800 Subject: Integrate backoff library with subchannel --- src/core/client_config/subchannel.c | 102 +++++++++++------------------------- 1 file changed, 31 insertions(+), 71 deletions(-) (limited to 'src') diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index d91dd116b8..8bd61a7e22 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -45,6 +45,7 @@ #include "src/core/client_config/subchannel_index.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" +#include "src/core/support/backoff.h" #include "src/core/surface/channel.h" #include "src/core/transport/connectivity_state.h" @@ -127,8 +128,8 @@ struct grpc_subchannel { /** next connect attempt time */ gpr_timespec next_attempt; - /** amount to backoff each failure */ - gpr_timespec backoff_delta; + /** backoff state */ + gpr_backoff backoff_state; /** do we have an active alarm? */ int have_alarm; /** our alarm */ @@ -146,7 +147,6 @@ struct grpc_subchannel_call { #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) -static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, bool iomgr_success); @@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -void grpc_connected_subchannel_ref(grpc_connected_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } @@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); @@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c return c; } -grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_weak_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -337,6 +337,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + gpr_backoff_init(&c->backoff_state, + GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_SUBCHANNEL_RECONNECT_JITTER, + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + if (c->args) { + for (size_t i = 0; i < c->args->num_args; i++) { + if (0 == strcmp(c->args->args[i].key, + "grpc.testing.fixed_reconnect_backoff")) { + GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); + gpr_backoff_init(&c->backoff_state, 1.0, 0.0, + c->args->args[i].value.integer, + c->args->args[i].value.integer); + } + } + } gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -348,7 +364,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline(c); + args.deadline = c->next_attempt; args.channel_args = c->args; args.initial_connect_string = c->initial_connect_string; @@ -359,10 +375,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { } static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - c->backoff_delta = gpr_time_from_seconds( - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); + gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); } @@ -579,50 +593,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free((void *)filters); } -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(grpc_subchannel *c) { - c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31); - return c->random / (double)((uint32_t)1 << 31); -} - -/* Update backoff_delta and next_attempt in subchannel */ -static void update_reconnect_parameters(grpc_subchannel *c) { - size_t i; - int32_t backoff_delta_millis, jitter; - int32_t max_backoff_millis = - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - double jitter_range; - - if (c->args) { - for (i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff")) { - GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); - c->next_attempt = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN)); - return; - } - } - } - - backoff_delta_millis = - (int32_t)(gpr_time_to_millis(c->backoff_delta) * - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); - if (backoff_delta_millis > max_backoff_millis) { - backoff_delta_millis = max_backoff_millis; - } - c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - - jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; - jitter = - (int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range); - c->next_attempt = - gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); -} - static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); @@ -631,7 +601,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { iomgr_success = 0; } if (iomgr_success) { - update_reconnect_parameters(c); + c->next_attempt = + gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -661,17 +632,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, } } -static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - gpr_timespec current_deadline = - gpr_time_add(c->next_attempt, c->backoff_delta); - gpr_timespec min_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, - GPR_TIMESPAN)); - return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline - : min_deadline; -} - /* * grpc_subchannel_call implementation */ @@ -686,8 +646,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } -void grpc_subchannel_call_ref(grpc_subchannel_call *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_call_ref( + grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } -- cgit v1.2.3