aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_channel/subchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_channel/subchannel.c')
-rw-r--r--src/core/ext/client_channel/subchannel.c185
1 files changed, 111 insertions, 74 deletions
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index ab6aede23a..f294e69392 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -100,7 +100,7 @@ struct grpc_subchannel {
grpc_subchannel_key *key;
/** initial string to send to peer */
- gpr_slice initial_connect_string;
+ grpc_slice initial_connect_string;
/** set during connection */
grpc_connect_out_args connecting_result;
@@ -119,9 +119,9 @@ struct grpc_subchannel {
gpr_mu mu;
/** have we seen a disconnection? */
- int disconnected;
+ bool disconnected;
/** are we connecting */
- int connecting;
+ bool connecting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
@@ -132,7 +132,9 @@ struct grpc_subchannel {
/** backoff state */
gpr_backoff backoff_state;
/** do we have an active alarm? */
- int have_alarm;
+ bool have_alarm;
+ /** have we started the backoff loop */
+ bool backoff_begun;
/** our alarm */
grpc_timer alarm;
};
@@ -183,9 +185,10 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c);
}
-void grpc_connected_subchannel_ref(
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+ return c;
}
void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
@@ -205,7 +208,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
- gpr_slice_unref(c->initial_connect_string);
+ grpc_slice_unref(c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
grpc_pollset_set_destroy(c->pollset_set);
@@ -263,7 +266,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_subchannel_index_unregister(exec_ctx, c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
- c->disconnected = 1;
+ c->disconnected = true;
grpc_connector_shutdown(exec_ctx, c->connector);
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
@@ -333,16 +336,18 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
int initial_backoff_ms =
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ int min_backoff_ms = GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS * 1000;
bool fixed_reconnect_backoff = false;
if (c->args) {
for (size_t i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
- "grpc.testing.fixed_reconnect_backoff")) {
+ "grpc.testing.fixed_reconnect_backoff_ms")) {
GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
fixed_reconnect_backoff = true;
- initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i],
- (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
+ initial_backoff_ms = min_backoff_ms = max_backoff_ms =
+ grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
@@ -359,17 +364,18 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
}
gpr_backoff_init(
- &c->backoff_state,
+ &c->backoff_state, initial_backoff_ms,
fixed_reconnect_backoff ? 1.0
: GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
- initial_backoff_ms, max_backoff_ms);
+ min_backoff_ms, max_backoff_ms);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(exec_ctx, key, c);
}
-static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -385,12 +391,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
&c->connected);
}
-static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- c->next_attempt =
- gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
-}
-
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
grpc_error **error) {
grpc_connectivity_state state;
@@ -417,6 +417,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_subchannel *c = arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = false;
+ if (c->disconnected) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ if (c->disconnected) {
+ /* Don't try to connect if we're already disconnected */
+ return;
+ }
+
+ if (c->connecting) {
+ /* Already connecting: don't restart */
+ return;
+ }
+
+ if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) {
+ /* Already connected: don't restart */
+ return;
+ }
+
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ /* Nobody is interested in connecting: so don't just yet */
+ return;
+ }
+
+ c->connecting = true;
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!c->backoff_begun) {
+ c->backoff_begun = true;
+ c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);
+ continue_connect_locked(exec_ctx, c);
+ } else {
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = true;
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ }
+}
+
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
@@ -448,13 +515,9 @@ void grpc_subchannel_notify_on_state_change(
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
- if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, &w->closure)) {
- c->connecting = 1;
- /* released by connection */
- GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
- start_connect(exec_ctx, c);
- }
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker,
+ state, &w->closure);
+ maybe_start_connecting_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
@@ -541,14 +604,20 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder_set_transport(builder,
c->connecting_result.transport);
- if (grpc_channel_init_create_stack(exec_ctx, builder,
- GRPC_CLIENT_SUBCHANNEL)) {
- con = grpc_channel_stack_builder_finish(exec_ctx, builder, 0, 1,
- connection_destroy, NULL);
- } else {
+ if (!grpc_channel_init_create_stack(exec_ctx, builder,
+ GRPC_CLIENT_SUBCHANNEL)) {
grpc_channel_stack_builder_destroy(builder);
abort(); /* TODO(ctiller): what to do here (previously we just crashed) */
}
+ grpc_error *error = grpc_channel_stack_builder_finish(
+ exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con);
+ if (error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(error);
+ abort(); /* TODO(ctiller): what to do here? */
+ }
stk = CHANNEL_STACK_FROM_CONNECTION(con);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@@ -574,12 +643,9 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
Re-evaluate if we really need this. */
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
- c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel
- ref
- for connecting is donated
- to the state watcher */
+ ref for connecting is donated to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
@@ -591,28 +657,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_subchannel *c = arg;
- gpr_mu_lock(&c->mu);
- c->have_alarm = 0;
- if (c->disconnected) {
- error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
- } else {
- GRPC_ERROR_REF(error);
- }
- if (error == GRPC_ERROR_NONE) {
- gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->next_attempt =
- gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
- gpr_mu_unlock(&c->mu);
- } else {
- gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
- }
- GRPC_ERROR_UNREF(error);
-}
-
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_subchannel *c = arg;
@@ -620,35 +664,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
+ c->connecting = false;
if (c->connecting_result.transport != NULL) {
publish_transport_locked(exec_ctx, c);
} else if (c->disconnected) {
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- GPR_ASSERT(!c->have_alarm);
- c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
- gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+
const char *errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
- if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
- 0) {
- gpr_log(GPR_INFO, "Retry immediately");
- } else {
- gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
- time_til_next.tv_sec, time_til_next.tv_nsec);
- }
- grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
grpc_error_free_string(errmsg);
+
+ maybe_start_connecting_locked(exec_ctx, c);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
grpc_channel_args_destroy(delete_channel_args);
}
@@ -701,15 +738,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
- grpc_subchannel_call **call) {
+ grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time,
+ gpr_timespec deadline, grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, path, deadline, callstk);
+ NULL, NULL, path, start_time, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);