diff options
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r-- | src/core/lib/channel/channel_args.c | 10 | ||||
-rw-r--r-- | src/core/lib/channel/channel_args.h | 5 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 4 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack_builder.c | 12 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack_builder.h | 6 | ||||
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 22 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.c | 3 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.h | 3 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 3 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 49 | ||||
-rw-r--r-- | src/core/lib/channel/http_server_filter.c | 39 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 14 |
12 files changed, 88 insertions, 82 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 401a2ad4fe..1a099ac437 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -184,7 +184,7 @@ grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a) { return b; } -void grpc_channel_args_destroy(grpc_channel_args *a) { +void grpc_channel_args_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_args *a) { size_t i; if (!a) return; for (i = 0; i < a->num_args; i++) { @@ -195,7 +195,8 @@ void grpc_channel_args_destroy(grpc_channel_args *a) { case GRPC_ARG_INTEGER: break; case GRPC_ARG_POINTER: - a->args[i].value.pointer.vtable->destroy(a->args[i].value.pointer.p); + a->args[i].value.pointer.vtable->destroy(exec_ctx, + a->args[i].value.pointer.p); break; } gpr_free(a->args[i].key); @@ -249,7 +250,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a, } grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) { + grpc_exec_ctx *exec_ctx, grpc_channel_args **a, + grpc_compression_algorithm algorithm, int state) { int *states_arg = NULL; grpc_channel_args *result = *a; const int states_arg_found = @@ -282,7 +284,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); } result = grpc_channel_args_copy_and_add(*a, &tmp, 1); - grpc_channel_args_destroy(*a); + grpc_channel_args_destroy(exec_ctx, *a); *a = result; } return result; diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 88fc0e37a3..5c7d31f8bb 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -68,7 +68,7 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, const grpc_channel_args *b); /** Destroy arguments created by \a grpc_channel_args_copy */ -void grpc_channel_args_destroy(grpc_channel_args *a); +void grpc_channel_args_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_args *a); /** Returns the compression algorithm set in \a a. */ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( @@ -88,7 +88,8 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( * modified to point to the returned instance (which may be different from the * input value of \a a). */ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled); + grpc_exec_ctx *exec_ctx, grpc_channel_args **a, + grpc_compression_algorithm algorithm, int enabled); /** Returns the bitset representing the support state (true for enabled, false * for disabled) for compression algorithms. diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index cddd84fcca..8f08b427fb 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -310,7 +310,7 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, memset(op, 0, sizeof(*op)); op->on_complete = grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); - grpc_transport_stream_op_add_cancellation_with_message(op, status, + grpc_transport_stream_op_add_cancellation_with_message(exec_ctx, op, status, optional_message); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } @@ -323,6 +323,6 @@ void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, memset(op, 0, sizeof(*op)); op->on_complete = grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); - grpc_transport_stream_op_add_close(op, status, optional_message); + grpc_transport_stream_op_add_close(exec_ctx, op, status, optional_message); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c index b959517afb..5f9e3b4539 100644 --- a/src/core/lib/channel/channel_stack_builder.c +++ b/src/core/lib/channel/channel_stack_builder.c @@ -138,9 +138,10 @@ void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder, } void grpc_channel_stack_builder_set_channel_arguments( - grpc_channel_stack_builder *builder, const grpc_channel_args *args) { + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + const grpc_channel_args *args) { if (builder->args != NULL) { - grpc_channel_args_destroy(builder->args); + grpc_channel_args_destroy(exec_ctx, builder->args); } builder->args = grpc_channel_args_copy(args); } @@ -213,7 +214,8 @@ bool grpc_channel_stack_builder_add_filter_after( return true; } -void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) { +void grpc_channel_stack_builder_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder) { filter_node *p = builder->begin.next; while (p != &builder->end) { filter_node *next = p->next; @@ -221,7 +223,7 @@ void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) { p = next; } if (builder->args != NULL) { - grpc_channel_args_destroy(builder->args); + grpc_channel_args_destroy(exec_ctx, builder->args); } gpr_free(builder->target); gpr_free(builder); @@ -276,7 +278,7 @@ grpc_error *grpc_channel_stack_builder_finish( } } - grpc_channel_stack_builder_destroy(builder); + grpc_channel_stack_builder_destroy(exec_ctx, builder); gpr_free((grpc_channel_filter **)filters); return error; diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index 65bfebcabc..8adf38e27b 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -73,7 +73,8 @@ grpc_transport *grpc_channel_stack_builder_get_transport( /// Set channel arguments: copies args void grpc_channel_stack_builder_set_channel_arguments( - grpc_channel_stack_builder *builder, const grpc_channel_args *args); + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, + const grpc_channel_args *args); /// Return a borrowed pointer to the channel arguments const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments( @@ -157,7 +158,8 @@ grpc_error *grpc_channel_stack_builder_finish( void *destroy_arg, void **result); /// Destroy the builder without creating a channel stack -void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder); +void grpc_channel_stack_builder_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder); extern int grpc_trace_channel_stack_builder; diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 35455d4908..337c194b79 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -44,6 +44,7 @@ #include "src/core/lib/compression/algorithm_metadata.h" #include "src/core/lib/compression/message_compress.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -82,7 +83,8 @@ typedef struct channel_data { /** For each \a md element from the incoming metadata, filter out the entry for * "grpc-encoding", using its value to populate the call data's * compression_algorithm field. */ -static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *compression_md_filter(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -130,12 +132,14 @@ static int skip_compression(grpc_call_element *elem, uint32_t flags) { /** Filter initial metadata */ static void process_send_initial_metadata( - grpc_call_element *elem, grpc_metadata_batch *initial_metadata) { + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; /* Parse incoming request for compression. If any, it'll be available * at calld->compression_algorithm */ - grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem); + grpc_metadata_batch_filter(exec_ctx, initial_metadata, compression_md_filter, + elem); if (!calld->has_compression_algorithm) { /* If no algorithm was found in the metadata and we aren't * exceptionally skipping compression, fall back to the channel @@ -161,7 +165,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } @@ -171,8 +175,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, int did_compress; grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); - did_compress = - grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); + did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, + &calld->slices, &tmp); if (did_compress) { if (grpc_compression_trace) { char *algo_name; @@ -199,7 +203,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, } } - grpc_slice_buffer_destroy(&tmp); + grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); @@ -243,7 +247,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); if (op->send_initial_metadata) { - process_send_initial_metadata(elem, op->send_initial_metadata); + process_send_initial_metadata(exec_ctx, elem, op->send_initial_metadata); } if (op->send_message != NULL && !skip_compression(elem, op->send_message->flags)) { @@ -283,7 +287,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, void *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - grpc_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); } /* Constructor for channel_data */ diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index c2a36b5558..ccc0619e1c 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -171,7 +171,8 @@ static void bind_transport(grpc_channel_stack *channel_stack, channel_stack->call_stack_size += grpc_transport_stream_size(t); } -bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, +bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg_must_be_null) { GPR_ASSERT(arg_must_be_null == NULL); grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h index 3142d647b7..3585c0ecbc 100644 --- a/src/core/lib/channel/connected_channel.h +++ b/src/core/lib/channel/connected_channel.h @@ -36,7 +36,8 @@ #include "src/core/lib/channel/channel_stack_builder.h" -bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, +bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg_must_be_null); #endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 902ebf1955..8dd6d099e1 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/slice/slice_internal.h" // // grpc_deadline_state @@ -58,7 +59,7 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_slice msg = grpc_slice_from_static_string("Deadline Exceeded"); grpc_call_element_send_cancel_with_message( exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg); - grpc_slice_unref(msg); + grpc_slice_unref_internal(exec_ctx, msg); } GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index c37cab3939..d154450988 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -37,6 +37,7 @@ #include <string.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/percent_encoding.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" @@ -92,13 +93,9 @@ typedef struct channel_data { size_t max_payload_size_for_get; } channel_data; -typedef struct { - grpc_call_element *elem; - grpc_exec_ctx *exec_ctx; -} client_recv_filter_args; - -static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { - client_recv_filter_args *a = user_data; +static grpc_mdelem *client_recv_filter(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_mdelem *md) { + grpc_call_element *elem = user_data; if (md == GRPC_MDELEM_STATUS_200) { return NULL; } else if (md->key == GRPC_MDSTR_STATUS) { @@ -107,18 +104,19 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { grpc_mdstr_as_c_string(md->value)); grpc_slice message = grpc_slice_from_copied_string(message_string); gpr_free(message_string); - grpc_call_element_send_close_with_message(a->exec_ctx, a->elem, + grpc_call_element_send_close_with_message(exec_ctx, elem, GRPC_STATUS_CANCELLED, &message); return NULL; } else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) { grpc_slice pct_decoded_msg = grpc_permissive_percent_decode_slice(md->value->slice); if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) { - grpc_slice_unref(pct_decoded_msg); + grpc_slice_unref_internal(exec_ctx, pct_decoded_msg); return md; } else { return grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_decoded_msg)); + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_mdstr_from_slice(exec_ctx, pct_decoded_msg)); } } else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { return NULL; @@ -147,11 +145,8 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - client_recv_filter_args a; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter, - &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata, + client_recv_filter, elem); grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, GRPC_ERROR_REF(error)); } @@ -160,11 +155,8 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - client_recv_filter_args a; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_trailing_metadata, client_recv_filter, - &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_trailing_metadata, + client_recv_filter, elem); grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata, GRPC_ERROR_REF(error)); } @@ -183,11 +175,12 @@ static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } -static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *client_strip_filter(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_mdelem *md) { /* eat the things we'd like to set ourselves */ if (md->key == GRPC_MDSTR_METHOD) return NULL; if (md->key == GRPC_MDSTR_SCHEME) return NULL; @@ -275,7 +268,7 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, /* when all the send_message data is available, then create a MDELEM and append to headers */ grpc_mdelem *payload_bin = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_PAYLOAD_BIN, + exec_ctx, GRPC_MDSTR_GRPC_PAYLOAD_BIN, grpc_mdstr_from_buffer(calld->payload_bytes, op->send_message->length)); grpc_metadata_batch_add_tail(op->send_initial_metadata, @@ -292,8 +285,8 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } } - grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter, - elem); + grpc_metadata_batch_filter(exec_ctx, op->send_initial_metadata, + client_strip_filter, elem); /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method, @@ -371,7 +364,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, void *ignored) { call_data *calld = elem->call_data; - grpc_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); } static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { @@ -472,7 +465,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, chand->max_payload_size_for_get = max_payload_size_from_args(args->channel_args); chand->user_agent = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_USER_AGENT, + exec_ctx, GRPC_MDSTR_USER_AGENT, user_agent_from_args(args->channel_args, args->optional_transport->vtable->name)); return GRPC_ERROR_NONE; @@ -482,7 +475,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - GRPC_MDELEM_UNREF(chand->user_agent); + GRPC_MDELEM_UNREF(exec_ctx, chand->user_agent); } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index a6d720530a..f508231238 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -38,6 +38,7 @@ #include <string.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/percent_encoding.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" #define EXPECTED_CONTENT_TYPE "application/grpc" @@ -82,31 +83,28 @@ typedef struct call_data { typedef struct channel_data { uint8_t unused; } channel_data; -typedef struct { - grpc_call_element *elem; - grpc_exec_ctx *exec_ctx; -} server_filter_args; - -static grpc_mdelem *server_filter_outgoing_metadata(void *user_data, +static grpc_mdelem *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_mdelem *md) { if (md->key == GRPC_MDSTR_GRPC_MESSAGE) { grpc_slice pct_encoded_msg = grpc_percent_encode_slice( md->value->slice, grpc_compatible_percent_encoding_unreserved_bytes); if (grpc_slice_is_equivalent(pct_encoded_msg, md->value->slice)) { - grpc_slice_unref(pct_encoded_msg); + grpc_slice_unref_internal(exec_ctx, pct_encoded_msg); return md; } else { return grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_encoded_msg)); + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_mdstr_from_slice(exec_ctx, pct_encoded_msg)); } } else { return md; } } -static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { - server_filter_args *a = user_data; - grpc_call_element *elem = a->elem; +static grpc_mdelem *server_filter(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_mdelem *md) { + grpc_call_element *elem = user_data; call_data *calld = elem->call_data; /* Check if it is one of the headers we care about. */ @@ -157,7 +155,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* swallow it and error everything out. */ /* TODO(klempner): We ought to generate more descriptive error messages on the wire here. */ - grpc_call_element_send_cancel(a->exec_ctx, elem); + grpc_call_element_send_cancel(exec_ctx, elem); return NULL; } else if (md->key == GRPC_MDSTR_PATH) { if (calld->seen_path) { @@ -173,7 +171,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* translate host to :authority since :authority may be omitted */ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value)); + exec_ctx, GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value)); calld->seen_authority = 1; return authority; } else if (md->key == GRPC_MDSTR_GRPC_PAYLOAD_BIN) { @@ -181,7 +179,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { header field */ calld->seen_payload_bin = 1; grpc_slice_buffer_add(&calld->read_slice_buffer, - grpc_slice_ref(md->value->slice)); + grpc_slice_ref_internal(md->value->slice)); grpc_slice_buffer_stream_init(&calld->read_stream, &calld->read_slice_buffer, 0); return NULL; @@ -195,10 +193,8 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (err == GRPC_ERROR_NONE) { - server_filter_args a; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata, + server_filter, elem); /* Have we seen the required http2 transport headers? (:method, :scheme, content-type, with :path and :authority covered at the channel level right now) */ @@ -310,9 +306,8 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } if (op->send_trailing_metadata) { - server_filter_args a = {elem, exec_ctx}; - grpc_metadata_batch_filter(op->send_trailing_metadata, - server_filter_outgoing_metadata, &a); + grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata, + server_filter_outgoing_metadata, elem); } } @@ -349,7 +344,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, void *ignored) { call_data *calld = elem->call_data; - grpc_slice_buffer_destroy(&calld->read_slice_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer); } /* Constructor for channel_data */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index b5e882de52..862090b371 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -54,8 +54,12 @@ static void* message_size_limits_copy(void* value) { return new_value; } +static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) { + gpr_free(value); +} + static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = { - gpr_free, message_size_limits_copy}; + message_size_limits_free, message_size_limits_copy}; static void* message_size_limits_create_from_json(const grpc_json* json) { int max_request_message_bytes = -1; @@ -169,8 +173,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, calld->max_send_size = chand->max_send_size; calld->max_recv_size = chand->max_recv_size; if (chand->method_limit_table != NULL) { - message_size_limits* limits = - grpc_method_config_table_get(chand->method_limit_table, args->path); + message_size_limits* limits = grpc_method_config_table_get( + exec_ctx, chand->method_limit_table, args->path); if (limits != NULL) { if (limits->max_send_size >= 0 && (limits->max_send_size < calld->max_send_size || @@ -227,7 +231,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, if (service_config != NULL) { chand->method_limit_table = grpc_service_config_create_method_config_table( - service_config, message_size_limits_create_from_json, + exec_ctx, service_config, message_size_limits_create_from_json, &message_size_limits_vtable); grpc_service_config_destroy(service_config); } @@ -239,7 +243,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) { channel_data* chand = elem->channel_data; - grpc_mdstr_hash_table_unref(chand->method_limit_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_limit_table); } const grpc_channel_filter grpc_message_size_filter = { |