diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
4 files changed, 61 insertions, 87 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ba4e90d4c2..ebc7fdac4c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -157,7 +157,7 @@ typedef struct wrapped_rr_closure_arg { /* the picked target, used to determine which LB token to add to the pick's * initial metadata */ - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; /* the context to be populated for the subchannel call */ grpc_call_context_element* context; @@ -242,7 +242,7 @@ typedef struct pending_pick { /* output argument where to store the pick()ed connected subchannel, or * nullptr upon error. */ - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; /* args for wrapped_on_complete */ wrapped_rr_closure_arg wrapped_on_complete_arg; @@ -250,7 +250,7 @@ typedef struct pending_pick { static void add_pending_pick(pending_pick** root, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, grpc_closure* on_complete) { pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); @@ -657,7 +657,7 @@ static void update_lb_connectivity_status_locked( * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args, - bool force_async, grpc_connected_subchannel** target, + bool force_async, grpc_core::ConnectedSubchannel** target, wrapped_rr_closure_arg* wc_arg) { // Check for drops if we are not using fallback backend addresses. if (glb_policy->serverlist != nullptr) { @@ -1090,7 +1090,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // we invoke the completion closure and set *target to nullptr right here. static void glb_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; pending_pick* pp = glb_policy->pending_picks; @@ -1184,7 +1184,7 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) { static int glb_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { if (pick_args->lb_token_mdelem_storage == nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index a3b05aacaf..e70f2a8c52 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -34,7 +34,7 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); typedef struct pending_pick { struct pending_pick* next; uint32_t initial_metadata_flags; - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; grpc_closure* on_complete; } pending_pick; @@ -102,7 +102,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) { } static void pf_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pending_pick* pp = p->pending_picks; @@ -174,7 +174,7 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) { static int pf_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; @@ -396,6 +396,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // Handle updates for the currently selected subchannel. if (p->selected == sd) { + gpr_log(GPR_INFO, "BAR selected. subchannel %p, conn subchannel %p", + sd->subchannel, p->selected->connected_subchannel); // If the new state is anything other than READY and there is a // pending update, switch to the pending update. if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && @@ -412,25 +414,13 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - if (sd->curr_connectivity_state < GRPC_CHANNEL_TRANSIENT_FAILURE) { - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } else { // in transient failure or shutdown. Rely on re-resolution to - // recover. - p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel( - sd, "pf_selected_shutdown"); // Unrefs connected subchannel - } // TODO(juanlishen): we re-resolve when the selected subchannel goes to // TRANSIENT_FAILURE because we used to shut down in this case before // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || - sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -438,10 +428,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { p->started_picking = false; grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + // in transient failure. Rely on re-resolution to recover. + p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel( + sd, "pf_selected_shutdown"); // Unrefs connected subchannel + } else { grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } } return; @@ -459,6 +459,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. + grpc_core::ConnectedSubchannel* con = + grpc_subchannel_get_connected_subchannel(sd->subchannel); + if (con == nullptr) { + // The subchannel may have become disconnected by the time this callback + // is invoked. Simply ignore and resubscribe: ulterior connectivity + // states + // must be in the pipeline and will eventually be invoked. + grpc_lb_subchannel_data_start_connectivity_watch(sd); + break; + } if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, @@ -469,9 +479,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); + sd->connected_subchannel = + GRPC_CONNECTED_SUBCHANNEL_REF(con, "connected"); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, @@ -530,39 +539,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } - case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); - // Advance to next subchannel and check its state. - grpc_lb_subchannel_data* original_sd = sd; - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr && sd != original_sd); - if (sd == original_sd) { - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_exhausted_subchannels"); - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, - "exhausted_subchannels+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, - GRPC_ERROR_NONE); - } - } else { - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } - } + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(break); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 0836dad2f6..a6a8fbb3cf 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -58,7 +58,7 @@ typedef struct pending_pick { /* output argument where to store the pick()ed connected subchannel, or NULL * upon error. */ - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; /* to be invoked once the pick() has completed (regardless of success) */ grpc_closure* on_complete; @@ -199,7 +199,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) { } static void rr_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; pending_pick* pp = p->pending_picks; @@ -267,7 +267,7 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) { static int rr_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; @@ -316,15 +316,14 @@ static int rr_pick_locked(grpc_lb_policy* pol, static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; + GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(subchannel_list->num_transient_failures > 0); --subchannel_list->num_transient_failures; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - GPR_ASSERT(subchannel_list->num_shutdown > 0); - --subchannel_list->num_shutdown; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; @@ -334,8 +333,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++subchannel_list->num_transient_failures; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - ++subchannel_list->num_shutdown; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { ++subchannel_list->num_idle; } @@ -401,6 +398,7 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); round_robin_lb_policy* p = (round_robin_lb_policy*)sd->subchannel_list->policy; if (grpc_lb_round_robin_trace.enabled()) { @@ -444,23 +442,16 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // subchannel, if any. - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - if (sd->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, - "connected_subchannel_transient_failure"); - sd->connected_subchannel = nullptr; + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + if (sd->connected_subchannel != nullptr) { + GRPC_CONNECTED_SUBCHANNEL_UNREF( + sd->connected_subchannel, "connected_subchannel_transient_failure"); + sd->connected_subchannel = nullptr; + } + break; } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } - // If the sd's new state is SHUTDOWN, unref the subchannel. - else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "rr_connectivity_shutdown"); - } else { // sd not in SHUTDOWN - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + case GRPC_CHANNEL_READY: { if (sd->connected_subchannel == nullptr) { sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(sd->subchannel), @@ -522,10 +513,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } + break; } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(); + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE:; // fallthrough } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -549,7 +545,7 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (next_ready_index < p->subchannel_list->num_subchannels) { grpc_lb_subchannel_data* selected = &p->subchannel_list->subchannels[next_ready_index]; - grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_core::ConnectedSubchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_ping"); target->Ping(on_initiate, on_ack); GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0f8cea9347..e4db3ef464 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -43,7 +43,7 @@ typedef struct { grpc_lb_subchannel_list* subchannel_list; /** subchannel itself */ grpc_subchannel* subchannel; - grpc_connected_subchannel* connected_subchannel; + grpc_core::ConnectedSubchannel* connected_subchannel; /** Is a connectivity notification pending? */ bool connectivity_notification_pending; /** notification that connectivity has changed on subchannel */ |