From 9d35a1f9ce98e446d9036d64227b3c9e9317177a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 2 Nov 2015 14:16:12 -0800 Subject: stream_op cleanup: transport changes --- src/core/transport/chttp2/frame_data.c | 90 +++++++++++++++++++++++++++++----- 1 file changed, 78 insertions(+), 12 deletions(-) (limited to 'src/core/transport/chttp2/frame_data.c') diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 07179a4571..7287f97aaa 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -45,12 +45,15 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser) { parser->state = GRPC_CHTTP2_DATA_FH_0; - grpc_sopb_init(&parser->incoming_sopb); return GRPC_CHTTP2_PARSE_OK; } void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) { - grpc_sopb_destroy(&parser->incoming_sopb); + grpc_byte_stream *bs; + while ( + (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { + grpc_byte_stream_destroy(bs); + } } grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( @@ -69,6 +72,62 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( return GRPC_CHTTP2_PARSE_OK; } +void grpc_chttp2_incoming_frame_queue_merge( + grpc_chttp2_incoming_frame_queue *head_dst, + grpc_chttp2_incoming_frame_queue *tail_src) { + if (tail_src->head == NULL) { + return; + } + + if (head_dst->head == NULL) { + *head_dst = *tail_src; + memset(tail_src, 0, sizeof(*tail_src)); + return; + } + + head_dst->tail->next_message = tail_src->head; + head_dst->tail = tail_src->tail; + memset(tail_src, 0, sizeof(*tail_src)); +} + +grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( + grpc_chttp2_incoming_frame_queue *q) { + grpc_byte_stream *out; + if (q->head == NULL) { + return NULL; + } + out = &q->head->base; + if (q->head == q->tail) { + memset(q, 0, sizeof(*q)); + } else { + q->head = q->head->next_message; + } + return out; +} + +void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf, + gpr_uint32 write_bytes, int is_eof, + gpr_slice_buffer *outbuf) { + gpr_slice hdr; + gpr_uint8 *p; + + hdr = gpr_slice_malloc(9); + p = GPR_SLICE_START_PTR(hdr); + GPR_ASSERT(write_bytes < 16777316); + *p++ = (gpr_uint8)(write_bytes >> 16); + *p++ = (gpr_uint8)(write_bytes >> 8); + *p++ = (gpr_uint8)(write_bytes); + *p++ = GRPC_CHTTP2_FRAME_DATA; + *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0; + *p++ = (gpr_uint8)(id >> 24); + *p++ = (gpr_uint8)(id >> 16); + *p++ = (gpr_uint8)(id >> 8); + *p++ = (gpr_uint8)(id); + gpr_slice_buffer_add(outbuf, hdr); + + gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf); +} + grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport_parsing *transport_parsing, @@ -77,7 +136,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; grpc_chttp2_data_parser *p = parser; - gpr_uint32 message_flags = 0; + gpr_uint32 message_flags; + grpc_chttp2_incoming_byte_stream *incoming_byte_stream; if (is_last && p->is_last_frame) { stream_parsing->received_close = 1; @@ -132,11 +192,14 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->frame_size |= ((gpr_uint32)*cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; + message_flags = 0; if (p->is_frame_compressed) { message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, - message_flags); + p->parsing_frame = incoming_byte_stream = + grpc_chttp2_incoming_byte_stream_create( + transport_parsing, stream_parsing, p->frame_size, message_flags, + &p->incoming_frames); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { @@ -147,20 +210,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); if ((gpr_uint32)(end - cur) == p->frame_size) { - grpc_sopb_add_slice( - &p->incoming_sopb, + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { - grpc_sopb_add_slice(&p->incoming_sopb, - gpr_slice_sub(slice, (size_t)(cur - beg), - (size_t)(cur + p->frame_size - beg))); + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + gpr_slice_sub(slice, (size_t)(cur - beg), + (size_t)(cur + p->frame_size - beg))); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); cur += p->frame_size; goto fh_0; /* loop */ } else { - grpc_sopb_add_slice( - &p->incoming_sopb, + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); GPR_ASSERT((size_t)(end - cur) <= p->frame_size); p->frame_size -= (gpr_uint32)(end - cur); -- cgit v1.2.3 From 20df14ef6d36c01239e4e1b52a3dee009a5962c3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 6 Nov 2015 09:10:49 -0800 Subject: Fix flow control for 0-byte messages --- src/core/surface/call.c | 2 +- src/core/transport/chttp2/frame_data.c | 4 +- src/core/transport/chttp2/internal.h | 2 +- src/core/transport/chttp2_transport.c | 83 ++++++++++++++++++++-------------- 4 files changed, 54 insertions(+), 37 deletions(-) (limited to 'src/core/transport/chttp2/frame_data.c') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 056d49064e..aa435d44d3 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -944,12 +944,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { + grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); gpr_mu_lock(&call->mu); bctl->call->used_batches = (gpr_uint8)(bctl->call->used_batches & ~(gpr_uint8)(1 << (bctl - bctl->call->active_batches))); gpr_mu_unlock(&call->mu); - grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 7287f97aaa..eafffc2795 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -198,8 +198,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } p->parsing_frame = incoming_byte_stream = grpc_chttp2_incoming_byte_stream_create( - transport_parsing, stream_parsing, p->frame_size, message_flags, - &p->incoming_frames); + exec_ctx, transport_parsing, stream_parsing, p->frame_size, + message_flags, &p->incoming_frames); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index b53c9dee0b..2d0cb4abdb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -738,7 +738,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, #endif grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue); void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3d98a4fb14..b44843a341 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1364,6 +1364,46 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ +static void incoming_byte_stream_update_flow_control( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, size_t max_size_hint, + size_t have_already) { + gpr_uint32 max_recv_bytes; + + /* clamp max recv hint to an allowable size */ + if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { + max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; + } else { + max_recv_bytes = (gpr_uint32)max_size_hint; + } + + /* account for bytes already received but unknown to higher layers */ + if (max_recv_bytes >= have_already) { + max_recv_bytes -= (gpr_uint32)have_already; + } else { + max_recv_bytes = 0; + } + + /* add some small lookahead to keep pipelines flowing */ + GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); + max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; + if (stream_global->max_recv_bytes < max_recv_bytes) { + gpr_uint32 add_max_recv_bytes = + max_recv_bytes - stream_global->max_recv_bytes; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + max_recv_bytes, add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_parse, + add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_writing, + add_max_recv_bytes); + grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, + stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } +} + static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, @@ -1372,41 +1412,11 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_transport_global *transport_global = &bs->transport->global; grpc_chttp2_stream_global *stream_global = &bs->stream->global; - gpr_uint32 max_recv_bytes; lock(bs->transport); if (bs->is_tail) { - /* clamp max recv hint to an allowable size */ - if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { - max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; - } else { - max_recv_bytes = (gpr_uint32)max_size_hint; - } - - /* account for bytes already received but unknown to higher layers */ - if (max_recv_bytes >= bs->slices.length) { - max_recv_bytes -= (gpr_uint32)bs->slices.length; - } else { - max_recv_bytes = 0; - } - /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); - max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; - if (stream_global->max_recv_bytes < max_recv_bytes) { - gpr_uint32 add_max_recv_bytes = - max_recv_bytes - stream_global->max_recv_bytes; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - max_recv_bytes, add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - unannounced_incoming_window_for_parse, - add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - unannounced_incoming_window_for_writing, - add_max_recv_bytes); - grpc_chttp2_list_add_unannounced_incoming_window_available( - transport_global, stream_global); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - } + incoming_byte_stream_update_flow_control(transport_global, stream_global, + max_size_hint, bs->slices.length); } if (bs->slices.count > 0) { *slice = gpr_slice_buffer_take_first(&bs->slices); @@ -1451,7 +1461,7 @@ void grpc_chttp2_incoming_byte_stream_finished( } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = @@ -1474,6 +1484,13 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( add_to_queue->tail->next_message = incoming_byte_stream; } add_to_queue->tail = incoming_byte_stream; + if (frame_size == 0) { + lock(TRANSPORT_FROM_PARSING(transport_parsing)); + incoming_byte_stream_update_flow_control( + &TRANSPORT_FROM_PARSING(transport_parsing)->global, + &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0); + unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing)); + } return incoming_byte_stream; } -- cgit v1.2.3 From 7be556e72865991d816bae8d369a74889655398d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 6 Nov 2015 12:29:33 -0800 Subject: Transport memory management fixes --- src/core/transport/chttp2/frame_data.c | 9 ++++++++- src/core/transport/chttp2/frame_data.h | 3 ++- src/core/transport/chttp2_transport.c | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) (limited to 'src/core/transport/chttp2/frame_data.c') diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index eafffc2795..e07fbb2cc7 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -45,11 +45,16 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser) { parser->state = GRPC_CHTTP2_DATA_FH_0; + parser->parsing_frame = NULL; return GRPC_CHTTP2_PARSE_OK; } -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) { +void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *parser) { grpc_byte_stream *bs; + if (parser->parsing_frame) { + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame); + } while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { grpc_byte_stream_destroy(bs); @@ -214,6 +219,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { @@ -222,6 +228,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + p->parsing_frame = NULL; cur += p->frame_size; goto fh_0; /* loop */ } else { diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index bc32f29d97..472f9cebdb 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -80,7 +80,8 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser); -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser); +void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *parser); /* start processing a new data frame */ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index b44843a341..cbde3c5b86 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); GPR_ASSERT(s->global.recv_message_ready == NULL); GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); - grpc_chttp2_data_parser_destroy(&s->parsing.data_parser); + grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser); grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]); grpc_chttp2_incoming_metadata_buffer_destroy( -- cgit v1.2.3