diff options
author | David Garcia Quintas <dgq@google.com> | 2015-07-06 23:01:39 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-07-06 23:01:39 -0700 |
commit | 20afd46db876d5b2e68a6ef0ad007565c7d16217 (patch) | |
tree | 78401943a03d28921528f8533a1927e6f41d0277 | |
parent | 4e4033650911582617049643fb02942a114be220 (diff) |
PR comments
-rw-r--r-- | src/core/channel/channel_args.c | 15 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 124 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 6 |
3 files changed, 92 insertions, 53 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 35ad93cc2f..5b331ded86 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -106,7 +106,7 @@ void grpc_channel_args_destroy(grpc_channel_args *a) { } int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { - unsigned i; + size_t i; if (a == NULL) return 0; for (i = 0; i < a->num_args; i++) { if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) { @@ -119,13 +119,12 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { grpc_compression_level grpc_channel_args_get_compression_level( const grpc_channel_args *a) { size_t i; - if (a) { - for (i = 0; a && i < a->num_args; ++i) { - if (a->args[i].type == GRPC_ARG_INTEGER && - !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) { - return a->args[i].value.integer; - break; - } + if (a == NULL) return 0; + for (i = 0; i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) { + return a->args[i].value.integer; + break; } } return GRPC_COMPRESS_LEVEL_NONE; diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 56e0a8141e..655c452ea9 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -34,13 +34,16 @@ #include <assert.h> #include <string.h> -#include "src/core/channel/compress_filter.h" -#include "src/core/channel/channel_args.h" -#include "src/core/compression/message_compress.h" #include <grpc/compression.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> + +#include "src/core/channel/compress_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/compression/message_compress.h" + + typedef struct call_data { gpr_slice_buffer slices; grpc_linked_mdelem compression_algorithm_storage; @@ -108,11 +111,81 @@ static int skip_compression(channel_data *channeld, call_data *calld) { return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; } +static void compressed_sopb(grpc_stream_op_buffer *send_ops, + grpc_call_element *elem) { + size_t i, j; + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + /* The following loop is akin to a selective reset + update */ + for (i = 0, j = 0; i < send_ops->nops; ++i) { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) { + case GRPC_OP_BEGIN_MESSAGE: + sop->data.begin_message.length = calld->slices.length; + sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + break; + case GRPC_OP_METADATA: + grpc_metadata_batch_add_head( + &(sop->data.metadata), &calld->compression_algorithm_storage, + grpc_mdelem_ref(channeld->mdelem_compression_algorithms + [calld->compression_algorithm])); + break; + case GRPC_OP_SLICE: + gpr_slice_unref(sop->data.slice); + /* replace only up to the number of available compressed slices */ + if (j < calld->slices.count) { + sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); + } + case GRPC_NO_OP: + ; /* fallthrough */ + } + } + + /* in case compressed slices remain to be added to the output */ + while (j < calld->slices.count) { + grpc_sopb_add_slice(send_ops, gpr_slice_ref(calld->slices.slices[j++])); + } +} + +/* even if the filter isn't producing compressed output, it may need to update + * the input. For example, compression may have een requested but somehow it was + * decided not to honor the request: the compression flags need to be reset and + * the fact that no compression was performed in the end signaled */ +static void not_compressed_sopb(grpc_stream_op_buffer *send_ops, + grpc_call_element *elem) { + size_t i; + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + for (i = 0; i < send_ops->nops; ++i) { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) { + case GRPC_OP_BEGIN_MESSAGE: + /* either because the user requested the exception or because + * compressing would have resulted in a larger output */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + /* reset the flag compression bit */ + sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS; + break; + case GRPC_OP_METADATA: + grpc_metadata_batch_add_head( + &(sop->data.metadata), &calld->compression_algorithm_storage, + grpc_mdelem_ref( + channeld->mdelem_compression_algorithms[GRPC_COMPRESS_NONE])); + break; + case GRPC_OP_SLICE: + case GRPC_NO_OP: + ; /* fallthrough */ + } + } +} + static void process_send_ops(grpc_call_element *elem, grpc_stream_op_buffer *send_ops) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - size_t i, j; + size_t i; int did_compress = 0; /* buffer up slices until we've processed all the expected ones (as given by @@ -159,46 +232,9 @@ static void process_send_ops(grpc_call_element *elem, } } - /* We need to: - * - (OP_SLICE) If compression happened, replace the input slices with the - * compressed ones. - * - (BEGIN_MESSAGE) Update the message info (size, flags). - * - (OP_METADATA) Convey the compression configuration */ - for (i = 0, j = 0; i < send_ops->nops; ++i) { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) { - case GRPC_OP_BEGIN_MESSAGE: - if (did_compress) { - sop->data.begin_message.length = calld->slices.length; - sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; - } else { - /* either because the user requested the exception or because - * compressing would have resulted in a larger output */ - calld->compression_algorithm = GRPC_COMPRESS_NONE; - /* reset the flag compression bit */ - sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS; - } - break; - case GRPC_OP_METADATA: - grpc_metadata_batch_add_head( - &(sop->data.metadata), &calld->compression_algorithm_storage, - grpc_mdelem_ref(channeld->mdelem_compression_algorithms - [did_compress ? calld->compression_algorithm - : GRPC_COMPRESS_NONE])); - break; - case GRPC_OP_SLICE: - if (did_compress) { - if (j < calld->slices.count) { - /* swap the input slices for their compressed counterparts */ - gpr_slice_unref(sop->data.slice); - sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); - } - } - break; - case GRPC_NO_OP: - ; /* fallthrough */ - } - } + /* Modify the send_ops stream_op_buffer depending on whether compression was + * carried out */ + (did_compress ? compressed_sopb : not_compressed_sopb)(send_ops, elem); } /* Called either: diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 5788236ffb..d0a17af5da 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -476,6 +476,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, gpr_uint32 flow_controlled_bytes_taken = 0; gpr_uint32 curop = 0; gpr_uint8 *p; + int compressed_flag_set = 0; while (curop < *inops_count) { GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes); @@ -495,9 +496,12 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, case GRPC_OP_BEGIN_MESSAGE: /* begin op: for now we just convert the op to a slice and fall through - this lets us reuse the slice framing code below */ + compressed_flag_set = + !!(op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS); slice = gpr_slice_malloc(5); + p = GPR_SLICE_START_PTR(slice); - p[0] = !!(op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS); + p[0] = compressed_flag_set; p[1] = op->data.begin_message.length >> 24; p[2] = op->data.begin_message.length >> 16; p[3] = op->data.begin_message.length >> 8; |