From 94eae043c723919442b5f837bc97e9c0763cc25b Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 9 Nov 2017 17:27:44 -0800 Subject: PF: don't unref errors when about to loop in pf_conn cb --- .../client_channel/lb_policy/pick_first/pick_first.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 125a4186aa..e87911bc94 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -444,6 +444,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); } + bool updated_error = false; while (true) { switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_READY: { @@ -486,6 +487,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); return; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { @@ -506,11 +508,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); - GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GRPC_ERROR_UNREF(error); // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); return; + } else { + updated_error = true; } break; // Go back to top of loop. } @@ -524,6 +528,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); return; } case GRPC_CHANNEL_SHUTDOWN: { @@ -544,6 +549,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, shutdown_locked(exec_ctx, p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick first exhausted channels", &error, 1)); + if (updated_error) GRPC_ERROR_UNREF(error); return; } if (sd->subchannel_list == p->subchannel_list) { @@ -553,11 +559,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); - GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GRPC_ERROR_UNREF(error); // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); return; + } else { + updated_error = true; } // For any other state, go back to top of loop. // We will reuse the connectivity refs from the previous watch. -- cgit v1.2.3 From b07d7a8051eaa7aef58cd45733b54f072a9e80f3 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 10 Nov 2017 10:37:09 -0800 Subject: Removed call to grpc_subchannel_check_connectivity and loop in pf_conn cb --- .../lb_policy/pick_first/pick_first.cc | 217 ++++++++++----------- 1 file changed, 99 insertions(+), 118 deletions(-) (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index e87911bc94..c4a2fa0bff 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -445,131 +445,112 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); } bool updated_error = false; - while (true) { - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_READY: { - // Case 2. Promote p->latest_pending_subchannel_list to - // p->subchannel_list. - if (sd->subchannel_list == p->latest_pending_subchannel_list) { - GPR_ASSERT(p->subchannel_list != NULL); - grpc_lb_subchannel_list_shutdown_and_unref( - exec_ctx, p->subchannel_list, "finish_update"); - p->subchannel_list = p->latest_pending_subchannel_list; - p->latest_pending_subchannel_list = NULL; - } - // Cases 1 and 2. - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, GRPC_ERROR_NONE, - "connecting_ready"); - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); - p->selected = sd; + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_READY: { + // Case 2. Promote p->latest_pending_subchannel_list to + // p->subchannel_list. + if (sd->subchannel_list == p->latest_pending_subchannel_list) { + GPR_ASSERT(p->subchannel_list != NULL); + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "finish_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + } + // Cases 1 and 2. + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_READY, GRPC_ERROR_NONE, + "connecting_ready"); + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); + p->selected = sd; + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, + (void*)sd->subchannel); + } + // Drop all other subchannels, since we are now connected. + destroy_unselected_subchannels_locked(exec_ctx, p); + // Update any calls that were waiting for a pick. + pending_pick* pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "picked"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, - (void*)sd->subchannel); - } - // Drop all other subchannels, since we are now connected. - destroy_unselected_subchannels_locked(exec_ctx, p); - // Update any calls that were waiting for a pick. - pending_pick* pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Servicing pending pick with selected subchannel %p", - (void*)p->selected); - } - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); + gpr_log(GPR_INFO, + "Servicing pending pick with selected subchannel %p", + (void*)p->selected); } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - if (updated_error) GRPC_ERROR_UNREF(error); - return; + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); } - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == NULL); - // Case 1: Only set state to TRANSIENT_FAILURE if we've tried - // all subchannels. - if (sd->subchannel_list->checking_subchannel == 0 && - sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); - } - sd->curr_connectivity_state = - grpc_subchannel_check_connectivity(sd->subchannel, &error); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GRPC_ERROR_UNREF(error); - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - return; - } else { - updated_error = true; - } - break; // Go back to top of loop. + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); + break; + } + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL); + // Case 1: Only set state to TRANSIENT_FAILURE if we've tried + // all subchannels. + if (sd->subchannel_list->checking_subchannel == 0 && + sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "connecting_transient_failure"); } - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: { - // Only update connectivity state in case 1. - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_REF(error), "connecting_changed"); - } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + break; + } + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: { + // Only update connectivity state in case 1. + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_REF(error), "connecting_changed"); + } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (updated_error) GRPC_ERROR_UNREF(error); + break; + } + case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "pf_candidate_shutdown"); + // Advance to next subchannel and check its state. + grpc_lb_subchannel_data* original_sd = sd; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL && sd != original_sd); + if (sd == original_sd) { + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); + shutdown_locked(exec_ctx, p, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick first exhausted channels", &error, 1)); if (updated_error) GRPC_ERROR_UNREF(error); - return; + break; } - case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, - "pf_candidate_shutdown"); - // Advance to next subchannel and check its state. - grpc_lb_subchannel_data* original_sd = sd; - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == NULL && sd != original_sd); - if (sd == original_sd) { - grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); - shutdown_locked(exec_ctx, p, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1)); - if (updated_error) GRPC_ERROR_UNREF(error); - return; - } - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - sd->curr_connectivity_state = - grpc_subchannel_check_connectivity(sd->subchannel, &error); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GRPC_ERROR_UNREF(error); - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - return; - } else { - updated_error = true; - } - // For any other state, go back to top of loop. - // We will reuse the connectivity refs from the previous watch. + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "subchannel_failed"); } + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + break; } } } -- cgit v1.2.3 From 0fa2c6bba8ac6ef9a299eb71d432a7995fa2753a Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 10 Nov 2017 11:28:33 -0800 Subject: Moar cleanups! --- .../filters/client_channel/lb_policy/pick_first/pick_first.cc | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first') diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index c4a2fa0bff..c79ee5687d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -440,11 +440,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, // for a subchannel in p->latest_pending_subchannel_list. The // goal here is to find a subchannel from the update that we can // select in place of the current one. - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || - sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - } - bool updated_error = false; switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to @@ -486,10 +481,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - if (updated_error) GRPC_ERROR_UNREF(error); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); do { sd->subchannel_list->checking_subchannel = (sd->subchannel_list->checking_subchannel + 1) % @@ -519,10 +514,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - if (updated_error) GRPC_ERROR_UNREF(error); break; } case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_candidate_shutdown"); // Advance to next subchannel and check its state. @@ -540,7 +535,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, shutdown_locked(exec_ctx, p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick first exhausted channels", &error, 1)); - if (updated_error) GRPC_ERROR_UNREF(error); break; } if (sd->subchannel_list == p->subchannel_list) { -- cgit v1.2.3