aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreecha@users.noreply.github.com>2015-12-15 18:22:53 -0800
committerGravatar Sree Kuchibhotla <sreecha@users.noreply.github.com>2015-12-15 18:22:53 -0800
commite5cdbea1530a99a95fd3d032e7d69a19c61a0d16 (patch)
tree73ba64f398cbd0d7ecf993a02f51eee775ab5d9a /src/core
parent8ccebc403fe4be49eb13aab12bf97f36afee511c (diff)
parentef1d424cc363f0ddda7ed810348d28436e12be90 (diff)
Merge pull request #4453 from ctiller/server_stall
Server stall fix
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/call.c18
-rw-r--r--src/core/transport/chttp2/frame_data.c9
-rw-r--r--src/core/transport/chttp2/internal.h4
-rw-r--r--src/core/transport/chttp2_transport.c35
4 files changed, 56 insertions, 10 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index a162d99193..55aad9e8ac 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -974,11 +974,19 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- GPR_ASSERT(success);
- gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
-
- continue_receiving_slices(exec_ctx, bctl);
+ if (success) {
+ gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ call->receiving_slice);
+ continue_receiving_slices(exec_ctx, bctl);
+ } else {
+ grpc_byte_stream_destroy(call->receiving_stream);
+ call->receiving_stream = NULL;
+ grpc_byte_buffer_destroy(*call->receiving_buffer);
+ *call->receiving_buffer = NULL;
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+ }
}
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 732124b7c9..60a3ce23d5 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -53,7 +53,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
grpc_byte_stream *bs;
if (parser->parsing_frame) {
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame);
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame,
+ 0, 1);
}
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
@@ -218,7 +219,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1,
+ 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
@@ -227,7 +229,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1,
+ 1);
p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 43b3adb9d3..4ad900378b 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -152,6 +152,7 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
struct grpc_chttp2_incoming_byte_stream *next_message;
+ int failed;
grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream;
@@ -748,7 +749,8 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
gpr_slice slice);
void grpc_chttp2_incoming_byte_stream_finished(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success,
+ int from_parsing_thread);
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *parsing,
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 7793f7c9e4..c3847bb6c5 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1049,6 +1049,12 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->parsing.incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing);
}
+ if (s->parsing.data_parser.parsing_frame != NULL) {
+ grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, s->parsing.data_parser.parsing_frame, 0, 0);
+ s->parsing.data_parser.parsing_frame = NULL;
+ }
+
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(exec_ctx, t);
}
@@ -1503,6 +1509,10 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
*slice = gpr_slice_buffer_take_first(&bs->slices);
unlock(exec_ctx, bs->transport);
return 1;
+ } else if (bs->failed) {
+ grpc_exec_ctx_enqueue(exec_ctx, on_complete, 0);
+ unlock(exec_ctx, bs->transport);
+ return 0;
} else {
bs->on_next = on_complete;
bs->next = slice;
@@ -1537,7 +1547,29 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
}
void grpc_chttp2_incoming_byte_stream_finished(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success,
+ int from_parsing_thread) {
+ if (!success) {
+ if (from_parsing_thread) {
+ gpr_mu_lock(&bs->transport->mu);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 0);
+ bs->on_next = NULL;
+ bs->failed = 1;
+ if (from_parsing_thread) {
+ gpr_mu_unlock(&bs->transport->mu);
+ }
+ } else {
+#ifndef NDEBUG
+ if (from_parsing_thread) {
+ gpr_mu_lock(&bs->transport->mu);
+ }
+ GPR_ASSERT(bs->on_next == NULL);
+ if (from_parsing_thread) {
+ gpr_mu_unlock(&bs->transport->mu);
+ }
+#endif
+ }
incoming_byte_stream_unref(bs);
}
@@ -1558,6 +1590,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
gpr_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
+ incoming_byte_stream->failed = 0;
if (add_to_queue->head == NULL) {
add_to_queue->head = incoming_byte_stream;
} else {