aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-07-26 14:23:08 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-07-26 14:23:08 -0700
commit3c902ab943368e9664c5be135f1e57a7250e42fc (patch)
tree9c34ed79f5ffaa2982e2f2a124a88b52b7abf915
parent68a0fd54169d165060b09efadb1e2c0df4a294cb (diff)
parentc7a94c5052a2409ac68b991ae116d5f9e60eb545 (diff)
Merge changes in transport from #11780
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c26
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c23
3 files changed, 30 insertions, 23 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 41516ed91a..212704a121 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -668,8 +668,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
- grpc_slice_buffer_init(&s->compressed_data_buffer);
- grpc_slice_buffer_init(&s->decompressed_data_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
@@ -708,8 +706,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
- grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer);
- grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer);
+ if (s->compressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer);
+ gpr_free(s->compressed_data_buffer);
+ }
+ if (s->decompressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer);
+ gpr_free(s->decompressed_data_buffer);
+ }
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -1671,7 +1675,7 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
}
if (s->stream_compression_recv_enabled &&
!s->unprocessed_incoming_frames_decompressed) {
- GPR_ASSERT(s->decompressed_data_buffer.length == 0);
+ GPR_ASSERT(s->decompressed_data_buffer->length == 0);
bool end_of_context;
if (!s->stream_decompression_ctx) {
s->stream_decompression_ctx =
@@ -1680,7 +1684,7 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
}
if (!grpc_stream_decompress(s->stream_decompression_ctx,
&s->unprocessed_incoming_frames_buffer,
- &s->decompressed_data_buffer, NULL,
+ s->decompressed_data_buffer, NULL,
GRPC_HEADER_SIZE_IN_BYTES,
&end_of_context)) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
@@ -1691,8 +1695,8 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
"Stream decompression error.");
} else {
error = grpc_deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer,
- NULL, s->recv_message);
+ exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL,
+ s->recv_message);
if (end_of_context) {
grpc_stream_compression_context_destroy(
s->stream_decompression_ctx);
@@ -2713,15 +2717,15 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
}
if (!grpc_stream_decompress(s->stream_decompression_ctx,
&s->unprocessed_incoming_frames_buffer,
- &s->decompressed_data_buffer, NULL,
- MAX_SIZE_T, &end_of_context)) {
+ s->decompressed_data_buffer, NULL, MAX_SIZE_T,
+ &end_of_context)) {
error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
return error;
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
- &s->decompressed_data_buffer);
+ s->decompressed_data_buffer);
s->unprocessed_incoming_frames_decompressed = true;
if (end_of_context) {
grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 9fad683516..eb1acc0f13 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -533,12 +533,12 @@ struct grpc_chttp2_stream {
grpc_stream_compression_context *stream_compression_ctx;
/** Buffer storing data that is compressed but not sent */
- grpc_slice_buffer compressed_data_buffer;
+ grpc_slice_buffer *compressed_data_buffer;
/** Amount of uncompressed bytes sent out when compressed_data_buffer is
* emptied */
size_t uncompressed_data_size;
/** Temporary buffer storing decompressed data */
- grpc_slice_buffer decompressed_data_buffer;
+ grpc_slice_buffer *decompressed_data_buffer;
};
/** Transport writing call flow:
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 08babb7369..c3ede08343 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -304,7 +304,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0 ||
- s->compressed_data_buffer.length > 0) {
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
0,
s->outgoing_window_delta +
@@ -319,19 +320,19 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
bool is_last_frame = false;
if (s->stream_compression_send_enabled) {
while ((s->flow_controlled_buffer.length > 0 ||
- s->compressed_data_buffer.length > 0) &&
+ s->compressed_data_buffer->length > 0) &&
max_outgoing > 0) {
- if (s->compressed_data_buffer.length > 0) {
+ if (s->compressed_data_buffer->length > 0) {
uint32_t send_bytes = (uint32_t)GPR_MIN(
- max_outgoing, s->compressed_data_buffer.length);
+ max_outgoing, s->compressed_data_buffer->length);
is_last_data_frame =
- (send_bytes == s->compressed_data_buffer.length &&
+ (send_bytes == s->compressed_data_buffer->length &&
s->flow_controlled_buffer.length == 0 &&
s->fetching_send_message == NULL);
is_last_frame =
is_last_data_frame && s->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(s->send_trailing_metadata);
- grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer,
+ grpc_chttp2_encode_data(s->id, s->compressed_data_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM(
@@ -339,7 +340,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
max_outgoing -= send_bytes;
- if (s->compressed_data_buffer.length == 0) {
+ if (s->compressed_data_buffer->length == 0) {
s->sending_bytes += s->uncompressed_data_size;
}
} else {
@@ -351,7 +352,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
s->uncompressed_data_size = s->flow_controlled_buffer.length;
GPR_ASSERT(grpc_stream_compress(
s->stream_compression_ctx, &s->flow_controlled_buffer,
- &s->compressed_data_buffer, NULL, MAX_SIZE_T,
+ s->compressed_data_buffer, NULL, MAX_SIZE_T,
GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
}
}
@@ -390,7 +391,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
now_writing = true;
if (s->flow_controlled_buffer.length > 0 ||
- s->compressed_data_buffer.length > 0) {
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
@@ -405,7 +407,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL &&
s->flow_controlled_buffer.length == 0 &&
- s->compressed_data_buffer.length == 0) {
+ (!s->stream_compression_send_enabled ||
+ s->compressed_data_buffer->length == 0)) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,