aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Muxi Yan <muxi@users.noreply.github.com>2017-07-26 17:00:53 -0700
committerGravatar GitHub <noreply@github.com>2017-07-26 17:00:53 -0700
commitddc0d374886f3db33db90c6c1be163214cc5147d (patch)
tree890dca6b3e5030ee313ab6ca6c6b32afa4f884e0
parentc44f036ee2bda2fb24b85b23eebc68213926fa2c (diff)
parentc7a94c5052a2409ac68b991ae116d5f9e60eb545 (diff)
Merge pull request #11780 from muxi/stream_compression_transport
Implement stream compression - transport layer
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c132
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h23
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c85
4 files changed, 206 insertions, 36 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index b1a7698811..dc35f4855c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -730,6 +730,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
+ if (s->compressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer);
+ gpr_free(s->compressed_data_buffer);
+ }
+ if (s->decompressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer);
+ gpr_free(s->decompressed_data_buffer);
+ }
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -780,6 +788,15 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+ if (s->stream_compression_ctx != NULL) {
+ grpc_stream_compression_context_destroy(s->stream_compression_ctx);
+ s->stream_compression_ctx = NULL;
+ }
+ if (s->stream_decompression_ctx != NULL) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+
s->destroy_stream_arg = then_schedule_closure;
GRPC_CLOSURE_SCHED(
exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
@@ -1367,8 +1384,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
"fetching_send_message_finished");
} else {
GPR_ASSERT(s->fetching_send_message == NULL);
- uint8_t *frame_hdr =
- grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
+ uint8_t *frame_hdr = grpc_slice_buffer_tiny_add(
+ &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
uint32_t flags = op_payload->send_message.send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op_payload->send_message.send_message->length;
@@ -1459,14 +1476,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) {
- if (s->pending_byte_stream) {
- already_received = s->frame_storage.length;
- } else {
- already_received = s->frame_storage.length +
- s->unprocessed_incoming_frames_buffer.length;
- }
- incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
- already_received);
+ already_received = s->frame_storage.length;
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1703,10 +1715,43 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
if (s->unprocessed_incoming_frames_buffer.length == 0) {
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
&s->frame_storage);
+ s->unprocessed_incoming_frames_decompressed = false;
+ }
+ if (s->stream_compression_recv_enabled &&
+ !s->unprocessed_incoming_frames_decompressed) {
+ GPR_ASSERT(s->decompressed_data_buffer->length == 0);
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx =
+ grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer, NULL,
+ GRPC_HEADER_SIZE_IN_BYTES,
+ &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Stream decompression error.");
+ } else {
+ error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL,
+ s->recv_message);
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(
+ s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ } else {
+ error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s,
+ &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
}
- error = grpc_deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, s,
- &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
if (error != GRPC_ERROR_NONE) {
s->seen_error = true;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
@@ -1744,7 +1789,37 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
bool pending_data = s->pending_byte_stream ||
s->unprocessed_incoming_frames_buffer.length > 0;
+ if (s->stream_compression_recv_enabled && s->read_closed &&
+ s->frame_storage.length > 0 &&
+ s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data &&
+ !s->seen_error && s->recv_trailing_metadata_finished != NULL) {
+ /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
+ * maybe decompress the next 5 bytes in the stream. */
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx = grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->frame_storage,
+ &s->unprocessed_incoming_frames_buffer, NULL,
+ GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ s->seen_error = true;
+ } else {
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ s->unprocessed_incoming_frames_decompressed = true;
+ }
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ }
if (s->read_closed && s->frame_storage.length == 0 &&
+ s->unprocessed_incoming_frames_buffer.length == 0 &&
(!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -2612,6 +2687,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
+ s->unprocessed_incoming_frames_decompressed = false;
GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
@@ -2673,17 +2749,41 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
+ grpc_error *error;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
- grpc_error *error = grpc_deframe_unprocessed_incoming_frames(
+ if (s->stream_compression_recv_enabled &&
+ !s->unprocessed_incoming_frames_decompressed) {
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx = grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer, NULL, MAX_SIZE_T,
+ &end_of_context)) {
+ error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
+ return error;
+ }
+ GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+ grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer);
+ s->unprocessed_incoming_frames_decompressed = true;
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ error = grpc_deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
slice, NULL);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
- grpc_error *error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index dead6be77f..222d2177b2 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -293,7 +293,6 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
- /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
if (!s->pending_byte_stream) {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
@@ -304,6 +303,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_NONE);
s->on_next = NULL;
+ s->unprocessed_incoming_frames_decompressed = false;
} else {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 4563b78e75..b538d1df17 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -526,6 +526,26 @@ struct grpc_chttp2_stream {
grpc_chttp2_write_cb *on_write_finished_cbs;
grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes;
+
+ /** Whether stream compression send is enabled */
+ bool stream_compression_recv_enabled;
+ /** Whether stream compression recv is enabled */
+ bool stream_compression_send_enabled;
+ /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
+ */
+ bool unprocessed_incoming_frames_decompressed;
+ /** Stream compression decompress context */
+ grpc_stream_compression_context *stream_decompression_ctx;
+ /** Stream compression compress context */
+ grpc_stream_compression_context *stream_compression_ctx;
+
+ /** Buffer storing data that is compressed but not sent */
+ grpc_slice_buffer *compressed_data_buffer;
+ /** Amount of uncompressed bytes sent out when compressed_data_buffer is
+ * emptied */
+ size_t uncompressed_data_size;
+ /** Temporary buffer storing decompressed data */
+ grpc_slice_buffer *decompressed_data_buffer;
};
/** Transport writing call flow:
@@ -621,6 +641,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_closure **pclosure,
grpc_error *error, const char *desc);
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+#define MAX_SIZE_T (~(size_t)0)
+
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 315f2a67a2..c3ede08343 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -303,7 +303,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
- if (s->flow_controlled_buffer.length > 0) {
+ if (s->flow_controlled_buffer.length > 0 ||
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
0,
s->outgoing_window_delta +
@@ -314,21 +316,63 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
GPR_MIN(stream_outgoing_window, t->outgoing_window));
if (max_outgoing > 0) {
- uint32_t send_bytes =
- (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
- bool is_last_data_frame =
- s->fetching_send_message == NULL &&
- send_bytes == s->flow_controlled_buffer.length;
- bool is_last_frame =
- is_last_data_frame && s->send_trailing_metadata != NULL &&
- grpc_metadata_batch_is_empty(s->send_trailing_metadata);
- grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
- is_last_frame, &s->stats.outgoing,
- &t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
- send_bytes);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
- send_bytes);
+ bool is_last_data_frame = false;
+ bool is_last_frame = false;
+ if (s->stream_compression_send_enabled) {
+ while ((s->flow_controlled_buffer.length > 0 ||
+ s->compressed_data_buffer->length > 0) &&
+ max_outgoing > 0) {
+ if (s->compressed_data_buffer->length > 0) {
+ uint32_t send_bytes = (uint32_t)GPR_MIN(
+ max_outgoing, s->compressed_data_buffer->length);
+ is_last_data_frame =
+ (send_bytes == s->compressed_data_buffer->length &&
+ s->flow_controlled_buffer.length == 0 &&
+ s->fetching_send_message == NULL);
+ is_last_frame =
+ is_last_data_frame && s->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+ grpc_chttp2_encode_data(s->id, s->compressed_data_buffer,
+ send_bytes, is_last_frame,
+ &s->stats.outgoing, &t->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM(
+ "write", t, s, outgoing_window_delta, send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+ send_bytes);
+ max_outgoing -= send_bytes;
+ if (s->compressed_data_buffer->length == 0) {
+ s->sending_bytes += s->uncompressed_data_size;
+ }
+ } else {
+ if (s->stream_compression_ctx == NULL) {
+ s->stream_compression_ctx =
+ grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_COMPRESS);
+ }
+ s->uncompressed_data_size = s->flow_controlled_buffer.length;
+ GPR_ASSERT(grpc_stream_compress(
+ s->stream_compression_ctx, &s->flow_controlled_buffer,
+ s->compressed_data_buffer, NULL, MAX_SIZE_T,
+ GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
+ }
+ }
+ } else {
+ uint32_t send_bytes = (uint32_t)GPR_MIN(
+ max_outgoing, s->flow_controlled_buffer.length);
+ is_last_data_frame = s->fetching_send_message == NULL &&
+ send_bytes == s->flow_controlled_buffer.length;
+ is_last_frame =
+ is_last_data_frame && s->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+ grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
+ send_bytes, is_last_frame,
+ &s->stats.outgoing, &t->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
+ send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+ send_bytes);
+ s->sending_bytes += send_bytes;
+ }
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
@@ -345,9 +389,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&s->stats.outgoing));
}
}
- s->sending_bytes += send_bytes;
now_writing = true;
- if (s->flow_controlled_buffer.length > 0) {
+ if (s->flow_controlled_buffer.length > 0 ||
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
@@ -361,7 +406,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL &&
- s->flow_controlled_buffer.length == 0) {
+ s->flow_controlled_buffer.length == 0 &&
+ (!s->stream_compression_send_enabled ||
+ s->compressed_data_buffer->length == 0)) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,