diff options
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r-- | src/core/lib/channel/channel_args.c | 28 | ||||
-rw-r--r-- | src/core/lib/channel/channel_args.h | 5 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack_builder.c | 11 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack_builder.h | 4 | ||||
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 353 | ||||
-rw-r--r-- | src/core/lib/channel/compress_filter.h | 67 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 348 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.h | 102 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 601 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.h | 47 | ||||
-rw-r--r-- | src/core/lib/channel/http_server_filter.c | 445 | ||||
-rw-r--r-- | src/core/lib/channel/http_server_filter.h | 42 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 270 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.h | 39 |
14 files changed, 42 insertions, 2320 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index a6d124c719..3de31d99da 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -329,7 +329,9 @@ const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, return NULL; } -int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) { +int grpc_channel_arg_get_integer(const grpc_arg *arg, + grpc_integer_options options) { + if (arg == NULL) return options.default_value; if (arg->type != GRPC_ARG_INTEGER) { gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key); return options.default_value; @@ -347,9 +349,25 @@ int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) { return arg->value.integer; } +bool grpc_channel_arg_get_bool(const grpc_arg *arg, bool default_value) { + if (arg == NULL) return default_value; + if (arg->type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key); + return default_value; + } + switch (arg->value.integer) { + case 0: + return false; + case 1: + return true; + default: + gpr_log(GPR_ERROR, "%s treated as bool but set to %d (assuming true)", + arg->key, arg->value.integer); + return true; + } +} + bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args) { - const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_MINIMAL_STACK); - if (arg == NULL) return false; - if (arg->type == GRPC_ARG_INTEGER && arg->value.integer == 0) return false; - return true; + return grpc_channel_arg_get_bool( + grpc_channel_args_find(args, GRPC_ARG_MINIMAL_STACK), false); } diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 158cda5b21..5ffcacb7fd 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -121,6 +121,9 @@ typedef struct grpc_integer_options { int max_value; } grpc_integer_options; /** Returns the value of \a arg, subject to the contraints in \a options. */ -int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options); +int grpc_channel_arg_get_integer(const grpc_arg *arg, + grpc_integer_options options); + +bool grpc_channel_arg_get_bool(const grpc_arg *arg, bool default_value); #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c index b515b7321a..88c02edb70 100644 --- a/src/core/lib/channel/channel_stack_builder.c +++ b/src/core/lib/channel/channel_stack_builder.c @@ -113,6 +113,17 @@ grpc_channel_stack_builder_create_iterator_at_last( return create_iterator_at_filter_node(builder, &builder->end); } +bool grpc_channel_stack_builder_iterator_is_end( + grpc_channel_stack_builder_iterator *iterator) { + return iterator->node == &iterator->builder->end; +} + +const char *grpc_channel_stack_builder_iterator_filter_name( + grpc_channel_stack_builder_iterator *iterator) { + if (iterator->node->filter == NULL) return NULL; + return iterator->node->filter->name; +} + bool grpc_channel_stack_builder_move_next( grpc_channel_stack_builder_iterator *iterator) { if (iterator->node == &iterator->builder->end) return false; diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index 8adf38e27b..c78111b00d 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -98,6 +98,10 @@ bool grpc_channel_stack_builder_iterator_is_first( bool grpc_channel_stack_builder_iterator_is_end( grpc_channel_stack_builder_iterator *iterator); +/// What is the name of the filter at this iterator position? +const char *grpc_channel_stack_builder_iterator_filter_name( + grpc_channel_stack_builder_iterator *iterator); + /// Move an iterator to the next item bool grpc_channel_stack_builder_move_next( grpc_channel_stack_builder_iterator *iterator); diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c deleted file mode 100644 index 4625cba0d2..0000000000 --- a/src/core/lib/channel/compress_filter.c +++ /dev/null @@ -1,353 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <assert.h> -#include <string.h> - -#include <grpc/compression.h> -#include <grpc/slice_buffer.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/compress_filter.h" -#include "src/core/lib/compression/algorithm_metadata.h" -#include "src/core/lib/compression/message_compress.h" -#include "src/core/lib/profiling/timers.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" -#include "src/core/lib/transport/static_metadata.h" - -int grpc_compression_trace = 0; - -typedef struct call_data { - grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ - grpc_linked_mdelem compression_algorithm_storage; - grpc_linked_mdelem accept_encoding_storage; - uint32_t remaining_slice_bytes; - /** Compression algorithm we'll try to use. It may be given by incoming - * metadata, or by the channel's default compression settings. */ - grpc_compression_algorithm compression_algorithm; - /** If true, contents of \a compression_algorithm are authoritative */ - int has_compression_algorithm; - - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; - grpc_slice_buffer_stream replacement_stream; - grpc_closure *post_send; - grpc_closure send_done; - grpc_closure got_slice; -} call_data; - -typedef struct channel_data { - /** The default, channel-level, compression algorithm */ - grpc_compression_algorithm default_compression_algorithm; - /** Bitset of enabled algorithms */ - uint32_t enabled_algorithms_bitset; - /** Supported compression algorithms */ - uint32_t supported_compression_algorithms; -} channel_data; - -static int skip_compression(grpc_call_element *elem, uint32_t flags) { - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { - return 1; - } - if (calld->has_compression_algorithm) { - if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { - return 1; - } - return 0; /* we have an actual call-specific algorithm */ - } - /* no per-call compression override */ - return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; -} - -/** Filter initial metadata */ -static grpc_error *process_send_initial_metadata( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata) GRPC_MUST_USE_RESULT; -static grpc_error *process_send_initial_metadata( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata) { - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - /* Parse incoming request for compression. If any, it'll be available - * at calld->compression_algorithm */ - if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) { - grpc_mdelem md = - initial_metadata->idx.named.grpc_internal_encoding_request->md; - if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), - &calld->compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); - gpr_log(GPR_ERROR, - "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); - gpr_free(val); - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - if (!GPR_BITGET(channeld->enabled_algorithms_bitset, - calld->compression_algorithm)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); - gpr_log(GPR_ERROR, - "Invalid compression algorithm: '%s' (previously disabled). " - "Ignoring.", - val); - gpr_free(val); - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - calld->has_compression_algorithm = 1; - - grpc_metadata_batch_remove( - exec_ctx, initial_metadata, - initial_metadata->idx.named.grpc_internal_encoding_request); - } else { - /* If no algorithm was found in the metadata and we aren't - * exceptionally skipping compression, fall back to the channel - * default */ - calld->compression_algorithm = channeld->default_compression_algorithm; - calld->has_compression_algorithm = 1; /* GPR_TRUE */ - } - - grpc_error *error = GRPC_ERROR_NONE; - /* hint compression algorithm */ - if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { - error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->compression_algorithm_storage, - grpc_compression_encoding_mdelem(calld->compression_algorithm)); - } - - if (error != GRPC_ERROR_NONE) return error; - - /* convey supported compression algorithms */ - error = grpc_metadata_batch_add_tail( - exec_ctx, initial_metadata, &calld->accept_encoding_storage, - GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( - channeld->supported_compression_algorithms)); - - return error; -} - -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem); - -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); -} - -static void finish_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = elem->call_data; - int did_compress; - grpc_slice_buffer tmp; - grpc_slice_buffer_init(&tmp); - did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); - if (did_compress) { - if (grpc_compression_trace) { - char *algo_name; - const size_t before_size = calld->slices.length; - const size_t after_size = tmp.length; - const float savings_ratio = 1.0f - (float)after_size / (float)before_size; - GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, - &algo_name)); - gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR - " bytes (%.2f%% savings)", - algo_name, before_size, after_size, 100 * savings_ratio); - } - grpc_slice_buffer_swap(&calld->slices, &tmp); - calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; - } else { - if (grpc_compression_trace) { - char *algo_name; - GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, - &algo_name)); - gpr_log(GPR_DEBUG, - "Algorithm '%s' enabled but decided not to compress. Input size: " - "%" PRIuPTR, - algo_name, calld->slices.length); - } - } - - grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); - - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = - &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - - grpc_call_next_op(exec_ctx, elem, calld->send_op); -} - -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - finish_send_message(exec_ctx, elem); - } else { - continue_send_message(exec_ctx, elem); - } -} - -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = elem->call_data; - while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, - &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - finish_send_message(exec_ctx, elem); - break; - } - } -} - -static void compress_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - call_data *calld = elem->call_data; - - GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); - - if (op->send_initial_metadata) { - grpc_error *error = process_send_initial_metadata( - exec_ctx, elem, - op->payload->send_initial_metadata.send_initial_metadata); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; - } - } - if (op->send_message && - !skip_compression(elem, op->payload->send_message.send_message->flags)) { - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); - } else { - /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); - } - - GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); -} - -/* Constructor for call_data */ -static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_element_args *args) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - - /* initialize members */ - grpc_slice_buffer_init(&calld->slices); - calld->has_compression_algorithm = 0; - grpc_closure_init(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->send_done, send_done, elem, - grpc_schedule_on_exec_ctx); - - return GRPC_ERROR_NONE; -} - -/* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_final_info *final_info, - grpc_closure *ignored) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); -} - -/* Constructor for channel_data */ -static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *channeld = elem->channel_data; - - channeld->enabled_algorithms_bitset = - grpc_channel_args_compression_algorithm_get_states(args->channel_args); - - channeld->default_compression_algorithm = - grpc_channel_args_get_compression_algorithm(args->channel_args); - /* Make sure the default isn't disabled. */ - if (!GPR_BITGET(channeld->enabled_algorithms_bitset, - channeld->default_compression_algorithm)) { - gpr_log(GPR_DEBUG, - "compression algorithm %d not enabled: switching to none", - channeld->default_compression_algorithm); - channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; - } - - channeld->supported_compression_algorithms = 1; /* always support identity */ - for (grpc_compression_algorithm algo_idx = 1; - algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - /* skip disabled algorithms */ - if (!GPR_BITGET(channeld->enabled_algorithms_bitset, algo_idx)) { - continue; - } - channeld->supported_compression_algorithms |= 1u << algo_idx; - } - - GPR_ASSERT(!args->is_last); - return GRPC_ERROR_NONE; -} - -/* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) {} - -const grpc_channel_filter grpc_compress_filter = { - compress_start_transport_stream_op_batch, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - grpc_channel_next_get_info, - "compress"}; diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h deleted file mode 100644 index e4a2a829d5..0000000000 --- a/src/core/lib/channel/compress_filter.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H -#define GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H - -#include <grpc/impl/codegen/compression_types.h> - -#include "src/core/lib/channel/channel_stack.h" - -extern int grpc_compression_trace; - -/** Compression filter for outgoing data. - * - * See <grpc/compression.h> for the available compression settings. - * - * Compression settings may come from: - * - Channel configuration, as established at channel creation time. - * - The metadata accompanying the outgoing data to be compressed. This is - * taken as a request only. We may choose not to honor it. The metadata key - * is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY. - * - * Compression can be disabled for concrete messages (for instance in order to - * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in - * the BEGIN_MESSAGE flags. - * - * The attempted compression mechanism is added to the resulting initial - * metadata under the'grpc-encoding' key. - * - * If compression is actually performed, BEGIN_MESSAGE's flag is modified to - * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the - * aforementioned 'grpc-encoding' metadata value, data will pass through - * uncompressed. */ - -extern const grpc_channel_filter grpc_compress_filter; - -#endif /* GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H */ diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c deleted file mode 100644 index fda099b021..0000000000 --- a/src/core/lib/channel/deadline_filter.c +++ /dev/null @@ -1,348 +0,0 @@ -// -// Copyright 2016, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#include "src/core/lib/channel/deadline_filter.h" - -#include <stdbool.h> -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/time.h> - -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/slice/slice_internal.h" - -// -// grpc_deadline_state -// - -// Timer callback. -static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - grpc_call_element* elem = arg; - grpc_deadline_state* deadline_state = elem->call_data; - if (error != GRPC_ERROR_CANCELLED) { - grpc_call_element_signal_error( - exec_ctx, elem, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED)); - } - GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); -} - -// Starts the deadline timer. -static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - gpr_timespec deadline) { - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { - return; - } - grpc_deadline_state* deadline_state = elem->call_data; - grpc_deadline_timer_state cur_state; - grpc_closure* closure = NULL; -retry: - cur_state = - (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state); - switch (cur_state) { - case GRPC_DEADLINE_STATE_PENDING: - // Note: We do not start the timer if there is already a timer - return; - case GRPC_DEADLINE_STATE_FINISHED: - if (gpr_atm_rel_cas(&deadline_state->timer_state, - GRPC_DEADLINE_STATE_FINISHED, - GRPC_DEADLINE_STATE_PENDING)) { - // If we've already created and destroyed a timer, we always create a - // new closure: we have no other guarantee that the inlined closure is - // not in use (it may hold a pending call to timer_callback) - closure = grpc_closure_create(timer_callback, elem, - grpc_schedule_on_exec_ctx); - } else { - goto retry; - } - break; - case GRPC_DEADLINE_STATE_INITIAL: - if (gpr_atm_rel_cas(&deadline_state->timer_state, - GRPC_DEADLINE_STATE_INITIAL, - GRPC_DEADLINE_STATE_PENDING)) { - closure = - grpc_closure_init(&deadline_state->timer_callback, timer_callback, - elem, grpc_schedule_on_exec_ctx); - } else { - goto retry; - } - break; - } - GPR_ASSERT(closure); - GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, - gpr_now(GPR_CLOCK_MONOTONIC)); -} - -// Cancels the deadline timer. -static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { - if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING, - GRPC_DEADLINE_STATE_FINISHED)) { - grpc_timer_cancel(exec_ctx, &deadline_state->timer); - } else { - // timer was either in STATE_INITAL (nothing to cancel) - // OR in STATE_FINISHED (again nothing to cancel) - } -} - -// Callback run when the call is complete. -static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_deadline_state* deadline_state = arg; - cancel_timer_if_needed(exec_ctx, deadline_state); - // Invoke the next callback. - grpc_closure_run(exec_ctx, deadline_state->next_on_complete, - GRPC_ERROR_REF(error)); -} - -// Inject our own on_complete callback into op. -static void inject_on_complete_cb(grpc_deadline_state* deadline_state, - grpc_transport_stream_op_batch* op) { - deadline_state->next_on_complete = op->on_complete; - grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state, - grpc_schedule_on_exec_ctx); - op->on_complete = &deadline_state->on_complete; -} - -void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_stack* call_stack) { - grpc_deadline_state* deadline_state = elem->call_data; - deadline_state->call_stack = call_stack; -} - -void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { - grpc_deadline_state* deadline_state = elem->call_data; - cancel_timer_if_needed(exec_ctx, deadline_state); -} - -// Callback and associated state for starting the timer after call stack -// initialization has been completed. -struct start_timer_after_init_state { - grpc_call_element* elem; - gpr_timespec deadline; - grpc_closure closure; -}; -static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - struct start_timer_after_init_state* state = arg; - start_timer_if_needed(exec_ctx, state->elem, state->deadline); - gpr_free(state); -} - -void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - gpr_timespec deadline) { - // Deadline will always be infinite on servers, so the timer will only be - // set on clients with a finite deadline. - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { - // When the deadline passes, we indicate the failure by sending down - // an op with cancel_error set. However, we can't send down any ops - // until after the call stack is fully initialized. If we start the - // timer here, we have no guarantee that the timer won't pop before - // call stack initialization is finished. To avoid that problem, we - // create a closure to start the timer, and we schedule that closure - // to be run after call stack initialization is done. - struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); - state->elem = elem; - state->deadline = deadline; - grpc_closure_init(&state->closure, start_timer_after_init, state, - grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE); - } -} - -void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - gpr_timespec new_deadline) { - grpc_deadline_state* deadline_state = elem->call_data; - cancel_timer_if_needed(exec_ctx, deadline_state); - start_timer_if_needed(exec_ctx, elem, new_deadline); -} - -void grpc_deadline_state_client_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { - grpc_deadline_state* deadline_state = elem->call_data; - if (op->cancel_stream) { - cancel_timer_if_needed(exec_ctx, deadline_state); - } else { - // Make sure we know when the call is complete, so that we can cancel - // the timer. - if (op->recv_trailing_metadata) { - inject_on_complete_cb(deadline_state, op); - } - } -} - -// -// filter code -// - -// Constructor for channel_data. Used for both client and server filters. -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(!args->is_last); - return GRPC_ERROR_NONE; -} - -// Destructor for channel_data. Used for both client and server filters. -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} - -// Call data used for both client and server filter. -typedef struct base_call_data { - grpc_deadline_state deadline_state; -} base_call_data; - -// Additional call data used only for the server filter. -typedef struct server_call_data { - base_call_data base; // Must be first. - // The closure for receiving initial metadata. - grpc_closure recv_initial_metadata_ready; - // Received initial metadata batch. - grpc_metadata_batch* recv_initial_metadata; - // The original recv_initial_metadata_ready closure, which we chain to - // after our own closure is invoked. - grpc_closure* next_recv_initial_metadata_ready; -} server_call_data; - -// Constructor for call_data. Used for both client and server filters. -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - const grpc_call_element_args* args) { - grpc_deadline_state_init(exec_ctx, elem, args->call_stack); - grpc_deadline_state_start(exec_ctx, elem, args->deadline); - return GRPC_ERROR_NONE; -} - -// Destructor for call_data. Used for both client and server filters. -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - const grpc_call_final_info* final_info, - grpc_closure* ignored) { - grpc_deadline_state_destroy(exec_ctx, elem); -} - -// Method for starting a call op for client filter. -static void client_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { - grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, - op); - // Chain to next filter. - grpc_call_next_op(exec_ctx, elem, op); -} - -// Callback for receiving initial metadata on the server. -static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - grpc_call_element* elem = arg; - server_call_data* calld = elem->call_data; - // Get deadline from metadata and start the timer if needed. - start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline); - // Invoke the next callback. - calld->next_recv_initial_metadata_ready->cb( - exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error); -} - -// Method for starting a call op for server filter. -static void server_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { - server_call_data* calld = elem->call_data; - if (op->cancel_stream) { - cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); - } else { - // If we're receiving initial metadata, we need to get the deadline - // from the recv_initial_metadata_ready callback. So we inject our - // own callback into that hook. - if (op->recv_initial_metadata) { - calld->next_recv_initial_metadata_ready = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - grpc_closure_init(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->recv_initial_metadata_ready; - } - // Make sure we know when the call is complete, so that we can cancel - // the timer. - // Note that we trigger this on recv_trailing_metadata, even though - // the client never sends trailing metadata, because this is the - // hook that tells us when the call is complete on the server side. - if (op->recv_trailing_metadata) { - inject_on_complete_cb(&calld->base.deadline_state, op); - } - } - // Chain to next filter. - grpc_call_next_op(exec_ctx, elem, op); -} - -const grpc_channel_filter grpc_client_deadline_filter = { - client_start_transport_stream_op_batch, - grpc_channel_next_op, - sizeof(base_call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - 0, // sizeof(channel_data) - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - grpc_channel_next_get_info, - "deadline", -}; - -const grpc_channel_filter grpc_server_deadline_filter = { - server_start_transport_stream_op_batch, - grpc_channel_next_op, - sizeof(server_call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - 0, // sizeof(channel_data) - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - grpc_channel_next_get_info, - "deadline", -}; diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h deleted file mode 100644 index d8db9a9f97..0000000000 --- a/src/core/lib/channel/deadline_filter.h +++ /dev/null @@ -1,102 +0,0 @@ -// -// Copyright 2016, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#ifndef GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H -#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H - -#include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/iomgr/timer.h" - -typedef enum grpc_deadline_timer_state { - GRPC_DEADLINE_STATE_INITIAL, - GRPC_DEADLINE_STATE_PENDING, - GRPC_DEADLINE_STATE_FINISHED -} grpc_deadline_timer_state; - -// State used for filters that enforce call deadlines. -// Must be the first field in the filter's call_data. -typedef struct grpc_deadline_state { - // We take a reference to the call stack for the timer callback. - grpc_call_stack* call_stack; - gpr_atm timer_state; - grpc_timer timer; - grpc_closure timer_callback; - // Closure to invoke when the call is complete. - // We use this to cancel the timer. - grpc_closure on_complete; - // The original on_complete closure, which we chain to after our own - // closure is invoked. - grpc_closure* next_on_complete; -} grpc_deadline_state; - -// -// NOTE: All of these functions require that the first field in -// elem->call_data is a grpc_deadline_state. -// - -// assumes elem->call_data is zero'd -void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_stack* call_stack); -void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem); - -// Starts the timer with the specified deadline. -// Should be called from the filter's init_call_elem() method. -void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - gpr_timespec deadline); - -// Cancels the existing timer and starts a new one with new_deadline. -// -// Note: It is generally safe to call this with an earlier deadline -// value than the current one, but not the reverse. No checks are done -// to ensure that the timer callback is not invoked while it is in the -// process of being reset, which means that attempting to increase the -// deadline may result in the timer being called twice. -void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - gpr_timespec new_deadline); - -// To be called from the client-side filter's start_transport_stream_op_batch() -// method. Ensures that the deadline timer is cancelled when the call -// is completed. -// -// Note: It is the caller's responsibility to chain to the next filter if -// necessary after this function returns. -void grpc_deadline_state_client_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op); - -// Deadline filters for direct client channels and server channels. -// Note: Deadlines for non-direct client channels are handled by the -// client_channel filter. -extern const grpc_channel_filter grpc_client_deadline_filter; -extern const grpc_channel_filter grpc_server_deadline_filter; - -#endif /* GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H */ diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c deleted file mode 100644 index 4e47c5c658..0000000000 --- a/src/core/lib/channel/http_client_filter.c +++ /dev/null @@ -1,601 +0,0 @@ -/* - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/channel/http_client_filter.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <string.h> -#include "src/core/lib/profiling/timers.h" -#include "src/core/lib/slice/b64.h" -#include "src/core/lib/slice/percent_encoding.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" -#include "src/core/lib/transport/static_metadata.h" -#include "src/core/lib/transport/transport_impl.h" - -#define EXPECTED_CONTENT_TYPE "application/grpc" -#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 - -/* default maximum size of payload eligable for GET request */ -static const size_t kMaxPayloadSizeForGet = 2048; - -typedef struct call_data { - grpc_linked_mdelem method; - grpc_linked_mdelem scheme; - grpc_linked_mdelem authority; - grpc_linked_mdelem te_trailers; - grpc_linked_mdelem content_type; - grpc_linked_mdelem user_agent; - - grpc_metadata_batch *recv_initial_metadata; - grpc_metadata_batch *recv_trailing_metadata; - uint8_t *payload_bytes; - - /* Vars to read data off of send_message */ - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; - grpc_slice_buffer_stream replacement_stream; - grpc_slice_buffer slices; - /* flag that indicates that all slices of send_messages aren't availble */ - bool send_message_blocked; - - /** Closure to call when finished with the hc_on_recv hook */ - grpc_closure *on_done_recv_initial_metadata; - grpc_closure *on_done_recv_trailing_metadata; - grpc_closure *on_complete; - grpc_closure *post_send; - - /** Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member - after handling it. */ - grpc_closure hc_on_recv_initial_metadata; - grpc_closure hc_on_recv_trailing_metadata; - grpc_closure hc_on_complete; - grpc_closure got_slice; - grpc_closure send_done; -} call_data; - -typedef struct channel_data { - grpc_mdelem static_scheme; - grpc_mdelem user_agent; - size_t max_payload_size_for_get; -} channel_data; - -static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_metadata_batch *b) { - if (b->idx.named.status != NULL) { - if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.status); - } else { - char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md), - GPR_DUMP_ASCII); - char *msg; - gpr_asprintf(&msg, "Received http2 header with status: %s", val); - grpc_error *e = grpc_error_set_str( - grpc_error_set_int( - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Received http2 :status header with non-200 OK status"), - GRPC_ERROR_STR_VALUE, grpc_slice_from_copied_string(val)), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED), - GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_from_copied_string(msg)); - gpr_free(val); - gpr_free(msg); - return e; - } - } - - if (b->idx.named.grpc_message != NULL) { - grpc_slice pct_decoded_msg = grpc_permissive_percent_decode_slice( - GRPC_MDVALUE(b->idx.named.grpc_message->md)); - if (grpc_slice_is_equivalent(pct_decoded_msg, - GRPC_MDVALUE(b->idx.named.grpc_message->md))) { - grpc_slice_unref_internal(exec_ctx, pct_decoded_msg); - } else { - grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message, - pct_decoded_msg); - } - } - - if (b->idx.named.content_type != NULL) { - if (!grpc_mdelem_eq(b->idx.named.content_type->md, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)) { - if (grpc_slice_buf_start_eq(GRPC_MDVALUE(b->idx.named.content_type->md), - EXPECTED_CONTENT_TYPE, - EXPECTED_CONTENT_TYPE_LENGTH) && - (GRPC_SLICE_START_PTR(GRPC_MDVALUE( - b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] == - '+' || - GRPC_SLICE_START_PTR(GRPC_MDVALUE( - b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] == - ';')) { - /* Although the C implementation doesn't (currently) generate them, - any custom +-suffix is explicitly valid. */ - /* TODO(klempner): We should consider preallocating common values such - as +proto or +json, or at least stashing them if we see them. */ - /* TODO(klempner): Should we be surfacing this to application code? */ - } else { - /* TODO(klempner): We're currently allowing this, but we shouldn't - see it without a proxy so log for now. */ - char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.content_type->md), - GPR_DUMP_ASCII); - gpr_log(GPR_INFO, "Unexpected content-type '%s'", val); - gpr_free(val); - } - } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type); - } - - return GRPC_ERROR_NONE; -} - -static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, - void *user_data, grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (error == GRPC_ERROR_NONE) { - error = client_filter_incoming_metadata(exec_ctx, elem, - calld->recv_initial_metadata); - } else { - GRPC_ERROR_REF(error); - } - grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error); -} - -static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, - void *user_data, grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (error == GRPC_ERROR_NONE) { - error = client_filter_incoming_metadata(exec_ctx, elem, - calld->recv_trailing_metadata); - } else { - GRPC_ERROR_REF(error); - } - grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata, error); -} - -static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (calld->payload_bytes) { - gpr_free(calld->payload_bytes); - calld->payload_bytes = NULL; - } - calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error); -} - -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); -} - -static void remove_if_present(grpc_exec_ctx *exec_ctx, - grpc_metadata_batch *batch, - grpc_metadata_batch_callouts_index idx) { - if (batch->idx.array[idx] != NULL) { - grpc_metadata_batch_remove(exec_ctx, batch, batch->idx.array[idx]); - } -} - -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = elem->call_data; - uint8_t *wrptr = calld->payload_bytes; - while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, - &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { - memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), - GRPC_SLICE_LENGTH(calld->incoming_slice)); - wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - calld->send_message_blocked = false; - break; - } - } -} - -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - calld->send_message_blocked = false; - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - /* Pass down the original send_message op that was blocked.*/ - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = - &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - grpc_call_next_op(exec_ctx, elem, calld->send_op); - } else { - continue_send_message(exec_ctx, elem); - } -} - -static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - grpc_error *error; - - if (op->send_initial_metadata) { - /* Decide which HTTP VERB to use. We use GET if the request is marked - cacheable, and the operation contains both initial metadata and send - message, and the payload is below the size threshold, and all the data - for this request is immediately available. */ - grpc_mdelem method = GRPC_MDELEM_METHOD_POST; - if (op->send_message && - (op->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - op->payload->send_message.send_message->length < - channeld->max_payload_size_for_get) { - method = GRPC_MDELEM_METHOD_GET; - /* The following write to calld->send_message_blocked isn't racy with - reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because - being here means ops->send_message is not NULL, which is primarily - guarding the read there. */ - calld->send_message_blocked = true; - } else if (op->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { - method = GRPC_MDELEM_METHOD_PUT; - } - - /* Attempt to read the data from send_message and create a header field. */ - if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) { - /* allocate memory to hold the entire payload */ - calld->payload_bytes = - gpr_malloc(op->payload->send_message.send_message->length); - - /* read slices of send_message and copy into payload_bytes */ - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); - - if (calld->send_message_blocked == false) { - /* when all the send_message data is available, then modify the path - * MDELEM by appending base64 encoded query to the path */ - const int k_url_safe = 1; - const int k_multi_line = 0; - const unsigned char k_query_separator = '?'; - - grpc_slice path_slice = - GRPC_MDVALUE(op->payload->send_initial_metadata - .send_initial_metadata->idx.named.path->md); - /* sum up individual component's lengths and allocate enough memory to - * hold combined path+query */ - size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); - estimated_len++; /* for the '?' */ - estimated_len += grpc_base64_estimate_encoded_size( - op->payload->send_message.send_message->length, k_url_safe, - k_multi_line); - estimated_len += 1; /* for the trailing 0 */ - grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len); - - /* memcopy individual pieces into this slice */ - uint8_t *write_ptr = - (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice); - uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); - memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); - write_ptr += GRPC_SLICE_LENGTH(path_slice); - - *write_ptr = k_query_separator; - write_ptr++; /* for the '?' */ - - grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes, - op->payload->send_message.send_message->length, - k_url_safe, k_multi_line); - - /* remove trailing unused memory and add trailing 0 to terminate string - */ - char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); - /* safe to use strlen since base64_encode will always add '\0' */ - size_t path_length = strlen(t) + 1; - *(t + path_length) = '\0'; - path_with_query_slice = - grpc_slice_sub(path_with_query_slice, 0, path_length); - - /* substitute previous path with the new path+query */ - grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); - grpc_metadata_batch *b = - op->payload->send_initial_metadata.send_initial_metadata; - error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, - mdelem_path_and_query); - if (error != GRPC_ERROR_NONE) return error; - - calld->on_complete = op->on_complete; - op->on_complete = &calld->hc_on_complete; - op->send_message = false; - grpc_slice_unref_internal(exec_ctx, path_with_query_slice); - } else { - /* Not all data is available. Fall back to POST. */ - gpr_log(GPR_DEBUG, - "Request is marked Cacheable but not all data is available.\ - Falling back to POST"); - method = GRPC_MDELEM_METHOD_POST; - } - } - - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_METHOD); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_SCHEME); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_TE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_CONTENT_TYPE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_USER_AGENT); - - /* Send : prefixed headers, which have to be before any application - layer headers. */ - error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->method, method); - if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->scheme, channeld->static_scheme); - if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); - if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); - if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); - if (error != GRPC_ERROR_NONE) return error; - } - - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->on_done_recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->hc_on_recv_initial_metadata; - } - - if (op->recv_trailing_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_trailing_metadata = - op->payload->recv_trailing_metadata.recv_trailing_metadata; - calld->on_done_recv_trailing_metadata = op->on_complete; - op->on_complete = &calld->hc_on_recv_trailing_metadata; - } - - return GRPC_ERROR_NONE; -} - -static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("hc_start_transport_op", 0); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_error *error = hc_mutate_op(exec_ctx, elem, op); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - } else { - call_data *calld = elem->call_data; - if (op->send_message && calld->send_message_blocked) { - /* Don't forward the op. send_message contains slices that aren't ready - yet. The call will be forwarded by the op_complete of slice read call. - */ - } else { - grpc_call_next_op(exec_ctx, elem, op); - } - } - GPR_TIMER_END("hc_start_transport_op", 0); -} - -/* Constructor for call_data */ -static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_element_args *args) { - call_data *calld = elem->call_data; - calld->on_done_recv_initial_metadata = NULL; - calld->on_done_recv_trailing_metadata = NULL; - calld->on_complete = NULL; - calld->payload_bytes = NULL; - calld->send_message_blocked = false; - grpc_slice_buffer_init(&calld->slices); - grpc_closure_init(&calld->hc_on_recv_initial_metadata, - hc_on_recv_initial_metadata, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hc_on_recv_trailing_metadata, - hc_on_recv_trailing_metadata, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->send_done, send_done, elem, - grpc_schedule_on_exec_ctx); - return GRPC_ERROR_NONE; -} - -/* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_final_info *final_info, - grpc_closure *ignored) { - call_data *calld = elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); -} - -static grpc_mdelem scheme_from_args(const grpc_channel_args *args) { - unsigned i; - size_t j; - grpc_mdelem valid_schemes[] = {GRPC_MDELEM_SCHEME_HTTP, - GRPC_MDELEM_SCHEME_HTTPS}; - if (args != NULL) { - for (i = 0; i < args->num_args; ++i) { - if (args->args[i].type == GRPC_ARG_STRING && - strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) { - for (j = 0; j < GPR_ARRAY_SIZE(valid_schemes); j++) { - if (0 == grpc_slice_str_cmp(GRPC_MDVALUE(valid_schemes[j]), - args->args[i].value.string)) { - return valid_schemes[j]; - } - } - } - } - } - return GRPC_MDELEM_SCHEME_HTTP; -} - -static size_t max_payload_size_from_args(const grpc_channel_args *args) { - if (args != NULL) { - for (size_t i = 0; i < args->num_args; ++i) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET)) { - if (args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET); - } else { - return (size_t)args->args[i].value.integer; - } - } - } - } - return kMaxPayloadSizeForGet; -} - -static grpc_slice user_agent_from_args(const grpc_channel_args *args, - const char *transport_name) { - gpr_strvec v; - size_t i; - int is_first = 1; - char *tmp; - grpc_slice result; - - gpr_strvec_init(&v); - - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_PRIMARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } - } - } - - gpr_asprintf(&tmp, "%sgrpc-c/%s (%s; %s; %s)", is_first ? "" : " ", - grpc_version_string(), GPR_PLATFORM_STRING, transport_name, - grpc_g_stands_for()); - is_first = 0; - gpr_strvec_add(&v, tmp); - - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_SECONDARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } - } - } - - tmp = gpr_strvec_flatten(&v, NULL); - gpr_strvec_destroy(&v); - result = grpc_slice_intern(grpc_slice_from_static_string(tmp)); - gpr_free(tmp); - - return result; -} - -/* Constructor for channel_data */ -static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *chand = elem->channel_data; - GPR_ASSERT(!args->is_last); - GPR_ASSERT(args->optional_transport != NULL); - chand->static_scheme = scheme_from_args(args->channel_args); - chand->max_payload_size_for_get = - max_payload_size_from_args(args->channel_args); - chand->user_agent = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_USER_AGENT, - user_agent_from_args(args->channel_args, - args->optional_transport->vtable->name)); - return GRPC_ERROR_NONE; -} - -/* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - GRPC_MDELEM_UNREF(exec_ctx, chand->user_agent); -} - -const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - grpc_channel_next_get_info, - "http-client"}; diff --git a/src/core/lib/channel/http_client_filter.h b/src/core/lib/channel/http_client_filter.h deleted file mode 100644 index 9e6e106e9c..0000000000 --- a/src/core/lib/channel/http_client_filter.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H -#define GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H - -#include "src/core/lib/channel/channel_stack.h" - -/* Processes metadata on the client side for HTTP2 transports */ -extern const grpc_channel_filter grpc_http_client_filter; - -/* Channel arg to override the http2 :scheme header */ -#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme" - -/* Channel arg to determine maximum size of payload eligable for GET request */ -#define GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET "grpc.max_payload_size_for_get" - -#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H */ diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c deleted file mode 100644 index c1e49ffacc..0000000000 --- a/src/core/lib/channel/http_server_filter.c +++ /dev/null @@ -1,445 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/channel/http_server_filter.h" - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <string.h> -#include "src/core/lib/profiling/timers.h" -#include "src/core/lib/slice/b64.h" -#include "src/core/lib/slice/percent_encoding.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/transport/static_metadata.h" - -#define EXPECTED_CONTENT_TYPE "application/grpc" -#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 - -extern int grpc_http_trace; - -typedef struct call_data { - grpc_linked_mdelem status; - grpc_linked_mdelem content_type; - - /* did this request come with path query containing request payload */ - bool seen_path_with_query; - /* flag to ensure payload_bin is delivered only once */ - bool payload_bin_delivered; - - grpc_metadata_batch *recv_initial_metadata; - uint32_t *recv_initial_metadata_flags; - /** Closure to call when finished with the hs_on_recv hook */ - grpc_closure *on_done_recv; - /** Closure to call when we retrieve read message from the path URI - */ - grpc_closure *recv_message_ready; - grpc_closure *on_complete; - grpc_byte_stream **pp_recv_message; - grpc_slice_buffer read_slice_buffer; - grpc_slice_buffer_stream read_stream; - - /** Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member - after handling it. */ - grpc_closure hs_on_recv; - grpc_closure hs_on_complete; - grpc_closure hs_recv_message_ready; -} call_data; - -typedef struct channel_data { uint8_t unused; } channel_data; - -static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_metadata_batch *b) { - if (b->idx.named.grpc_message != NULL) { - grpc_slice pct_encoded_msg = grpc_percent_encode_slice( - GRPC_MDVALUE(b->idx.named.grpc_message->md), - grpc_compatible_percent_encoding_unreserved_bytes); - if (grpc_slice_is_equivalent(pct_encoded_msg, - GRPC_MDVALUE(b->idx.named.grpc_message->md))) { - grpc_slice_unref_internal(exec_ctx, pct_encoded_msg); - } else { - grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message, - pct_encoded_msg); - } - } - return GRPC_ERROR_NONE; -} - -static void add_error(const char *error_name, grpc_error **cumulative, - grpc_error *new) { - if (new == GRPC_ERROR_NONE) return; - if (*cumulative == GRPC_ERROR_NONE) { - *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); - } - *cumulative = grpc_error_add_child(*cumulative, new); -} - -static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_metadata_batch *b) { - call_data *calld = elem->call_data; - grpc_error *error = GRPC_ERROR_NONE; - static const char *error_name = "Failed processing incoming headers"; - - if (b->idx.named.method != NULL) { - if (grpc_mdelem_eq(b->idx.named.method->md, GRPC_MDELEM_METHOD_POST)) { - *calld->recv_initial_metadata_flags &= - ~(GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | - GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST); - } else if (grpc_mdelem_eq(b->idx.named.method->md, - GRPC_MDELEM_METHOD_PUT)) { - *calld->recv_initial_metadata_flags &= - ~GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; - *calld->recv_initial_metadata_flags |= - GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; - } else if (grpc_mdelem_eq(b->idx.named.method->md, - GRPC_MDELEM_METHOD_GET)) { - *calld->recv_initial_metadata_flags |= - GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; - *calld->recv_initial_metadata_flags &= - ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; - } else { - add_error(error_name, &error, - grpc_attach_md_to_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), - b->idx.named.method->md)); - } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.method); - } else { - add_error( - error_name, &error, - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), - GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":method"))); - } - - if (b->idx.named.te != NULL) { - if (!grpc_mdelem_eq(b->idx.named.te->md, GRPC_MDELEM_TE_TRAILERS)) { - add_error(error_name, &error, - grpc_attach_md_to_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), - b->idx.named.te->md)); - } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.te); - } else { - add_error(error_name, &error, - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), - GRPC_ERROR_STR_KEY, grpc_slice_from_static_string("te"))); - } - - if (b->idx.named.scheme != NULL) { - if (!grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTP) && - !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTPS) && - !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_GRPC)) { - add_error(error_name, &error, - grpc_attach_md_to_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"), - b->idx.named.scheme->md)); - } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.scheme); - } else { - add_error( - error_name, &error, - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), - GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":scheme"))); - } - - if (b->idx.named.content_type != NULL) { - if (!grpc_mdelem_eq(b->idx.named.content_type->md, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)) { - if (grpc_slice_buf_start_eq(GRPC_MDVALUE(b->idx.named.content_type->md), - EXPECTED_CONTENT_TYPE, - EXPECTED_CONTENT_TYPE_LENGTH) && - (GRPC_SLICE_START_PTR(GRPC_MDVALUE( - b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] == - '+' || - GRPC_SLICE_START_PTR(GRPC_MDVALUE( - b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] == - ';')) { - /* Although the C implementation doesn't (currently) generate them, - any custom +-suffix is explicitly valid. */ - /* TODO(klempner): We should consider preallocating common values such - as +proto or +json, or at least stashing them if we see them. */ - /* TODO(klempner): Should we be surfacing this to application code? */ - } else { - /* TODO(klempner): We're currently allowing this, but we shouldn't - see it without a proxy so log for now. */ - char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.content_type->md), - GPR_DUMP_ASCII); - gpr_log(GPR_INFO, "Unexpected content-type '%s'", val); - gpr_free(val); - } - } - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type); - } - - if (b->idx.named.path == NULL) { - add_error(error_name, &error, - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), - GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path"))); - } else if (*calld->recv_initial_metadata_flags & - GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { - /* We have a cacheable request made with GET verb. The path contains the - * query parameter which is base64 encoded request payload. */ - const char k_query_separator = '?'; - grpc_slice path_slice = GRPC_MDVALUE(b->idx.named.path->md); - uint8_t *path_ptr = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); - size_t path_length = GRPC_SLICE_LENGTH(path_slice); - /* offset of the character '?' */ - size_t offset = 0; - for (offset = 0; offset < path_length && *path_ptr != k_query_separator; - path_ptr++, offset++) - ; - if (offset < path_length) { - grpc_slice query_slice = - grpc_slice_sub(path_slice, offset + 1, path_length); - - /* substitute path metadata with just the path (not query) */ - grpc_mdelem mdelem_path_without_query = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_PATH, grpc_slice_sub(path_slice, 0, offset)); - - grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, - mdelem_path_without_query); - - /* decode payload from query and add to the slice buffer to be returned */ - const int k_url_safe = 1; - grpc_slice_buffer_add( - &calld->read_slice_buffer, - grpc_base64_decode(exec_ctx, - (const char *)GRPC_SLICE_START_PTR(query_slice), - k_url_safe)); - grpc_slice_buffer_stream_init(&calld->read_stream, - &calld->read_slice_buffer, 0); - calld->seen_path_with_query = true; - grpc_slice_unref_internal(exec_ctx, query_slice); - } else { - gpr_log(GPR_ERROR, "GET request without QUERY"); - } - } - - if (b->idx.named.host != NULL && b->idx.named.authority == NULL) { - grpc_linked_mdelem *el = b->idx.named.host; - grpc_mdelem md = GRPC_MDELEM_REF(el->md); - grpc_metadata_batch_remove(exec_ctx, b, el); - add_error( - error_name, &error, - grpc_metadata_batch_add_head( - exec_ctx, b, el, grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_AUTHORITY, - grpc_slice_ref_internal(GRPC_MDVALUE(md))))); - GRPC_MDELEM_UNREF(exec_ctx, md); - } - - if (b->idx.named.authority == NULL) { - add_error( - error_name, &error, - grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"), - GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority"))); - } - - return error; -} - -static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *err) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (err == GRPC_ERROR_NONE) { - err = server_filter_incoming_metadata(exec_ctx, elem, - calld->recv_initial_metadata); - } else { - GRPC_ERROR_REF(err); - } - grpc_closure_run(exec_ctx, calld->on_done_recv, err); -} - -static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *err) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - /* Call recv_message_ready if we got the payload via the path field */ - if (calld->seen_path_with_query && calld->recv_message_ready != NULL) { - *calld->pp_recv_message = calld->payload_bin_delivered - ? NULL - : (grpc_byte_stream *)&calld->read_stream; - grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); - calld->recv_message_ready = NULL; - calld->payload_bin_delivered = true; - } - grpc_closure_run(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err)); -} - -static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *err) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (calld->seen_path_with_query) { - /* do nothing. This is probably a GET request, and payload will be returned - in hs_on_complete callback. */ - } else { - grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); - } -} - -static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - - if (op->send_initial_metadata) { - grpc_error *error = GRPC_ERROR_NONE; - static const char *error_name = "Failed sending initial metadata"; - add_error( - error_name, &error, - grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->status, GRPC_MDELEM_STATUS_200)); - add_error( - error_name, &error, - grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, - &calld->content_type, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)); - add_error(error_name, &error, - server_filter_outgoing_metadata( - exec_ctx, elem, - op->payload->send_initial_metadata.send_initial_metadata)); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; - } - } - - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags != NULL); - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->recv_initial_metadata_flags = - op->payload->recv_initial_metadata.recv_flags; - calld->on_done_recv = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->hs_on_recv; - } - - if (op->recv_message) { - calld->recv_message_ready = op->payload->recv_message.recv_message_ready; - calld->pp_recv_message = op->payload->recv_message.recv_message; - if (op->payload->recv_message.recv_message_ready) { - op->payload->recv_message.recv_message_ready = - &calld->hs_recv_message_ready; - } - if (op->on_complete) { - calld->on_complete = op->on_complete; - op->on_complete = &calld->hs_on_complete; - } - } - - if (op->send_trailing_metadata) { - grpc_error *error = server_filter_outgoing_metadata( - exec_ctx, elem, - op->payload->send_trailing_metadata.send_trailing_metadata); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; - } - } -} - -static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - GPR_TIMER_BEGIN("hs_start_transport_op", 0); - hs_mutate_op(exec_ctx, elem, op); - grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("hs_start_transport_op", 0); -} - -/* Constructor for call_data */ -static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_element_args *args) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - /* initialize members */ - grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, - grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&calld->read_slice_buffer); - return GRPC_ERROR_NONE; -} - -/* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_final_info *final_info, - grpc_closure *ignored) { - call_data *calld = elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer); -} - -/* Constructor for channel_data */ -static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - GPR_ASSERT(!args->is_last); - return GRPC_ERROR_NONE; -} - -/* Destructor for channel data */ -static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) {} - -const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_op, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - grpc_channel_next_get_info, - "http-server"}; diff --git a/src/core/lib/channel/http_server_filter.h b/src/core/lib/channel/http_server_filter.h deleted file mode 100644 index 77ba2d263d..0000000000 --- a/src/core/lib/channel/http_server_filter.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H -#define GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H - -#include "src/core/lib/channel/channel_stack.h" - -/* Processes metadata on the client side for HTTP2 transports */ -extern const grpc_channel_filter grpc_http_server_filter; - -#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c deleted file mode 100644 index 57726c8476..0000000000 --- a/src/core/lib/channel/message_size_filter.c +++ /dev/null @@ -1,270 +0,0 @@ -// -// Copyright 2016, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#include "src/core/lib/channel/message_size_filter.h" - -#include <limits.h> -#include <string.h> - -#include <grpc/impl/codegen/grpc_types.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/support/string.h" -#include "src/core/lib/transport/service_config.h" - -typedef struct message_size_limits { - int max_send_size; - int max_recv_size; -} message_size_limits; - -static void* message_size_limits_copy(void* value) { - void* new_value = gpr_malloc(sizeof(message_size_limits)); - memcpy(new_value, value, sizeof(message_size_limits)); - return new_value; -} - -static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) { - gpr_free(value); -} - -static const grpc_slice_hash_table_vtable message_size_limits_vtable = { - message_size_limits_free, message_size_limits_copy}; - -static void* message_size_limits_create_from_json(const grpc_json* json) { - int max_request_message_bytes = -1; - int max_response_message_bytes = -1; - for (grpc_json* field = json->child; field != NULL; field = field->next) { - if (field->key == NULL) continue; - if (strcmp(field->key, "maxRequestMessageBytes") == 0) { - if (max_request_message_bytes >= 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) { - return NULL; - } - max_request_message_bytes = gpr_parse_nonnegative_int(field->value); - if (max_request_message_bytes == -1) return NULL; - } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) { - if (max_response_message_bytes >= 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) { - return NULL; - } - max_response_message_bytes = gpr_parse_nonnegative_int(field->value); - if (max_response_message_bytes == -1) return NULL; - } - } - message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); - value->max_send_size = max_request_message_bytes; - value->max_recv_size = max_response_message_bytes; - return value; -} - -typedef struct call_data { - int max_send_size; - int max_recv_size; - // Receive closures are chained: we inject this closure as the - // recv_message_ready up-call on transport_stream_op, and remember to - // call our next_recv_message_ready member after handling it. - grpc_closure recv_message_ready; - // Used by recv_message_ready. - grpc_byte_stream** recv_message; - // Original recv_message_ready callback, invoked after our own. - grpc_closure* next_recv_message_ready; -} call_data; - -typedef struct channel_data { - int max_send_size; - int max_recv_size; - // Maps path names to message_size_limits structs. - grpc_slice_hash_table* method_limit_table; -} channel_data; - -// Callback invoked when we receive a message. Here we check the max -// receive message size. -static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, - grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; - if (*calld->recv_message != NULL && calld->max_recv_size >= 0 && - (*calld->recv_message)->length > (size_t)calld->max_recv_size) { - char* message_string; - gpr_asprintf(&message_string, - "Received message larger than max (%u vs. %d)", - (*calld->recv_message)->length, calld->max_recv_size); - grpc_error* new_error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INVALID_ARGUMENT); - if (error == GRPC_ERROR_NONE) { - error = new_error; - } else { - error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); - } - gpr_free(message_string); - } - // Invoke the next callback. - grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error); -} - -// Start transport stream op. -static void start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { - call_data* calld = elem->call_data; - // Check max send message size. - if (op->send_message && calld->max_send_size >= 0 && - op->payload->send_message.send_message->length > - (size_t)calld->max_send_size) { - char* message_string; - gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->payload->send_message.send_message->length, - calld->max_send_size); - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_INVALID_ARGUMENT)); - gpr_free(message_string); - return; - } - // Inject callback for receiving a message. - if (op->recv_message) { - calld->next_recv_message_ready = - op->payload->recv_message.recv_message_ready; - calld->recv_message = op->payload->recv_message.recv_message; - op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; - } - // Chain to the next filter. - grpc_call_next_op(exec_ctx, elem, op); -} - -// Constructor for call_data. -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - const grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; - call_data* calld = elem->call_data; - calld->next_recv_message_ready = NULL; - grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem, - grpc_schedule_on_exec_ctx); - // Get max sizes from channel data, then merge in per-method config values. - // Note: Per-method config is only available on the client, so we - // apply the max request size to the send limit and the max response - // size to the receive limit. - calld->max_send_size = chand->max_send_size; - calld->max_recv_size = chand->max_recv_size; - if (chand->method_limit_table != NULL) { - message_size_limits* limits = grpc_method_config_table_get( - exec_ctx, chand->method_limit_table, args->path); - if (limits != NULL) { - if (limits->max_send_size >= 0 && - (limits->max_send_size < calld->max_send_size || - calld->max_send_size < 0)) { - calld->max_send_size = limits->max_send_size; - } - if (limits->max_recv_size >= 0 && - (limits->max_recv_size < calld->max_recv_size || - calld->max_recv_size < 0)) { - calld->max_recv_size = limits->max_recv_size; - } - } - } - return GRPC_ERROR_NONE; -} - -// Destructor for call_data. -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - const grpc_call_final_info* final_info, - grpc_closure* ignored) {} - -// Constructor for channel_data. -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(!args->is_last); - channel_data* chand = elem->channel_data; - chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH; - chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH; - for (size_t i = 0; i < args->channel_args->num_args; ++i) { - if (strcmp(args->channel_args->args[i].key, - GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = { - GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; - chand->max_send_size = - grpc_channel_arg_get_integer(&args->channel_args->args[i], options); - } - if (strcmp(args->channel_args->args[i].key, - GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = { - GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; - chand->max_recv_size = - grpc_channel_arg_get_integer(&args->channel_args->args[i], options); - } - } - // Get method config table from channel args. - const grpc_arg* channel_arg = - grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); - if (channel_arg != NULL) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_service_config* service_config = - grpc_service_config_create(channel_arg->value.string); - if (service_config != NULL) { - chand->method_limit_table = - grpc_service_config_create_method_config_table( - exec_ctx, service_config, message_size_limits_create_from_json, - &message_size_limits_vtable); - grpc_service_config_destroy(service_config); - } - } - return GRPC_ERROR_NONE; -} - -// Destructor for channel_data. -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) { - channel_data* chand = elem->channel_data; - grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table); -} - -const grpc_channel_filter grpc_message_size_filter = { - start_transport_stream_op_batch, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - grpc_channel_next_get_info, - "message_size"}; diff --git a/src/core/lib/channel/message_size_filter.h b/src/core/lib/channel/message_size_filter.h deleted file mode 100644 index a88ff7f81a..0000000000 --- a/src/core/lib/channel/message_size_filter.h +++ /dev/null @@ -1,39 +0,0 @@ -// -// Copyright 2016, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#ifndef GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H -#define GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H - -#include "src/core/lib/channel/channel_stack.h" - -extern const grpc_channel_filter grpc_message_size_filter; - -#endif /* GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H */ |