diff options
author | 2016-04-23 00:17:05 -0700 | |
---|---|---|
committer | 2016-04-23 00:17:05 -0700 | |
commit | 73dcbda5b006fac4651480f7692b30db993b79b9 (patch) | |
tree | ab5ce0d90300dedfdbc1954679f5cf3c4bcd1583 /src/core/lib/surface | |
parent | e52be8c8a1e35e5fd709bfe99f95a12f90029aed (diff) |
Validation for incoming compressed data
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 133 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 16 | ||||
-rw-r--r-- | src/core/lib/surface/channel.h | 4 |
3 files changed, 127 insertions, 26 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6581bbd3d1..2462adc26a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -40,6 +40,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/slice.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -52,7 +53,9 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/transport.h" /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch @@ -240,6 +243,9 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, bool success); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, @@ -410,7 +416,30 @@ static void set_status_code(grpc_call *call, status_source source, static void set_compression_algorithm(grpc_call *call, grpc_compression_algorithm algo) { - call->compression_algorithm = algo; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + char *error_msg = NULL; + const grpc_compression_options compression_options = + grpc_channel_get_compression_options(call->channel); + + /* check if algorithm is known */ + if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) { + gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", algo); + gpr_log(GPR_ERROR, error_msg); + close_with_status(&exec_ctx, call, GRPC_STATUS_INTERNAL, error_msg); + } else if (grpc_compression_options_is_algorithm_enabled(&compression_options, + algo) == 0) { + /* check if algorithm is supported by current channel config */ + char *algo_name; + grpc_compression_algorithm_name(algo, &algo_name); + gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", + algo_name); + gpr_log(GPR_ERROR, error_msg); + close_with_status(&exec_ctx, call, GRPC_STATUS_INTERNAL, error_msg); + } else { + call->compression_algorithm = algo; + } + gpr_free(error_msg); + grpc_exec_ctx_finish(&exec_ctx); } grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( @@ -694,48 +723,102 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return r; } -typedef struct cancel_closure { +typedef struct termination_closure { grpc_closure closure; grpc_call *call; grpc_status_code status; -} cancel_closure; + gpr_slice optional_message; + grpc_closure *op_closure; + enum { TC_CANCEL, TC_CLOSE } type; +} termination_closure; + +static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { + termination_closure *tc = tcp; + if (tc->type == TC_CANCEL) { + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel"); + } + if (tc->type == TC_CLOSE) { + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); + } + gpr_slice_unref(tc->optional_message); + if (tc->op_closure != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, tc->op_closure, false, NULL); + } + gpr_free(tc); +} -static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { - cancel_closure *cc = ccp; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); - gpr_free(cc); +static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { + grpc_transport_stream_op op; + termination_closure *tc = tcp; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = tc->status; + /* reuse closure to catch completion */ + grpc_closure_init(&tc->closure, done_termination, tc); + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); } -static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { +static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { grpc_transport_stream_op op; - cancel_closure *cc = ccp; + termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - op.cancel_with_status = cc->status; + tc->optional_message = gpr_slice_ref(tc->optional_message); + grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message); /* reuse closure to catch completion */ - grpc_closure_init(&cc->closure, done_cancel, cc); - op.on_complete = &cc->closure; - execute_op(exec_ctx, cc->call, &op); + grpc_closure_init(&tc->closure, done_termination, tc); + tc->op_closure = op.on_complete; + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); +} + +static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, + termination_closure *tc) { + grpc_mdstr *details = NULL; + if (GPR_SLICE_LENGTH(tc->optional_message) > 0) { + tc->optional_message = gpr_slice_ref(tc->optional_message); + details = grpc_mdstr_from_slice(tc->optional_message); + } + + set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status); + set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details); + + if (tc->type == TC_CANCEL) { + grpc_closure_init(&tc->closure, send_cancel, tc); + GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); + } else if (tc->type == TC_CLOSE) { + grpc_closure_init(&tc->closure, send_close, tc); + GRPC_CALL_INTERNAL_REF(tc->call, "close"); + } + grpc_exec_ctx_enqueue(exec_ctx, &tc->closure, true, NULL); + return GRPC_CALL_OK; } static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { - grpc_mdstr *details = - description ? grpc_mdstr_from_string(description) : NULL; - cancel_closure *cc = gpr_malloc(sizeof(*cc)); - + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CANCEL; + tc->call = c; + tc->optional_message = gpr_slice_from_copied_string(description); GPR_ASSERT(status != GRPC_STATUS_OK); + tc->status = status; - set_status_code(c, STATUS_FROM_API_OVERRIDE, (uint32_t)status); - set_status_details(c, STATUS_FROM_API_OVERRIDE, details); + return terminate_with_status(exec_ctx, tc); +} - grpc_closure_init(&cc->closure, send_cancel, cc); - cc->call = c; - cc->status = status; - GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL); +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description) { + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CLOSE; + tc->call = c; + tc->optional_message = gpr_slice_from_copied_string(description); + GPR_ASSERT(status != GRPC_STATUS_OK); + tc->status = status; - return GRPC_CALL_OK; + return terminate_with_status(exec_ctx, tc); } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index b6b760b5d8..72d0cfab2b 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -36,16 +36,17 @@ #include <stdlib.h> #include <string.h> +#include <grpc/compression.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" -#include "src/core/lib/surface/init.h" #include "src/core/lib/transport/static_metadata.h" /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. @@ -64,6 +65,7 @@ typedef struct registered_call { struct grpc_channel { int is_client; uint32_t max_message_length; + grpc_compression_options compression_options; grpc_mdelem *default_authority; gpr_mu registered_call_mu; @@ -111,6 +113,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, channel->registered_calls = NULL; channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + grpc_compression_options_init(&channel->compression_options); if (args) { for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { @@ -153,6 +156,12 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, } } } + /* extract compression options */ + channel->compression_options.enabled_algorithms_bitset = + (uint32_t)grpc_channel_args_compression_algorithm_get_states(args); + channel->compression_options.default_compression_algorithm = + grpc_channel_args_get_compression_algorithm(args); + grpc_channel_args_destroy(args); } @@ -306,6 +315,11 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } +grpc_compression_options grpc_channel_get_compression_options( + const grpc_channel *channel) { + return channel->compression_options; +} + grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { char tmp[GPR_LTOA_MIN_BUFSIZE]; switch (i) { diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 22dae930e4..8f153fbc55 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -45,6 +45,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); +/** Return the compression options for \a channel */ +grpc_compression_options grpc_channel_get_compression_options( + const grpc_channel *channel); + /** Get a grpc_mdelem of grpc-status: X where X is the numeric value of status_code. |