aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc16
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc14
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc15
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h13
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.cc17
5 files changed, 55 insertions, 20 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 023281db97..d217dc0e63 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -451,6 +451,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
+ GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
if (grpc_lb_pick_first_trace.enabled()) {
@@ -480,14 +481,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"update"),
"selected_not_ready+switch_to_update");
} else {
- // TODO(juanlishen): we re-resolve when the selected subchannel goes to
- // TRANSIENT_FAILURE because we used to shut down in this case before
- // re-resolution is introduced. But we need to investigate whether we
- // really want to take any action instead of waiting for the selected
- // subchannel reconnecting.
- GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // If the selected channel goes bad, request a re-resolution.
+ // If the selected subchannel goes bad, request a re-resolution. We also
+ // set the channel state to IDLE and reset started_picking_. The reason
+ // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
+ // reception we don't want to connect to the re-resolved backends until
+ // we leave the IDLE state.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"selected_changed+reresolve");
@@ -568,9 +567,10 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
+ p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "connecting_transient_failure");
+ GRPC_ERROR_REF(error), "exhausted_subchannels");
}
sd->StartConnectivityWatchLocked();
break;
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 5894d52e6b..d7b64a900f 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -418,8 +418,6 @@ static void continue_connect_locked(grpc_subchannel* c) {
c->next_attempt_deadline = c->backoff->NextAttemptTime();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
@@ -475,27 +473,24 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
/* Don't try to connect if we're already disconnected */
return;
}
-
if (c->connecting) {
/* Already connecting: don't restart */
return;
}
-
if (c->connected_subchannel != nullptr) {
/* Already connected: don't restart */
return;
}
-
if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
/* Nobody is interested in connecting: so don't just yet */
return;
}
-
c->connecting = true;
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
-
if (!c->backoff_begun) {
c->backoff_begun = true;
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "connecting");
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
@@ -510,6 +505,11 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
+ // During backoff, we prefer the connectivity state of CONNECTING instead of
+ // TRANSIENT_FAILURE in order to prevent triggering re-resolution
+ // continuously in pick_first.
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "backoff");
}
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index bc6fa0d0eb..9ad271753c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -813,7 +813,11 @@ static void set_write_state(grpc_chttp2_transport* t,
write_state_name(st), reason));
t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
- GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
+ grpc_chttp2_stream* s;
+ while (grpc_chttp2_list_pop_waiting_for_write_stream(t, &s)) {
+ GRPC_CLOSURE_LIST_SCHED(&s->run_after_write);
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:write_closure_sched");
+ }
if (t->close_transport_on_writes_finished != nullptr) {
grpc_error* err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = nullptr;
@@ -1208,7 +1212,10 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
} else {
- grpc_closure_list_append(&t->run_after_write, closure,
+ if (grpc_chttp2_list_add_waiting_for_write_stream(t, s)) {
+ GRPC_CHTTP2_STREAM_REF(s, "chttp2:pending_write_closure");
+ }
+ grpc_closure_list_append(&s->run_after_write, closure,
closure->error_data.error);
}
}
@@ -2009,6 +2016,10 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error* due_to_error) {
+ GRPC_CLOSURE_LIST_SCHED(&s->run_after_write);
+ if (grpc_chttp2_list_remove_waiting_for_write_stream(t, s)) {
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:pending_write_closure");
+ }
if (!t->is_client && !s->sent_trailing_metadata &&
grpc_error_has_clear_grpc_status(due_to_error)) {
close_from_api(t, s, due_to_error);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ca6e715978..4f1a08d98b 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -54,6 +54,8 @@ typedef enum {
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
+ /** streams with closures waiting to be run on a write **/
+ GRPC_CHTTP2_LIST_WAITING_FOR_WRITE,
STREAM_LIST_COUNT /* must be last */
} grpc_chttp2_stream_list_id;
@@ -431,9 +433,6 @@ struct grpc_chttp2_transport {
*/
grpc_error* close_transport_on_writes_finished;
- /* a list of closures to run after writes are finished */
- grpc_closure_list run_after_write;
-
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
@@ -584,6 +583,7 @@ struct grpc_chttp2_stream {
grpc_slice_buffer flow_controlled_buffer;
+ grpc_closure_list run_after_write;
grpc_chttp2_write_cb* on_flow_controlled_cbs;
grpc_chttp2_write_cb* on_write_finished_cbs;
grpc_chttp2_write_cb* finish_after_write;
@@ -686,6 +686,13 @@ bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);
+bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s);
+bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream** s);
+bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s);
+
/********* Flow Control ***************/
// Takes in a flow control action and performs all the needed operations.
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc
index 6626170a7e..50bfe36a86 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.cc
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc
@@ -35,6 +35,8 @@ static const char* stream_list_id_string(grpc_chttp2_stream_list_id id) {
return "stalled_by_stream";
case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY:
return "waiting_for_concurrency";
+ case GRPC_CHTTP2_LIST_WAITING_FOR_WRITE:
+ return "waiting_for_write";
case STREAM_LIST_COUNT:
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -214,3 +216,18 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
}
+
+bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ return stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
+}
+
+bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream** s) {
+ return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
+}
+
+bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
+}