diff options
Diffstat (limited to 'src/core/ext')
5 files changed, 55 insertions, 20 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 023281db97..d217dc0e63 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -451,6 +451,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // latest pending subchannel lists. GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || subchannel_list() == p->latest_pending_subchannel_list_.get()); + GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); // Handle updates for the currently selected subchannel. if (p->selected_ == this) { if (grpc_lb_pick_first_trace.enabled()) { @@ -480,14 +481,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( "update"), "selected_not_ready+switch_to_update"); } else { - // TODO(juanlishen): we re-resolve when the selected subchannel goes to - // TRANSIENT_FAILURE because we used to shut down in this case before - // re-resolution is introduced. But we need to investigate whether we - // really want to take any action instead of waiting for the selected - // subchannel reconnecting. - GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // If the selected channel goes bad, request a re-resolution. + // If the selected subchannel goes bad, request a re-resolution. We also + // set the channel state to IDLE and reset started_picking_. The reason + // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY + // reception we don't want to connect to the re-resolved backends until + // we leave the IDLE state. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "selected_changed+reresolve"); @@ -568,9 +567,10 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) { + p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); + GRPC_ERROR_REF(error), "exhausted_subchannels"); } sd->StartConnectivityWatchLocked(); break; diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 5894d52e6b..d7b64a900f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -418,8 +418,6 @@ static void continue_connect_locked(grpc_subchannel* c) { c->next_attempt_deadline = c->backoff->NextAttemptTime(); args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->on_connected); } @@ -475,27 +473,24 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { /* Don't try to connect if we're already disconnected */ return; } - if (c->connecting) { /* Already connecting: don't restart */ return; } - if (c->connected_subchannel != nullptr) { /* Already connected: don't restart */ return; } - if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) { /* Nobody is interested in connecting: so don't just yet */ return; } - c->connecting = true; GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - if (!c->backoff_begun) { c->backoff_begun = true; + grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "connecting"); continue_connect_locked(c); } else { GPR_ASSERT(!c->have_alarm); @@ -510,6 +505,11 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); + // During backoff, we prefer the connectivity state of CONNECTING instead of + // TRANSIENT_FAILURE in order to prevent triggering re-resolution + // continuously in pick_first. + grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "backoff"); } } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index bc6fa0d0eb..9ad271753c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -813,7 +813,11 @@ static void set_write_state(grpc_chttp2_transport* t, write_state_name(st), reason)); t->write_state = st; if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + grpc_chttp2_stream* s; + while (grpc_chttp2_list_pop_waiting_for_write_stream(t, &s)) { + GRPC_CLOSURE_LIST_SCHED(&s->run_after_write); + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:write_closure_sched"); + } if (t->close_transport_on_writes_finished != nullptr) { grpc_error* err = t->close_transport_on_writes_finished; t->close_transport_on_writes_finished = nullptr; @@ -1208,7 +1212,10 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { GRPC_CLOSURE_RUN(closure, closure->error_data.error); } else { - grpc_closure_list_append(&t->run_after_write, closure, + if (grpc_chttp2_list_add_waiting_for_write_stream(t, s)) { + GRPC_CHTTP2_STREAM_REF(s, "chttp2:pending_write_closure"); + } + grpc_closure_list_append(&s->run_after_write, closure, closure->error_data.error); } } @@ -2009,6 +2016,10 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id, void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_error* due_to_error) { + GRPC_CLOSURE_LIST_SCHED(&s->run_after_write); + if (grpc_chttp2_list_remove_waiting_for_write_stream(t, s)) { + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:pending_write_closure"); + } if (!t->is_client && !s->sent_trailing_metadata && grpc_error_has_clear_grpc_status(due_to_error)) { close_from_api(t, s, due_to_error); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ca6e715978..4f1a08d98b 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -54,6 +54,8 @@ typedef enum { /** streams that are waiting to start because there are too many concurrent streams on the connection */ GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, + /** streams with closures waiting to be run on a write **/ + GRPC_CHTTP2_LIST_WAITING_FOR_WRITE, STREAM_LIST_COUNT /* must be last */ } grpc_chttp2_stream_list_id; @@ -431,9 +433,6 @@ struct grpc_chttp2_transport { */ grpc_error* close_transport_on_writes_finished; - /* a list of closures to run after writes are finished */ - grpc_closure_list run_after_write; - /* buffer pool state */ /** have we scheduled a benign cleanup? */ bool benign_reclaimer_registered; @@ -584,6 +583,7 @@ struct grpc_chttp2_stream { grpc_slice_buffer flow_controlled_buffer; + grpc_closure_list run_after_write; grpc_chttp2_write_cb* on_flow_controlled_cbs; grpc_chttp2_write_cb* on_write_finished_cbs; grpc_chttp2_write_cb* finish_after_write; @@ -686,6 +686,13 @@ bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t, bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s); +bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t, + grpc_chttp2_stream* s); +bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t, + grpc_chttp2_stream** s); +bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t, + grpc_chttp2_stream* s); + /********* Flow Control ***************/ // Takes in a flow control action and performs all the needed operations. diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc index 6626170a7e..50bfe36a86 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.cc +++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc @@ -35,6 +35,8 @@ static const char* stream_list_id_string(grpc_chttp2_stream_list_id id) { return "stalled_by_stream"; case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY: return "waiting_for_concurrency"; + case GRPC_CHTTP2_LIST_WAITING_FOR_WRITE: + return "waiting_for_write"; case STREAM_LIST_COUNT: GPR_UNREACHABLE_CODE(return "unknown"); } @@ -214,3 +216,18 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s) { return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } + +bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { + return stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE); +} + +bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t, + grpc_chttp2_stream** s) { + return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE); +} + +bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t, + grpc_chttp2_stream* s) { + return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE); +} |