aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_config/subchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_config/subchannel.c')
-rw-r--r--src/core/ext/client_config/subchannel.c99
1 files changed, 60 insertions, 39 deletions
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index bd45d3825c..d089cd4399 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
-#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -147,7 +147,7 @@ struct grpc_subchannel_call {
(((grpc_subchannel_call *)(callstack)) - 1)
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
- bool iomgr_success);
+ grpc_error *error);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define REF_REASON reason
@@ -177,7 +177,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
*/
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
@@ -200,7 +200,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
*/
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
@@ -290,8 +290,8 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c),
+ GRPC_ERROR_NONE, NULL);
}
}
@@ -320,7 +320,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->filters = NULL;
}
c->addr = gpr_malloc(args->addr_len);
- memcpy(c->addr, args->addr, args->addr_len);
+ if (args->addr_len) memcpy(c->addr, args->addr, args->addr_len);
c->pollset_set = grpc_pollset_set_create();
c->addr_len = args->addr_len;
grpc_set_initial_connect_string(&c->addr, &c->addr_len,
@@ -382,7 +382,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.initial_connect_string = c->initial_connect_string;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_CONNECTING, "state_change");
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "state_change");
grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
&c->connected);
}
@@ -393,16 +394,17 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
continue_connect(exec_ctx, c);
}
-grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
+grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
+ grpc_error **error) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_check(&c->state_tracker);
+ state = grpc_connectivity_state_check(&c->state_tracker, error);
gpr_mu_unlock(&c->mu);
return state;
}
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
@@ -415,7 +417,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, success);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
void grpc_subchannel_notify_on_state_change(
@@ -469,7 +471,7 @@ void grpc_connected_subchannel_process_transport_op(
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- bool iomgr_success) {
+ grpc_error *error) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -477,20 +479,19 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
gpr_mu_lock(mu);
/* if we failed just leave this closure */
- if (iomgr_success) {
- if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* any errors on a subchannel ==> we're done, create a new one */
- sw->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
- }
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- sw->connectivity_state, "reflect_child");
- if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
- &sw->connectivity_state, &sw->closure);
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- sw = NULL;
- }
+ if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* any errors on a subchannel ==> we're done, create a new one */
+ sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ }
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ sw->connectivity_state, GRPC_ERROR_REF(error),
+ "reflect_child");
+ if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
+ &sw->connectivity_state, &sw->closure);
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ sw = NULL;
}
gpr_mu_unlock(mu);
@@ -592,17 +593,20 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
/* signal completion */
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
- "connected");
+ GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
- iomgr_success = 0;
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
}
- if (iomgr_success) {
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->next_attempt =
gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
continue_connect(exec_ctx, c);
@@ -611,11 +615,13 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
gpr_mu_unlock(&c->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
+ GRPC_ERROR_UNREF(error);
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
grpc_subchannel *c = arg;
+ grpc_channel_args *delete_channel_args = c->connecting_result.channel_args;
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
@@ -627,13 +633,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connect_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ "connect_failed");
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ const char *errmsg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ grpc_error_free_string(errmsg);
}
gpr_mu_unlock(&c->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ grpc_channel_args_destroy(delete_channel_args);
}
/*
@@ -641,11 +660,11 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
*/
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
- bool success) {
+ grpc_error *error) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_connected_subchannel *connection = c->connection;
- grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c);
+ grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
@@ -671,9 +690,11 @@ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call,
grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
+ GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
@@ -683,7 +704,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_pollset *pollset) {
+ grpc_polling_entity *pollent) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
@@ -692,7 +713,7 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call(
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
NULL, NULL, callstk);
- grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
+ grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
return call;
}