aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc150
1 files changed, 63 insertions, 87 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 53fa0fff04..1b19650b61 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -345,9 +345,6 @@ typedef struct glb_lb_policy {
/** are we currently updating lb_call? */
bool updating_lb_call;
- /** are we currently updating lb_channel? */
- bool updating_lb_channel;
-
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
@@ -360,9 +357,6 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
- /** args from the latest update received while already updating, or NULL */
- grpc_lb_policy_args *pending_update_args;
-
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@@ -617,7 +611,6 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
break;
- case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
@@ -982,10 +975,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
- if (glb_policy->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
- gpr_free(glb_policy->pending_update_args);
- }
gpr_free(glb_policy);
}
@@ -1037,15 +1026,19 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ gpr_free(pp);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ gpr_free(pping);
pping = next;
}
}
@@ -1170,36 +1163,52 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
"won't work without it. Failing"));
return 0;
}
-
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- bool pick_done;
-
+ bool pick_done = false;
if (glb_policy->rr_policy != NULL) {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
- (void *)glb_policy, (void *)glb_policy->rr_policy);
+ const grpc_connectivity_state rr_connectivity_state =
+ grpc_lb_policy_check_connectivity_locked(exec_ctx,
+ glb_policy->rr_policy, NULL);
+ // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
+ // callback registered to capture this event
+ // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We
+ // need to make sure we aren't trying to pick from a RR policy instance
+ // that's in shutdown.
+ if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO,
+ "grpclb %p NOT picking from from RR %p: RR conn state=%s",
+ (void *)glb_policy, (void *)glb_policy->rr_policy,
+ grpc_connectivity_state_name(rr_connectivity_state));
+ }
+ add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
+ on_complete);
+ pick_done = false;
+ } else { // RR not in shutdown
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
+ (void *)glb_policy, (void *)glb_policy->rr_policy);
+ }
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
+ wrapped_rr_closure_arg *wc_arg =
+ (wrapped_rr_closure_arg *)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
+ GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
+ grpc_schedule_on_exec_ctx);
+ wc_arg->rr_policy = glb_policy->rr_policy;
+ wc_arg->target = target;
+ wc_arg->context = context;
+ GPR_ASSERT(glb_policy->client_stats != NULL);
+ wc_arg->client_stats =
+ grpc_grpclb_client_stats_ref(glb_policy->client_stats);
+ wc_arg->wrapped_closure = on_complete;
+ wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+ wc_arg->initial_metadata = pick_args->initial_metadata;
+ wc_arg->free_when_done = wc_arg;
+ pick_done =
+ pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
+ false /* force_async */, target, wc_arg);
}
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
-
- wrapped_rr_closure_arg *wc_arg =
- (wrapped_rr_closure_arg *)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
-
- GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
- grpc_schedule_on_exec_ctx);
- wc_arg->rr_policy = glb_policy->rr_policy;
- wc_arg->target = target;
- wc_arg->context = context;
- GPR_ASSERT(glb_policy->client_stats != NULL);
- wc_arg->client_stats =
- grpc_grpclb_client_stats_ref(glb_policy->client_stats);
- wc_arg->wrapped_closure = on_complete;
- wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
- wc_arg->initial_metadata = pick_args->initial_metadata;
- wc_arg->free_when_done = wc_arg;
- pick_done =
- pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
- false /* force_async */, target, wc_arg);
- } else {
+ } else { // glb_policy->rr_policy == NULL
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG,
"No RR policy in grpclb instance %p. Adding to grpclb's pending "
@@ -1208,7 +1217,6 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
on_complete);
-
if (!glb_policy->started_picking) {
start_picking_locked(exec_ctx, glb_policy);
}
@@ -1273,7 +1281,8 @@ static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
grpc_millis next_try =
- grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state);
+ grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state)
+ .next_attempt_start_time;
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
(void *)glb_policy);
@@ -1438,7 +1447,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_backoff_init(&glb_policy->lb_call_backoff_state,
- GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
+ GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
GRPC_GRPCLB_RECONNECT_JITTER,
GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
@@ -1752,45 +1761,22 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
const grpc_lb_addresses *addresses =
(const grpc_lb_addresses *)arg->value.pointer.p;
-
+ // If a non-empty serverlist hasn't been received from the balancer,
+ // propagate the update to fallback_backend_addresses.
if (glb_policy->serverlist == NULL) {
- // If a non-empty serverlist hasn't been received from the balancer,
- // propagate the update to fallback_backend_addresses.
fallback_update_locked(exec_ctx, glb_policy, addresses);
- } else if (glb_policy->updating_lb_channel) {
- // If we have recieved serverlist from the balancer, we need to defer update
- // when there is an in-progress one.
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Update already in progress for grpclb %p. Deferring update.",
- (void *)glb_policy);
- }
- if (glb_policy->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx,
- glb_policy->pending_update_args->args);
- gpr_free(glb_policy->pending_update_args);
- }
- glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc(
- sizeof(*glb_policy->pending_update_args));
- glb_policy->pending_update_args->client_channel_factory =
- args->client_channel_factory;
- glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
- glb_policy->pending_update_args->combiner = args->combiner;
- return;
}
-
- glb_policy->updating_lb_channel = true;
GPR_ASSERT(glb_policy->lb_channel != NULL);
+ // Propagate updates to the LB channel (pick_first) through the fake
+ // resolver.
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
- /* Propagate updates to the LB channel (pick first) through the fake resolver
- */
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
-
+ // Start watching the LB channel connectivity for connection, if not
+ // already doing so.
if (!glb_policy->watching_lb_channel) {
- // Watch the LB channel connectivity for connection.
glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
@@ -1819,7 +1805,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
// embedded RR policy. Note that the current RR policy, if any, will stay in
// effect until an update from the new lb_call is received.
switch (glb_policy->lb_channel_connectivity) {
- case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
/* resub. */
@@ -1836,24 +1821,15 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
break;
}
case GRPC_CHANNEL_IDLE:
- // lb channel inactive (probably shutdown prior to update). Restart lb
- // call to kick the lb channel into gear.
- GPR_ASSERT(glb_policy->lb_call == NULL);
+ // lb channel inactive (probably shutdown prior to update). Restart lb
+ // call to kick the lb channel into gear.
/* fallthrough */
case GRPC_CHANNEL_READY:
if (glb_policy->lb_call != NULL) {
- glb_policy->updating_lb_channel = false;
glb_policy->updating_lb_call = true;
grpc_call_cancel(glb_policy->lb_call, NULL);
- // lb_on_server_status_received will pick up the cancel and reinit
+ // lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
- if (glb_policy->pending_update_args != NULL) {
- grpc_lb_policy_args *args = glb_policy->pending_update_args;
- glb_policy->pending_update_args = NULL;
- glb_update_locked(exec_ctx, &glb_policy->base, args);
- grpc_channel_args_destroy(exec_ctx, args->args);
- gpr_free(args);
- }
} else if (glb_policy->started_picking && !glb_policy->shutting_down) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);