diff options
Diffstat (limited to 'src/core/lib/channel')
22 files changed, 728 insertions, 313 deletions
diff --git a/src/core/lib/channel/README.md b/src/core/lib/channel/README.md new file mode 100644 index 0000000000..2dfcfe6e66 --- /dev/null +++ b/src/core/lib/channel/README.md @@ -0,0 +1,4 @@ +# Channel + +Provides channel/call stack implementation, and implementation of common filters +for that implementation. diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 401a2ad4fe..1a099ac437 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -184,7 +184,7 @@ grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a) { return b; } -void grpc_channel_args_destroy(grpc_channel_args *a) { +void grpc_channel_args_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_args *a) { size_t i; if (!a) return; for (i = 0; i < a->num_args; i++) { @@ -195,7 +195,8 @@ void grpc_channel_args_destroy(grpc_channel_args *a) { case GRPC_ARG_INTEGER: break; case GRPC_ARG_POINTER: - a->args[i].value.pointer.vtable->destroy(a->args[i].value.pointer.p); + a->args[i].value.pointer.vtable->destroy(exec_ctx, + a->args[i].value.pointer.p); break; } gpr_free(a->args[i].key); @@ -249,7 +250,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a, } grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) { + grpc_exec_ctx *exec_ctx, grpc_channel_args **a, + grpc_compression_algorithm algorithm, int state) { int *states_arg = NULL; grpc_channel_args *result = *a; const int states_arg_found = @@ -282,7 +284,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); } result = grpc_channel_args_copy_and_add(*a, &tmp, 1); - grpc_channel_args_destroy(*a); + grpc_channel_args_destroy(exec_ctx, *a); *a = result; } return result; diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 88fc0e37a3..5c7d31f8bb 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -68,7 +68,7 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, const grpc_channel_args *b); /** Destroy arguments created by \a grpc_channel_args_copy */ -void grpc_channel_args_destroy(grpc_channel_args *a); +void grpc_channel_args_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_args *a); /** Returns the compression algorithm set in \a a. */ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( @@ -88,7 +88,8 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( * modified to point to the returned instance (which may be different from the * input value of \a a). */ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled); + grpc_exec_ctx *exec_ctx, grpc_channel_args **a, + grpc_compression_algorithm algorithm, int enabled); /** Returns the bitset representing the support state (true for enabled, false * for disabled) for compression algorithms. diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 999ad5f507..8f08b427fb 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -102,13 +102,11 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, return CALL_ELEMS_FROM_STACK(call_stack) + index; } -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, - grpc_iomgr_cb_func destroy, void *destroy_arg, - const grpc_channel_filter **filters, - size_t filter_count, - const grpc_channel_args *channel_args, - grpc_transport *optional_transport, - const char *name, grpc_channel_stack *stack) { +grpc_error *grpc_channel_stack_init( + grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, + const grpc_channel_args *channel_args, grpc_transport *optional_transport, + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); @@ -126,6 +124,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); /* init per-filter data */ + grpc_error *first_error = GRPC_ERROR_NONE; for (i = 0; i < filter_count; i++) { args.channel_stack = stack; args.channel_args = channel_args; @@ -134,7 +133,15 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, args.is_last = i == (filter_count - 1); elems[i].filter = filters[i]; elems[i].channel_data = user_data; - elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args); + grpc_error *error = + elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args); + if (error != GRPC_ERROR_NONE) { + if (first_error == GRPC_ERROR_NONE) { + first_error = error; + } else { + GRPC_ERROR_UNREF(error); + } + } user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); } @@ -144,6 +151,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, grpc_channel_stack_size(filters, filter_count)); stack->call_stack_size = call_size; + return first_error; } void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, @@ -289,7 +297,8 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); op->cancel_error = GRPC_ERROR_CANCELLED; - op->on_complete = grpc_closure_create(destroy_op, op); + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } @@ -299,8 +308,9 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); - op->on_complete = grpc_closure_create(destroy_op, op); - grpc_transport_stream_op_add_cancellation_with_message(op, status, + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); + grpc_transport_stream_op_add_cancellation_with_message(exec_ctx, op, status, optional_message); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } @@ -311,7 +321,8 @@ void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); - op->on_complete = grpc_closure_create(destroy_op, op); - grpc_transport_stream_op_add_close(op, status, optional_message); + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); + grpc_transport_stream_op_add_close(exec_ctx, op, status, optional_message); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 004643d45f..d9d3a85233 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -34,6 +34,13 @@ #ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H #define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H +////////////////////////////////////////////////////////////////////////////// +// IMPORTANT NOTE: +// +// When you update this API, please make the corresponding changes to +// the C++ API in src/cpp/common/channel_filter.{h,cc} +////////////////////////////////////////////////////////////////////////////// + /* A channel filter defines how operations on a channel are implemented. Channel filters are chained together to create full channels, and if those chains are linear, then channel stacks provide a mechanism to minimize @@ -146,8 +153,9 @@ typedef struct { is_first, is_last designate this elements position in the stack, and are useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining */ - void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_channel_element_args *args); + grpc_error *(*init_channel_elem)(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args); /* Destroy per channel data. The filter does not need to do any chaining */ void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx, @@ -214,12 +222,11 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, - grpc_iomgr_cb_func destroy, void *destroy_arg, - const grpc_channel_filter **filters, - size_t filter_count, const grpc_channel_args *args, - grpc_transport *optional_transport, - const char *name, grpc_channel_stack *stack); +grpc_error *grpc_channel_stack_init( + grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, + const grpc_channel_args *args, grpc_transport *optional_transport, + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_stack *stack); diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c index eda4968f48..5f9e3b4539 100644 --- a/src/core/lib/channel/channel_stack_builder.c +++ b/src/core/lib/channel/channel_stack_builder.c @@ -138,9 +138,10 @@ void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder, } void grpc_channel_stack_builder_set_channel_arguments( - grpc_channel_stack_builder *builder, const grpc_channel_args *args) { + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + const grpc_channel_args *args) { if (builder->args != NULL) { - grpc_channel_args_destroy(builder->args); + grpc_channel_args_destroy(exec_ctx, builder->args); } builder->args = grpc_channel_args_copy(args); } @@ -213,7 +214,8 @@ bool grpc_channel_stack_builder_add_filter_after( return true; } -void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) { +void grpc_channel_stack_builder_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder) { filter_node *p = builder->begin.next; while (p != &builder->end) { filter_node *next = p->next; @@ -221,17 +223,16 @@ void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) { p = next; } if (builder->args != NULL) { - grpc_channel_args_destroy(builder->args); + grpc_channel_args_destroy(exec_ctx, builder->args); } gpr_free(builder->target); gpr_free(builder); } -void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx, - grpc_channel_stack_builder *builder, - size_t prefix_bytes, int initial_refs, - grpc_iomgr_cb_func destroy, - void *destroy_arg) { +grpc_error *grpc_channel_stack_builder_finish( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + size_t prefix_bytes, int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, void **result) { // count the number of filters size_t num_filters = 0; for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) { @@ -250,28 +251,35 @@ void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx, size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters); // allocate memory, with prefix_bytes followed by channel_stack_size - char *result = gpr_malloc(prefix_bytes + channel_stack_size); + *result = gpr_malloc(prefix_bytes + channel_stack_size); // fetch a pointer to the channel stack grpc_channel_stack *channel_stack = - (grpc_channel_stack *)(result + prefix_bytes); + (grpc_channel_stack *)((char *)(*result) + prefix_bytes); // and initialize it - grpc_channel_stack_init(exec_ctx, initial_refs, destroy, - destroy_arg == NULL ? result : destroy_arg, filters, - num_filters, builder->args, builder->transport, - builder->name, channel_stack); - - // run post-initialization functions - i = 0; - for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) { - if (p->init != NULL) { - p->init(channel_stack, grpc_channel_stack_element(channel_stack, i), - p->init_arg); + grpc_error *error = grpc_channel_stack_init( + exec_ctx, initial_refs, destroy, + destroy_arg == NULL ? *result : destroy_arg, filters, num_filters, + builder->args, builder->transport, builder->name, channel_stack); + + if (error != GRPC_ERROR_NONE) { + grpc_channel_stack_destroy(exec_ctx, channel_stack); + gpr_free(*result); + *result = NULL; + } else { + // run post-initialization functions + i = 0; + for (filter_node *p = builder->begin.next; p != &builder->end; + p = p->next) { + if (p->init != NULL) { + p->init(channel_stack, grpc_channel_stack_element(channel_stack, i), + p->init_arg); + } + i++; } - i++; } - grpc_channel_stack_builder_destroy(builder); + grpc_channel_stack_builder_destroy(exec_ctx, builder); gpr_free((grpc_channel_filter **)filters); - return result; + return error; } diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index 4a00f7bfdb..8adf38e27b 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -73,7 +73,8 @@ grpc_transport *grpc_channel_stack_builder_get_transport( /// Set channel arguments: copies args void grpc_channel_stack_builder_set_channel_arguments( - grpc_channel_stack_builder *builder, const grpc_channel_args *args); + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + const grpc_channel_args *args); /// Return a borrowed pointer to the channel arguments const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments( @@ -146,19 +147,19 @@ bool grpc_channel_stack_builder_append_filter( void grpc_channel_stack_builder_iterator_destroy( grpc_channel_stack_builder_iterator *iterator); -/// Destroy the builder, return the freshly minted channel stack +/// Destroy the builder, return the freshly minted channel stack in \a result. /// Allocates \a prefix_bytes bytes before the channel stack /// Returns the base pointer of the allocated block /// \a initial_refs, \a destroy, \a destroy_arg are as per /// grpc_channel_stack_init -void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx, - grpc_channel_stack_builder *builder, - size_t prefix_bytes, int initial_refs, - grpc_iomgr_cb_func destroy, - void *destroy_arg); +grpc_error *grpc_channel_stack_builder_finish( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + size_t prefix_bytes, int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, void **result); /// Destroy the builder without creating a channel stack -void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder); +void grpc_channel_stack_builder_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder); extern int grpc_trace_channel_stack_builder; diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 2874d63fc7..337c194b79 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -44,6 +44,7 @@ #include "src/core/lib/compression/algorithm_metadata.h" #include "src/core/lib/compression/message_compress.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -82,7 +83,8 @@ typedef struct channel_data { /** For each \a md element from the incoming metadata, filter out the entry for * "grpc-encoding", using its value to populate the call data's * compression_algorithm field. */ -static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *compression_md_filter(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -130,12 +132,14 @@ static int skip_compression(grpc_call_element *elem, uint32_t flags) { /** Filter initial metadata */ static void process_send_initial_metadata( - grpc_call_element *elem, grpc_metadata_batch *initial_metadata) { + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; /* Parse incoming request for compression. If any, it'll be available * at calld->compression_algorithm */ - grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem); + grpc_metadata_batch_filter(exec_ctx, initial_metadata, compression_md_filter, + elem); if (!calld->has_compression_algorithm) { /* If no algorithm was found in the metadata and we aren't * exceptionally skipping compression, fall back to the channel @@ -161,7 +165,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } @@ -171,8 +175,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, int did_compress; grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); - did_compress = - grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); + did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, + &calld->slices, &tmp); if (did_compress) { if (grpc_compression_trace) { char *algo_name; @@ -199,7 +203,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, } } - grpc_slice_buffer_destroy(&tmp); + grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); @@ -243,7 +247,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); if (op->send_initial_metadata) { - process_send_initial_metadata(elem, op->send_initial_metadata); + process_send_initial_metadata(exec_ctx, elem, op->send_initial_metadata); } if (op->send_message != NULL && !skip_compression(elem, op->send_message->flags)) { @@ -269,8 +273,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ grpc_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; - grpc_closure_init(&calld->got_slice, got_slice, elem); - grpc_closure_init(&calld->send_done, send_done, elem); + grpc_closure_init(&calld->got_slice, got_slice, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->send_done, send_done, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -281,13 +287,13 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, void *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - grpc_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); } /* Constructor for channel_data */ -static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *channeld = elem->channel_data; channeld->enabled_algorithms_bitset = @@ -315,6 +321,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, } GPR_ASSERT(!args->is_last); + return GRPC_ERROR_NONE; } /* Destructor for channel data */ diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 038e819f72..ccc0619e1c 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -114,12 +114,13 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Constructor for channel_data */ -static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *cd = (channel_data *)elem->channel_data; GPR_ASSERT(args->is_last); cd->transport = NULL; + return GRPC_ERROR_NONE; } /* Destructor for channel_data */ @@ -170,7 +171,8 @@ static void bind_transport(grpc_channel_stack *channel_stack, channel_stack->call_stack_size += grpc_transport_stream_size(t); } -bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, +bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg_must_be_null) { GPR_ASSERT(arg_must_be_null == NULL); grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h index 3142d647b7..3585c0ecbc 100644 --- a/src/core/lib/channel/connected_channel.h +++ b/src/core/lib/channel/connected_channel.h @@ -36,7 +36,8 @@ #include "src/core/lib/channel/channel_stack_builder.h" -bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, +bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg_must_be_null); #endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 071c5f695c..6c931ad28a 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -47,6 +47,9 @@ typedef enum { /// Value is a \a census_context. GRPC_CONTEXT_TRACING, + /// Reserved for traffic_class_context. + GRPC_CONTEXT_TRAFFIC, + GRPC_CONTEXT_COUNT } grpc_context_index; diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 0e703d8d27..a45a4d4b82 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/slice/slice_internal.h" // // grpc_deadline_state @@ -58,7 +59,7 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice msg = grpc_slice_from_static_string("Deadline Exceeded"); grpc_call_element_send_cancel_with_message( exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg); - grpc_slice_unref(msg); + grpc_slice_unref_internal(exec_ctx, msg); } GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } @@ -82,8 +83,11 @@ static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, // Take a reference to the call stack, to be owned by the timer. GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); deadline_state->timer_pending = true; - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback, - elem, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem, + grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, + &deadline_state->timer_callback, + gpr_now(GPR_CLOCK_MONOTONIC)); } } static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, @@ -123,7 +127,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { static void inject_on_complete_cb(grpc_deadline_state* deadline_state, grpc_transport_stream_op* op) { deadline_state->next_on_complete = op->on_complete; - grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state); + grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state, + grpc_schedule_on_exec_ctx); op->on_complete = &deadline_state->on_complete; } @@ -172,8 +177,9 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); state->elem = elem; state->deadline = deadline; - grpc_closure_init(&state->closure, start_timer_after_init, state); - grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_init(&state->closure, start_timer_after_init, state, + grpc_schedule_on_exec_ctx); + grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE); } } @@ -207,10 +213,11 @@ void grpc_deadline_state_client_start_transport_stream_op( // // Constructor for channel_data. Used for both client and server filters. -static void init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, - grpc_channel_element_args* args) { +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); + return GRPC_ERROR_NONE; } // Destructor for channel_data. Used for both client and server filters. @@ -289,7 +296,8 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready; calld->recv_initial_metadata = op->recv_initial_metadata; grpc_closure_init(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, elem); + recv_initial_metadata_ready, elem, + grpc_schedule_on_exec_ctx); op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; } // Make sure we know when the call is complete, so that we can cancel diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 716a852565..bd2b84f79e 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -46,6 +46,7 @@ typedef struct grpc_deadline_state { bool timer_pending; // The deadline timer. grpc_timer timer; + grpc_closure timer_callback; // Closure to invoke when the call is complete. // We use this to cancel the timer. grpc_closure on_complete; diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index a45a39981c..c052ca5385 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -38,12 +38,13 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/timer.h" // // grpc_handshaker // -void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable, +void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, grpc_handshaker* handshaker) { handshaker->vtable = vtable; } @@ -60,45 +61,44 @@ void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data) { - handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args, - read_buffer, deadline, acceptor, cb, - user_data); + grpc_closure* on_handshake_done, + grpc_handshaker_args* args) { + handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor, + on_handshake_done, args); } // // grpc_handshake_manager // -// State used while chaining handshakers. -struct grpc_handshaker_state { - // The index of the handshaker to invoke next. - size_t index; - // The deadline for all handshakers. - gpr_timespec deadline; - // The acceptor to call the handshakers with. - grpc_tcp_server_acceptor* acceptor; - // The final callback and user_data to invoke after the last handshaker. - grpc_handshaker_done_cb final_cb; - void* final_user_data; -}; - struct grpc_handshake_manager { + gpr_mu mu; + gpr_refcount refs; + bool shutdown; // An array of handshakers added via grpc_handshake_manager_add(). size_t count; grpc_handshaker** handshakers; - // State used while chaining handshakers. - struct grpc_handshaker_state* state; + // The index of the handshaker to invoke next and closure to invoke it. + size_t index; + grpc_closure call_next_handshaker; + // The acceptor to call the handshakers with. + grpc_tcp_server_acceptor* acceptor; + // Deadline timer across all handshakers. + grpc_timer deadline_timer; + grpc_closure on_timeout; + // The final callback and user_data to invoke after the last handshaker. + grpc_closure on_handshake_done; + void* user_data; + // Handshaker args. + grpc_handshaker_args args; }; grpc_handshake_manager* grpc_handshake_manager_create() { grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager)); memset(mgr, 0, sizeof(*mgr)); + gpr_mu_init(&mgr->mu); + gpr_ref_init(&mgr->refs, 1); return mgr; } @@ -106,6 +106,7 @@ static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; } void grpc_handshake_manager_add(grpc_handshake_manager* mgr, grpc_handshaker* handshaker) { + gpr_mu_lock(&mgr->mu); // To avoid allocating memory for each handshaker we add, we double // the number of elements every time we need more. size_t realloc_count = 0; @@ -119,85 +120,121 @@ void grpc_handshake_manager_add(grpc_handshake_manager* mgr, gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*)); } mgr->handshakers[mgr->count++] = handshaker; + gpr_mu_unlock(&mgr->mu); +} + +static void grpc_handshake_manager_unref(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr) { + if (gpr_unref(&mgr->refs)) { + for (size_t i = 0; i < mgr->count; ++i) { + grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); + } + gpr_free(mgr->handshakers); + gpr_mu_destroy(&mgr->mu); + gpr_free(mgr); + } } void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr) { - for (size_t i = 0; i < mgr->count; ++i) { - grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); - } - gpr_free(mgr->handshakers); - gpr_free(mgr); + grpc_handshake_manager_unref(exec_ctx, mgr); } void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr) { - for (size_t i = 0; i < mgr->count; ++i) { - grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]); + gpr_mu_lock(&mgr->mu); + // Shutdown the handshaker that's currently in progress, if any. + if (!mgr->shutdown && mgr->index > 0) { + mgr->shutdown = true; + grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]); } - if (mgr->state != NULL) { - gpr_free(mgr->state); - mgr->state = NULL; + gpr_mu_unlock(&mgr->mu); +} + +// Helper function to call either the next handshaker or the +// on_handshake_done callback. +// Returns true if we've scheduled the on_handshake_done callback. +static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr, + grpc_error* error) { + GPR_ASSERT(mgr->index <= mgr->count); + // If we got an error or we've been shut down or we're exiting early or + // we've finished the last handshaker, invoke the on_handshake_done + // callback. Otherwise, call the next handshaker. + if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early || + mgr->index == mgr->count) { + // Cancel deadline timer, since we're invoking the on_handshake_done + // callback now. + grpc_timer_cancel(exec_ctx, &mgr->deadline_timer); + grpc_closure_sched(exec_ctx, &mgr->on_handshake_done, error); + mgr->shutdown = true; + } else { + grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index], + mgr->acceptor, &mgr->call_next_handshaker, + &mgr->args); } + ++mgr->index; + return mgr->shutdown; } // A function used as the handshaker-done callback when chaining // handshakers together. -static void call_next_handshaker(grpc_exec_ctx* exec_ctx, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - void* user_data, grpc_error* error) { - grpc_handshake_manager* mgr = user_data; - GPR_ASSERT(mgr->state != NULL); - GPR_ASSERT(mgr->state->index < mgr->count); - // If we got an error, skip all remaining handshakers and invoke the - // caller-supplied callback immediately. - if (error != GRPC_ERROR_NONE) { - mgr->state->final_cb(exec_ctx, endpoint, args, read_buffer, - mgr->state->final_user_data, error); - return; +static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_handshake_manager* mgr = arg; + gpr_mu_lock(&mgr->mu); + bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&mgr->mu); + // If we're invoked the final callback, we won't be coming back + // to this function, so we can release our reference to the + // handshake manager. + if (done) { + grpc_handshake_manager_unref(exec_ctx, mgr); } - grpc_handshaker_done_cb cb = call_next_handshaker; - // If this is the last handshaker, use the caller-supplied callback - // and user_data instead of chaining back to this function again. - if (mgr->state->index == mgr->count - 1) { - cb = mgr->state->final_cb; - user_data = mgr->state->final_user_data; - } - // Invoke handshaker. - grpc_handshaker_do_handshake( - exec_ctx, mgr->handshakers[mgr->state->index], endpoint, args, - read_buffer, mgr->state->deadline, mgr->state->acceptor, cb, user_data); - ++mgr->state->index; - // If this is the last handshaker, clean up state. - if (mgr->state->index == mgr->count) { - gpr_free(mgr->state); - mgr->state = NULL; +} + +// Callback invoked when deadline is exceeded. +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_handshake_manager* mgr = arg; + if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. + grpc_handshake_manager_shutdown(exec_ctx, mgr); } + grpc_handshake_manager_unref(exec_ctx, mgr); } void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, const grpc_channel_args* args, + grpc_endpoint* endpoint, const grpc_channel_args* channel_args, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data) { - grpc_channel_args* args_copy = grpc_channel_args_copy(args); - grpc_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer)); - grpc_slice_buffer_init(read_buffer); - if (mgr->count == 0) { - // No handshakers registered, so we just immediately call the done - // callback with the passed-in endpoint. - cb(exec_ctx, endpoint, args_copy, read_buffer, user_data, GRPC_ERROR_NONE); - } else { - GPR_ASSERT(mgr->state == NULL); - mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state)); - memset(mgr->state, 0, sizeof(*mgr->state)); - mgr->state->deadline = deadline; - mgr->state->acceptor = acceptor; - mgr->state->final_cb = cb; - mgr->state->final_user_data = user_data; - call_next_handshaker(exec_ctx, endpoint, args_copy, read_buffer, mgr, - GRPC_ERROR_NONE); + grpc_iomgr_cb_func on_handshake_done, void* user_data) { + gpr_mu_lock(&mgr->mu); + GPR_ASSERT(mgr->index == 0); + GPR_ASSERT(!mgr->shutdown); + // Construct handshaker args. These will be passed through all + // handshakers and eventually be freed by the on_handshake_done callback. + mgr->args.endpoint = endpoint; + mgr->args.args = grpc_channel_args_copy(channel_args); + mgr->args.user_data = user_data; + mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer)); + grpc_slice_buffer_init(mgr->args.read_buffer); + // Initialize state needed for calling handshakers. + mgr->acceptor = acceptor; + grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args, + grpc_schedule_on_exec_ctx); + // Start deadline timer, which owns a ref. + gpr_ref(&mgr->refs); + grpc_closure_init(&mgr->on_timeout, on_timeout, mgr, + grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &mgr->deadline_timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + &mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + // Start first handshaker, which also owns a ref. + gpr_ref(&mgr->refs); + bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE); + gpr_mu_unlock(&mgr->mu); + if (done) { + grpc_handshake_manager_unref(exec_ctx, mgr); } } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index f8a36c6473..450b7adaee 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -54,15 +54,33 @@ typedef struct grpc_handshaker grpc_handshaker; -/// Callback type invoked when a handshaker is done. -/// Takes ownership of \a args and \a read_buffer. -typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - void* user_data, grpc_error* error); - -struct grpc_handshaker_vtable { +/// Arguments passed through handshakers and to the on_handshake_done callback. +/// +/// For handshakers, all members are input/output parameters; for +/// example, a handshaker may read from or write to \a endpoint and +/// then later replace it with a wrapped endpoint. Similarly, a +/// handshaker may modify \a args. +/// +/// A handshaker takes ownership of the members while a handshake is in +/// progress. Upon failure or shutdown of an in-progress handshaker, +/// the handshaker is responsible for destroying the members and setting +/// them to NULL before invoking the on_handshake_done callback. +/// +/// For the on_handshake_done callback, all members are input arguments, +/// which the callback takes ownership of. +typedef struct { + grpc_endpoint* endpoint; + grpc_channel_args* args; + grpc_slice_buffer* read_buffer; + // A handshaker may set this to true before invoking on_handshake_done + // to indicate that subsequent handshakers should be skipped. + bool exit_early; + // User data passed through the handshake manager. Not used by + // individual handshakers. + void* user_data; +} grpc_handshaker_args; + +typedef struct { /// Destroys the handshaker. void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); @@ -70,43 +88,35 @@ struct grpc_handshaker_vtable { /// aborted in the middle). void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); - /// Performs handshaking. When finished, calls \a cb with \a user_data. - /// Takes ownership of \a args. - /// Takes ownership of \a read_buffer, which contains leftover bytes read - /// from the endpoint by the previous handshaker. + /// Performs handshaking, modifying \a args as needed (e.g., to + /// replace \a endpoint with a wrapped endpoint). + /// When finished, invokes \a on_handshake_done. /// \a acceptor will be NULL for client-side handshakers. void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - grpc_endpoint* endpoint, grpc_channel_args* args, - grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data); -}; + grpc_closure* on_handshake_done, + grpc_handshaker_args* args); +} grpc_handshaker_vtable; /// Base struct. To subclass, make this the first member of the /// implementation struct. struct grpc_handshaker { - const struct grpc_handshaker_vtable* vtable; + const grpc_handshaker_vtable* vtable; }; /// Called by concrete implementations to initialize the base struct. -void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable, +void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, grpc_handshaker* handshaker); -/// Convenient wrappers for invoking methods via the vtable. -/// These probably do not need to be called from anywhere but -/// grpc_handshake_manager. void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data); + grpc_closure* on_handshake_done, + grpc_handshaker_args* args); /// /// grpc_handshake_manager @@ -134,15 +144,21 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr); /// Invokes handshakers in the order they were added. -/// Does NOT take ownership of \a args. Instead, makes a copy before +/// Takes ownership of \a endpoint, and then passes that ownership to +/// the \a on_handshake_done callback. +/// Does NOT take ownership of \a channel_args. Instead, makes a copy before /// invoking the first handshaker. /// \a acceptor will be NULL for client-side handshakers. -/// Invokes \a cb with \a user_data after either a handshaker fails or -/// all handshakers have completed successfully. +/// +/// When done, invokes \a on_handshake_done with a grpc_handshaker_args +/// object as its argument. If the callback is invoked with error != +/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done +/// the necessary clean-up. Otherwise, the callback takes ownership of +/// the arguments. void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, const grpc_channel_args* args, + grpc_endpoint* endpoint, const grpc_channel_args* channel_args, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data); + grpc_iomgr_cb_func on_handshake_done, void* user_data); #endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ diff --git a/src/core/lib/channel/handshaker_factory.c b/src/core/lib/channel/handshaker_factory.c new file mode 100644 index 0000000000..3c30a4e1d2 --- /dev/null +++ b/src/core/lib/channel/handshaker_factory.c @@ -0,0 +1,54 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/handshaker_factory.h" + +#include <grpc/support/log.h> + +void grpc_handshaker_factory_add_handshakers( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory, + const grpc_channel_args *args, grpc_handshake_manager *handshake_mgr) { + if (handshaker_factory != NULL) { + GPR_ASSERT(handshaker_factory->vtable != NULL); + handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory, + args, handshake_mgr); + } +} + +void grpc_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory) { + if (handshaker_factory != NULL) { + GPR_ASSERT(handshaker_factory->vtable != NULL); + handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); + } +} diff --git a/src/core/lib/channel/handshaker_factory.h b/src/core/lib/channel/handshaker_factory.h new file mode 100644 index 0000000000..1984546104 --- /dev/null +++ b/src/core/lib/channel/handshaker_factory.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H +#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +// A handshaker factory is used to create handshakers. + +typedef struct grpc_handshaker_factory grpc_handshaker_factory; + +typedef struct { + void (*add_handshakers)(grpc_exec_ctx *exec_ctx, + grpc_handshaker_factory *handshaker_factory, + const grpc_channel_args *args, + grpc_handshake_manager *handshake_mgr); + void (*destroy)(grpc_exec_ctx *exec_ctx, + grpc_handshaker_factory *handshaker_factory); +} grpc_handshaker_factory_vtable; + +struct grpc_handshaker_factory { + const grpc_handshaker_factory_vtable *vtable; +}; + +void grpc_handshaker_factory_add_handshakers( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory, + const grpc_channel_args *args, grpc_handshake_manager *handshake_mgr); + +void grpc_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory); + +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H */ diff --git a/src/core/lib/channel/handshaker_registry.c b/src/core/lib/channel/handshaker_registry.c new file mode 100644 index 0000000000..2e5f04064c --- /dev/null +++ b/src/core/lib/channel/handshaker_registry.c @@ -0,0 +1,113 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/handshaker_registry.h" + +#include <string.h> + +#include <grpc/support/alloc.h> + +// +// grpc_handshaker_factory_list +// + +typedef struct { + grpc_handshaker_factory** list; + size_t num_factories; +} grpc_handshaker_factory_list; + +static void grpc_handshaker_factory_list_register( + grpc_handshaker_factory_list* list, bool at_start, + grpc_handshaker_factory* factory) { + list->list = gpr_realloc( + list->list, (list->num_factories + 1) * sizeof(grpc_handshaker_factory*)); + if (at_start) { + memmove(list->list + 1, list->list, + sizeof(grpc_handshaker_factory*) * list->num_factories); + list->list[0] = factory; + } else { + list->list[list->num_factories] = factory; + } + ++list->num_factories; +} + +static void grpc_handshaker_factory_list_add_handshakers( + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory_list* list, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { + for (size_t i = 0; i < list->num_factories; ++i) { + grpc_handshaker_factory_add_handshakers(exec_ctx, list->list[i], args, + handshake_mgr); + } +} + +static void grpc_handshaker_factory_list_destroy( + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory_list* list) { + for (size_t i = 0; i < list->num_factories; ++i) { + grpc_handshaker_factory_destroy(exec_ctx, list->list[i]); + } + gpr_free(list->list); +} + +// +// plugin +// + +static grpc_handshaker_factory_list + g_handshaker_factory_lists[NUM_HANDSHAKER_TYPES]; + +void grpc_handshaker_factory_registry_init() { + memset(g_handshaker_factory_lists, 0, sizeof(g_handshaker_factory_lists)); +} + +void grpc_handshaker_factory_registry_shutdown(grpc_exec_ctx* exec_ctx) { + for (size_t i = 0; i < NUM_HANDSHAKER_TYPES; ++i) { + grpc_handshaker_factory_list_destroy(exec_ctx, + &g_handshaker_factory_lists[i]); + } +} + +void grpc_handshaker_factory_register(bool at_start, + grpc_handshaker_type handshaker_type, + grpc_handshaker_factory* factory) { + grpc_handshaker_factory_list_register( + &g_handshaker_factory_lists[handshaker_type], at_start, factory); +} + +void grpc_handshakers_add(grpc_exec_ctx* exec_ctx, + grpc_handshaker_type handshaker_type, + const grpc_channel_args* args, + grpc_handshake_manager* handshake_mgr) { + grpc_handshaker_factory_list_add_handshakers( + exec_ctx, &g_handshaker_factory_lists[handshaker_type], args, + handshake_mgr); +} diff --git a/src/core/lib/channel/handshaker_registry.h b/src/core/lib/channel/handshaker_registry.h new file mode 100644 index 0000000000..53c1b173af --- /dev/null +++ b/src/core/lib/channel/handshaker_registry.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H +#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/channel/handshaker_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +typedef enum { + HANDSHAKER_CLIENT = 0, + HANDSHAKER_SERVER, + NUM_HANDSHAKER_TYPES, // Must be last. +} grpc_handshaker_type; + +void grpc_handshaker_factory_registry_init(); +void grpc_handshaker_factory_registry_shutdown(grpc_exec_ctx* exec_ctx); + +/// Registers a new handshaker factory. Takes ownership. +/// If \a at_start is true, the new handshaker will be at the beginning of +/// the list. Otherwise, it will be added to the end. +void grpc_handshaker_factory_register(bool at_start, + grpc_handshaker_type handshaker_type, + grpc_handshaker_factory* factory); + +void grpc_handshakers_add(grpc_exec_ctx* exec_ctx, + grpc_handshaker_type handshaker_type, + const grpc_channel_args* args, + grpc_handshake_manager* handshake_mgr); + +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H */ diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index dbe0d25211..d154450988 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -37,6 +37,7 @@ #include <string.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/percent_encoding.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" @@ -92,13 +93,9 @@ typedef struct channel_data { size_t max_payload_size_for_get; } channel_data; -typedef struct { - grpc_call_element *elem; - grpc_exec_ctx *exec_ctx; -} client_recv_filter_args; - -static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { - client_recv_filter_args *a = user_data; +static grpc_mdelem *client_recv_filter(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_mdelem *md) { + grpc_call_element *elem = user_data; if (md == GRPC_MDELEM_STATUS_200) { return NULL; } else if (md->key == GRPC_MDSTR_STATUS) { @@ -107,18 +104,19 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { grpc_mdstr_as_c_string(md->value)); grpc_slice message = grpc_slice_from_copied_string(message_string); gpr_free(message_string); - grpc_call_element_send_close_with_message(a->exec_ctx, a->elem, + grpc_call_element_send_close_with_message(exec_ctx, elem, GRPC_STATUS_CANCELLED, &message); return NULL; } else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) { grpc_slice pct_decoded_msg = grpc_permissive_percent_decode_slice(md->value->slice); if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) { - grpc_slice_unref(pct_decoded_msg); + grpc_slice_unref_internal(exec_ctx, pct_decoded_msg); return md; } else { return grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_decoded_msg)); + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_mdstr_from_slice(exec_ctx, pct_decoded_msg)); } } else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { return NULL; @@ -147,11 +145,8 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - client_recv_filter_args a; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter, - &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata, + client_recv_filter, elem); grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, GRPC_ERROR_REF(error)); } @@ -160,11 +155,8 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - client_recv_filter_args a; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_trailing_metadata, client_recv_filter, - &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_trailing_metadata, + client_recv_filter, elem); grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata, GRPC_ERROR_REF(error)); } @@ -183,11 +175,12 @@ static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } -static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *client_strip_filter(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_mdelem *md) { /* eat the things we'd like to set ourselves */ if (md->key == GRPC_MDSTR_METHOD) return NULL; if (md->key == GRPC_MDSTR_SCHEME) return NULL; @@ -245,12 +238,15 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, message, and the payload is below the size threshold, and all the data for this request is immediately available. */ grpc_mdelem *method = GRPC_MDELEM_METHOD_POST; - calld->send_message_blocked = false; if ((op->send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && op->send_message != NULL && op->send_message->length < channeld->max_payload_size_for_get) { method = GRPC_MDELEM_METHOD_GET; + /* The following write to calld->send_message_blocked isn't racy with + reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because + being here means ops->send_message is not NULL, which is primarily + guarding the read there. */ calld->send_message_blocked = true; } else if (op->send_initial_metadata_flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { @@ -272,7 +268,7 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, /* when all the send_message data is available, then create a MDELEM and append to headers */ grpc_mdelem *payload_bin = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_PAYLOAD_BIN, + exec_ctx, GRPC_MDSTR_GRPC_PAYLOAD_BIN, grpc_mdstr_from_buffer(calld->payload_bytes, op->send_message->length)); grpc_metadata_batch_add_tail(op->send_initial_metadata, @@ -289,8 +285,8 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } } - grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter, - elem); + grpc_metadata_batch_filter(exec_ctx, op->send_initial_metadata, + client_strip_filter, elem); /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method, @@ -331,8 +327,7 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; if (op->send_message != NULL && calld->send_message_blocked) { /* Don't forward the op. send_message contains slices that aren't ready - yet. The call will be forwarded by the op_complete of slice read call. - */ + yet. The call will be forwarded by the op_complete of slice read call. */ } else { grpc_call_next_op(exec_ctx, elem, op); } @@ -347,14 +342,20 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, calld->on_done_recv_trailing_metadata = NULL; calld->on_complete = NULL; calld->payload_bytes = NULL; + calld->send_message_blocked = false; grpc_slice_buffer_init(&calld->slices); grpc_closure_init(&calld->hc_on_recv_initial_metadata, - hc_on_recv_initial_metadata, elem); + hc_on_recv_initial_metadata, elem, + grpc_schedule_on_exec_ctx); grpc_closure_init(&calld->hc_on_recv_trailing_metadata, - hc_on_recv_trailing_metadata, elem); - grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem); - grpc_closure_init(&calld->got_slice, got_slice, elem); - grpc_closure_init(&calld->send_done, send_done, elem); + hc_on_recv_trailing_metadata, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->got_slice, got_slice, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->send_done, send_done, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -363,7 +364,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, void *ignored) { call_data *calld = elem->call_data; - grpc_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); } static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { @@ -454,9 +455,9 @@ static grpc_mdstr *user_agent_from_args(const grpc_channel_args *args, } /* Constructor for channel_data */ -static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; GPR_ASSERT(!args->is_last); GPR_ASSERT(args->optional_transport != NULL); @@ -464,16 +465,17 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, chand->max_payload_size_for_get = max_payload_size_from_args(args->channel_args); chand->user_agent = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_USER_AGENT, + exec_ctx, GRPC_MDSTR_USER_AGENT, user_agent_from_args(args->channel_args, args->optional_transport->vtable->name)); + return GRPC_ERROR_NONE; } /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - GRPC_MDELEM_UNREF(chand->user_agent); + GRPC_MDELEM_UNREF(exec_ctx, chand->user_agent); } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index b42ff06039..f508231238 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -38,6 +38,7 @@ #include <string.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/percent_encoding.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" #define EXPECTED_CONTENT_TYPE "application/grpc" @@ -82,31 +83,28 @@ typedef struct call_data { typedef struct channel_data { uint8_t unused; } channel_data; -typedef struct { - grpc_call_element *elem; - grpc_exec_ctx *exec_ctx; -} server_filter_args; - -static grpc_mdelem *server_filter_outgoing_metadata(void *user_data, +static grpc_mdelem *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_mdelem *md) { if (md->key == GRPC_MDSTR_GRPC_MESSAGE) { grpc_slice pct_encoded_msg = grpc_percent_encode_slice( md->value->slice, grpc_compatible_percent_encoding_unreserved_bytes); if (grpc_slice_is_equivalent(pct_encoded_msg, md->value->slice)) { - grpc_slice_unref(pct_encoded_msg); + grpc_slice_unref_internal(exec_ctx, pct_encoded_msg); return md; } else { return grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_encoded_msg)); + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_mdstr_from_slice(exec_ctx, pct_encoded_msg)); } } else { return md; } } -static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { - server_filter_args *a = user_data; - grpc_call_element *elem = a->elem; +static grpc_mdelem *server_filter(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_mdelem *md) { + grpc_call_element *elem = user_data; call_data *calld = elem->call_data; /* Check if it is one of the headers we care about. */ @@ -157,7 +155,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* swallow it and error everything out. */ /* TODO(klempner): We ought to generate more descriptive error messages on the wire here. */ - grpc_call_element_send_cancel(a->exec_ctx, elem); + grpc_call_element_send_cancel(exec_ctx, elem); return NULL; } else if (md->key == GRPC_MDSTR_PATH) { if (calld->seen_path) { @@ -173,7 +171,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* translate host to :authority since :authority may be omitted */ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value)); + exec_ctx, GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value)); calld->seen_authority = 1; return authority; } else if (md->key == GRPC_MDSTR_GRPC_PAYLOAD_BIN) { @@ -181,7 +179,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { header field */ calld->seen_payload_bin = 1; grpc_slice_buffer_add(&calld->read_slice_buffer, - grpc_slice_ref(md->value->slice)); + grpc_slice_ref_internal(md->value->slice)); grpc_slice_buffer_stream_init(&calld->read_stream, &calld->read_slice_buffer, 0); return NULL; @@ -195,10 +193,8 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (err == GRPC_ERROR_NONE) { - server_filter_args a; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata, + server_filter, elem); /* Have we seen the required http2 transport headers? (:method, :scheme, content-type, with :path and :authority covered at the channel level right now) */ @@ -310,9 +306,8 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } if (op->send_trailing_metadata) { - server_filter_args a = {elem, exec_ctx}; - grpc_metadata_batch_filter(op->send_trailing_metadata, - server_filter_outgoing_metadata, &a); + grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata, + server_filter_outgoing_metadata, elem); } } @@ -334,9 +329,12 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; /* initialize members */ memset(calld, 0, sizeof(*calld)); - grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); - grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem); - grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem); + grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, + grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&calld->read_slice_buffer); return GRPC_ERROR_NONE; } @@ -346,14 +344,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, void *ignored) { call_data *calld = elem->call_data; - grpc_slice_buffer_destroy(&calld->read_slice_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer); } /* Constructor for channel_data */ -static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { GPR_ASSERT(!args->is_last); + return GRPC_ERROR_NONE; } /* Destructor for channel data */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 1331fe1c65..862090b371 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -34,16 +34,14 @@ #include <limits.h> #include <string.h> +#include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/transport/method_config.h" - -#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited. -// The protobuf library will (by default) start warning at 100 megs. -#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) +#include "src/core/lib/support/string.h" +#include "src/core/lib/transport/service_config.h" typedef struct message_size_limits { int max_send_size; @@ -56,30 +54,33 @@ static void* message_size_limits_copy(void* value) { return new_value; } -static int message_size_limits_cmp(void* value1, void* value2) { - const message_size_limits* v1 = value1; - const message_size_limits* v2 = value2; - if (v1->max_send_size > v2->max_send_size) return 1; - if (v1->max_send_size < v2->max_send_size) return -1; - if (v1->max_recv_size > v2->max_recv_size) return 1; - if (v1->max_recv_size < v2->max_recv_size) return -1; - return 0; +static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) { + gpr_free(value); } static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = { - gpr_free, message_size_limits_copy, message_size_limits_cmp}; + message_size_limits_free, message_size_limits_copy}; -static void* method_config_convert_value( - const grpc_method_config* method_config) { +static void* message_size_limits_create_from_json(const grpc_json* json) { + int max_request_message_bytes = -1; + int max_response_message_bytes = -1; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) continue; + if (strcmp(field->key, "maxRequestMessageBytes") == 0) { + if (max_request_message_bytes >= 0) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + max_request_message_bytes = gpr_parse_nonnegative_int(field->value); + if (max_request_message_bytes == -1) return NULL; + } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) { + if (max_response_message_bytes >= 0) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + max_response_message_bytes = gpr_parse_nonnegative_int(field->value); + if (max_response_message_bytes == -1) return NULL; + } + } message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); - const int32_t* max_request_message_bytes = - grpc_method_config_get_max_request_message_bytes(method_config); - value->max_send_size = - max_request_message_bytes != NULL ? *max_request_message_bytes : -1; - const int32_t* max_response_message_bytes = - grpc_method_config_get_max_response_message_bytes(method_config); - value->max_recv_size = - max_response_message_bytes != NULL ? *max_response_message_bytes : -1; + value->max_send_size = max_request_message_bytes; + value->max_recv_size = max_response_message_bytes; return value; } @@ -127,7 +128,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, gpr_free(message_string); } // Invoke the next callback. - grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL); + grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error); } // Start transport stream op. @@ -163,7 +164,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, channel_data* chand = elem->channel_data; call_data* calld = elem->call_data; calld->next_recv_message_ready = NULL; - grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem); + grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem, + grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -171,8 +173,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, calld->max_send_size = chand->max_send_size; calld->max_recv_size = chand->max_recv_size; if (chand->method_limit_table != NULL) { - message_size_limits* limits = - grpc_method_config_table_get(chand->method_limit_table, args->path); + message_size_limits* limits = grpc_method_config_table_get( + exec_ctx, chand->method_limit_table, args->path); if (limits != NULL) { if (limits->max_send_size >= 0 && (limits->max_send_size < calld->max_send_size || @@ -195,26 +197,26 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void* ignored) {} // Constructor for channel_data. -static void init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, - grpc_channel_element_args* args) { +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); channel_data* chand = elem->channel_data; memset(chand, 0, sizeof(*chand)); - chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH; - chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH; + chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH; + chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH; for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, - INT_MAX}; + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; chand->max_send_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, - INT_MAX}; + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; chand->max_recv_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } @@ -223,18 +225,25 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, const grpc_arg* channel_arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); if (channel_arg != NULL) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); - chand->method_limit_table = grpc_method_config_table_convert( - (grpc_method_config_table*)channel_arg->value.pointer.p, - method_config_convert_value, &message_size_limits_vtable); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_service_config* service_config = + grpc_service_config_create(channel_arg->value.string); + if (service_config != NULL) { + chand->method_limit_table = + grpc_service_config_create_method_config_table( + exec_ctx, service_config, message_size_limits_create_from_json, + &message_size_limits_vtable); + grpc_service_config_destroy(service_config); + } } + return GRPC_ERROR_NONE; } // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) { channel_data* chand = elem->channel_data; - grpc_mdstr_hash_table_unref(chand->method_limit_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_limit_table); } const grpc_channel_filter grpc_message_size_filter = { |