diff options
author | David Garcia Quintas <dgq@google.com> | 2015-07-15 00:09:27 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-07-15 00:09:55 -0700 |
commit | d317e7570bd400ed0ae7ae11fe18674fac3ec734 (patch) | |
tree | ccb4a09df22b239e5978f0ea8085e732cf81b4f9 /src/core/channel | |
parent | 20a3538ddcdc194c14ad7c04b237a2ec7f022df3 (diff) |
Fixes to compression, to be merged back to the appropriate branch.
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/compress_filter.c | 112 |
1 files changed, 35 insertions, 77 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 3e76030dfc..69911cbda3 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -46,7 +46,7 @@ typedef struct call_data { gpr_slice_buffer slices; grpc_linked_mdelem compression_algorithm_storage; int remaining_slice_bytes; - int seen_initial_metadata; + int written_initial_metadata; grpc_compression_algorithm compression_algorithm; gpr_uint8 has_compression_algorithm; } call_data; @@ -115,13 +115,10 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, size_t i; grpc_stream_op_buffer new_send_ops; call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; int new_slices_added = 0; /* GPR_FALSE */ - grpc_metadata_batch metadata; grpc_sopb_init(&new_send_ops); - /* The following loop is akin to a selective reset + update */ for (i = 0; i < send_ops->nops; i++) { grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { @@ -130,17 +127,6 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, &new_send_ops, calld->slices.length, sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); break; - case GRPC_OP_METADATA: - grpc_metadata_batch_move(&metadata, &sop->data.metadata); - if (!calld->seen_initial_metadata) { - grpc_metadata_batch_add_head( - &metadata, &calld->compression_algorithm_storage, - grpc_mdelem_ref(channeld->mdelem_compression_algorithms - [calld->compression_algorithm])); - calld->seen_initial_metadata = 1; /* GPR_TRUE */ - } - grpc_sopb_add_metadata(&new_send_ops, metadata); - break; case GRPC_OP_SLICE: if (!new_slices_added) { size_t j; @@ -151,49 +137,16 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, new_slices_added = 1; /* GPR_TRUE */ } break; - case GRPC_NO_OP: - break; - } - } - grpc_sopb_swap(send_ops, &new_send_ops); - grpc_sopb_destroy(&new_send_ops); -} - -/* even if the filter isn't producing compressed output, it may need to update - * the input. For example, compression may have een requested but somehow it was - * decided not to honor the request: the compression flags need to be reset and - * the fact that no compression was performed in the end signaled */ -static void finish_not_compressed_sopb(grpc_stream_op_buffer *send_ops, - grpc_call_element *elem) { - size_t i; - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - for (i = 0; i < send_ops->nops; ++i) { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) { - case GRPC_OP_BEGIN_MESSAGE: - /* either because the user requested the exception or because - * compressing would have resulted in a larger output */ - calld->compression_algorithm = GRPC_COMPRESS_NONE; - /* reset the flag compression bit */ - sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS; - break; case GRPC_OP_METADATA: - if (!calld->seen_initial_metadata) { - grpc_metadata_batch_add_head( - &(sop->data.metadata), &calld->compression_algorithm_storage, - grpc_mdelem_ref( - channeld->mdelem_compression_algorithms[GRPC_COMPRESS_NONE])); - calld->seen_initial_metadata = 1; /* GPR_TRUE */ - } - break; - case GRPC_OP_SLICE: + grpc_sopb_add_metadata(&new_send_ops, sop->data.metadata); + memset(&(sop->data.metadata), 0, sizeof(grpc_metadata_batch)); break; case GRPC_NO_OP: break; } } + grpc_sopb_swap(send_ops, &new_send_ops); + grpc_sopb_destroy(&new_send_ops); } static void process_send_ops(grpc_call_element *elem, @@ -216,21 +169,28 @@ static void process_send_ops(grpc_call_element *elem, } break; case GRPC_OP_METADATA: - /* Parse incoming request for compression. If any, it'll be available at - * calld->compression_algorithm */ - grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter, - elem); - if (!calld->has_compression_algorithm) { - /* If no algorithm was found in the metadata and we aren't - * exceptionally skipping compression, fall back to the channel - * default */ - calld->compression_algorithm = - channeld->default_compression_algorithm; - calld->has_compression_algorithm = 1; /* GPR_TRUE */ + if (!calld->written_initial_metadata) { + /* Parse incoming request for compression. If any, it'll be available + * at calld->compression_algorithm */ + grpc_metadata_batch_filter(&(sop->data.metadata), + compression_md_filter, elem); + if (!calld->has_compression_algorithm) { + /* If no algorithm was found in the metadata and we aren't + * exceptionally skipping compression, fall back to the channel + * default */ + calld->compression_algorithm = + channeld->default_compression_algorithm; + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + } + grpc_metadata_batch_add_head( + &(sop->data.metadata), &calld->compression_algorithm_storage, + grpc_mdelem_ref(channeld->mdelem_compression_algorithms + [calld->compression_algorithm])); + calld->written_initial_metadata = 1; /* GPR_TRUE */ } break; case GRPC_OP_SLICE: - if (skip_compression(channeld, calld)) goto done; + if (skip_compression(channeld, calld)) continue; GPR_ASSERT(calld->remaining_slice_bytes > 0); /* We need to copy the input because gpr_slice_buffer_add takes * ownership. However, we don't own sop->data.slice, the caller does. */ @@ -247,13 +207,10 @@ static void process_send_ops(grpc_call_element *elem, } } -done: /* Modify the send_ops stream_op_buffer depending on whether compression was * carried out */ if (did_compress) { finish_compressed_sopb(send_ops, elem); - } else { - finish_not_compressed_sopb(send_ops, elem); } } @@ -282,7 +239,7 @@ static void init_call_elem(grpc_call_element *elem, /* initialize members */ gpr_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; - calld->seen_initial_metadata = 0; /* GPR_FALSE */ + calld->written_initial_metadata = 0; /* GPR_FALSE */ if (initial_op) { if (initial_op->send_ops && initial_op->send_ops->nops > 0) { @@ -342,12 +299,13 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } } -const grpc_channel_filter grpc_compress_filter = {compress_start_transport_stream_op, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - "compress"}; +const grpc_channel_filter grpc_compress_filter = { + compress_start_transport_stream_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "compress"}; |