diff options
author | 2017-07-28 15:47:32 -0700 | |
---|---|---|
committer | 2017-07-28 15:47:32 -0700 | |
commit | fb193a1e2fd0093103101b2551dd3e7b9af8833d (patch) | |
tree | b1d7ba010e35797e7c02c0056c89a517cb66b8bc /src/core/ext/transport | |
parent | 020dbf2ef5d01f22791e20182ec0509767264ffe (diff) | |
parent | 433f7bce2d5a6683e9d60a147b528bb9be98baff (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into flow-control-v3
Diffstat (limited to 'src/core/ext/transport')
6 files changed, 277 insertions, 62 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index c48156b069..e62f7777ec 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -727,6 +727,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); + if (s->compressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer); + gpr_free(s->compressed_data_buffer); + } + if (s->decompressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); + gpr_free(s->decompressed_data_buffer); + } grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -771,6 +779,15 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + if (s->stream_compression_ctx != NULL) { + grpc_stream_compression_context_destroy(s->stream_compression_ctx); + s->stream_compression_ctx = NULL; + } + if (s->stream_decompression_ctx != NULL) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, @@ -1164,6 +1181,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } if (s->fetched_send_message_length == s->fetching_send_message->length) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( @@ -1186,9 +1204,14 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, UINT32_MAX, &s->complete_fetch_locked)) { - grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, - &s->fetching_slice); - add_fetched_slice_locked(exec_ctx, t, s); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, s->fetching_send_message, &s->fetching_slice); + if (error != GRPC_ERROR_NONE) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + } else { + add_fetched_slice_locked(exec_ctx, t, s); + } } } } @@ -1205,10 +1228,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, continue_fetching_send_locked(exec_ctx, t, s); } } - if (error != GRPC_ERROR_NONE) { - /* TODO(ctiller): what to do here */ - abort(); + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); } } @@ -1353,8 +1375,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "fetching_send_message_finished"); } else { GPR_ASSERT(s->fetching_send_message == NULL); - uint8_t *frame_hdr = - grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); + uint8_t *frame_hdr = grpc_slice_buffer_tiny_add( + &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); uint32_t flags = op_payload->send_message.send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; size_t len = op_payload->send_message.send_message->length; @@ -1445,15 +1467,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0) { - if (s->pending_byte_stream) { - already_received = s->frame_storage.length; - } else { - already_received = s->frame_storage.length + - s->unprocessed_incoming_frames_buffer.length; - } if (!s->read_closed) { + already_received = s->frame_storage.length; grpc_chttp2_flowctl_incoming_bs_update( - &t->flow_control, &s->flow_control, 5, already_received); + &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, already_received); grpc_chttp2_act_on_flowctl_action( exec_ctx, grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), @@ -1695,10 +1712,43 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, if (s->unprocessed_incoming_frames_buffer.length == 0) { grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, &s->frame_storage); + s->unprocessed_incoming_frames_decompressed = false; + } + if (s->stream_compression_recv_enabled && + !s->unprocessed_incoming_frames_decompressed) { + GPR_ASSERT(s->decompressed_data_buffer->length == 0); + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = + grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer, NULL, + GRPC_HEADER_SIZE_IN_BYTES, + &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Stream decompression error."); + } else { + error = grpc_deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL, + s->recv_message); + if (end_of_context) { + grpc_stream_compression_context_destroy( + s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + } else { + error = grpc_deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, + &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); } - error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, - &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); if (error != GRPC_ERROR_NONE) { s->seen_error = true; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -1736,7 +1786,37 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } bool pending_data = s->pending_byte_stream || s->unprocessed_incoming_frames_buffer.length > 0; + if (s->stream_compression_recv_enabled && s->read_closed && + s->frame_storage.length > 0 && + s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data && + !s->seen_error && s->recv_trailing_metadata_finished != NULL) { + /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and + * maybe decompress the next 5 bytes in the stream. */ + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->frame_storage, + &s->unprocessed_incoming_frames_buffer, NULL, + GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + s->seen_error = true; + } else { + if (s->unprocessed_incoming_frames_buffer.length > 0) { + s->unprocessed_incoming_frames_decompressed = true; + } + if (end_of_context) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + } if (s->read_closed && s->frame_storage.length == 0 && + s->unprocessed_incoming_frames_buffer.length == 0 && (!pending_data || s->seen_error) && s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish( @@ -2594,6 +2674,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, if (s->frame_storage.length > 0) { grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); + s->unprocessed_incoming_frames_decompressed = false; GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, @@ -2655,17 +2736,41 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_stream *s = bs->stream; + grpc_error *error; if (s->unprocessed_incoming_frames_buffer.length > 0) { - grpc_error *error = grpc_deframe_unprocessed_incoming_frames( + if (s->stream_compression_recv_enabled && + !s->unprocessed_incoming_frames_decompressed) { + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_DECOMPRESS); + } + if (!grpc_stream_decompress(s->stream_decompression_ctx, + &s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer, NULL, MAX_SIZE_T, + &end_of_context)) { + error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); + return error; + } + GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); + grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, + s->decompressed_data_buffer); + s->unprocessed_incoming_frames_decompressed = true; + if (end_of_context) { + grpc_stream_compression_context_destroy(s->stream_decompression_ctx); + s->stream_decompression_ctx = NULL; + } + } + error = grpc_deframe_unprocessed_incoming_frames( exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, NULL); if (error != GRPC_ERROR_NONE) { return error; } } else { - grpc_error *error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } @@ -2673,22 +2778,9 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream); - static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, - grpc_error *error_ignored) { - grpc_chttp2_incoming_byte_stream *bs = byte_stream; - grpc_chttp2_stream *s = bs->stream; - grpc_chttp2_transport *t = s->t; - - GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); - incoming_byte_stream_unref(exec_ctx, bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); -} + grpc_error *error_ignored); static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { @@ -2755,6 +2847,33 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished( return error; } +static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, bs, error, true /* reset_on_error */)); +} + +static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { + incoming_byte_stream_next, incoming_byte_stream_pull, + incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; + +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + void *byte_stream, + grpc_error *error_ignored) { + grpc_chttp2_incoming_byte_stream *bs = byte_stream; + grpc_chttp2_stream *s = bs->stream; + grpc_chttp2_transport *t = s->t; + + GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); + incoming_byte_stream_unref(exec_ctx, bs); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); +} + grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { @@ -2763,9 +2882,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; - incoming_byte_stream->base.next = incoming_byte_stream_next; - incoming_byte_stream->base.pull = incoming_byte_stream_pull; - incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable; gpr_ref_init(&incoming_byte_stream->refs, 2); incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index dead6be77f..222d2177b2 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -293,7 +293,6 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice slice, int is_last) { - /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */ if (!s->pending_byte_stream) { grpc_slice_ref_internal(slice); grpc_slice_buffer_add(&s->frame_storage, slice); @@ -304,6 +303,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_NONE); s->on_next = NULL; + s->unprocessed_incoming_frames_decompressed = false; } else { grpc_slice_ref_internal(slice); grpc_slice_buffer_add(&s->frame_storage, slice); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index bf0988bdd3..f26f14dbec 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -556,6 +556,26 @@ struct grpc_chttp2_stream { grpc_chttp2_write_cb *on_write_finished_cbs; grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; + + /** Whether stream compression send is enabled */ + bool stream_compression_recv_enabled; + /** Whether stream compression recv is enabled */ + bool stream_compression_send_enabled; + /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed + */ + bool unprocessed_incoming_frames_decompressed; + /** Stream compression decompress context */ + grpc_stream_compression_context *stream_decompression_ctx; + /** Stream compression compress context */ + grpc_stream_compression_context *stream_compression_ctx; + + /** Buffer storing data that is compressed but not sent */ + grpc_slice_buffer *compressed_data_buffer; + /** Amount of uncompressed bytes sent out when compressed_data_buffer is + * emptied */ + size_t uncompressed_data_size; + /** Temporary buffer storing decompressed data */ + grpc_slice_buffer *decompressed_data_buffer; }; /** Transport writing call flow: @@ -720,6 +740,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, grpc_closure **pclosure, grpc_error *error, const char *desc); +#define GRPC_HEADER_SIZE_IN_BYTES 5 +#define MAX_SIZE_T (~(size_t)0) + #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 082541d6c3..18d163ee98 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -589,6 +589,10 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx, "ignoring grpc_chttp2_stream with non-client generated index %d", t->incoming_stream_id)); return init_skip_frame_parser(exec_ctx, t, 1); + } else if (grpc_chttp2_stream_map_size(&t->stream_map) >= + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Max stream count exceeded"); } t->last_new_stream_id = t->incoming_stream_id; s = t->incoming_stream = diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 726af39a2e..a9f8efe312 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -296,7 +296,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { uint32_t stream_remote_window = (uint32_t)GPR_MAX( 0, s->flow_control.remote_window_delta + @@ -307,19 +309,59 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], GPR_MIN(stream_remote_window, t->flow_control.remote_window)); if (max_outgoing > 0) { - uint32_t send_bytes = - (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); - bool is_last_data_frame = - s->fetching_send_message == NULL && - send_bytes == s->flow_controlled_buffer.length; - bool is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, - is_last_frame, &s->stats.outgoing, - &t->outbuf); - grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, + bool is_last_data_frame = false; + bool is_last_frame = false; + if (s->stream_compression_send_enabled) { + while ((s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer->length > 0) && + max_outgoing > 0) { + if (s->compressed_data_buffer->length > 0) { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->compressed_data_buffer->length); + is_last_data_frame = + (send_bytes == s->compressed_data_buffer->length && + s->flow_controlled_buffer.length == 0 && + s->fetching_send_message == NULL); + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, send_bytes); + max_outgoing -= send_bytes; + if (s->compressed_data_buffer->length == 0) { + s->sending_bytes += s->uncompressed_data_size; + } + } else { + if (s->stream_compression_ctx == NULL) { + s->stream_compression_ctx = + grpc_stream_compression_context_create( + GRPC_STREAM_COMPRESSION_COMPRESS); + } + s->uncompressed_data_size = s->flow_controlled_buffer.length; + GPR_ASSERT(grpc_stream_compress( + s->stream_compression_ctx, &s->flow_controlled_buffer, + s->compressed_data_buffer, NULL, MAX_SIZE_T, + GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); + } + } + } else { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->flow_controlled_buffer.length); + is_last_data_frame = s->fetching_send_message == NULL && + send_bytes == s->flow_controlled_buffer.length; + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, + send_bytes); + s->sending_bytes += send_bytes; + } t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { @@ -336,9 +378,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &s->stats.outgoing)); } } - s->sending_bytes += send_bytes; now_writing = true; - if (s->flow_controlled_buffer.length > 0) { + if (s->flow_controlled_buffer.length > 0 || + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -352,7 +395,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && - s->flow_controlled_buffer.length == 0) { + s->flow_controlled_buffer.length == 0 && + (!s->stream_compression_send_enabled || + s->compressed_data_buffer->length == 0)) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 4df64d81e2..6f4b429ee2 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -72,6 +72,7 @@ typedef struct sb_list_entry { typedef struct { grpc_byte_stream base; sb_list_entry *le; + grpc_error *shutdown_error; } inproc_slice_byte_stream; typedef struct { @@ -190,32 +191,50 @@ typedef struct inproc_stream { static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs, size_t max, grpc_closure *on_complete) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - return (stream->le->sb.count != 0); + // Because inproc transport always provides the entire message atomically, + // the byte stream always has data available when this function is called. + // Thus, this function always returns true (unlike other transports) and + // there is never any need to schedule a closure + return true; } static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs, grpc_slice *slice) { inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } *slice = grpc_slice_buffer_take_first(&stream->le->sb); return GRPC_ERROR_NONE; } +static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *bs, + grpc_error *error) { + inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = error; +} + static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs) { inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; sb_list_entry_destroy(exec_ctx, stream->le); + GRPC_ERROR_UNREF(stream->shutdown_error); } +static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { + inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, + inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; + void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, sb_list_entry *le) { s->base.length = (uint32_t)le->sb.length; s->base.flags = 0; - s->base.next = inproc_slice_byte_stream_next; - s->base.pull = inproc_slice_byte_stream_pull; - s->base.destroy = inproc_slice_byte_stream_destroy; + s->base.vtable = &inproc_slice_byte_stream_vtable; s->le = le; + s->shutdown_error = GRPC_ERROR_NONE; } static void ref_transport(inproc_transport *t) { @@ -953,11 +972,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(grpc_byte_stream_next(exec_ctx, op->payload->send_message.send_message, SIZE_MAX, &unused)); - grpc_byte_stream_pull(exec_ctx, op->payload->send_message.send_message, - &message_slice); + error = grpc_byte_stream_pull( + exec_ctx, op->payload->send_message.send_message, &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); remaining -= GRPC_SLICE_LENGTH(message_slice); grpc_slice_buffer_add(dest, message_slice); } while (remaining != 0); + grpc_byte_stream_destroy(exec_ctx, + op->payload->send_message.send_message); } if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md |