diff options
author | David Garcia Quintas <dgq@google.com> | 2016-05-18 20:25:57 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-05-18 20:25:57 -0700 |
commit | ac0944701ab93d6746f81a2a9ab38ee411e663d8 (patch) | |
tree | 95af27fc2cd5cb4f893d60ea8bc2aa2e1fe62498 /src | |
parent | ddefbb82270d97d8b87cac4959993ae61e204d1c (diff) | |
parent | 824f83758d80717f910c2cf973ff41fd98f8d81e (diff) |
Merge branch 'compression_incoming_checks' into compression_md_level_bis
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/channel/channel_args.c | 12 | ||||
-rw-r--r-- | src/core/lib/compression/compression.c (renamed from src/core/lib/compression/compression_algorithm.c) | 78 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 251 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 45 | ||||
-rw-r--r-- | src/core/lib/surface/channel.h | 7 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 14 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi | 5 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 26 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/imports.generated.c | 10 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/imports.generated.h | 15 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.c | 10 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.h | 15 |
13 files changed, 362 insertions, 128 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index a171b41b35..c14b9aa422 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -35,6 +35,7 @@ #include <grpc/grpc.h> #include "src/core/lib/support/string.h" +#include <grpc/compression.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -180,6 +181,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( grpc_channel_args *grpc_channel_args_set_compression_algorithm( grpc_channel_args *a, grpc_compression_algorithm algorithm) { + GPR_ASSERT(algorithm < GRPC_COMPRESS_ALGORITHMS_COUNT); grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM; @@ -214,7 +216,15 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( const int states_arg_found = find_compression_algorithm_states_bitset(*a, &states_arg); - if (states_arg_found) { + if (grpc_channel_args_get_compression_algorithm(*a) == algorithm && + state == 0) { + char *algo_name = NULL; + GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name) != 0); + gpr_log(GPR_ERROR, + "Tried to disable default compression algorithm '%s'. The " + "operation has been ignored.", + algo_name); + } else if (states_arg_found) { if (state != 0) { GPR_BITSET((unsigned *)states_arg, algorithm); } else if (algorithm != GRPC_COMPRESS_NONE) { diff --git a/src/core/lib/compression/compression_algorithm.c b/src/core/lib/compression/compression.c index 6cbdbd81b0..54efb5e855 100644 --- a/src/core/lib/compression/compression_algorithm.c +++ b/src/core/lib/compression/compression.c @@ -124,3 +124,81 @@ grpc_mdelem *grpc_compression_encoding_mdelem( } return NULL; } + +void grpc_compression_options_init(grpc_compression_options *opts) { + memset(opts, 0, sizeof(*opts)); + /* all enabled by default */ + opts->enabled_algorithms_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; +} + +void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITSET(&opts->enabled_algorithms_bitset, algorithm); +} + +void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITCLEAR(&opts->enabled_algorithms_bitset, algorithm); +} + +int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) { + return GPR_BITGET(opts->enabled_algorithms_bitset, algorithm); +} + +/* TODO(dgq): Add the ability to specify parameters to the individual + * compression algorithms */ +grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level, uint32_t accepted_encodings) { + GRPC_API_TRACE("grpc_compression_algorithm_for_level(level=%d)", 1, + ((int)level)); + if (level > GRPC_COMPRESS_LEVEL_HIGH) { + gpr_log(GPR_ERROR, "Unknown compression level %d.", (int)level); + abort(); + } + + const size_t num_supported = + GPR_BITCOUNT(accepted_encodings) - 1; /* discard NONE */ + if (level == GRPC_COMPRESS_LEVEL_NONE || num_supported == 0) { + return GRPC_COMPRESS_NONE; + } + + GPR_ASSERT(level > 0); + + /* Establish a "ranking" or compression algorithms in increasing order of + * compression. + * This is simplistic and we will probably want to introduce other dimensions + * in the future (cpu/memory cost, etc). */ + const grpc_compression_algorithm algos_ranking[] = {GRPC_COMPRESS_GZIP, + GRPC_COMPRESS_DEFLATE}; + + /* intersect algos_ranking with the supported ones keeping the ranked order */ + grpc_compression_algorithm + sorted_supported_algos[GRPC_COMPRESS_ALGORITHMS_COUNT]; + size_t algos_supported_idx = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(algos_ranking); i++) { + const grpc_compression_algorithm alg = algos_ranking[i]; + for (size_t j = 0; j < num_supported; j++) { + if (GPR_BITGET(accepted_encodings, alg) == 1) { + /* if \a alg in supported */ + sorted_supported_algos[algos_supported_idx++] = alg; + break; + } + } + if (algos_supported_idx == num_supported) break; + } + + switch (level) { + case GRPC_COMPRESS_LEVEL_NONE: + abort(); /* should have been handled already */ + case GRPC_COMPRESS_LEVEL_LOW: + return sorted_supported_algos[0]; + case GRPC_COMPRESS_LEVEL_MED: + return sorted_supported_algos[num_supported / 2]; + case GRPC_COMPRESS_LEVEL_HIGH: + return sorted_supported_algos[num_supported - 1]; + default: + abort(); + }; +} diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 7b19a5b93a..d081e03e2c 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -40,6 +40,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/slice.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -52,7 +53,9 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/transport.h" /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch @@ -238,6 +241,9 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, bool success); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, @@ -408,8 +414,8 @@ static void set_status_code(grpc_call *call, status_source source, /* TODO(ctiller): what to do about the flush that was previously here */ } -static void set_compression_algorithm(grpc_call *call, - grpc_compression_algorithm algo) { +static void set_incoming_compression_algorithm( + grpc_call *call, grpc_compression_algorithm algo) { GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); call->incoming_compression_algorithm = algo; } @@ -425,61 +431,8 @@ grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( static grpc_compression_algorithm compression_algorithm_for_level_locked( grpc_call *call, grpc_compression_level level) { - /* Establish a "ranking" or compression algorithms in increasing order of - * compression. - * This is simplistic and we will probably want to introduce other - * dimensions - * in the future (cpu/memory cost, etc). */ - const grpc_compression_algorithm algos_ranking[] = {GRPC_COMPRESS_GZIP, - GRPC_COMPRESS_DEFLATE}; - const uint32_t accepted_encodings = call->encodings_accepted_by_peer; - if (level > GRPC_COMPRESS_LEVEL_HIGH) { - extern int grpc_compression_trace; - if (grpc_compression_trace) { - gpr_log(GPR_ERROR, - "Unknown compression level %d. Compression will be disabled.", - (int)level); - } - return GRPC_COMPRESS_NONE; - } - - const size_t num_supported = - GPR_BITCOUNT(accepted_encodings) - 1; /* discard NONE */ - if (level == GRPC_COMPRESS_LEVEL_NONE || num_supported == 0) { - return GRPC_COMPRESS_NONE; - } - - GPR_ASSERT(level > 0); - - /* intersect algos_ranking with the supported ones keeping the ranked order - */ - grpc_compression_algorithm - sorted_supported_algos[GRPC_COMPRESS_ALGORITHMS_COUNT]; - size_t algos_supported_idx = 0; - for (size_t i = 0; i < GPR_ARRAY_SIZE(algos_ranking); i++) { - const grpc_compression_algorithm alg = algos_ranking[i]; - for (size_t j = 0; j < num_supported; j++) { - if (GPR_BITGET(accepted_encodings, alg) == 1) { - /* if \a alg in supported */ - sorted_supported_algos[algos_supported_idx++] = alg; - break; - } - } - if (algos_supported_idx == num_supported) break; - } - - switch (level) { - case GRPC_COMPRESS_LEVEL_NONE: - abort(); /* should have been handled already */ - case GRPC_COMPRESS_LEVEL_LOW: - return sorted_supported_algos[0]; - case GRPC_COMPRESS_LEVEL_MED: - return sorted_supported_algos[num_supported / 2]; - case GRPC_COMPRESS_LEVEL_HIGH: - return sorted_supported_algos[num_supported - 1]; - default: - abort(); - }; + return grpc_compression_algorithm_for_level(level, + call->encodings_accepted_by_peer); } uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { @@ -793,48 +746,102 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return r; } -typedef struct cancel_closure { +typedef struct termination_closure { grpc_closure closure; grpc_call *call; grpc_status_code status; -} cancel_closure; + gpr_slice optional_message; + grpc_closure *op_closure; + enum { TC_CANCEL, TC_CLOSE } type; +} termination_closure; + +static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { + termination_closure *tc = tcp; + if (tc->type == TC_CANCEL) { + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel"); + } + if (tc->type == TC_CLOSE) { + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); + } + gpr_slice_unref(tc->optional_message); + if (tc->op_closure != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, tc->op_closure, true, NULL); + } + gpr_free(tc); +} -static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { - cancel_closure *cc = ccp; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); - gpr_free(cc); +static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { + grpc_transport_stream_op op; + termination_closure *tc = tcp; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = tc->status; + /* reuse closure to catch completion */ + grpc_closure_init(&tc->closure, done_termination, tc); + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); } -static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { +static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { grpc_transport_stream_op op; - cancel_closure *cc = ccp; + termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - op.cancel_with_status = cc->status; + tc->optional_message = gpr_slice_ref(tc->optional_message); + grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message); /* reuse closure to catch completion */ - grpc_closure_init(&cc->closure, done_cancel, cc); - op.on_complete = &cc->closure; - execute_op(exec_ctx, cc->call, &op); + grpc_closure_init(&tc->closure, done_termination, tc); + tc->op_closure = op.on_complete; + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); +} + +static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, + termination_closure *tc) { + grpc_mdstr *details = NULL; + if (GPR_SLICE_LENGTH(tc->optional_message) > 0) { + tc->optional_message = gpr_slice_ref(tc->optional_message); + details = grpc_mdstr_from_slice(tc->optional_message); + } + + set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status); + set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details); + + if (tc->type == TC_CANCEL) { + grpc_closure_init(&tc->closure, send_cancel, tc); + GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); + } else if (tc->type == TC_CLOSE) { + grpc_closure_init(&tc->closure, send_close, tc); + GRPC_CALL_INTERNAL_REF(tc->call, "close"); + } + grpc_exec_ctx_enqueue(exec_ctx, &tc->closure, true, NULL); + return GRPC_CALL_OK; } static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { - grpc_mdstr *details = - description ? grpc_mdstr_from_string(description) : NULL; - cancel_closure *cc = gpr_malloc(sizeof(*cc)); - + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CANCEL; + tc->call = c; + tc->optional_message = gpr_slice_from_copied_string(description); GPR_ASSERT(status != GRPC_STATUS_OK); + tc->status = status; - set_status_code(c, STATUS_FROM_API_OVERRIDE, (uint32_t)status); - set_status_details(c, STATUS_FROM_API_OVERRIDE, details); + return terminate_with_status(exec_ctx, tc); +} - grpc_closure_init(&cc->closure, send_cancel, cc); - cc->call = c; - cc->status = status; - GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL); +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description) { + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CLOSE; + tc->call = c; + tc->optional_message = gpr_slice_from_copied_string(description); + GPR_ASSERT(status != GRPC_STATUS_OK); + tc->status = status; - return GRPC_CALL_OK; + return terminate_with_status(exec_ctx, tc); } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, @@ -976,7 +983,7 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { return NULL; } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); - set_compression_algorithm(call, decode_compression(elem)); + set_incoming_compression_algorithm(call, decode_compression(elem)); GPR_TIMER_END("incoming_compression_algorithm", 0); return NULL; } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { @@ -1170,6 +1177,56 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } +static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { + grpc_call *call = bctl->call; + /* validate call->incoming_compression_algorithm */ + if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) { + const grpc_compression_algorithm algo = + call->incoming_compression_algorithm; + char *error_msg = NULL; + const grpc_compression_options compression_options = + grpc_channel_compression_options(call->channel); + /* check if algorithm is known */ + if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) { + gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", + algo); + gpr_log(GPR_ERROR, error_msg); + close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + } else if (grpc_compression_options_is_algorithm_enabled( + &compression_options, algo) == 0) { + /* check if algorithm is supported by current channel config */ + char *algo_name; + grpc_compression_algorithm_name(algo, &algo_name); + gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", + algo_name); + gpr_log(GPR_ERROR, error_msg); + close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + } else { + call->incoming_compression_algorithm = algo; + } + gpr_free(error_msg); + } + + /* make sure the received grpc-encoding is amongst the ones listed in + * grpc-accept-encoding */ + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->incoming_compression_algorithm)) { + extern int grpc_compression_trace; + if (grpc_compression_trace) { + char *algo_name; + grpc_compression_algorithm_name(call->incoming_compression_algorithm, + &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } + } +} + static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; @@ -1184,24 +1241,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_initial_filter, call); - /* make sure the received grpc-encoding is amongst the ones listed in - * grpc-accept-encoding */ - - GPR_ASSERT(call->encodings_accepted_by_peer != 0); - if (!GPR_BITGET(call->encodings_accepted_by_peer, - call->incoming_compression_algorithm)) { - extern int grpc_compression_trace; - if (grpc_compression_trace) { - char *algo_name; - grpc_compression_algorithm_name(call->incoming_compression_algorithm, - &algo_name); - gpr_log(GPR_ERROR, - "Compression algorithm (grpc-encoding = '%s') not present in " - "the bitset of accepted encodings (grpc-accept-encodings: " - "'0x%x')", - algo_name, call->encodings_accepted_by_peer); - } - } + GPR_TIMER_BEGIN("validate_filtered_metadata", 0); + validate_filtered_metadata(exec_ctx, bctl); + GPR_TIMER_END("validate_filtered_metadata", 0); + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { @@ -1356,8 +1399,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, .compression_level; level_set = true; } else { - level_set = grpc_channel_default_compression_level( - call->channel, &effective_compression_level); + const grpc_compression_options copts = + grpc_channel_compression_options(call->channel); + level_set = copts.default_level.is_set; + if (level_set) { + effective_compression_level = copts.default_level.level; + } } if (level_set) { const grpc_compression_algorithm calgo = diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index a7aa9365a0..a71190f3c0 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -36,16 +36,17 @@ #include <stdlib.h> #include <string.h> +#include <grpc/compression.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" -#include "src/core/lib/surface/init.h" #include "src/core/lib/transport/static_metadata.h" /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. @@ -64,17 +65,13 @@ typedef struct registered_call { struct grpc_channel { int is_client; uint32_t max_message_length; + grpc_compression_options compression_options; grpc_mdelem *default_authority; gpr_mu registered_call_mu; registered_call *registered_calls; char *target; - - struct { - bool is_set; - grpc_compression_level default_compression_level; - } maybe_default_compression_level; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -117,6 +114,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, channel->registered_calls = NULL; channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + grpc_compression_options_init(&channel->compression_options); if (args) { for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { @@ -159,11 +157,25 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, } } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) { - channel->maybe_default_compression_level.is_set = true; + channel->compression_options.default_level.is_set = true; GPR_ASSERT(args->args[i].value.integer >= 0 && args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT); - channel->maybe_default_compression_level.default_compression_level = + channel->compression_options.default_level.level = (grpc_compression_level)args->args[i].value.integer; + } else if (0 == strcmp(args->args[i].key, + GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { + channel->compression_options.default_algorithm.is_set = true; + GPR_ASSERT(args->args[i].value.integer >= 0 && + args->args[i].value.integer < + GRPC_COMPRESS_ALGORITHMS_COUNT); + channel->compression_options.default_algorithm.algorithm = + (grpc_compression_algorithm)args->args[i].value.integer; + } else if (0 == + strcmp(args->args[i].key, + GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { + channel->compression_options.enabled_algorithms_bitset = + (uint32_t)args->args[i].value.integer | + 0x1; /* always support no compression */ } } grpc_channel_args_destroy(args); @@ -319,6 +331,11 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } +grpc_compression_options grpc_channel_compression_options( + const grpc_channel *channel) { + return channel->compression_options; +} + grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { char tmp[GPR_LTOA_MIN_BUFSIZE]; switch (i) { @@ -337,15 +354,3 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { uint32_t grpc_channel_get_max_message_length(grpc_channel *channel) { return channel->max_message_length; } - -bool grpc_channel_default_compression_level(grpc_channel *channel, - grpc_compression_level *level) { - if (channel->maybe_default_compression_level.is_set) { - *level = channel->maybe_default_compression_level.default_compression_level; - return true; - } - return false; -} - -bool grpc_channel_default_compression_algorithm( - grpc_channel *channel, grpc_compression_algorithm *algorithm); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index a8631eea87..c3279fef41 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -71,9 +71,8 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel_internal_unref(exec_ctx, channel) #endif -/** If the channel has an associated default compression level, return it in \a - * level and return true. Otherwise return false. */ -bool grpc_channel_default_compression_level(grpc_channel *channel, - grpc_compression_level *level); +/** Return the channel's compression options. */ +grpc_compression_options grpc_channel_compression_options( + const grpc_channel *channel); #endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_H */ diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 2fced6cf47..66e6e6b549 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -442,6 +442,10 @@ cdef extern from "grpc/_cython/loader.h": GRPC_COMPRESS_LEVEL_HIGH GRPC_COMPRESS_LEVEL_COUNT + ctypedef struct grpc_compression_options: + uint32_t enabled_algorithms_bitset + grpc_compression_algorithm default_compression_algorithm + int grpc_compression_algorithm_parse( const char *name, size_t name_length, grpc_compression_algorithm *algorithm) nogil @@ -449,3 +453,13 @@ cdef extern from "grpc/_cython/loader.h": char **name) nogil grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_level level, uint32_t accepted_encodings) nogil + void grpc_compression_options_init(grpc_compression_options *opts) nogil + void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 8ac18f0c3e..0474697af8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -123,3 +123,8 @@ cdef class Operations: cdef grpc_op *c_ops cdef size_t c_nops cdef list operations + + +cdef class CompressionOptions: + + cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index fda317b9a1..c7539f0d49 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -718,6 +718,32 @@ cdef class Operations: return _OperationsIterator(self) +cdef class CompressionOptions: + + def __cinit__(self): + with nogil: + grpc_compression_options_init(&self.c_options) + + def enable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_enable_algorithm(&self.c_options, algorithm) + + def disable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_disable_algorithm(&self.c_options, algorithm) + + def is_algorithm_enabled(self, grpc_compression_algorithm algorithm): + cdef int result + with nogil: + result = grpc_compression_options_is_algorithm_enabled( + &self.c_options, algorithm) + return result + + def to_channel_arg(self): + return ChannelArg(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, + self.c_options.enabled_algorithms_bitset) + + def compression_algorithm_name(grpc_compression_algorithm algorithm): cdef char* name with nogil: diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c index c557d9e964..09551472b5 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.c +++ b/src/python/grpcio/grpc/_cython/imports.generated.c @@ -72,6 +72,11 @@ census_view_get_data_type census_view_get_data_import; census_view_reset_type census_view_reset_import; grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_import; grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import; +grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import; +grpc_compression_options_init_type grpc_compression_options_init_import; +grpc_compression_options_enable_algorithm_type grpc_compression_options_enable_algorithm_import; +grpc_compression_options_disable_algorithm_type grpc_compression_options_disable_algorithm_import; +grpc_compression_options_is_algorithm_enabled_type grpc_compression_options_is_algorithm_enabled_import; grpc_metadata_array_init_type grpc_metadata_array_init_import; grpc_metadata_array_destroy_type grpc_metadata_array_destroy_import; grpc_call_details_init_type grpc_call_details_init_import; @@ -338,6 +343,11 @@ void pygrpc_load_imports(HMODULE library) { census_view_reset_import = (census_view_reset_type) GetProcAddress(library, "census_view_reset"); grpc_compression_algorithm_parse_import = (grpc_compression_algorithm_parse_type) GetProcAddress(library, "grpc_compression_algorithm_parse"); grpc_compression_algorithm_name_import = (grpc_compression_algorithm_name_type) GetProcAddress(library, "grpc_compression_algorithm_name"); + grpc_compression_algorithm_for_level_import = (grpc_compression_algorithm_for_level_type) GetProcAddress(library, "grpc_compression_algorithm_for_level"); + grpc_compression_options_init_import = (grpc_compression_options_init_type) GetProcAddress(library, "grpc_compression_options_init"); + grpc_compression_options_enable_algorithm_import = (grpc_compression_options_enable_algorithm_type) GetProcAddress(library, "grpc_compression_options_enable_algorithm"); + grpc_compression_options_disable_algorithm_import = (grpc_compression_options_disable_algorithm_type) GetProcAddress(library, "grpc_compression_options_disable_algorithm"); + grpc_compression_options_is_algorithm_enabled_import = (grpc_compression_options_is_algorithm_enabled_type) GetProcAddress(library, "grpc_compression_options_is_algorithm_enabled"); grpc_metadata_array_init_import = (grpc_metadata_array_init_type) GetProcAddress(library, "grpc_metadata_array_init"); grpc_metadata_array_destroy_import = (grpc_metadata_array_destroy_type) GetProcAddress(library, "grpc_metadata_array_destroy"); grpc_call_details_init_import = (grpc_call_details_init_type) GetProcAddress(library, "grpc_call_details_init"); diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index 01097a08d4..6de295414a 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -167,6 +167,21 @@ extern grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_im typedef int(*grpc_compression_algorithm_name_type)(grpc_compression_algorithm algorithm, char **name); extern grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import; #define grpc_compression_algorithm_name grpc_compression_algorithm_name_import +typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level, uint32_t accepted_encodings); +extern grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import; +#define grpc_compression_algorithm_for_level grpc_compression_algorithm_for_level_import +typedef void(*grpc_compression_options_init_type)(grpc_compression_options *opts); +extern grpc_compression_options_init_type grpc_compression_options_init_import; +#define grpc_compression_options_init grpc_compression_options_init_import +typedef void(*grpc_compression_options_enable_algorithm_type)(grpc_compression_options *opts, grpc_compression_algorithm algorithm); +extern grpc_compression_options_enable_algorithm_type grpc_compression_options_enable_algorithm_import; +#define grpc_compression_options_enable_algorithm grpc_compression_options_enable_algorithm_import +typedef void(*grpc_compression_options_disable_algorithm_type)(grpc_compression_options *opts, grpc_compression_algorithm algorithm); +extern grpc_compression_options_disable_algorithm_type grpc_compression_options_disable_algorithm_import; +#define grpc_compression_options_disable_algorithm grpc_compression_options_disable_algorithm_import +typedef int(*grpc_compression_options_is_algorithm_enabled_type)(const grpc_compression_options *opts, grpc_compression_algorithm algorithm); +extern grpc_compression_options_is_algorithm_enabled_type grpc_compression_options_is_algorithm_enabled_import; +#define grpc_compression_options_is_algorithm_enabled grpc_compression_options_is_algorithm_enabled_import typedef void(*grpc_metadata_array_init_type)(grpc_metadata_array *array); extern grpc_metadata_array_init_type grpc_metadata_array_init_import; #define grpc_metadata_array_init grpc_metadata_array_init_import diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 162191b06d..907b7b4a5c 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -84,7 +84,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/connected_channel.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', - 'src/core/lib/compression/compression_algorithm.c', + 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', 'src/core/lib/debug/trace.c', 'src/core/lib/http/format_request.c', diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 7994822a06..cebbe8c40f 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -72,6 +72,11 @@ census_view_get_data_type census_view_get_data_import; census_view_reset_type census_view_reset_import; grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_import; grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import; +grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import; +grpc_compression_options_init_type grpc_compression_options_init_import; +grpc_compression_options_enable_algorithm_type grpc_compression_options_enable_algorithm_import; +grpc_compression_options_disable_algorithm_type grpc_compression_options_disable_algorithm_import; +grpc_compression_options_is_algorithm_enabled_type grpc_compression_options_is_algorithm_enabled_import; grpc_metadata_array_init_type grpc_metadata_array_init_import; grpc_metadata_array_destroy_type grpc_metadata_array_destroy_import; grpc_call_details_init_type grpc_call_details_init_import; @@ -334,6 +339,11 @@ void grpc_rb_load_imports(HMODULE library) { census_view_reset_import = (census_view_reset_type) GetProcAddress(library, "census_view_reset"); grpc_compression_algorithm_parse_import = (grpc_compression_algorithm_parse_type) GetProcAddress(library, "grpc_compression_algorithm_parse"); grpc_compression_algorithm_name_import = (grpc_compression_algorithm_name_type) GetProcAddress(library, "grpc_compression_algorithm_name"); + grpc_compression_algorithm_for_level_import = (grpc_compression_algorithm_for_level_type) GetProcAddress(library, "grpc_compression_algorithm_for_level"); + grpc_compression_options_init_import = (grpc_compression_options_init_type) GetProcAddress(library, "grpc_compression_options_init"); + grpc_compression_options_enable_algorithm_import = (grpc_compression_options_enable_algorithm_type) GetProcAddress(library, "grpc_compression_options_enable_algorithm"); + grpc_compression_options_disable_algorithm_import = (grpc_compression_options_disable_algorithm_type) GetProcAddress(library, "grpc_compression_options_disable_algorithm"); + grpc_compression_options_is_algorithm_enabled_import = (grpc_compression_options_is_algorithm_enabled_type) GetProcAddress(library, "grpc_compression_options_is_algorithm_enabled"); grpc_metadata_array_init_import = (grpc_metadata_array_init_type) GetProcAddress(library, "grpc_metadata_array_init"); grpc_metadata_array_destroy_import = (grpc_metadata_array_destroy_type) GetProcAddress(library, "grpc_metadata_array_destroy"); grpc_call_details_init_import = (grpc_call_details_init_type) GetProcAddress(library, "grpc_call_details_init"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index fd573808d8..d7ea6c574c 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -167,6 +167,21 @@ extern grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_im typedef int(*grpc_compression_algorithm_name_type)(grpc_compression_algorithm algorithm, char **name); extern grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import; #define grpc_compression_algorithm_name grpc_compression_algorithm_name_import +typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level, uint32_t accepted_encodings); +extern grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import; +#define grpc_compression_algorithm_for_level grpc_compression_algorithm_for_level_import +typedef void(*grpc_compression_options_init_type)(grpc_compression_options *opts); +extern grpc_compression_options_init_type grpc_compression_options_init_import; +#define grpc_compression_options_init grpc_compression_options_init_import +typedef void(*grpc_compression_options_enable_algorithm_type)(grpc_compression_options *opts, grpc_compression_algorithm algorithm); +extern grpc_compression_options_enable_algorithm_type grpc_compression_options_enable_algorithm_import; +#define grpc_compression_options_enable_algorithm grpc_compression_options_enable_algorithm_import +typedef void(*grpc_compression_options_disable_algorithm_type)(grpc_compression_options *opts, grpc_compression_algorithm algorithm); +extern grpc_compression_options_disable_algorithm_type grpc_compression_options_disable_algorithm_import; +#define grpc_compression_options_disable_algorithm grpc_compression_options_disable_algorithm_import +typedef int(*grpc_compression_options_is_algorithm_enabled_type)(const grpc_compression_options *opts, grpc_compression_algorithm algorithm); +extern grpc_compression_options_is_algorithm_enabled_type grpc_compression_options_is_algorithm_enabled_import; +#define grpc_compression_options_is_algorithm_enabled grpc_compression_options_is_algorithm_enabled_import typedef void(*grpc_metadata_array_init_type)(grpc_metadata_array *array); extern grpc_metadata_array_init_type grpc_metadata_array_init_import; #define grpc_metadata_array_init grpc_metadata_array_init_import |