diff options
author | Sree Kuchibhotla <sreecha@users.noreply.github.com> | 2015-12-15 18:22:53 -0800 |
---|---|---|
committer | Sree Kuchibhotla <sreecha@users.noreply.github.com> | 2015-12-15 18:22:53 -0800 |
commit | e5cdbea1530a99a95fd3d032e7d69a19c61a0d16 (patch) | |
tree | 73ba64f398cbd0d7ecf993a02f51eee775ab5d9a /src/core | |
parent | 8ccebc403fe4be49eb13aab12bf97f36afee511c (diff) | |
parent | ef1d424cc363f0ddda7ed810348d28436e12be90 (diff) |
Merge pull request #4453 from ctiller/server_stall
Server stall fix
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/surface/call.c | 18 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 9 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 35 |
4 files changed, 56 insertions, 10 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a162d99193..55aad9e8ac 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -974,11 +974,19 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, batch_control *bctl = bctlp; grpc_call *call = bctl->call; - GPR_ASSERT(success); - gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); - - continue_receiving_slices(exec_ctx, bctl); + if (success) { + gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + continue_receiving_slices(exec_ctx, bctl); + } else { + grpc_byte_stream_destroy(call->receiving_stream); + call->receiving_stream = NULL; + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } } static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 732124b7c9..60a3ce23d5 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -53,7 +53,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *parser) { grpc_byte_stream *bs; if (parser->parsing_frame) { - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame, + 0, 1); } while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { @@ -218,7 +219,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1, + 1); p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; @@ -227,7 +229,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg))); - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1, + 1); p->parsing_frame = NULL; cur += p->frame_size; goto fh_0; /* loop */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 43b3adb9d3..4ad900378b 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -152,6 +152,7 @@ struct grpc_chttp2_incoming_byte_stream { grpc_byte_stream base; gpr_refcount refs; struct grpc_chttp2_incoming_byte_stream *next_message; + int failed; grpc_chttp2_transport *transport; grpc_chttp2_stream *stream; @@ -748,7 +749,8 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, gpr_slice slice); void grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs); + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success, + int from_parsing_thread); void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *parsing, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 7793f7c9e4..c3847bb6c5 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1049,6 +1049,12 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing); } + if (s->parsing.data_parser.parsing_frame != NULL) { + grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, s->parsing.data_parser.parsing_frame, 0, 0); + s->parsing.data_parser.parsing_frame = NULL; + } + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { close_transport_locked(exec_ctx, t); } @@ -1503,6 +1509,10 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, *slice = gpr_slice_buffer_take_first(&bs->slices); unlock(exec_ctx, bs->transport); return 1; + } else if (bs->failed) { + grpc_exec_ctx_enqueue(exec_ctx, on_complete, 0); + unlock(exec_ctx, bs->transport); + return 0; } else { bs->on_next = on_complete; bs->next = slice; @@ -1537,7 +1547,29 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, } void grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success, + int from_parsing_thread) { + if (!success) { + if (from_parsing_thread) { + gpr_mu_lock(&bs->transport->mu); + } + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 0); + bs->on_next = NULL; + bs->failed = 1; + if (from_parsing_thread) { + gpr_mu_unlock(&bs->transport->mu); + } + } else { +#ifndef NDEBUG + if (from_parsing_thread) { + gpr_mu_lock(&bs->transport->mu); + } + GPR_ASSERT(bs->on_next == NULL); + if (from_parsing_thread) { + gpr_mu_unlock(&bs->transport->mu); + } +#endif + } incoming_byte_stream_unref(bs); } @@ -1558,6 +1590,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( gpr_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; + incoming_byte_stream->failed = 0; if (add_to_queue->head == NULL) { add_to_queue->head = incoming_byte_stream; } else { |