diff options
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r-- | src/core/lib/channel/channel_args.c | 67 | ||||
-rw-r--r-- | src/core/lib/channel/channel_args.h | 15 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 15 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 16 | ||||
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 35 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.c | 8 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 82 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.h | 33 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.c | 10 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.h | 6 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 23 | ||||
-rw-r--r-- | src/core/lib/channel/http_server_filter.c | 21 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 97 |
13 files changed, 332 insertions, 96 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 3a56b1ff20..cfc072c0b5 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -66,22 +66,59 @@ static grpc_arg copy_arg(const grpc_arg *src) { grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add) { + return grpc_channel_args_copy_and_add_and_remove(src, NULL, 0, to_add, + num_to_add); +} + +grpc_channel_args *grpc_channel_args_copy_and_remove( + const grpc_channel_args *src, const char **to_remove, + size_t num_to_remove) { + return grpc_channel_args_copy_and_add_and_remove(src, to_remove, + num_to_remove, NULL, 0); +} + +static bool should_remove_arg(const grpc_arg *arg, const char **to_remove, + size_t num_to_remove) { + for (size_t i = 0; i < num_to_remove; ++i) { + if (strcmp(arg->key, to_remove[i]) == 0) return true; + } + return false; +} + +grpc_channel_args *grpc_channel_args_copy_and_add_and_remove( + const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, + const grpc_arg *to_add, size_t num_to_add) { + // Figure out how many args we'll be copying. + size_t num_args_to_copy = 0; + if (src != NULL) { + for (size_t i = 0; i < src->num_args; ++i) { + if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) { + ++num_args_to_copy; + } + } + } + // Create result. grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args)); - size_t i; - size_t src_num_args = (src == NULL) ? 0 : src->num_args; - if (!src && !to_add) { - dst->num_args = 0; + dst->num_args = num_args_to_copy + num_to_add; + if (dst->num_args == 0) { dst->args = NULL; return dst; } - dst->num_args = src_num_args + num_to_add; dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args); - for (i = 0; i < src_num_args; i++) { - dst->args[i] = copy_arg(&src->args[i]); + // Copy args from src that are not being removed. + size_t dst_idx = 0; + if (src != NULL) { + for (size_t i = 0; i < src->num_args; ++i) { + if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) { + dst->args[dst_idx++] = copy_arg(&src->args[i]); + } + } } - for (i = 0; i < num_to_add; i++) { - dst->args[i + src_num_args] = copy_arg(&to_add[i]); + // Add args from to_add. + for (size_t i = 0; i < num_to_add; ++i) { + dst->args[dst_idx++] = copy_arg(&to_add[i]); } + GPR_ASSERT(dst_idx == dst->num_args); return dst; } @@ -272,6 +309,18 @@ int grpc_channel_args_compare(const grpc_channel_args *a, return 0; } +const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, + const char *name) { + if (args != NULL) { + for (size_t i = 0; i < args->num_args; ++i) { + if (strcmp(args->args[i].key, name) == 0) { + return &args->args[i]; + } + } + } + return NULL; +} + int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) { if (arg->type != GRPC_ARG_INTEGER) { gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key); diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 586a296d1f..1e05303471 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -51,6 +51,17 @@ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add); +/** Copies the arguments in \a src except for those whose keys are in + \a to_remove. */ +grpc_channel_args *grpc_channel_args_copy_and_remove( + const grpc_channel_args *src, const char **to_remove, size_t num_to_remove); + +/** Copies the arguments from \a src except for those whose keys are in + \a to_remove and appends the arguments in \a to_add. */ +grpc_channel_args *grpc_channel_args_copy_and_add_and_remove( + const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, + const grpc_arg *to_add, size_t num_to_add); + /** Concatenate args from \a a and \a b into a new instance */ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, const grpc_channel_args *b); @@ -89,6 +100,10 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b); +/** Returns the value of argument \a name from \a args, or NULL if not found. */ +const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, + const char *name); + typedef struct grpc_integer_options { int default_value; // Return this if value is outside of expected bounds. int min_value; diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 57d34d9e9a..98ee03417f 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -162,7 +162,7 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - gpr_timespec deadline, grpc_call_stack *call_stack) { + grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; size_t count = channel_stack->count; @@ -179,10 +179,12 @@ grpc_error *grpc_call_stack_init( /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; + args.start_time = gpr_now(GPR_CLOCK_MONOTONIC); for (i = 0; i < count; i++) { args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; + args.path = path; args.deadline = deadline; call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; @@ -253,6 +255,13 @@ char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, return next_elem->filter->get_peer(exec_ctx, next_elem); } +void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info) { + grpc_channel_element *next_elem = elem + 1; + next_elem->filter->get_channel_info(exec_ctx, next_elem, channel_info); +} + void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { grpc_channel_element *next_elem = elem + 1; @@ -286,7 +295,7 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_status_code status, - gpr_slice *optional_message) { + grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); op->on_complete = grpc_closure_create(destroy_op, op); @@ -298,7 +307,7 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_status_code status, - gpr_slice *optional_message) { + grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); op->on_complete = grpc_closure_create(destroy_op, op); diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 1cfe2885d8..c3b662c969 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -74,6 +74,8 @@ typedef struct { grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; + grpc_mdstr *path; + gpr_timespec start_time; gpr_timespec deadline; } grpc_call_element_args; @@ -154,6 +156,10 @@ typedef struct { /* Implement grpc_call_get_peer() */ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + /* Implement grpc_channel_get_info() */ + void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + const grpc_channel_info *channel_info); + /* The name of this filter */ const char *name; } grpc_channel_filter; @@ -225,7 +231,7 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - gpr_timespec deadline, grpc_call_stack *call_stack); + grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -271,6 +277,10 @@ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op); /* Pass through a request to get_peer to the next child element */ char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); +/* Pass through a request to get_channel_info() to the next child element */ +void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( @@ -287,12 +297,12 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem, grpc_status_code status, - gpr_slice *optional_message); + grpc_slice *optional_message); void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem, grpc_status_code status, - gpr_slice *optional_message); + grpc_slice *optional_message); extern int grpc_trace_channel; diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 0981d59f63..2874d63fc7 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -35,9 +35,9 @@ #include <string.h> #include <grpc/compression.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/compress_filter.h" @@ -50,7 +50,7 @@ int grpc_compression_trace = 0; typedef struct call_data { - gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ + grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; uint32_t remaining_slice_bytes; @@ -63,7 +63,7 @@ typedef struct call_data { grpc_transport_stream_op *send_op; uint32_t send_length; uint32_t send_flags; - gpr_slice incoming_slice; + grpc_slice incoming_slice; grpc_slice_buffer_stream replacement_stream; grpc_closure *post_send; grpc_closure send_done; @@ -111,9 +111,13 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { return md; } -static int skip_compression(grpc_call_element *elem) { +static int skip_compression(grpc_call_element *elem, uint32_t flags) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + + if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { + return 1; + } if (calld->has_compression_algorithm) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { return 1; @@ -157,7 +161,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - gpr_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref(&calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } @@ -165,8 +169,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; int did_compress; - gpr_slice_buffer tmp; - gpr_slice_buffer_init(&tmp); + grpc_slice_buffer tmp; + grpc_slice_buffer_init(&tmp); did_compress = grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { @@ -181,7 +185,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); } - gpr_slice_buffer_swap(&calld->slices, &tmp); + grpc_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (grpc_compression_trace) { @@ -195,7 +199,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, } } - gpr_slice_buffer_destroy(&tmp); + grpc_slice_buffer_destroy(&tmp); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); @@ -209,7 +213,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); } else { @@ -223,7 +227,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message, &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); break; @@ -241,8 +245,8 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, if (op->send_initial_metadata) { process_send_initial_metadata(elem, op->send_initial_metadata); } - if (op->send_message != NULL && !skip_compression(elem) && - 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) { + if (op->send_message != NULL && + !skip_compression(elem, op->send_message->flags)) { calld->send_op = op; calld->send_length = op->send_message->length; calld->send_flags = op->send_message->flags; @@ -263,7 +267,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; /* initialize members */ - gpr_slice_buffer_init(&calld->slices); + grpc_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; grpc_closure_init(&calld->got_slice, got_slice, elem); grpc_closure_init(&calld->send_done, send_done, elem); @@ -277,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, void *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - gpr_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy(&calld->slices); } /* Constructor for channel_data */ @@ -328,4 +332,5 @@ const grpc_channel_filter grpc_compress_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "compress"}; diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 918379c845..038e819f72 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -38,9 +38,9 @@ #include <string.h> #include <grpc/byte_buffer.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport.h" @@ -134,6 +134,11 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return grpc_transport_get_peer(exec_ctx, chand->transport); } +/* No-op. */ +static void con_get_channel_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info) {} + static const grpc_channel_filter connected_channel_filter = { con_start_transport_stream_op, con_start_transport_op, @@ -145,6 +150,7 @@ static const grpc_channel_filter connected_channel_filter = { init_channel_elem, destroy_channel_elem, con_get_peer, + con_get_channel_info, "connected", }; diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 079b98a2f8..0e703d8d27 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -55,39 +55,58 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, deadline_state->timer_pending = false; gpr_mu_unlock(&deadline_state->timer_mu); if (error != GRPC_ERROR_CANCELLED) { - gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded"); + grpc_slice msg = grpc_slice_from_static_string("Deadline Exceeded"); grpc_call_element_send_cancel_with_message( exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg); - gpr_slice_unref(msg); + grpc_slice_unref(msg); } GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } // Starts the deadline timer. -static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - gpr_timespec deadline) { +static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + gpr_timespec deadline) { grpc_deadline_state* deadline_state = elem->call_data; deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + // Note: We do not start the timer if there is already a timer + // pending. This should be okay, because this is only called from two + // functions exported by this module: grpc_deadline_state_start(), which + // starts the initial timer, and grpc_deadline_state_reset(), which + // cancels any pre-existing timer before starting a new one. In + // particular, we want to ensure that if grpc_deadline_state_start() + // winds up trying to start the timer after grpc_deadline_state_reset() + // has already done so, we ignore the value from the former. + if (!deadline_state->timer_pending && + gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // Take a reference to the call stack, to be owned by the timer. GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - gpr_mu_lock(&deadline_state->timer_mu); deadline_state->timer_pending = true; grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback, elem, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(&deadline_state->timer_mu); } } +static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + gpr_timespec deadline) { + grpc_deadline_state* deadline_state = elem->call_data; + gpr_mu_lock(&deadline_state->timer_mu); + start_timer_if_needed_locked(exec_ctx, elem, deadline); + gpr_mu_unlock(&deadline_state->timer_mu); +} // Cancels the deadline timer. -static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { - gpr_mu_lock(&deadline_state->timer_mu); +static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { if (deadline_state->timer_pending) { grpc_timer_cancel(exec_ctx, &deadline_state->timer); deadline_state->timer_pending = false; } +} +static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { + gpr_mu_lock(&deadline_state->timer_mu); + cancel_timer_if_needed_locked(exec_ctx, deadline_state); gpr_mu_unlock(&deadline_state->timer_mu); } @@ -108,6 +127,21 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, op->on_complete = &deadline_state->on_complete; } +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_stack* call_stack) { + grpc_deadline_state* deadline_state = elem->call_data; + memset(deadline_state, 0, sizeof(*deadline_state)); + deadline_state->call_stack = call_stack; + gpr_mu_init(&deadline_state->timer_mu); +} + +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + grpc_deadline_state* deadline_state = elem->call_data; + cancel_timer_if_needed(exec_ctx, deadline_state); + gpr_mu_destroy(&deadline_state->timer_mu); +} + // Callback and associated state for starting the timer after call stack // initialization has been completed. struct start_timer_after_init_state { @@ -122,16 +156,11 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, gpr_free(state); } -void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args) { - grpc_deadline_state* deadline_state = elem->call_data; - memset(deadline_state, 0, sizeof(*deadline_state)); - deadline_state->call_stack = args->call_stack; - gpr_mu_init(&deadline_state->timer_mu); +void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec deadline) { // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. - const gpr_timespec deadline = - gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // When the deadline passes, we indicate the failure by sending down // an op with cancel_error set. However, we can't send down any ops @@ -148,11 +177,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, } } -void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec new_deadline) { grpc_deadline_state* deadline_state = elem->call_data; - cancel_timer_if_needed(exec_ctx, deadline_state); - gpr_mu_destroy(&deadline_state->timer_mu); + gpr_mu_lock(&deadline_state->timer_mu); + cancel_timer_if_needed_locked(exec_ctx, deadline_state); + start_timer_if_needed_locked(exec_ctx, elem, new_deadline); + gpr_mu_unlock(&deadline_state->timer_mu); } void grpc_deadline_state_client_start_transport_stream_op( @@ -209,7 +240,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element_args* args) { // Note: size of call data is different between client and server. memset(elem->call_data, 0, elem->filter->sizeof_call_data); - grpc_deadline_state_init(exec_ctx, elem, args); + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + grpc_deadline_state_start(exec_ctx, elem, args->deadline); return GRPC_ERROR_NONE; } @@ -284,6 +316,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "deadline", }; @@ -298,5 +331,6 @@ const grpc_channel_filter grpc_server_deadline_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "deadline", }; diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 685df87761..716a852565 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -54,18 +54,37 @@ typedef struct grpc_deadline_state { grpc_closure* next_on_complete; } grpc_deadline_state; -// To be used in a filter's init_call_elem(), destroy_call_elem(), and -// start_transport_stream_op() methods to enforce call deadlines. // -// REQUIRES: The first field in elem->call_data is a grpc_deadline_state. +// NOTE: All of these functions require that the first field in +// elem->call_data is a grpc_deadline_state. // -// For grpc_deadline_state_client_start_transport_stream_op(), it is the -// caller's responsibility to chain to the next filter if necessary -// after the function returns. + void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args); + grpc_call_stack* call_stack); void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem); + +// Starts the timer with the specified deadline. +// Should be called from the filter's init_call_elem() method. +void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec deadline); + +// Cancels the existing timer and starts a new one with new_deadline. +// +// Note: It is generally safe to call this with an earlier deadline +// value than the current one, but not the reverse. No checks are done +// to ensure that the timer callback is not invoked while it is in the +// process of being reset, which means that attempting to increase the +// deadline may result in the timer being called twice. +void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec new_deadline); + +// To be called from the client-side filter's start_transport_stream_op() +// method. Ensures that the deadline timer is cancelled when the call +// is completed. +// +// Note: It is the caller's responsibility to chain to the next filter if +// necessary after this function returns. void grpc_deadline_state_client_start_transport_stream_op( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op); diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 8f9fb17a31..a45a39981c 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -62,7 +62,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, + grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { @@ -146,8 +146,8 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, static void call_next_handshaker(grpc_exec_ctx* exec_ctx, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, void* user_data, - grpc_error* error) { + grpc_slice_buffer* read_buffer, + void* user_data, grpc_error* error) { grpc_handshake_manager* mgr = user_data; GPR_ASSERT(mgr->state != NULL); GPR_ASSERT(mgr->state->index < mgr->count); @@ -183,8 +183,8 @@ void grpc_handshake_manager_do_handshake( gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { grpc_channel_args* args_copy = grpc_channel_args_copy(args); - gpr_slice_buffer* read_buffer = malloc(sizeof(*read_buffer)); - gpr_slice_buffer_init(read_buffer); + grpc_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer)); + grpc_slice_buffer_init(read_buffer); if (mgr->count == 0) { // No handshakers registered, so we just immediately call the done // callback with the passed-in endpoint. diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index d574b46242..f8a36c6473 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -59,7 +59,7 @@ typedef struct grpc_handshaker grpc_handshaker; typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, + grpc_slice_buffer* read_buffer, void* user_data, grpc_error* error); struct grpc_handshaker_vtable { @@ -77,7 +77,7 @@ struct grpc_handshaker_vtable { /// \a acceptor will be NULL for client-side handshakers. void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, gpr_timespec deadline, + grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data); }; @@ -103,7 +103,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, + grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 1dc05fb20d..f57d7c2453 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -62,9 +62,9 @@ typedef struct call_data { grpc_transport_stream_op send_op; uint32_t send_length; uint32_t send_flags; - gpr_slice incoming_slice; + grpc_slice incoming_slice; grpc_slice_buffer_stream replacement_stream; - gpr_slice_buffer slices; + grpc_slice_buffer slices; /* flag that indicates that all slices of send_messages aren't availble */ bool send_message_blocked; @@ -101,7 +101,7 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { char *message_string; gpr_asprintf(&message_string, "Received http2 header with status: %s", grpc_mdstr_as_c_string(md->value)); - gpr_slice message = gpr_slice_from_copied_string(message_string); + grpc_slice message = grpc_slice_from_copied_string(message_string); gpr_free(message_string); grpc_call_element_send_close_with_message(a->exec_ctx, a->elem, GRPC_STATUS_CANCELLED, &message); @@ -155,7 +155,7 @@ static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - gpr_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref(&calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } @@ -176,10 +176,10 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { - memcpy(wrptr, GPR_SLICE_START_PTR(calld->incoming_slice), - GPR_SLICE_LENGTH(calld->incoming_slice)); - wrptr += GPR_SLICE_LENGTH(calld->incoming_slice); - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), + GRPC_SLICE_LENGTH(calld->incoming_slice)); + wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { calld->send_message_blocked = false; break; @@ -191,7 +191,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; calld->send_message_blocked = false; - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { /* Pass down the original send_message op that was blocked.*/ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, @@ -311,7 +311,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, calld->on_done_recv = NULL; calld->on_complete = NULL; calld->payload_bytes = NULL; - gpr_slice_buffer_init(&calld->slices); + grpc_slice_buffer_init(&calld->slices); grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem); grpc_closure_init(&calld->got_slice, got_slice, elem); @@ -324,7 +324,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, void *ignored) { call_data *calld = elem->call_data; - gpr_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy(&calld->slices); } static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { @@ -448,4 +448,5 @@ const grpc_channel_filter grpc_http_client_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "http-client"}; diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index 0f2bf97824..6a33689fec 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -42,6 +42,8 @@ #define EXPECTED_CONTENT_TYPE "application/grpc" #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 +extern int grpc_http_trace; + typedef struct call_data { uint8_t seen_path; uint8_t seen_method; @@ -66,7 +68,7 @@ typedef struct call_data { grpc_closure *recv_message_ready; grpc_closure *on_complete; grpc_byte_stream **pp_recv_message; - gpr_slice_buffer read_slice_buffer; + grpc_slice_buffer read_slice_buffer; grpc_slice_buffer_stream read_stream; /** Receive closures are chained: we inject this closure as the on_done_recv @@ -160,9 +162,8 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* Retrieve the payload from the value of the 'grpc-internal-payload-bin' header field */ calld->seen_payload_bin = 1; - gpr_slice_buffer_init(&calld->read_slice_buffer); - gpr_slice_buffer_add(&calld->read_slice_buffer, - gpr_slice_ref(md->value->slice)); + grpc_slice_buffer_add(&calld->read_slice_buffer, + grpc_slice_ref(md->value->slice)); grpc_slice_buffer_stream_init(&calld->read_stream, &calld->read_slice_buffer, 0); return NULL; @@ -209,6 +210,11 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, err, GRPC_ERROR_CREATE("Missing te: trailers header")); } /* Error this call out */ + if (grpc_http_trace) { + const char *error_str = grpc_error_string(err); + gpr_log(GPR_ERROR, "Invalid http2 headers: %s", error_str); + grpc_error_free_string(error_str); + } grpc_call_element_send_cancel(exec_ctx, elem); } } else { @@ -307,13 +313,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem); grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem); + grpc_slice_buffer_init(&calld->read_slice_buffer); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) {} + void *ignored) { + call_data *calld = elem->call_data; + grpc_slice_buffer_destroy(&calld->read_slice_buffer); +} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, @@ -337,4 +347,5 @@ const grpc_channel_filter grpc_http_server_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "http-server"}; diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index f067a3a51c..1331fe1c65 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -39,12 +39,53 @@ #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/transport/method_config.h" #define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited. // The protobuf library will (by default) start warning at 100 megs. #define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) +typedef struct message_size_limits { + int max_send_size; + int max_recv_size; +} message_size_limits; + +static void* message_size_limits_copy(void* value) { + void* new_value = gpr_malloc(sizeof(message_size_limits)); + memcpy(new_value, value, sizeof(message_size_limits)); + return new_value; +} + +static int message_size_limits_cmp(void* value1, void* value2) { + const message_size_limits* v1 = value1; + const message_size_limits* v2 = value2; + if (v1->max_send_size > v2->max_send_size) return 1; + if (v1->max_send_size < v2->max_send_size) return -1; + if (v1->max_recv_size > v2->max_recv_size) return 1; + if (v1->max_recv_size < v2->max_recv_size) return -1; + return 0; +} + +static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = { + gpr_free, message_size_limits_copy, message_size_limits_cmp}; + +static void* method_config_convert_value( + const grpc_method_config* method_config) { + message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); + const int32_t* max_request_message_bytes = + grpc_method_config_get_max_request_message_bytes(method_config); + value->max_send_size = + max_request_message_bytes != NULL ? *max_request_message_bytes : -1; + const int32_t* max_response_message_bytes = + grpc_method_config_get_max_response_message_bytes(method_config); + value->max_recv_size = + max_response_message_bytes != NULL ? *max_response_message_bytes : -1; + return value; +} + typedef struct call_data { + int max_send_size; + int max_recv_size; // Receive closures are chained: we inject this closure as the // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. @@ -58,6 +99,8 @@ typedef struct call_data { typedef struct channel_data { int max_send_size; int max_recv_size; + // Maps path names to message_size_limits structs. + grpc_mdstr_hash_table* method_limit_table; } channel_data; // Callback invoked when we receive a message. Here we check the max @@ -66,13 +109,12 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { grpc_call_element* elem = user_data; call_data* calld = elem->call_data; - channel_data* chand = elem->channel_data; - if (*calld->recv_message != NULL && chand->max_recv_size >= 0 && - (*calld->recv_message)->length > (size_t)chand->max_recv_size) { + if (*calld->recv_message != NULL && calld->max_recv_size >= 0 && + (*calld->recv_message)->length > (size_t)calld->max_recv_size) { char* message_string; gpr_asprintf(&message_string, "Received message larger than max (%u vs. %d)", - (*calld->recv_message)->length, chand->max_recv_size); + (*calld->recv_message)->length, calld->max_recv_size); grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INVALID_ARGUMENT); @@ -93,14 +135,13 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { call_data* calld = elem->call_data; - channel_data* chand = elem->channel_data; // Check max send message size. - if (op->send_message != NULL && chand->max_send_size >= 0 && - op->send_message->length > (size_t)chand->max_send_size) { + if (op->send_message != NULL && calld->max_send_size >= 0 && + op->send_message->length > (size_t)calld->max_send_size) { char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->send_message->length, chand->max_send_size); - gpr_slice message = gpr_slice_from_copied_string(message_string); + op->send_message->length, calld->max_send_size); + grpc_slice message = grpc_slice_from_copied_string(message_string); gpr_free(message_string); grpc_call_element_send_close_with_message( exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); @@ -119,9 +160,32 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_element_args* args) { + channel_data* chand = elem->channel_data; call_data* calld = elem->call_data; calld->next_recv_message_ready = NULL; grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem); + // Get max sizes from channel data, then merge in per-method config values. + // Note: Per-method config is only available on the client, so we + // apply the max request size to the send limit and the max response + // size to the receive limit. + calld->max_send_size = chand->max_send_size; + calld->max_recv_size = chand->max_recv_size; + if (chand->method_limit_table != NULL) { + message_size_limits* limits = + grpc_method_config_table_get(chand->method_limit_table, args->path); + if (limits != NULL) { + if (limits->max_send_size >= 0 && + (limits->max_send_size < calld->max_send_size || + calld->max_send_size < 0)) { + calld->max_send_size = limits->max_send_size; + } + if (limits->max_recv_size >= 0 && + (limits->max_recv_size < calld->max_recv_size || + calld->max_recv_size < 0)) { + calld->max_recv_size = limits->max_recv_size; + } + } + } return GRPC_ERROR_NONE; } @@ -155,11 +219,23 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } } + // Get method config table from channel args. + const grpc_arg* channel_arg = + grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != NULL) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + chand->method_limit_table = grpc_method_config_table_convert( + (grpc_method_config_table*)channel_arg->value.pointer.p, + method_config_convert_value, &message_size_limits_vtable); + } } // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} + grpc_channel_element* elem) { + channel_data* chand = elem->channel_data; + grpc_mdstr_hash_table_unref(chand->method_limit_table); +} const grpc_channel_filter grpc_message_size_filter = { start_transport_stream_op, @@ -172,4 +248,5 @@ const grpc_channel_filter grpc_message_size_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "message_size"}; |