aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar David G. Quintas <dgq@google.com>2017-04-15 10:28:38 -0700
committerGravatar GitHub <noreply@github.com>2017-04-15 10:28:38 -0700
commita8b8aeac2067bf5efc0c261537578878285bf376 (patch)
tree4eac2633781fe86543207d1aa40bd1ba6c549980 /src/core/ext
parent644e05e79ba04fb0f424cbb8dffce6c3cd3913ec (diff)
parent3725128dc659829b69bb4eced8fec3f0621da890 (diff)
Merge pull request #10645 from dgquintas/client_channel_high_freq_resolver_updates
Keep LB policy alive during high freq of resolver updates
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c76
1 files changed, 65 insertions, 11 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 16be2c70e9..ce9abdad61 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -238,14 +238,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error,
const char *reason) {
- if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
- state == GRPC_CHANNEL_SHUTDOWN) &&
- chand->lb_policy != NULL) {
- /* cancel picks with wait_for_ready=false */
- grpc_lb_policy_cancel_picks_locked(
- exec_ctx, chand->lb_policy,
- /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
- /* check= */ 0, GRPC_ERROR_REF(error));
+ /* TODO: Improve failure handling:
+ * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
+ * - Hand over pending picks from old policies during the switch that happens
+ * when resolver provides an update. */
+ if (chand->lb_policy != NULL) {
+ if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* cancel picks with wait_for_ready=false */
+ grpc_lb_policy_cancel_picks_locked(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
+ /* check= */ 0, GRPC_ERROR_REF(error));
+ } else if (state == GRPC_CHANNEL_SHUTDOWN) {
+ /* cancel all picks */
+ grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
+ /* mask= */ 0, /* check= */ 0,
+ GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -348,6 +357,33 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
+// Wrap a closure associated with \a lb_policy. The associated callback (\a
+// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
+// scheduling \a wrapped_closure.
+typedef struct wrapped_on_pick_closure_arg {
+ /* the closure instance using this struct as argument */
+ grpc_closure wrapper_closure;
+
+ /* the original closure. Usually a on_complete/notify cb for pick() and ping()
+ * calls against the internal RR instance, respectively. */
+ grpc_closure *wrapped_closure;
+
+ /* The policy instance related to the closure */
+ grpc_lb_policy *lb_policy;
+} wrapped_on_pick_closure_arg;
+
+// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
+static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ wrapped_on_pick_closure_arg *wc_arg = arg;
+ GPR_ASSERT(wc_arg != NULL);
+ GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+ GPR_ASSERT(wc_arg->lb_policy != NULL);
+ grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
+ gpr_free(wc_arg);
+}
+
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
@@ -1037,11 +1073,29 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
- const bool result = grpc_lb_policy_pick_locked(
- exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
+
+ // Wrap the user-provided callback in order to hold a strong reference to
+ // the LB policy for the duration of the pick.
+ wrapped_on_pick_closure_arg *w_on_pick_arg =
+ gpr_zalloc(sizeof(*w_on_pick_arg));
+ grpc_closure_init(&w_on_pick_arg->wrapper_closure,
+ wrapped_on_pick_closure_cb, w_on_pick_arg,
+ grpc_schedule_on_exec_ctx);
+ w_on_pick_arg->wrapped_closure = on_ready;
+ GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
+ w_on_pick_arg->lb_policy = lb_policy;
+ const bool pick_done = grpc_lb_policy_pick_locked(
+ exec_ctx, lb_policy, &inputs, connected_subchannel, NULL,
+ &w_on_pick_arg->wrapper_closure);
+ if (pick_done) {
+ /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
+ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
+ "pick_subchannel_wrapping");
+ gpr_free(w_on_pick_arg);
+ }
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
- return result;
+ return pick_done;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;