diff options
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 338 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/frame_data.c | 365 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/frame_data.h | 24 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 51 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.c | 5 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 17 | ||||
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 14 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 14 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 44 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.c | 32 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.h | 21 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_chttp2_transport.cc | 9 |
12 files changed, 350 insertions, 584 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 29ed4bf90e..a7d047d6e7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -44,7 +44,6 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -#include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" @@ -130,11 +129,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); -static void incoming_byte_stream_publish_error( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error); -static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs); static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -180,9 +174,6 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -664,6 +655,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for incoming_byte_streams that are actively reading */ + gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); @@ -673,11 +665,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); - grpc_slice_buffer_init(&s->frame_storage); - s->pending_byte_stream = false; - grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s, - grpc_combiner_scheduler(t->combiner, false)); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); @@ -695,6 +682,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_error *error) { + grpc_byte_stream *bs; grpc_chttp2_stream *s = sp; grpc_chttp2_transport *t = s->t; @@ -705,9 +693,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL); } - grpc_slice_buffer_destroy_internal(exec_ctx, - &s->unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); + while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) { + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + } grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -734,7 +722,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); - GRPC_ERROR_UNREF(s->byte_stream_error); if (s->incoming_window_delta > 0) { GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( @@ -1188,9 +1175,8 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, s->fetching_send_message = NULL; return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - UINT32_MAX, &s->complete_fetch_locked)) { - grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, - &s->fetching_slice); + &s->fetching_slice, UINT32_MAX, + &s->complete_fetch_locked)) { add_fetched_slice_locked(exec_ctx, t, s); } } @@ -1201,15 +1187,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_chttp2_stream *s = gs; grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { - error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, - &s->fetching_slice); - if (error == GRPC_ERROR_NONE) { - add_fetched_slice_locked(exec_ctx, t, s); - continue_fetching_send_locked(exec_ctx, t, s); - } - } - - if (error != GRPC_ERROR_NONE) { + add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s); + } else { /* TODO(ctiller): what to do here */ abort(); } @@ -1444,7 +1424,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->recv_message_ready == NULL); s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; - if (s->id != 0 && s->frame_storage.length == 0) { + if (s->id != 0 && + (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) { incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); @@ -1633,13 +1614,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + grpc_byte_stream *bs; if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { if (s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); - if (!s->pending_byte_stream) { - grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != + NULL) { + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } } grpc_chttp2_incoming_metadata_buffer_publish( @@ -1652,65 +1633,39 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - grpc_error *error = GRPC_ERROR_NONE; + grpc_byte_stream *bs; if (s->recv_message_ready != NULL) { - *s->recv_message = NULL; - if (s->final_metadata_requested && s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); - if (!s->pending_byte_stream) { - grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); - } + while (s->final_metadata_requested && s->seen_error && + (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != + NULL) { + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } - if (!s->pending_byte_stream) { - while (s->unprocessed_incoming_frames_buffer.length > 0 || - s->frame_storage.length > 0) { - if (s->unprocessed_incoming_frames_buffer.length == 0) { - grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, - &s->frame_storage); - } - error = deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, - &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); - if (error != GRPC_ERROR_NONE) { - s->seen_error = true; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - &s->frame_storage); - grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); - break; - } else if (*s->recv_message != NULL) { - break; - } - } - } - if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) { + if (s->incoming_frames.head != NULL) { + *s->recv_message = + grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames); + GPR_ASSERT(*s->recv_message != NULL); null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { *s->recv_message = NULL; null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } - GRPC_ERROR_UNREF(error); } } void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + grpc_byte_stream *bs; grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); if (s->recv_trailing_metadata_finished != NULL && s->read_closed && s->write_closed) { if (s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); - if (!s->pending_byte_stream) { - grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != + NULL) { + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } } - bool pending_data = s->pending_byte_stream || - s->unprocessed_incoming_frames_buffer.length > 0; - if (s->read_closed && s->frame_storage.length == 0 && - (!pending_data || s->seen_error) && + if (s->all_incoming_byte_streams_finished && s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); @@ -1721,6 +1676,14 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } } +static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) { + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); + } +} + static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t id, grpc_error *error) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id); @@ -1729,19 +1692,10 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); } - if (s->pending_byte_stream) { - if (s->on_next != NULL) { - grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame; - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - } - incoming_byte_stream_publish_error(exec_ctx, bs, error); - incoming_byte_stream_unref(exec_ctx, bs); - s->data_parser.parsing_frame = NULL; - } else { - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_REF(error); - } + if (s->data_parser.parsing_frame != NULL) { + grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error)); + s->data_parser.parsing_frame = NULL; } if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -1927,6 +1881,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; } } + decrement_active_streams_locked(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -2464,28 +2419,12 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ -static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg; - - s->pending_byte_stream = false; - if (error == GRPC_ERROR_NONE) { - grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s); - } else { - GPR_ASSERT(error != GRPC_ERROR_NONE); - grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); - s->on_next = NULL; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_NONE; - grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error)); - s->byte_stream_error = error; - } -} - static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { + GRPC_ERROR_UNREF(bs->error); + grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices); + gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); } } @@ -2545,90 +2484,47 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = bs->transport; grpc_chttp2_stream *s = bs->stream; - size_t cur_length = s->frame_storage.length; - incoming_byte_stream_update_flow_control( - exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); - - GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); - if (s->frame_storage.length > 0) { - grpc_slice_buffer_swap(&s->frame_storage, - &s->unprocessed_incoming_frames_buffer); - grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); - } else if (s->byte_stream_error != GRPC_ERROR_NONE) { - grpc_closure_sched(exec_ctx, bs->next_action.on_complete, - GRPC_ERROR_REF(s->byte_stream_error)); - if (s->data_parser.parsing_frame != NULL) { - incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); - s->data_parser.parsing_frame = NULL; - } - } else if (s->read_closed) { - if (bs->remaining_bytes != 0) { - s->byte_stream_error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - grpc_closure_sched(exec_ctx, bs->next_action.on_complete, - GRPC_ERROR_REF(s->byte_stream_error)); - if (s->data_parser.parsing_frame != NULL) { - incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); - s->data_parser.parsing_frame = NULL; - } - } else { - /* Should never reach here. */ - GPR_ASSERT(false); - } + if (bs->is_tail) { + gpr_mu_lock(&bs->slice_mu); + size_t cur_length = bs->slices.length; + gpr_mu_unlock(&bs->slice_mu); + incoming_byte_stream_update_flow_control( + exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); + } + gpr_mu_lock(&bs->slice_mu); + if (bs->slices.count > 0) { + *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices); + grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); + } else if (bs->error != GRPC_ERROR_NONE) { + grpc_closure_run(exec_ctx, bs->next_action.on_complete, + GRPC_ERROR_REF(bs->error)); } else { - s->on_next = bs->next_action.on_complete; + bs->on_next = bs->next_action.on_complete; + bs->next = bs->next_action.slice; } + gpr_mu_unlock(&bs->slice_mu); incoming_byte_stream_unref(exec_ctx, bs); } -static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - size_t max_size_hint, - grpc_closure *on_complete) { +static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice, size_t max_size_hint, + grpc_closure *on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_chttp2_stream *s = bs->stream; - if (s->unprocessed_incoming_frames_buffer.length > 0) { - return true; - } else { - gpr_ref(&bs->refs); - bs->next_action.max_size_hint = max_size_hint; - bs->next_action.on_complete = on_complete; - grpc_closure_sched( - exec_ctx, - grpc_closure_init( - &bs->next_action.closure, incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner, false)), - GRPC_ERROR_NONE); - GPR_TIMER_END("incoming_byte_stream_next", 0); - return false; - } -} - -static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice) { - GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0); - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_chttp2_stream *s = bs->stream; - - if (s->unprocessed_incoming_frames_buffer.length > 0) { - grpc_error *error = deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, - slice, NULL); - if (error != GRPC_ERROR_NONE) { - return error; - } - } else { - grpc_error *error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); - return error; - } - GPR_TIMER_END("incoming_byte_stream_pull", 0); - return GRPC_ERROR_NONE; + gpr_ref(&bs->refs); + bs->next_action.slice = slice; + bs->next_action.max_size_hint = max_size_hint; + bs->next_action.on_complete = on_complete; + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &bs->next_action.closure, incoming_byte_stream_next_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner, false)), + GRPC_ERROR_NONE); + GPR_TIMER_END("incoming_byte_stream_next", 0); + return 0; } static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -2638,14 +2534,9 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored) { grpc_chttp2_incoming_byte_stream *bs = byte_stream; - grpc_chttp2_stream *s = bs->stream; - grpc_chttp2_transport *t = s->t; - GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); + decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream); incoming_byte_stream_unref(exec_ctx, bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -2665,53 +2556,50 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_publish_error( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error) { - grpc_chttp2_stream *s = bs->stream; - GPR_ASSERT(error != GRPC_ERROR_NONE); - grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); - s->on_next = NULL; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_REF(error); + grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error)); + bs->on_next = NULL; + GRPC_ERROR_UNREF(bs->error); grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream, GRPC_ERROR_REF(error)); + bs->error = error; } -grpc_error *grpc_chttp2_incoming_byte_stream_push( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice, grpc_slice *slice_out) { - grpc_chttp2_stream *s = bs->stream; - +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + grpc_slice slice) { + gpr_mu_lock(&bs->slice_mu); if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { - grpc_error *error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); - - grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); - grpc_slice_unref_internal(exec_ctx, slice); - return error; + incoming_byte_stream_publish_error( + exec_ctx, bs, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream")); } else { bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); - if (slice_out != NULL) { - *slice_out = slice; + if (bs->on_next != NULL) { + *bs->next = slice; + grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE); + bs->on_next = NULL; + } else { + grpc_slice_buffer_add(&bs->slices, slice); } - return GRPC_ERROR_NONE; } + gpr_mu_unlock(&bs->slice_mu); } -grpc_error *grpc_chttp2_incoming_byte_stream_finished( +void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error, bool reset_on_error) { - grpc_chttp2_stream *s = bs->stream; - + grpc_error *error) { if (error == GRPC_ERROR_NONE) { + gpr_mu_lock(&bs->slice_mu); if (bs->remaining_bytes != 0) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } + gpr_mu_unlock(&bs->slice_mu); } - if (error != GRPC_ERROR_NONE && reset_on_error) { - grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); + if (error != GRPC_ERROR_NONE) { + incoming_byte_stream_publish_error(exec_ctx, bs, error); } incoming_byte_stream_unref(exec_ctx, bs); - return error; } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( @@ -2723,12 +2611,26 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.next = incoming_byte_stream_next; - incoming_byte_stream->base.pull = incoming_byte_stream_pull; incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + gpr_mu_init(&incoming_byte_stream->slice_mu); gpr_ref_init(&incoming_byte_stream->refs, 2); + incoming_byte_stream->next_message = NULL; incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; - s->byte_stream_error = GRPC_ERROR_NONE; + gpr_ref(&incoming_byte_stream->stream->active_streams); + grpc_slice_buffer_init(&incoming_byte_stream->slices); + incoming_byte_stream->on_next = NULL; + incoming_byte_stream->is_tail = 1; + incoming_byte_stream->error = GRPC_ERROR_NONE; + grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames; + if (q->head == NULL) { + q->head = incoming_byte_stream; + } else { + q->tail->is_tail = 0; + q->tail->next_message = incoming_byte_stream; + } + q->tail = incoming_byte_stream; + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); return incoming_byte_stream; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 5d382d80a8..6e9258ee7e 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -40,7 +40,6 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/internal.h" -#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport.h" @@ -54,17 +53,16 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) { void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *parser) { if (parser->parsing_frame != NULL) { - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( + grpc_chttp2_incoming_byte_stream_finished( exec_ctx, parser->parsing_frame, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed")); } GRPC_ERROR_UNREF(parser->error); } grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, uint8_t flags, - uint32_t stream_id, - grpc_chttp2_stream *s) { + uint32_t stream_id) { if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) { char *msg; gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags); @@ -76,14 +74,47 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, } if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { - s->received_last_frame = true; + parser->is_last_frame = 1; } else { - s->received_last_frame = false; + parser->is_last_frame = 0; } return GRPC_ERROR_NONE; } +void grpc_chttp2_incoming_frame_queue_merge( + grpc_chttp2_incoming_frame_queue *head_dst, + grpc_chttp2_incoming_frame_queue *tail_src) { + if (tail_src->head == NULL) { + return; + } + + if (head_dst->head == NULL) { + *head_dst = *tail_src; + memset(tail_src, 0, sizeof(*tail_src)); + return; + } + + head_dst->tail->next_message = tail_src->head; + head_dst->tail = tail_src->tail; + memset(tail_src, 0, sizeof(*tail_src)); +} + +grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( + grpc_chttp2_incoming_frame_queue *q) { + grpc_byte_stream *out; + if (q->head == NULL) { + return NULL; + } + out = &q->head->base; + if (q->head == q->tail) { + memset(q, 0, sizeof(*q)); + } else { + q->head = q->head->next_message; + } + return out; +} + void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, grpc_transport_one_way_stats *stats, @@ -112,217 +143,145 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, stats->data_bytes += write_bytes; } -grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx, - grpc_chttp2_data_parser *p, - grpc_chttp2_stream *s, - grpc_slice_buffer *slices, - grpc_slice *slice_out, - grpc_byte_stream **stream_out) { - grpc_error *error = GRPC_ERROR_NONE; - grpc_chttp2_transport *t = s->t; - - while (slices->count > 0) { - uint8_t *beg = NULL; - uint8_t *end = NULL; - uint8_t *cur = NULL; - - grpc_slice slice = grpc_slice_buffer_take_first(slices); +static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *p, + grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_slice slice) { + uint8_t *const beg = GRPC_SLICE_START_PTR(slice); + uint8_t *const end = GRPC_SLICE_END_PTR(slice); + uint8_t *cur = beg; + uint32_t message_flags; + grpc_chttp2_incoming_byte_stream *incoming_byte_stream; + char *msg; - beg = GRPC_SLICE_START_PTR(slice); - end = GRPC_SLICE_END_PTR(slice); - cur = beg; - uint32_t message_flags; - char *msg; - - if (cur == end) { - grpc_slice_unref_internal(exec_ctx, slice); - continue; - } - - switch (p->state) { - case GRPC_CHTTP2_DATA_ERROR: - p->state = GRPC_CHTTP2_DATA_ERROR; - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_REF(p->error); - case GRPC_CHTTP2_DATA_FH_0: - p->frame_type = *cur; - switch (p->frame_type) { - case 0: - p->is_frame_compressed = false; /* GPR_FALSE */ - break; - case 1: - p->is_frame_compressed = true; /* GPR_TRUE */ - break; - default: - gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); - p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, - (intptr_t)s->id); - gpr_free(msg); - msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); - p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, - grpc_slice_from_copied_string(msg)); - gpr_free(msg); - p->error = - grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); - p->state = GRPC_CHTTP2_DATA_ERROR; - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_REF(p->error); - } - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_1; - grpc_slice_unref_internal(exec_ctx, slice); - continue; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_1: - p->frame_size = ((uint32_t)*cur) << 24; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_2; - grpc_slice_unref_internal(exec_ctx, slice); - continue; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_2: - p->frame_size |= ((uint32_t)*cur) << 16; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_3; - grpc_slice_unref_internal(exec_ctx, slice); - continue; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_3: - p->frame_size |= ((uint32_t)*cur) << 8; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_4; - grpc_slice_unref_internal(exec_ctx, slice); - continue; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_4: - GPR_ASSERT(stream_out != NULL); - GPR_ASSERT(p->parsing_frame == NULL); - p->frame_size |= ((uint32_t)*cur); - p->state = GRPC_CHTTP2_DATA_FRAME; - ++cur; - message_flags = 0; - if (p->is_frame_compressed) { - message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; - } - p->parsing_frame = grpc_chttp2_incoming_byte_stream_create( - exec_ctx, t, s, p->frame_size, message_flags); - *stream_out = &p->parsing_frame->base; - if (p->parsing_frame->remaining_bytes == 0) { - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true)); - p->parsing_frame = NULL; - p->state = GRPC_CHTTP2_DATA_FH_0; - } - s->pending_byte_stream = true; + if (cur == end) { + return GRPC_ERROR_NONE; + } - if (cur != end) { - grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - } - grpc_slice_unref_internal(exec_ctx, slice); + switch (p->state) { + case GRPC_CHTTP2_DATA_ERROR: + p->state = GRPC_CHTTP2_DATA_ERROR; + return GRPC_ERROR_REF(p->error); + fh_0: + case GRPC_CHTTP2_DATA_FH_0: + s->stats.incoming.framing_bytes++; + p->frame_type = *cur; + switch (p->frame_type) { + case 0: + p->is_frame_compressed = 0; /* GPR_FALSE */ + break; + case 1: + p->is_frame_compressed = 1; /* GPR_TRUE */ + break; + default: + gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); + p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, + (intptr_t)s->id); + gpr_free(msg); + msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, + grpc_slice_from_copied_string(msg)); + gpr_free(msg); + p->error = + grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); + p->state = GRPC_CHTTP2_DATA_ERROR; + return GRPC_ERROR_REF(p->error); + } + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_1; + return GRPC_ERROR_NONE; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_1: + s->stats.incoming.framing_bytes++; + p->frame_size = ((uint32_t)*cur) << 24; + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_2; + return GRPC_ERROR_NONE; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_2: + s->stats.incoming.framing_bytes++; + p->frame_size |= ((uint32_t)*cur) << 16; + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_3; + return GRPC_ERROR_NONE; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_3: + s->stats.incoming.framing_bytes++; + p->frame_size |= ((uint32_t)*cur) << 8; + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_4; + return GRPC_ERROR_NONE; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_4: + s->stats.incoming.framing_bytes++; + p->frame_size |= ((uint32_t)*cur); + p->state = GRPC_CHTTP2_DATA_FRAME; + ++cur; + message_flags = 0; + if (p->is_frame_compressed) { + message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } + p->parsing_frame = incoming_byte_stream = + grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size, + message_flags); + /* fallthrough */ + case GRPC_CHTTP2_DATA_FRAME: + if (cur == end) { + return GRPC_ERROR_NONE; + } + uint32_t remaining = (uint32_t)(end - cur); + if (remaining == p->frame_size) { + s->stats.incoming.data_bytes += p->frame_size; + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, + GRPC_ERROR_NONE); + p->parsing_frame = NULL; + p->state = GRPC_CHTTP2_DATA_FH_0; + return GRPC_ERROR_NONE; + } else if (remaining > p->frame_size) { + s->stats.incoming.data_bytes += p->frame_size; + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + grpc_slice_sub(slice, (size_t)(cur - beg), + (size_t)(cur + p->frame_size - beg))); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, + GRPC_ERROR_NONE); + p->parsing_frame = NULL; + cur += p->frame_size; + goto fh_0; /* loop */ + } else { + GPR_ASSERT(remaining <= p->frame_size); + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + p->frame_size -= remaining; + s->stats.incoming.data_bytes += remaining; return GRPC_ERROR_NONE; - case GRPC_CHTTP2_DATA_FRAME: { - GPR_ASSERT(p->parsing_frame != NULL); - GPR_ASSERT(slice_out != NULL); - if (cur == end) { - grpc_slice_unref_internal(exec_ctx, slice); - continue; - } - uint32_t remaining = (uint32_t)(end - cur); - if (remaining == p->frame_size) { - if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push( - exec_ctx, p->parsing_frame, - grpc_slice_sub(slice, (size_t)(cur - beg), - (size_t)(end - beg)), - slice_out))) { - grpc_slice_unref_internal(exec_ctx, slice); - return error; - } - if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) { - grpc_slice_unref_internal(exec_ctx, slice); - return error; - } - p->parsing_frame = NULL; - p->state = GRPC_CHTTP2_DATA_FH_0; - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_NONE; - } else if (remaining < p->frame_size) { - if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push( - exec_ctx, p->parsing_frame, - grpc_slice_sub(slice, (size_t)(cur - beg), - (size_t)(end - beg)), - slice_out))) { - return error; - } - p->frame_size -= remaining; - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_NONE; - } else { - GPR_ASSERT(remaining > p->frame_size); - if (GRPC_ERROR_NONE != - (grpc_chttp2_incoming_byte_stream_push( - exec_ctx, p->parsing_frame, - grpc_slice_sub(slice, (size_t)(cur - beg), - (size_t)(cur + p->frame_size - beg)), - slice_out))) { - grpc_slice_unref_internal(exec_ctx, slice); - return error; - } - if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) { - grpc_slice_unref_internal(exec_ctx, slice); - return error; - } - p->parsing_frame = NULL; - p->state = GRPC_CHTTP2_DATA_FH_0; - cur += p->frame_size; - grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_NONE; - } } - } } - return GRPC_ERROR_NONE; + GPR_UNREACHABLE_CODE( + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here")); } grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice slice, int is_last) { - /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */ - s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice); - if (!s->pending_byte_stream) { - grpc_slice_ref_internal(slice); - grpc_slice_buffer_add(&s->frame_storage, slice); - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); - } else if (s->on_next) { - GPR_ASSERT(s->frame_storage.length == 0); - grpc_slice_ref_internal(slice); - grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); - grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE); - s->on_next = NULL; - } else { - grpc_slice_ref_internal(slice); - grpc_slice_buffer_add(&s->frame_storage, slice); - } + grpc_chttp2_data_parser *p = parser; + grpc_error *error = parse_inner(exec_ctx, p, t, s, slice); - if (is_last && s->received_last_frame) { + if (is_last && p->is_last_frame) { grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, GRPC_ERROR_NONE); } - return GRPC_ERROR_NONE; + return error; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index 2fb8983c38..264ad14608 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -56,16 +56,28 @@ typedef enum { typedef struct grpc_chttp2_incoming_byte_stream grpc_chttp2_incoming_byte_stream; +typedef struct grpc_chttp2_incoming_frame_queue { + grpc_chttp2_incoming_byte_stream *head; + grpc_chttp2_incoming_byte_stream *tail; +} grpc_chttp2_incoming_frame_queue; + typedef struct { grpc_chttp2_stream_state state; + uint8_t is_last_frame; uint8_t frame_type; uint32_t frame_size; grpc_error *error; - bool is_frame_compressed; + int is_frame_compressed; grpc_chttp2_incoming_byte_stream *parsing_frame; } grpc_chttp2_data_parser; +void grpc_chttp2_incoming_frame_queue_merge( + grpc_chttp2_incoming_frame_queue *head_dst, + grpc_chttp2_incoming_frame_queue *tail_src); +grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( + grpc_chttp2_incoming_frame_queue *q); + /* initialize per-stream state for data frame parsing */ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser); @@ -75,8 +87,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, /* start processing a new data frame */ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, uint8_t flags, - uint32_t stream_id, - grpc_chttp2_stream *s); + uint32_t stream_id); /* handle a slice of a data frame - is_last indicates the last slice of a frame */ @@ -90,11 +101,4 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, grpc_transport_one_way_stats *stats, grpc_slice_buffer *outbuf); -grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx, - grpc_chttp2_data_parser *p, - grpc_chttp2_stream *s, - grpc_slice_buffer *slices, - grpc_slice *slice_out, - grpc_byte_stream **stream_out); - #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index a10e3886ea..6eb848b8d7 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -195,20 +195,22 @@ typedef struct grpc_chttp2_write_cb { struct grpc_chttp2_incoming_byte_stream { grpc_byte_stream base; gpr_refcount refs; + struct grpc_chttp2_incoming_byte_stream *next_message; + grpc_error *error; - grpc_chttp2_transport *transport; /* immutable */ - grpc_chttp2_stream *stream; /* immutable */ + grpc_chttp2_transport *transport; + grpc_chttp2_stream *stream; + bool is_tail; - /* Accessed only by transport thread when stream->pending_byte_stream == false - * Accessed only by application thread when stream->pending_byte_stream == - * true */ + gpr_mu slice_mu; // protects slices, on_next + grpc_slice_buffer slices; + grpc_closure *on_next; + grpc_slice *next; uint32_t remaining_bytes; - /* Accessed only by transport thread when stream->pending_byte_stream == false - * Accessed only by application thread when stream->pending_byte_stream == - * true */ struct { grpc_closure closure; + grpc_slice *slice; size_t max_size_hint; grpc_closure *on_complete; } next_action; @@ -443,8 +445,8 @@ struct grpc_chttp2_stream { uint32_t id; /** window available for us to send to peer, over or under the initial window - * size of the transport... ie: - * outgoing_window = outgoing_window_delta + transport.initial_window_size */ + * size of the transport... ie: + * outgoing_window = outgoing_window_delta + transport.initial_window_size */ int64_t outgoing_window_delta; /** things the upper layers would like to send */ grpc_metadata_batch *send_initial_metadata; @@ -471,6 +473,9 @@ struct grpc_chttp2_stream { grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats stats; + /** number of streams that are currently being read */ + gpr_refcount active_streams; + /** Is this stream closed for writing. */ bool write_closed; /** Is this stream reading half-closed. */ @@ -494,17 +499,7 @@ struct grpc_chttp2_stream { grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; - grpc_slice_buffer frame_storage; /* protected by t combiner */ - - /* Accessed only by transport thread when stream->pending_byte_stream == false - * Accessed only by application thread when stream->pending_byte_stream == - * true */ - grpc_slice_buffer unprocessed_incoming_frames_buffer; - grpc_closure *on_next; /* protected by t combiner */ - bool pending_byte_stream; /* protected by t combiner */ - grpc_closure reset_byte_stream; - grpc_error *byte_stream_error; /* protected by t combiner */ - bool received_last_frame; /* protected by t combiner */ + grpc_chttp2_incoming_frame_queue incoming_frames; gpr_timespec deadline; @@ -517,9 +512,6 @@ struct grpc_chttp2_stream { * incoming_window = incoming_window_delta + transport.initial_window_size */ int64_t incoming_window_delta; /** parsing state for data frames */ - /* Accessed only by transport thread when stream->pending_byte_stream == false - * Accessed only by application thread when stream->pending_byte_stream == - * true */ grpc_chttp2_data_parser data_parser; /** number of bytes received - reset at end of parse thread execution */ int64_t received_bytes; @@ -798,13 +790,10 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t); grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags); -grpc_error *grpc_chttp2_incoming_byte_stream_push( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice, grpc_slice *slice_out); -grpc_error *grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error, bool reset_on_error); -void grpc_chttp2_incoming_byte_stream_notify( +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + grpc_slice slice); +void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 638b137316..7e457ced27 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -458,13 +458,12 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, return init_skip_frame_parser(exec_ctx, t, 0); } if (err == GRPC_ERROR_NONE) { - err = grpc_chttp2_data_parser_begin_frame( - &s->data_parser, t->incoming_frame_flags, s->id, s); + err = grpc_chttp2_data_parser_begin_frame(&s->data_parser, + t->incoming_frame_flags, s->id); } error_handler: if (err == GRPC_ERROR_NONE) { t->incoming_stream = s; - /* t->parser = grpc_chttp2_data_parser_parse;*/ t->parser = grpc_chttp2_data_parser_parse; t->parser_data = &s->data_parser; return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 7896c70f9e..88335ecd0b 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -973,20 +973,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - if (1 != grpc_byte_stream_next( - exec_ctx, stream_op->payload->send_message.send_message, - stream_op->payload->send_message.send_message->length, - NULL)) { - /* Should never reach here */ - GPR_ASSERT(false); - } - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - stream_op->payload->send_message.send_message, - &slice)) { - /* Should never reach here */ - GPR_ASSERT(false); - } + grpc_byte_stream_next( + NULL, stream_op->payload->send_message.send_message, &slice, + stream_op->payload->send_message.send_message->length, NULL); grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 764524b24d..4625cba0d2 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -221,13 +221,6 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); @@ -240,11 +233,8 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); + exec_ctx, calld->send_op->payload->send_message.send_message, + &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 151fb9885d..4e47c5c658 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -220,11 +220,8 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; uint8_t *wrptr = calld->payload_bytes; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); + exec_ctx, calld->send_op->payload->send_message.send_message, + &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), GRPC_SLICE_LENGTH(calld->incoming_slice)); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); @@ -240,13 +237,6 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; calld->send_message_blocked = false; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { /* Pass down the original send_message op that was blocked.*/ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 3e96d09798..97d50a91be 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1187,7 +1187,6 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { - grpc_error *error; grpc_call *call = bctl->call; for (;;) { size_t remaining = call->receiving_stream->length - @@ -1199,22 +1198,11 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, finish_batch_step(exec_ctx, bctl); return; } - if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, + &call->receiving_slice, remaining, &call->receiving_slice_ready)) { - error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, - &call->receiving_slice); - if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); - } else { - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); - call->receiving_stream = NULL; - grpc_byte_buffer_destroy(*call->receiving_buffer); - *call->receiving_buffer = NULL; - call->receiving_message = 0; - finish_batch_step(exec_ctx, bctl); - return; - } + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); } else { return; } @@ -1225,24 +1213,12 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - grpc_byte_stream *bs = call->receiving_stream; - bool release_error = false; if (error == GRPC_ERROR_NONE) { - grpc_slice slice; - error = grpc_byte_stream_pull(exec_ctx, bs, &slice); - if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - slice); - continue_receiving_slices(exec_ctx, bctl); - } else { - /* Error returned by grpc_byte_stream_pull needs to be released manually - */ - release_error = true; - } - } - - if (error != GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + continue_receiving_slices(exec_ctx, bctl); + } else { if (grpc_trace_operation_failures) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } @@ -1250,11 +1226,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; - call->receiving_message = 0; finish_batch_step(exec_ctx, bctl); - if (release_error) { - GRPC_ERROR_UNREF(error); - } } } diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 5800c70ef4..4d4206189e 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -40,15 +40,10 @@ #include "src/core/lib/slice/slice_internal.h" int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete) { - return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); -} - -grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice) { - return byte_stream->pull(exec_ctx, byte_stream, slice); + grpc_byte_stream *byte_stream, grpc_slice *slice, + size_t max_size_hint, grpc_closure *on_complete) { + return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint, + on_complete); } void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -58,24 +53,16 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, /* slice_buffer_stream */ -static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - size_t max_size_hint, - grpc_closure *on_complete) { - grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; - GPR_ASSERT(stream->cursor < stream->backing_buffer->count); - return true; -} - -static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice) { +static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice, size_t max_size_hint, + grpc_closure *on_complete) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; GPR_ASSERT(stream->cursor < stream->backing_buffer->count); *slice = grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); stream->cursor++; - return GRPC_ERROR_NONE; + return 1; } static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -88,7 +75,6 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, stream->base.length = (uint32_t)slice_buffer->length; stream->base.flags = flags; stream->base.next = slice_buffer_stream_next; - stream->base.pull = slice_buffer_stream_pull; stream->base.destroy = slice_buffer_stream_destroy; stream->backing_buffer = slice_buffer; stream->cursor = 0; diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 381c65fb04..1fdd5b4d77 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -49,10 +49,9 @@ typedef struct grpc_byte_stream grpc_byte_stream; struct grpc_byte_stream { uint32_t length; uint32_t flags; - bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - size_t max_size_hint, grpc_closure *on_complete); - grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - grpc_slice *slice); + int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + grpc_slice *slice, size_t max_size_hint, + grpc_closure *on_complete); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); }; @@ -62,20 +61,12 @@ struct grpc_byte_stream { * * max_size_hint can be set as a hint as to the maximum number * of bytes that would be acceptable to read. - */ -int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete); - -/* returns the next slice in the byte stream when it is ready (indicated by - * either grpc_byte_stream_next returning 1 or on_complete passed to - * grpc_byte_stream_next is called). * * once a slice is returned into *slice, it is owned by the caller. */ -grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice); +int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, grpc_slice *slice, + size_t max_size_hint, grpc_closure *on_complete); void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 8c5413b5fd..c89f349ca7 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -569,17 +569,12 @@ static void BM_TransportStreamRecv(benchmark::State &state) { grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE); return; } - } while (grpc_byte_stream_next(exec_ctx, recv_stream, + } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice, recv_stream->length - received, - drain_continue.get()) && - GRPC_ERROR_NONE == - grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) && - (received += GRPC_SLICE_LENGTH(recv_slice), - grpc_slice_unref_internal(exec_ctx, recv_slice), true)); + drain_continue.get())); }); drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice); received += GRPC_SLICE_LENGTH(recv_slice); grpc_slice_unref_internal(exec_ctx, recv_slice); grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); |