aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--doc/environment_variables.md2
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c365
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h2
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.c1
4 files changed, 260 insertions, 110 deletions
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index 0a289ac94d..339b705252 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -42,6 +42,8 @@ some configuration as environment variables that can be set.
- bdp_estimator - traces behavior of bdp estimation logic
- call_error - traces the possible errors contributing to final call status
- channel - traces operations on the C core channel stack
+ - client_channel - traces client channel activity, including resolver
+ and load balancing policy interaction
- combiner - traces combiner lock state
- compression - traces compression operations
- connectivity_state - traces connectivity state changes to channels
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index de516ab4c9..7add432589 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -52,6 +52,8 @@
/* Client channel implementation */
+grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false);
+
/*************************************************************************
* METHOD-CONFIG TABLE
*/
@@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_REF(error));
}
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
+ grpc_connectivity_state_name(state));
+ }
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
}
@@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
+ w->lb_policy, grpc_connectivity_state_name(w->state));
+ }
if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
@@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
}
}
-
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
gpr_free(w);
}
@@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
-
w->chand = chand;
GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
grpc_combiner_scheduler(chand->combiner));
@@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
&w->on_changed);
}
+static void start_resolving_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
+ }
+ GPR_ASSERT(!chand->started_resolving);
+ chand->started_resolving = true;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+}
+
typedef struct {
char *server_name;
grpc_server_retry_throttle_data *retry_throttle_data;
@@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
+ grpc_error_string(error));
+ }
// Extract the following fields from the resolver result, if non-NULL.
char *lb_policy_name = NULL;
+ bool lb_policy_name_changed = false;
grpc_lb_policy *new_lb_policy = NULL;
char *service_config_json = NULL;
grpc_server_retry_throttle_data *retry_throttle_data = NULL;
@@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
- const bool lb_policy_type_changed =
+ lb_policy_name_changed =
chand->info_lb_policy_name == NULL ||
strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
- if (chand->lb_policy != NULL && !lb_policy_type_changed) {
+ if (chand->lb_policy != NULL && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
} else {
@@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
+ "service_config=\"%s\"",
+ chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "",
+ service_config_json);
+ }
// Now swap out fields in chand. Note that the new values may still
// be NULL if (e.g.) the resolver failed to return results or the
// results did not contain the necessary data.
@@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
chand->resolver == NULL) {
if (chand->lb_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
+ chand->lb_policy);
+ }
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
chand->interested_parties);
@@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
+ }
if (chand->resolver != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
+ }
grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_error *state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (new_lb_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
+ }
GRPC_ERROR_UNREF(state_error);
state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
&state_error);
@@ -772,7 +817,9 @@ typedef struct client_channel_call_data {
gpr_atm subchannel_call_or_error;
gpr_arena *arena;
- bool pick_pending;
+ grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
+ grpc_closure lb_pick_closure;
+
grpc_connected_subchannel *connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity *pollent;
@@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked(
}
static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld,
+ grpc_call_element *elem,
grpc_error *error) {
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
+ elem->channel_data, calld, calld->waiting_for_pick_batches_count,
+ grpc_error_string(error));
+ }
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
@@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
}
static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld) {
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
if (calld->waiting_for_pick_batches_count == 0) return;
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
- waiting_for_pick_batches_fail_locked(exec_ctx, calld,
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem,
GRPC_ERROR_REF(coe.error));
return;
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
+ " pending batches to subchannel_call=%p",
+ elem->channel_data, calld, calld->waiting_for_pick_batches_count,
+ coe.subchannel_call);
+ }
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
calld->waiting_for_pick_batches[i]);
@@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
+ chand, calld);
+ }
if (chand->retry_throttle_data != NULL) {
calld->retry_throttle_data =
grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
@@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
}
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld, grpc_error *error) {
+ grpc_call_element *elem,
+ grpc_error *error) {
+ call_data *calld = elem->call_data;
grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
@@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
+ elem->channel_data, calld, subchannel_call,
+ grpc_error_string(new_error));
+ }
GPR_ASSERT(set_call_or_error(
calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error);
} else {
- waiting_for_pick_batches_resume_locked(exec_ctx, calld);
+ waiting_for_pick_batches_resume_locked(exec_ctx, elem);
}
GRPC_ERROR_UNREF(error);
}
@@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT(calld->pick_pending);
- calld->pick_pending = false;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
call_or_error coe = get_call_or_error(calld);
@@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
"Call dropped by load balancing policy")
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to create subchannel", &error, 1);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: failed to create subchannel: error=%s", chand,
+ calld, grpc_error_string(failure));
+ }
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure);
} else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
grpc_error *child_errors[] = {error, coe.error};
@@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_DEADLINE_EXCEEDED);
}
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: cancelled before subchannel became ready: %s",
+ chand, calld, grpc_error_string(cancellation_error));
+ }
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error);
} else {
/* Create call on subchannel. */
- create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
GRPC_ERROR_UNREF(error);
@@ -983,41 +1063,77 @@ typedef struct {
grpc_closure closure;
} pick_after_resolver_result_args;
-static void continue_picking_after_resolver_result_locked(
- grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
pick_after_resolver_result_args *args = arg;
if (args->cancelled) {
/* cancelled, do nothing */
- } else if (error != GRPC_ERROR_NONE) {
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "call cancelled before resolver result");
+ }
} else {
- if (pick_subchannel_locked(exec_ctx, args->elem)) {
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
+ channel_data *chand = args->elem->channel_data;
+ call_data *calld = args->elem->call_data;
+ if (error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
+ chand, calld);
+ }
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ } else {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
+ chand, calld);
+ }
+ if (pick_subchannel_locked(exec_ctx, args->elem)) {
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
+ }
}
}
gpr_free(args);
}
-static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_error *error) {
+static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: deferring pick pending resolver result", chand,
+ calld);
+ }
+ pick_after_resolver_result_args *args =
+ (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
+ args->elem = elem;
+ GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
+ args, grpc_combiner_scheduler(chand->combiner));
+ grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
+ &args->closure, GRPC_ERROR_NONE);
+}
+
+static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- if (chand->lb_policy != NULL) {
- grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
- &calld->connected_subchannel,
- GRPC_ERROR_REF(error));
- }
// If we don't yet have a resolver result, then a closure for
- // continue_picking_after_resolver_result_locked() will have been added to
+ // pick_after_resolver_result_done_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
- // cancelled, so that when continue_picking_after_resolver_result_locked()
+ // cancelled, so that when pick_after_resolver_result_done_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
closure != NULL; closure = closure->next_data.next) {
pick_after_resolver_result_args *args = closure->cb_arg;
if (!args->cancelled && args->elem == elem) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: "
+ "cancelling pick waiting for resolver result",
+ chand, calld);
+ }
args->cancelled = true;
subchannel_ready_locked(exec_ctx, elem,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GRPC_ERROR_UNREF(error);
}
-// State for pick callback that holds a reference to the LB policy
-// from which the pick was requested.
-typedef struct {
- grpc_lb_policy *lb_policy;
- grpc_call_element *elem;
- grpc_closure closure;
-} pick_callback_args;
-
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy after invoking subchannel_ready_locked().
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- pick_callback_args *args = arg;
- GPR_ASSERT(args != NULL);
- GPR_ASSERT(args->lb_policy != NULL);
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
- GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel");
- gpr_free(args);
+ grpc_call_element *elem = arg;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
+ chand, calld);
+ }
+ GPR_ASSERT(calld->lb_policy != NULL);
+ GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ calld->lb_policy = NULL;
+ subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
@@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
const grpc_lb_policy_pick_args *inputs) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args));
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
+ chand, calld, chand->lb_policy);
+ }
+ // Keep a ref to the LB policy in calld while the pick is pending.
GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
- pick_args->lb_policy = chand->lb_policy;
- pick_args->elem = elem;
- GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args,
+ calld->lb_policy = chand->lb_policy;
+ GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
grpc_combiner_scheduler(chand->combiner));
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
- calld->subchannel_call_context, NULL, &pick_args->closure);
+ calld->subchannel_call_context, NULL, &calld->lb_pick_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel");
- gpr_free(pick_args);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
+ chand, calld);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ calld->lb_policy = NULL;
}
return pick_done;
}
+static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ GPR_ASSERT(calld->lb_policy != NULL);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ chand, calld, calld->lb_policy);
+ }
+ grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
+ &calld->connected_subchannel, error);
+}
+
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
@@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
} else if (chand->resolver != NULL) {
if (!chand->started_resolving) {
- chand->started_resolving = true;
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ start_resolving_locked(exec_ctx, chand);
}
- pick_after_resolver_result_args *args =
- (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
- args->elem = elem;
- GRPC_CLOSURE_INIT(&args->closure,
- continue_picking_after_resolver_result_locked, args,
- grpc_combiner_scheduler(chand->combiner));
- grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
- &args->closure, GRPC_ERROR_NONE);
+ pick_after_resolver_result_start_locked(exec_ctx, elem);
} else {
subchannel_ready_locked(
exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
@@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
- grpc_transport_stream_op_batch *op = arg;
- grpc_call_element *elem = op->handler_private.extra_arg;
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
+ }
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(coe.error));
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
goto done;
}
if (coe.subchannel_call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
+ }
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
goto done;
}
// Add to waiting-for-pick list. If we succeed in getting a
// subchannel call below, we'll handle this batch (along with any
// other waiting batches) in waiting_for_pick_batches_resume_locked().
- waiting_for_pick_batches_add_locked(calld, op);
- /* if this is a cancellation, then we can raise our cancelled flag */
- if (op->cancel_stream) {
- grpc_error *error = op->payload->cancel_stream.cancel_error;
+ waiting_for_pick_batches_add_locked(calld, batch);
+ // If this is a cancellation, cancel the pending pick (if any) and
+ // fail any pending batches.
+ if (batch->cancel_stream) {
+ grpc_error *error = batch->payload->cancel_stream.cancel_error;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
+ calld, grpc_error_string(error));
+ }
/* Stash a copy of cancel_error in our call data, so that we can use
it for subsequent operations. This ensures that if the call is
- cancelled before any ops are passed down (e.g., if the deadline
+ cancelled before any batches are passed down (e.g., if the deadline
is in the past when the call starts), we can return the right
- error to the caller when the first op does get passed down. */
+ error to the caller when the first batch does get passed down. */
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
- if (calld->pick_pending) {
- cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ if (calld->lb_policy != NULL) {
+ pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ } else {
+ pick_after_resolver_result_cancel_locked(exec_ctx, elem,
+ GRPC_ERROR_REF(error));
}
- waiting_for_pick_batches_fail_locked(exec_ctx, calld,
- GRPC_ERROR_REF(error));
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
goto done;
}
/* if we don't have a subchannel, try to get one */
- if (!calld->pick_pending && calld->connected_subchannel == NULL &&
- op->send_initial_metadata) {
- calld->initial_metadata_payload = op->payload;
- calld->pick_pending = true;
+ if (batch->send_initial_metadata) {
+ GPR_ASSERT(calld->connected_subchannel == NULL);
+ calld->initial_metadata_payload = batch->payload;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
if (pick_subchannel_locked(exec_ctx, elem)) {
// Pick was returned synchronously.
- calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
set_call_or_error(calld,
(call_or_error){.error = GRPC_ERROR_REF(error)});
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, error);
} else {
// Create subchannel call.
- create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE);
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
@@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
If it has, we proceed on the fast path. */
static void cc_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace) ||
+ GRPC_TRACER_ON(grpc_trace_channel)) {
+ grpc_call_log_op(GPR_INFO, elem, batch);
+ }
if (chand->deadline_checking_enabled) {
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
- op);
+ batch);
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
- if (op->recv_trailing_metadata) {
- GPR_ASSERT(op->on_complete != NULL);
- calld->original_on_complete = op->on_complete;
+ if (batch->recv_trailing_metadata) {
+ GPR_ASSERT(batch->on_complete != NULL);
+ calld->original_on_complete = batch->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
grpc_schedule_on_exec_ctx);
- op->on_complete = &calld->on_complete;
+ batch->on_complete = &calld->on_complete;
}
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
if (coe.error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
+ }
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(coe.error));
- GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
- /* early out */
- return;
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
+ goto done;
}
if (coe.subchannel_call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
- GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
- /* early out */
- return;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
+ }
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
+ goto done;
}
/* we failed; lock and figure out what to do */
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
+ }
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
- op->handler_private.extra_arg = elem;
+ batch->handler_private.extra_arg = elem;
GRPC_CLOSURE_SCHED(
- exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure,
- start_transport_stream_op_batch_locked, op,
+ exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ start_transport_stream_op_batch_locked, batch,
grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
+done:
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
"client_channel_destroy_call");
}
- GPR_ASSERT(!calld->pick_pending);
+ GPR_ASSERT(calld->lb_policy == NULL);
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->connected_subchannel != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
@@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- chand->started_resolving = true;
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ start_resolving_locked(exec_ctx, chand);
}
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 63f7c29940..c99f0092e9 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -23,6 +23,8 @@
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
+extern grpc_tracer_flag grpc_client_channel_trace;
+
// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c
index 60e77d6268..6f133a648b 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.c
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.c
@@ -78,6 +78,7 @@ void grpc_client_channel_init(void) {
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
(void *)&grpc_client_channel_filter);
grpc_http_connect_register_handshaker_factory();
+ grpc_register_tracer("client_channel", &grpc_client_channel_trace);
#ifndef NDEBUG
grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount);
#endif