aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/subchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r--src/core/client_config/subchannel.c114
1 files changed, 27 insertions, 87 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index ec9e47a6dd..8f150a8d81 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -45,7 +45,9 @@
#include "src/core/client_config/subchannel_index.h"
#include "src/core/iomgr/timer.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/backoff.h"
#include "src/core/surface/channel.h"
+#include "src/core/surface/channel_init.h"
#include "src/core/transport/connectivity_state.h"
#define INTERNAL_REF_BITS 16
@@ -127,8 +129,8 @@ struct grpc_subchannel {
/** next connect attempt time */
gpr_timespec next_attempt;
- /** amount to backoff each failure */
- gpr_timespec backoff_delta;
+ /** backoff state */
+ gpr_backoff backoff_state;
/** do we have an active alarm? */
int have_alarm;
/** our alarm */
@@ -146,7 +148,6 @@ struct grpc_subchannel_call {
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call *)(callstack)) - 1)
-static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
bool iomgr_success);
@@ -337,6 +338,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
+ gpr_backoff_init(&c->backoff_state,
+ GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
+ GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ if (c->args) {
+ for (size_t i = 0; i < c->args->num_args; i++) {
+ if (0 == strcmp(c->args->args[i].key,
+ "grpc.testing.fixed_reconnect_backoff")) {
+ GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
+ gpr_backoff_init(&c->backoff_state, 1.0, 0.0,
+ c->args->args[i].value.integer,
+ c->args->args[i].value.integer);
+ }
+ }
+ }
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(exec_ctx, key, c);
@@ -348,7 +365,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.interested_parties = c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
- args.deadline = compute_connect_deadline(c);
+ args.deadline = c->next_attempt;
args.channel_args = c->args;
args.initial_connect_string = c->initial_connect_string;
@@ -359,10 +376,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
}
static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- c->backoff_delta = gpr_time_from_seconds(
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
c->next_attempt =
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
+ gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
continue_connect(exec_ctx, c);
}
@@ -507,32 +522,15 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c) {
- size_t channel_stack_size;
grpc_connected_subchannel *con;
grpc_channel_stack *stk;
- size_t num_filters;
- const grpc_channel_filter **filters;
state_watcher *sw_subchannel;
- /* build final filter list */
- num_filters = c->num_filters + c->connecting_result.num_filters + 1;
- filters = gpr_malloc(sizeof(*filters) * num_filters);
- if (c->num_filters > 0) {
- memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
- }
- memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
- sizeof(*filters) * c->connecting_result.num_filters);
- filters[num_filters - 1] = &grpc_connected_channel_filter;
-
/* construct channel stack */
- channel_stack_size = grpc_channel_stack_size(filters, num_filters);
- con = gpr_malloc(channel_stack_size);
+ con = grpc_channel_init_create_stack(
+ exec_ctx, GRPC_CLIENT_SUBCHANNEL, 0, c->connecting_result.channel_args, 1,
+ connection_destroy, NULL, c->connecting_result.transport);
stk = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters,
- num_filters, c->connecting_result.channel_args,
- "CONNECTED_SUBCHANNEL", stk);
- grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
- gpr_free((void *)c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
@@ -543,9 +541,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
sw_subchannel);
if (c->disconnected) {
- gpr_mu_unlock(&c->mu);
gpr_free(sw_subchannel);
- gpr_free((void *)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
gpr_free(con);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
@@ -573,52 +569,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
/* signal completion */
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
"connected");
-
- gpr_free((void *)filters);
-}
-
-/* Generate a random number between 0 and 1. */
-static double generate_uniform_random_number(grpc_subchannel *c) {
- c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31);
- return c->random / (double)((uint32_t)1 << 31);
-}
-
-/* Update backoff_delta and next_attempt in subchannel */
-static void update_reconnect_parameters(grpc_subchannel *c) {
- size_t i;
- int32_t backoff_delta_millis, jitter;
- int32_t max_backoff_millis =
- GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
- double jitter_range;
-
- if (c->args) {
- for (i = 0; i < c->args->num_args; i++) {
- if (0 == strcmp(c->args->args[i].key,
- "grpc.testing.fixed_reconnect_backoff")) {
- GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
- c->next_attempt = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN));
- return;
- }
- }
- }
-
- backoff_delta_millis =
- (int32_t)(gpr_time_to_millis(c->backoff_delta) *
- GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
- if (backoff_delta_millis > max_backoff_millis) {
- backoff_delta_millis = max_backoff_millis;
- }
- c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
- c->next_attempt =
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
-
- jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
- jitter =
- (int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
- c->next_attempt =
- gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
}
static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
@@ -629,7 +579,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
iomgr_success = 0;
}
if (iomgr_success) {
- update_reconnect_parameters(c);
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
continue_connect(exec_ctx, c);
gpr_mu_unlock(&c->mu);
} else {
@@ -661,17 +612,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
-static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
- gpr_timespec current_deadline =
- gpr_time_add(c->next_attempt, c->backoff_delta);
- gpr_timespec min_deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
- GPR_TIMESPAN));
- return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
- : min_deadline;
-}
-
/*
* grpc_subchannel_call implementation
*/