aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.c')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c103
1 files changed, 85 insertions, 18 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 83e3b8f118..ce9abdad61 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -49,9 +49,9 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
@@ -183,6 +183,8 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
bool started_resolving;
+ /** is deadline checking enabled? */
+ bool deadline_checking_enabled;
/** client channel factory */
grpc_client_channel_factory *client_channel_factory;
@@ -236,14 +238,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error,
const char *reason) {
- if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
- state == GRPC_CHANNEL_SHUTDOWN) &&
- chand->lb_policy != NULL) {
- /* cancel picks with wait_for_ready=false */
- grpc_lb_policy_cancel_picks_locked(
- exec_ctx, chand->lb_policy,
- /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
- /* check= */ 0, GRPC_ERROR_REF(error));
+ /* TODO: Improve failure handling:
+ * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
+ * - Hand over pending picks from old policies during the switch that happens
+ * when resolver provides an update. */
+ if (chand->lb_policy != NULL) {
+ if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* cancel picks with wait_for_ready=false */
+ grpc_lb_policy_cancel_picks_locked(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
+ /* check= */ 0, GRPC_ERROR_REF(error));
+ } else if (state == GRPC_CHANNEL_SHUTDOWN) {
+ /* cancel all picks */
+ grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
+ /* mask= */ 0, /* check= */ 0,
+ GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -346,6 +357,33 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
+// Wrap a closure associated with \a lb_policy. The associated callback (\a
+// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
+// scheduling \a wrapped_closure.
+typedef struct wrapped_on_pick_closure_arg {
+ /* the closure instance using this struct as argument */
+ grpc_closure wrapper_closure;
+
+ /* the original closure. Usually a on_complete/notify cb for pick() and ping()
+ * calls against the internal RR instance, respectively. */
+ grpc_closure *wrapped_closure;
+
+ /* The policy instance related to the closure */
+ grpc_lb_policy *lb_policy;
+} wrapped_on_pick_closure_arg;
+
+// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
+static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ wrapped_on_pick_closure_arg *wc_arg = arg;
+ GPR_ASSERT(wc_arg != NULL);
+ GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+ GPR_ASSERT(wc_arg->lb_policy != NULL);
+ grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
+ gpr_free(wc_arg);
+}
+
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
@@ -676,6 +714,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
if (chand->resolver == NULL) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
}
+ chand->deadline_checking_enabled =
+ grpc_deadline_checking_enabled(args->channel_args);
return GRPC_ERROR_NONE;
}
@@ -864,12 +904,14 @@ static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
/* apply service-config level configuration to the call (now that we're
* certain it exists) */
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_timespec per_method_deadline;
if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
&per_method_deadline)) {
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
- if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ if (chand->deadline_checking_enabled &&
+ gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
calld->deadline = per_method_deadline;
grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
}
@@ -1031,11 +1073,29 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
- const bool result = grpc_lb_policy_pick_locked(
- exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
+
+ // Wrap the user-provided callback in order to hold a strong reference to
+ // the LB policy for the duration of the pick.
+ wrapped_on_pick_closure_arg *w_on_pick_arg =
+ gpr_zalloc(sizeof(*w_on_pick_arg));
+ grpc_closure_init(&w_on_pick_arg->wrapper_closure,
+ wrapped_on_pick_closure_cb, w_on_pick_arg,
+ grpc_schedule_on_exec_ctx);
+ w_on_pick_arg->wrapped_closure = on_ready;
+ GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
+ w_on_pick_arg->lb_policy = lb_policy;
+ const bool pick_done = grpc_lb_policy_pick_locked(
+ exec_ctx, lb_policy, &inputs, connected_subchannel, NULL,
+ &w_on_pick_arg->wrapper_closure);
+ if (pick_done) {
+ /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
+ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
+ "pick_subchannel_wrapping");
+ gpr_free(w_on_pick_arg);
+ }
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
- return result;
+ return pick_done;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
@@ -1227,8 +1287,10 @@ static void cc_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
- op);
+ if (chand->deadline_checking_enabled) {
+ grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
+ op);
+ }
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
@@ -1262,14 +1324,16 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
// Initialize data members.
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
calld->owning_call = args->call_stack;
calld->arena = args->arena;
- grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
+ if (chand->deadline_checking_enabled) {
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
+ }
return GRPC_ERROR_NONE;
}
@@ -1279,7 +1343,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
grpc_closure *then_schedule_closure) {
call_data *calld = elem->call_data;
- grpc_deadline_state_destroy(exec_ctx, elem);
+ channel_data *chand = elem->channel_data;
+ if (chand->deadline_checking_enabled) {
+ grpc_deadline_state_destroy(exec_ctx, elem);
+ }
grpc_slice_unref_internal(exec_ctx, calld->path);
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);