diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/channel_args.c | 10 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 2 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 48 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 18 |
4 files changed, 54 insertions, 24 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 54ee75af28..591135cd6f 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -132,7 +132,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( for (i = 0; i < a->num_args; ++i) { if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) { - return a->args[i].value.integer; + return (grpc_compression_algorithm)a->args[i].value.integer; break; } } @@ -177,9 +177,9 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( if (states_arg_found) { if (state != 0) { - GPR_BITSET(states_arg, algorithm); + GPR_BITSET((unsigned *)states_arg, algorithm); } else { - GPR_BITCLEAR(states_arg, algorithm); + GPR_BITCLEAR((unsigned *)states_arg, algorithm); } } else { /* create a new arg */ @@ -189,9 +189,9 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( /* all enabled by default */ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; if (state != 0) { - GPR_BITSET(&tmp.value.integer, algorithm); + GPR_BITSET((unsigned *)&tmp.value.integer, algorithm); } else { - GPR_BITCLEAR(&tmp.value.integer, algorithm); + GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); } result = grpc_channel_args_copy_and_add(*a, &tmp, 1); grpc_channel_args_destroy(*a); diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index cd7c182ef2..4eb5df5de3 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -57,7 +57,7 @@ int grpc_trace_channel = 0; /* Given a size, round up to the next multiple of sizeof(void*) */ #define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ - (((x) + GPR_MAX_ALIGNMENT - 1) & ~(GPR_MAX_ALIGNMENT - 1)) + (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count) { diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 762a4edc73..7959603102 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -48,7 +48,8 @@ typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; - int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ + gpr_uint32 + remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ int written_initial_metadata; /**< Already processed initial md? */ /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ @@ -70,6 +71,8 @@ typedef struct channel_data { grpc_mdelem *mdelem_accept_encoding; /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; + /** Compression options for the channel */ + grpc_compression_options compression_options; } channel_data; /** Compress \a slices in place using \a algorithm. Returns 1 if compression did @@ -102,7 +105,17 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), &calld->compression_algorithm)) { - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (unknown). Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, calld->compression_algorithm) == + 0) { + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (previously disabled). " + "Ignoring.", md_c_str); calld->compression_algorithm = GRPC_COMPRESS_NONE; } @@ -141,8 +154,9 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { case GRPC_OP_BEGIN_MESSAGE: + GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX); grpc_sopb_add_begin_message( - &new_send_ops, calld->slices.length, + &new_send_ops, (gpr_uint32)calld->slices.length, sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); break; case GRPC_OP_SLICE: @@ -228,7 +242,10 @@ static void process_send_ops(grpc_call_element *elem, GPR_ASSERT(calld->remaining_slice_bytes > 0); /* Increase input ref count, gpr_slice_buffer_add takes ownership. */ gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice)); - calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); + GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) >= + calld->remaining_slice_bytes); + calld->remaining_slice_bytes -= + (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice); if (calld->remaining_slice_bytes == 0) { did_compress = compress_send_sb(calld->compression_algorithm, &calld->slices); @@ -294,11 +311,21 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; + size_t supported_algorithms_idx = 0; char *accept_encoding_str; size_t accept_encoding_str_len; + grpc_compression_options_init(&channeld->compression_options); + channeld->compression_options.enabled_algorithms_bitset = + (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args); + channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args); + /* Make sure the default isn't disabled. */ + GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, channeld->default_compression_algorithm)); + channeld->compression_options.default_compression_algorithm = + channeld->default_compression_algorithm; channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); @@ -311,6 +338,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { char *algorithm_name; + /* skip disabled algorithms */ + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, algo_idx) == 0) { + continue; + } GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( @@ -318,15 +350,15 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorithm_name, 0)); if (algo_idx > 0) { - supported_algorithms_names[algo_idx - 1] = algorithm_name; + supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; } } /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated * arrays, as to avoid the heap allocs */ - accept_encoding_str = gpr_strjoin_sep( - supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names), - ", ", &accept_encoding_str_len); + accept_encoding_str = + gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",", + &accept_encoding_str_len); channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 2b61d33c29..ec832a0367 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -85,16 +85,14 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { static void hc_on_recv(void *user_data, int success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (success) { - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - calld->got_initial_metadata = 1; - grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); - } + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); } calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } |