aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-07-15 00:09:27 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-07-15 00:09:55 -0700
commitd317e7570bd400ed0ae7ae11fe18674fac3ec734 (patch)
treeccb4a09df22b239e5978f0ea8085e732cf81b4f9 /src/core/channel
parent20a3538ddcdc194c14ad7c04b237a2ec7f022df3 (diff)
Fixes to compression, to be merged back to the appropriate branch.
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/compress_filter.c112
1 files changed, 35 insertions, 77 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 3e76030dfc..69911cbda3 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -46,7 +46,7 @@ typedef struct call_data {
gpr_slice_buffer slices;
grpc_linked_mdelem compression_algorithm_storage;
int remaining_slice_bytes;
- int seen_initial_metadata;
+ int written_initial_metadata;
grpc_compression_algorithm compression_algorithm;
gpr_uint8 has_compression_algorithm;
} call_data;
@@ -115,13 +115,10 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
size_t i;
grpc_stream_op_buffer new_send_ops;
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
int new_slices_added = 0; /* GPR_FALSE */
- grpc_metadata_batch metadata;
grpc_sopb_init(&new_send_ops);
- /* The following loop is akin to a selective reset + update */
for (i = 0; i < send_ops->nops; i++) {
grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) {
@@ -130,17 +127,6 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
&new_send_ops, calld->slices.length,
sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
break;
- case GRPC_OP_METADATA:
- grpc_metadata_batch_move(&metadata, &sop->data.metadata);
- if (!calld->seen_initial_metadata) {
- grpc_metadata_batch_add_head(
- &metadata, &calld->compression_algorithm_storage,
- grpc_mdelem_ref(channeld->mdelem_compression_algorithms
- [calld->compression_algorithm]));
- calld->seen_initial_metadata = 1; /* GPR_TRUE */
- }
- grpc_sopb_add_metadata(&new_send_ops, metadata);
- break;
case GRPC_OP_SLICE:
if (!new_slices_added) {
size_t j;
@@ -151,49 +137,16 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
new_slices_added = 1; /* GPR_TRUE */
}
break;
- case GRPC_NO_OP:
- break;
- }
- }
- grpc_sopb_swap(send_ops, &new_send_ops);
- grpc_sopb_destroy(&new_send_ops);
-}
-
-/* even if the filter isn't producing compressed output, it may need to update
- * the input. For example, compression may have een requested but somehow it was
- * decided not to honor the request: the compression flags need to be reset and
- * the fact that no compression was performed in the end signaled */
-static void finish_not_compressed_sopb(grpc_stream_op_buffer *send_ops,
- grpc_call_element *elem) {
- size_t i;
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- for (i = 0; i < send_ops->nops; ++i) {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type) {
- case GRPC_OP_BEGIN_MESSAGE:
- /* either because the user requested the exception or because
- * compressing would have resulted in a larger output */
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- /* reset the flag compression bit */
- sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS;
- break;
case GRPC_OP_METADATA:
- if (!calld->seen_initial_metadata) {
- grpc_metadata_batch_add_head(
- &(sop->data.metadata), &calld->compression_algorithm_storage,
- grpc_mdelem_ref(
- channeld->mdelem_compression_algorithms[GRPC_COMPRESS_NONE]));
- calld->seen_initial_metadata = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_SLICE:
+ grpc_sopb_add_metadata(&new_send_ops, sop->data.metadata);
+ memset(&(sop->data.metadata), 0, sizeof(grpc_metadata_batch));
break;
case GRPC_NO_OP:
break;
}
}
+ grpc_sopb_swap(send_ops, &new_send_ops);
+ grpc_sopb_destroy(&new_send_ops);
}
static void process_send_ops(grpc_call_element *elem,
@@ -216,21 +169,28 @@ static void process_send_ops(grpc_call_element *elem,
}
break;
case GRPC_OP_METADATA:
- /* Parse incoming request for compression. If any, it'll be available at
- * calld->compression_algorithm */
- grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter,
- elem);
- if (!calld->has_compression_algorithm) {
- /* If no algorithm was found in the metadata and we aren't
- * exceptionally skipping compression, fall back to the channel
- * default */
- calld->compression_algorithm =
- channeld->default_compression_algorithm;
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ if (!calld->written_initial_metadata) {
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ grpc_metadata_batch_filter(&(sop->data.metadata),
+ compression_md_filter, elem);
+ if (!calld->has_compression_algorithm) {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm =
+ channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ }
+ grpc_metadata_batch_add_head(
+ &(sop->data.metadata), &calld->compression_algorithm_storage,
+ grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+ [calld->compression_algorithm]));
+ calld->written_initial_metadata = 1; /* GPR_TRUE */
}
break;
case GRPC_OP_SLICE:
- if (skip_compression(channeld, calld)) goto done;
+ if (skip_compression(channeld, calld)) continue;
GPR_ASSERT(calld->remaining_slice_bytes > 0);
/* We need to copy the input because gpr_slice_buffer_add takes
* ownership. However, we don't own sop->data.slice, the caller does. */
@@ -247,13 +207,10 @@ static void process_send_ops(grpc_call_element *elem,
}
}
-done:
/* Modify the send_ops stream_op_buffer depending on whether compression was
* carried out */
if (did_compress) {
finish_compressed_sopb(send_ops, elem);
- } else {
- finish_not_compressed_sopb(send_ops, elem);
}
}
@@ -282,7 +239,7 @@ static void init_call_elem(grpc_call_element *elem,
/* initialize members */
gpr_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
- calld->seen_initial_metadata = 0; /* GPR_FALSE */
+ calld->written_initial_metadata = 0; /* GPR_FALSE */
if (initial_op) {
if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
@@ -342,12 +299,13 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
}
-const grpc_channel_filter grpc_compress_filter = {compress_start_transport_stream_op,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- "compress"};
+const grpc_channel_filter grpc_compress_filter = {
+ compress_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "compress"};