aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-02-22 22:07:31 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-02-22 22:07:31 -0800
commit914a2e7217e3376dfbd69fa37008d6d60f797689 (patch)
tree0fec82e8b667bbcc9a4b45ac47749fcc9673e440
parentc8d34d50a2daaa7014d800273ba3d8d5ae021825 (diff)
parent276e32d0fbbfab490fd26dcbfb4e65c3c87f31ae (diff)
Merge pull request #5350 from yang-g/stalled_by_transport_race
Fix race between add_writing_stalled and destroy stream
-rw-r--r--src/core/transport/chttp2/internal.h9
-rw-r--r--src/core/transport/chttp2/stream_lists.c22
-rw-r--r--src/core/transport/chttp2/writing.c10
-rw-r--r--src/core/transport/chttp2_transport.c2
4 files changed, 31 insertions, 12 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 0e1e2c4265..d76d31be23 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -485,7 +485,8 @@ struct grpc_chttp2_stream {
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
-int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
+int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing,
int is_parsing);
void grpc_chttp2_perform_writes(
@@ -568,8 +569,12 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_chttp2_transport_writing *transport_writing, bool is_window_available);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
+ bool is_window_available);
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 2f31a47cb3..b284c78818 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -316,13 +316,16 @@ int grpc_chttp2_list_pop_check_read_ops(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
- stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
- STREAM_FROM_WRITING(stream_writing),
+ grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
+ if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
+ GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
+ }
+ stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_chttp2_transport_writing *transport_writing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available) {
grpc_chttp2_stream *stream;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
@@ -331,11 +334,22 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
if (is_window_available) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
} else {
- stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ &stream->writing);
}
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
+ "chttp2_writing_stalled");
}
}
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing) {
+ stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
+ STREAM_FROM_WRITING(stream_writing),
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+}
+
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index cafecf1046..356fd8174a 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -44,7 +44,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_unlocking_check_writes(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing, int is_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
@@ -76,8 +76,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
bool is_window_available = transport_writing->outgoing_window > 0;
- grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing,
- is_window_available);
+ grpc_chttp2_list_flush_writing_stalled_by_transport(
+ exec_ctx, transport_writing, is_window_available);
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@@ -133,8 +133,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
} else {
- grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
- stream_writing);
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
}
}
if (stream_global->send_trailing_metadata) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index c3efc36cc5..b9f511e946 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -598,7 +598,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("unlock", 0);
if (!t->writing_active && !t->closed &&
- grpc_chttp2_unlocking_check_writes(&t->global, &t->writing,
+ grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
t->parsing_active)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");