aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-12-05 13:48:20 -0800
committerGravatar GitHub <noreply@github.com>2016-12-05 13:48:20 -0800
commit1d3ad2b2491cd62abdb25434b8e418993b32d77c (patch)
tree0f975fb69ab620e0eb0b8ddade04db85aa445ac2
parentb23c86bcd7f63ef0d0455ab9e9c87b7e93f3eb35 (diff)
parent73c4f8fce1eec86638d989e395ca2f004024b33d (diff)
Merge pull request #8938 from ctiller/fixit23
Reliably handle failing streams during writes
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c78
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h9
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c2
3 files changed, 37 insertions, 52 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 3e7c078d3c..3b84898fee 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -425,7 +425,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
@@ -521,10 +520,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
}
- if (s->fail_pending_writes_on_writes_finished_error != NULL) {
- GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error);
- }
-
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
GPR_ASSERT(s->fetching_send_message == NULL);
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
@@ -604,11 +599,13 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name(t->write_state),
write_state_name(st), reason));
t->write_state = st;
- if (st == GRPC_CHTTP2_WRITE_STATE_IDLE &&
- t->close_transport_on_writes_finished != NULL) {
- grpc_error *err = t->close_transport_on_writes_finished;
- t->close_transport_on_writes_finished = NULL;
- close_transport_locked(exec_ctx, t, err);
+ if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL);
+ if (t->close_transport_on_writes_finished != NULL) {
+ grpc_error *err = t->close_transport_on_writes_finished;
+ t->close_transport_on_writes_finished = NULL;
+ close_transport_locked(exec_ctx, t, err);
+ }
}
}
@@ -825,7 +822,14 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
}
}
+/* Flag that this closure barrier wants stats to be updated before finishing */
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
+/* Flag that this closure barrier may be covering a write in a pollset, and so
+ we should not complete this closure until we can prove that the write got
+ scheduled */
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+/* First bit of the reference count, stored in the high order bits (with the low
+ bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure *add_closure_barrier(grpc_closure *closure) {
@@ -852,6 +856,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
return;
}
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
+ if (grpc_http_trace) {
+ const char *errstr = grpc_error_string(error);
+ gpr_log(GPR_DEBUG,
+ "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s",
+ closure,
+ (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
+ (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
+ desc, errstr);
+ grpc_error_free_string(errstr);
+ }
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error =
@@ -868,7 +882,13 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = NULL;
}
- grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
+ !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
+ grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ } else {
+ grpc_closure_list_append(&t->run_after_write, closure,
+ closure->error_data.error);
+ }
}
}
@@ -1013,6 +1033,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_initial_metadata != NULL) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata = op->send_initial_metadata;
const size_t metadata_size =
@@ -1066,6 +1087,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->send_message != NULL) {
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
grpc_chttp2_complete_closure_step(
@@ -1103,6 +1125,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_trailing_metadata != NULL) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
const size_t metadata_size =
@@ -1406,7 +1429,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
}
@@ -1537,41 +1559,9 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
-void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- if (s->need_fail_pending_writes_on_writes_finished) {
- grpc_error *error = s->fail_pending_writes_on_writes_finished_error;
- s->fail_pending_writes_on_writes_finished_error = NULL;
- s->need_fail_pending_writes_on_writes_finished = false;
- grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error);
- }
-}
-
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
- if (s->need_fail_pending_writes_on_writes_finished ||
- (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE &&
- (s->included[GRPC_CHTTP2_LIST_WRITABLE] ||
- s->included[GRPC_CHTTP2_LIST_WRITING]))) {
- /* If a write is in progress, and it involves this stream, wait for the
- * write to complete before cancelling things out. If we don't do this, then
- * our combiner lock might think that some operation on its queue might be
- * covering a completion even though there is none, in which case we might
- * offload to another thread, which isn't guarateed to exist */
- if (error != GRPC_ERROR_NONE) {
- if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) {
- s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE(
- "Post-poned fail writes due to in-progress write");
- }
- s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child(
- s->fail_pending_writes_on_writes_finished_error, error);
- }
- s->need_fail_pending_writes_on_writes_finished = true;
- return; /* early out */
- }
-
error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = NULL;
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 6cba1e7fd2..31eb1e01ac 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -327,6 +327,9 @@ struct grpc_chttp2_transport {
*/
grpc_error *close_transport_on_writes_finished;
+ /* a list of closures to run after writes are finished */
+ grpc_closure_list run_after_write;
+
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
@@ -409,9 +412,6 @@ struct grpc_chttp2_stream {
grpc_error *read_closed_error;
/** the error that resulted in this stream being write-closed */
grpc_error *write_closed_error;
- /** should any writes be cleared once this stream becomes non-writable */
- bool need_fail_pending_writes_on_writes_finished;
- grpc_error *fail_pending_writes_on_writes_finished_error;
grpc_published_metadata_method published_metadata[2];
bool final_metadata_requested;
@@ -692,9 +692,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 769b229a0d..139e7387c4 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -208,7 +208,6 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
}
} else {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write");
}
}
@@ -253,7 +252,6 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
GRPC_ERROR_REF(error));
}
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
}
grpc_slice_buffer_reset_and_unref(&t->outbuf);