aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_channel/subchannel.c130
1 files changed, 80 insertions, 50 deletions
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 7364daf020..46c01127de 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -133,6 +133,8 @@ struct grpc_subchannel {
gpr_backoff backoff_state;
/** do we have an active alarm? */
bool have_alarm;
+ /** have we started the backoff loop */
+ bool backoff_begun;
/** our alarm */
grpc_timer alarm;
};
@@ -370,7 +372,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
return grpc_subchannel_index_register(exec_ctx, key, c);
}
-static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -386,12 +389,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
&c->connected);
}
-static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- c->next_attempt =
- gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
-}
-
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
grpc_error **error) {
grpc_connectivity_state state;
@@ -418,6 +415,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_subchannel *c = arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = false;
+ if (c->disconnected) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ if (c->disconnected) {
+ /* Don't try to connect if we're already disconnected */
+ return;
+ }
+
+ if (c->connecting) {
+ /* Already connecting: don't restart */
+ return;
+ }
+
+ if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) {
+ /* Already connected: don't restart */
+ return;
+ }
+
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ /* Nobody is interested in connecting: so don't just yet */
+ return;
+ }
+
+ c->connecting = true;
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!c->backoff_begun) {
+ c->backoff_begun = true;
+ c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);
+ continue_connect_locked(exec_ctx, c);
+ } else {
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = true;
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ }
+}
+
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
@@ -449,13 +513,9 @@ void grpc_subchannel_notify_on_state_change(
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
- if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, &w->closure)) {
- c->connecting = true;
- /* released by connection */
- GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
- start_connect(exec_ctx, c);
- }
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker,
+ state, &w->closure);
+ maybe_start_connecting_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
@@ -575,7 +635,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
Re-evaluate if we really need this. */
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
- c->connecting = false;
/* setup subchannel watching connected subchannel for changes; subchannel
ref for connecting is donated to the state watcher */
@@ -590,28 +649,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_subchannel *c = arg;
- gpr_mu_lock(&c->mu);
- c->have_alarm = false;
- if (c->disconnected) {
- error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
- } else {
- GRPC_ERROR_REF(error);
- }
- if (error == GRPC_ERROR_NONE) {
- gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->next_attempt =
- gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
- gpr_mu_unlock(&c->mu);
- } else {
- gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
- }
- GRPC_ERROR_UNREF(error);
-}
-
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_subchannel *c = arg;
@@ -619,35 +656,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
+ c->connecting = false;
if (c->connecting_result.transport != NULL) {
publish_transport_locked(exec_ctx, c);
} else if (c->disconnected) {
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- GPR_ASSERT(!c->have_alarm);
- c->have_alarm = true;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
- gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+
const char *errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
- if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
- 0) {
- gpr_log(GPR_INFO, "Retry immediately");
- } else {
- gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
- time_til_next.tv_sec, time_til_next.tv_nsec);
- }
- grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
grpc_error_free_string(errmsg);
+
+ maybe_start_connecting_locked(exec_ctx, c);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
grpc_channel_args_destroy(delete_channel_args);
}