diff options
author | David G. Quintas <dgq@google.com> | 2017-04-15 10:28:38 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-15 10:28:38 -0700 |
commit | a8b8aeac2067bf5efc0c261537578878285bf376 (patch) | |
tree | 4eac2633781fe86543207d1aa40bd1ba6c549980 /src/core/ext | |
parent | 644e05e79ba04fb0f424cbb8dffce6c3cd3913ec (diff) | |
parent | 3725128dc659829b69bb4eced8fec3f0621da890 (diff) |
Merge pull request #10645 from dgquintas/client_channel_high_freq_resolver_updates
Keep LB policy alive during high freq of resolver updates
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.c | 76 |
1 files changed, 65 insertions, 11 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 16be2c70e9..ce9abdad61 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -238,14 +238,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, grpc_connectivity_state state, grpc_error *error, const char *reason) { - if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE || - state == GRPC_CHANNEL_SHUTDOWN) && - chand->lb_policy != NULL) { - /* cancel picks with wait_for_ready=false */ - grpc_lb_policy_cancel_picks_locked( - exec_ctx, chand->lb_policy, - /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, - /* check= */ 0, GRPC_ERROR_REF(error)); + /* TODO: Improve failure handling: + * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. + * - Hand over pending picks from old policies during the switch that happens + * when resolver provides an update. */ + if (chand->lb_policy != NULL) { + if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + /* cancel picks with wait_for_ready=false */ + grpc_lb_policy_cancel_picks_locked( + exec_ctx, chand->lb_policy, + /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, + /* check= */ 0, GRPC_ERROR_REF(error)); + } else if (state == GRPC_CHANNEL_SHUTDOWN) { + /* cancel all picks */ + grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy, + /* mask= */ 0, /* check= */ 0, + GRPC_ERROR_REF(error)); + } } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); @@ -348,6 +357,33 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { } } +// Wrap a closure associated with \a lb_policy. The associated callback (\a +// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after +// scheduling \a wrapped_closure. +typedef struct wrapped_on_pick_closure_arg { + /* the closure instance using this struct as argument */ + grpc_closure wrapper_closure; + + /* the original closure. Usually a on_complete/notify cb for pick() and ping() + * calls against the internal RR instance, respectively. */ + grpc_closure *wrapped_closure; + + /* The policy instance related to the closure */ + grpc_lb_policy *lb_policy; +} wrapped_on_pick_closure_arg; + +// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg. +static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + wrapped_on_pick_closure_arg *wc_arg = arg; + GPR_ASSERT(wc_arg != NULL); + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + GPR_ASSERT(wc_arg->lb_policy != NULL); + grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping"); + gpr_free(wc_arg); +} + static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; @@ -1037,11 +1073,29 @@ static bool pick_subchannel_locked( const grpc_lb_policy_pick_args inputs = { initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; - const bool result = grpc_lb_policy_pick_locked( - exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); + + // Wrap the user-provided callback in order to hold a strong reference to + // the LB policy for the duration of the pick. + wrapped_on_pick_closure_arg *w_on_pick_arg = + gpr_zalloc(sizeof(*w_on_pick_arg)); + grpc_closure_init(&w_on_pick_arg->wrapper_closure, + wrapped_on_pick_closure_cb, w_on_pick_arg, + grpc_schedule_on_exec_ctx); + w_on_pick_arg->wrapped_closure = on_ready; + GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping"); + w_on_pick_arg->lb_policy = lb_policy; + const bool pick_done = grpc_lb_policy_pick_locked( + exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, + &w_on_pick_arg->wrapper_closure); + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ + GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy, + "pick_subchannel_wrapping"); + gpr_free(w_on_pick_arg); + } GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); - return result; + return pick_done; } if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; |