From 38edec66067fc737ffd61f66972dc29ccd68fce3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 14 Dec 2015 15:01:29 -0800 Subject: Fix server side handling of incoming partial requests in core --- src/core/surface/call.c | 18 ++++++++++---- src/core/transport/chttp2/frame_data.c | 9 ++++--- src/core/transport/chttp2/internal.h | 4 +++- src/core/transport/chttp2_transport.c | 43 ++++++++++++++++++++++++++++++++-- 4 files changed, 63 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index f857b66e5e..d170dbf68e 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -974,11 +974,19 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, batch_control *bctl = bctlp; grpc_call *call = bctl->call; - GPR_ASSERT(success); - gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); - - continue_receiving_slices(exec_ctx, bctl); + if (success) { + gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + continue_receiving_slices(exec_ctx, bctl); + } else { + grpc_byte_stream_destroy(call->receiving_stream); + call->receiving_stream = NULL; + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } } static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 732124b7c9..60a3ce23d5 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -53,7 +53,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *parser) { grpc_byte_stream *bs; if (parser->parsing_frame) { - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame, + 0, 1); } while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { @@ -218,7 +219,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1, + 1); p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; @@ -227,7 +229,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg))); - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1, + 1); p->parsing_frame = NULL; cur += p->frame_size; goto fh_0; /* loop */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index fc35ea6f93..e7cdbc32ef 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -151,6 +151,7 @@ struct grpc_chttp2_incoming_byte_stream { grpc_byte_stream base; gpr_refcount refs; struct grpc_chttp2_incoming_byte_stream *next_message; + int failed; grpc_chttp2_transport *transport; grpc_chttp2_stream *stream; @@ -742,7 +743,8 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, gpr_slice slice); void grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs); + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success, + int from_parsing_thread); void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *parsing, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 91c1a27cff..a5bb7018e3 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -115,7 +115,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_status_code status, gpr_slice *optional_message); -/** Fail any outstanding ops */ +/** Fail any outstanding ops: must be called under lock, whilst not parsing */ static void fail_all_outstanding_ops( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); @@ -756,6 +756,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, static void fail_all_outstanding_ops( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + grpc_chttp2_stream_parsing *stream_parsing; + GPR_ASSERT(!TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active); + stream_parsing = &STREAM_FROM_GLOBAL(stream_global)->parsing; grpc_chttp2_complete_closure_step( exec_ctx, &stream_global->send_initial_metadata_finished, 0); grpc_chttp2_complete_closure_step( @@ -766,6 +769,15 @@ static void fail_all_outstanding_ops( exec_ctx, &stream_global->recv_initial_metadata_finished, 0); grpc_chttp2_complete_closure_step( exec_ctx, &stream_global->recv_trailing_metadata_finished, 0); + if (stream_global->recv_message_ready != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 0); + stream_global->recv_message_ready = 0; + } + if (stream_parsing->data_parser.parsing_frame != NULL) { + grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, stream_parsing->data_parser.parsing_frame, 0, 0); + stream_parsing->data_parser.parsing_frame = NULL; + } } static int contains_non_ok_status( @@ -1502,6 +1514,10 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, *slice = gpr_slice_buffer_take_first(&bs->slices); unlock(exec_ctx, bs->transport); return 1; + } else if (bs->failed) { + grpc_exec_ctx_enqueue(exec_ctx, on_complete, 0); + unlock(exec_ctx, bs->transport); + return 0; } else { bs->on_next = on_complete; bs->next = slice; @@ -1536,7 +1552,29 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, } void grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success, + int from_parsing_thread) { + if (!success) { + if (from_parsing_thread) { + gpr_mu_lock(&bs->transport->mu); + } + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 0); + bs->on_next = NULL; + bs->failed = 1; + if (from_parsing_thread) { + gpr_mu_unlock(&bs->transport->mu); + } + } else { +#ifndef NDEBUG + if (from_parsing_thread) { + gpr_mu_lock(&bs->transport->mu); + } + GPR_ASSERT(bs->on_next == NULL); + if (from_parsing_thread) { + gpr_mu_unlock(&bs->transport->mu); + } +#endif + } incoming_byte_stream_unref(bs); } @@ -1557,6 +1595,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( gpr_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; + incoming_byte_stream->failed = 0; if (add_to_queue->head == NULL) { add_to_queue->head = incoming_byte_stream; } else { -- cgit v1.2.3 From 31d0d1edd0c8922592b4de585c752469681de290 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 14 Dec 2015 22:01:21 -0800 Subject: recv_message_ready was already taken care of --- src/core/transport/chttp2_transport.c | 4 ---- 1 file changed, 4 deletions(-) (limited to 'src') diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 015c156d94..38a07f59ae 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1040,10 +1040,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing); } - if (s->global.recv_message_ready != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, s->global.recv_message_ready, 0); - s->global.recv_message_ready = 0; - } if (s->parsing.data_parser.parsing_frame != NULL) { grpc_chttp2_incoming_byte_stream_finished( exec_ctx, s->parsing.data_parser.parsing_frame, 0, 0); -- cgit v1.2.3