From 0fc9999fba0b7869206026e7e592b7b7199d997b Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Wed, 4 Jan 2017 23:50:42 +0100 Subject: Putting message size filter back. --- src/core/ext/client_channel/message_size_filter.c | 257 ---------------------- src/core/ext/client_channel/message_size_filter.h | 39 ---- src/core/lib/channel/message_size_filter.c | 257 ++++++++++++++++++++++ src/core/lib/channel/message_size_filter.h | 39 ++++ src/core/lib/surface/init.c | 10 + src/python/grpcio/grpc_core_dependencies.py | 1 + 6 files changed, 307 insertions(+), 296 deletions(-) delete mode 100644 src/core/ext/client_channel/message_size_filter.c delete mode 100644 src/core/ext/client_channel/message_size_filter.h create mode 100644 src/core/lib/channel/message_size_filter.c create mode 100644 src/core/lib/channel/message_size_filter.h (limited to 'src') diff --git a/src/core/ext/client_channel/message_size_filter.c b/src/core/ext/client_channel/message_size_filter.c deleted file mode 100644 index c6d94cb6d0..0000000000 --- a/src/core/ext/client_channel/message_size_filter.c +++ /dev/null @@ -1,257 +0,0 @@ -// -// Copyright 2016, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#include "src/core/ext/client_channel/message_size_filter.h" - -#include -#include - -#include -#include -#include -#include - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/support/string.h" -#include "src/core/lib/transport/service_config.h" - -typedef struct message_size_limits { - int max_send_size; - int max_recv_size; -} message_size_limits; - -static void* message_size_limits_copy(void* value) { - void* new_value = gpr_malloc(sizeof(message_size_limits)); - memcpy(new_value, value, sizeof(message_size_limits)); - return new_value; -} - -static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = { - gpr_free, message_size_limits_copy}; - -static void* message_size_limits_create_from_json(const grpc_json* json) { - int max_request_message_bytes = -1; - int max_response_message_bytes = -1; - for (grpc_json* field = json->child; field != NULL; field = field->next) { - if (field->key == NULL) continue; - if (strcmp(field->key, "maxRequestMessageBytes") == 0) { - if (max_request_message_bytes >= 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING) return NULL; - max_request_message_bytes = gpr_parse_nonnegative_int(field->value); - if (max_request_message_bytes == -1) return NULL; - } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) { - if (max_response_message_bytes >= 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING) return NULL; - max_response_message_bytes = gpr_parse_nonnegative_int(field->value); - if (max_response_message_bytes == -1) return NULL; - } - } - message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); - value->max_send_size = max_request_message_bytes; - value->max_recv_size = max_response_message_bytes; - return value; -} - -typedef struct call_data { - int max_send_size; - int max_recv_size; - // Receive closures are chained: we inject this closure as the - // recv_message_ready up-call on transport_stream_op, and remember to - // call our next_recv_message_ready member after handling it. - grpc_closure recv_message_ready; - // Used by recv_message_ready. - grpc_byte_stream** recv_message; - // Original recv_message_ready callback, invoked after our own. - grpc_closure* next_recv_message_ready; -} call_data; - -typedef struct channel_data { - int max_send_size; - int max_recv_size; - // Maps path names to message_size_limits structs. - grpc_mdstr_hash_table* method_limit_table; -} channel_data; - -// Callback invoked when we receive a message. Here we check the max -// receive message size. -static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, - grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; - if (*calld->recv_message != NULL && calld->max_recv_size >= 0 && - (*calld->recv_message)->length > (size_t)calld->max_recv_size) { - char* message_string; - gpr_asprintf(&message_string, - "Received message larger than max (%u vs. %d)", - (*calld->recv_message)->length, calld->max_recv_size); - grpc_error* new_error = grpc_error_set_int( - GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_INVALID_ARGUMENT); - if (error == GRPC_ERROR_NONE) { - error = new_error; - } else { - error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); - } - gpr_free(message_string); - } - // Invoke the next callback. - grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error); -} - -// Start transport stream op. -static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_transport_stream_op* op) { - call_data* calld = elem->call_data; - // Check max send message size. - if (op->send_message != NULL && calld->max_send_size >= 0 && - op->send_message->length > (size_t)calld->max_send_size) { - char* message_string; - gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->send_message->length, calld->max_send_size); - grpc_slice message = grpc_slice_from_copied_string(message_string); - gpr_free(message_string); - grpc_call_element_send_close_with_message( - exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); - } - // Inject callback for receiving a message. - if (op->recv_message_ready != NULL) { - calld->next_recv_message_ready = op->recv_message_ready; - calld->recv_message = op->recv_message; - op->recv_message_ready = &calld->recv_message_ready; - } - // Chain to the next filter. - grpc_call_next_op(exec_ctx, elem, op); -} - -// Constructor for call_data. -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; - call_data* calld = elem->call_data; - calld->next_recv_message_ready = NULL; - grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem, - grpc_schedule_on_exec_ctx); - // Get max sizes from channel data, then merge in per-method config values. - // Note: Per-method config is only available on the client, so we - // apply the max request size to the send limit and the max response - // size to the receive limit. - calld->max_send_size = chand->max_send_size; - calld->max_recv_size = chand->max_recv_size; - if (chand->method_limit_table != NULL) { - message_size_limits* limits = - grpc_method_config_table_get(chand->method_limit_table, args->path); - if (limits != NULL) { - if (limits->max_send_size >= 0 && - (limits->max_send_size < calld->max_send_size || - calld->max_send_size < 0)) { - calld->max_send_size = limits->max_send_size; - } - if (limits->max_recv_size >= 0 && - (limits->max_recv_size < calld->max_recv_size || - calld->max_recv_size < 0)) { - calld->max_recv_size = limits->max_recv_size; - } - } - } - return GRPC_ERROR_NONE; -} - -// Destructor for call_data. -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - const grpc_call_final_info* final_info, - void* ignored) {} - -// Constructor for channel_data. -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(!args->is_last); - channel_data* chand = elem->channel_data; - memset(chand, 0, sizeof(*chand)); - chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH; - chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH; - for (size_t i = 0; i < args->channel_args->num_args; ++i) { - if (strcmp(args->channel_args->args[i].key, - GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = { - GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; - chand->max_send_size = - grpc_channel_arg_get_integer(&args->channel_args->args[i], options); - } - if (strcmp(args->channel_args->args[i].key, - GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = { - GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; - chand->max_recv_size = - grpc_channel_arg_get_integer(&args->channel_args->args[i], options); - } - } - // Get method config table from channel args. - const grpc_arg* channel_arg = - grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); - if (channel_arg != NULL) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_service_config* service_config = - grpc_service_config_create(channel_arg->value.string); - if (service_config != NULL) { - chand->method_limit_table = - grpc_service_config_create_method_config_table( - service_config, message_size_limits_create_from_json, - &message_size_limits_vtable); - grpc_service_config_destroy(service_config); - } - } - return GRPC_ERROR_NONE; -} - -// Destructor for channel_data. -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) { - channel_data* chand = elem->channel_data; - grpc_mdstr_hash_table_unref(chand->method_limit_table); -} - -const grpc_channel_filter grpc_message_size_filter = { - start_transport_stream_op, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - grpc_channel_next_get_info, - "message_size"}; diff --git a/src/core/ext/client_channel/message_size_filter.h b/src/core/ext/client_channel/message_size_filter.h deleted file mode 100644 index 8ee4f4cca4..0000000000 --- a/src/core/ext/client_channel/message_size_filter.h +++ /dev/null @@ -1,39 +0,0 @@ -// -// Copyright 2016, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_MESSAGE_SIZE_FILTER_H -#define GRPC_CORE_EXT_CLIENT_CHANNEL_MESSAGE_SIZE_FILTER_H - -#include "src/core/lib/channel/channel_stack.h" - -extern const grpc_channel_filter grpc_message_size_filter; - -#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_MESSAGE_SIZE_FILTER_H */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c new file mode 100644 index 0000000000..b5e882de52 --- /dev/null +++ b/src/core/lib/channel/message_size_filter.c @@ -0,0 +1,257 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/lib/channel/message_size_filter.h" + +#include +#include + +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/transport/service_config.h" + +typedef struct message_size_limits { + int max_send_size; + int max_recv_size; +} message_size_limits; + +static void* message_size_limits_copy(void* value) { + void* new_value = gpr_malloc(sizeof(message_size_limits)); + memcpy(new_value, value, sizeof(message_size_limits)); + return new_value; +} + +static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = { + gpr_free, message_size_limits_copy}; + +static void* message_size_limits_create_from_json(const grpc_json* json) { + int max_request_message_bytes = -1; + int max_response_message_bytes = -1; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) continue; + if (strcmp(field->key, "maxRequestMessageBytes") == 0) { + if (max_request_message_bytes >= 0) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + max_request_message_bytes = gpr_parse_nonnegative_int(field->value); + if (max_request_message_bytes == -1) return NULL; + } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) { + if (max_response_message_bytes >= 0) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + max_response_message_bytes = gpr_parse_nonnegative_int(field->value); + if (max_response_message_bytes == -1) return NULL; + } + } + message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); + value->max_send_size = max_request_message_bytes; + value->max_recv_size = max_response_message_bytes; + return value; +} + +typedef struct call_data { + int max_send_size; + int max_recv_size; + // Receive closures are chained: we inject this closure as the + // recv_message_ready up-call on transport_stream_op, and remember to + // call our next_recv_message_ready member after handling it. + grpc_closure recv_message_ready; + // Used by recv_message_ready. + grpc_byte_stream** recv_message; + // Original recv_message_ready callback, invoked after our own. + grpc_closure* next_recv_message_ready; +} call_data; + +typedef struct channel_data { + int max_send_size; + int max_recv_size; + // Maps path names to message_size_limits structs. + grpc_mdstr_hash_table* method_limit_table; +} channel_data; + +// Callback invoked when we receive a message. Here we check the max +// receive message size. +static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_error* error) { + grpc_call_element* elem = user_data; + call_data* calld = elem->call_data; + if (*calld->recv_message != NULL && calld->max_recv_size >= 0 && + (*calld->recv_message)->length > (size_t)calld->max_recv_size) { + char* message_string; + gpr_asprintf(&message_string, + "Received message larger than max (%u vs. %d)", + (*calld->recv_message)->length, calld->max_recv_size); + grpc_error* new_error = grpc_error_set_int( + GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_INVALID_ARGUMENT); + if (error == GRPC_ERROR_NONE) { + error = new_error; + } else { + error = grpc_error_add_child(error, new_error); + GRPC_ERROR_UNREF(new_error); + } + gpr_free(message_string); + } + // Invoke the next callback. + grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error); +} + +// Start transport stream op. +static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_transport_stream_op* op) { + call_data* calld = elem->call_data; + // Check max send message size. + if (op->send_message != NULL && calld->max_send_size >= 0 && + op->send_message->length > (size_t)calld->max_send_size) { + char* message_string; + gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", + op->send_message->length, calld->max_send_size); + grpc_slice message = grpc_slice_from_copied_string(message_string); + gpr_free(message_string); + grpc_call_element_send_close_with_message( + exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); + } + // Inject callback for receiving a message. + if (op->recv_message_ready != NULL) { + calld->next_recv_message_ready = op->recv_message_ready; + calld->recv_message = op->recv_message; + op->recv_message_ready = &calld->recv_message_ready; + } + // Chain to the next filter. + grpc_call_next_op(exec_ctx, elem, op); +} + +// Constructor for call_data. +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_call_element_args* args) { + channel_data* chand = elem->channel_data; + call_data* calld = elem->call_data; + calld->next_recv_message_ready = NULL; + grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem, + grpc_schedule_on_exec_ctx); + // Get max sizes from channel data, then merge in per-method config values. + // Note: Per-method config is only available on the client, so we + // apply the max request size to the send limit and the max response + // size to the receive limit. + calld->max_send_size = chand->max_send_size; + calld->max_recv_size = chand->max_recv_size; + if (chand->method_limit_table != NULL) { + message_size_limits* limits = + grpc_method_config_table_get(chand->method_limit_table, args->path); + if (limits != NULL) { + if (limits->max_send_size >= 0 && + (limits->max_send_size < calld->max_send_size || + calld->max_send_size < 0)) { + calld->max_send_size = limits->max_send_size; + } + if (limits->max_recv_size >= 0 && + (limits->max_recv_size < calld->max_recv_size || + calld->max_recv_size < 0)) { + calld->max_recv_size = limits->max_recv_size; + } + } + } + return GRPC_ERROR_NONE; +} + +// Destructor for call_data. +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + void* ignored) {} + +// Constructor for channel_data. +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + GPR_ASSERT(!args->is_last); + channel_data* chand = elem->channel_data; + memset(chand, 0, sizeof(*chand)); + chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH; + chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH; + for (size_t i = 0; i < args->channel_args->num_args; ++i) { + if (strcmp(args->channel_args->args[i].key, + GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; + chand->max_send_size = + grpc_channel_arg_get_integer(&args->channel_args->args[i], options); + } + if (strcmp(args->channel_args->args[i].key, + GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; + chand->max_recv_size = + grpc_channel_arg_get_integer(&args->channel_args->args[i], options); + } + } + // Get method config table from channel args. + const grpc_arg* channel_arg = + grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != NULL) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_service_config* service_config = + grpc_service_config_create(channel_arg->value.string); + if (service_config != NULL) { + chand->method_limit_table = + grpc_service_config_create_method_config_table( + service_config, message_size_limits_create_from_json, + &message_size_limits_vtable); + grpc_service_config_destroy(service_config); + } + } + return GRPC_ERROR_NONE; +} + +// Destructor for channel_data. +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) { + channel_data* chand = elem->channel_data; + grpc_mdstr_hash_table_unref(chand->method_limit_table); +} + +const grpc_channel_filter grpc_message_size_filter = { + start_transport_stream_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "message_size"}; diff --git a/src/core/lib/channel/message_size_filter.h b/src/core/lib/channel/message_size_filter.h new file mode 100644 index 0000000000..a88ff7f81a --- /dev/null +++ b/src/core/lib/channel/message_size_filter.h @@ -0,0 +1,39 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H +#define GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_message_size_filter; + +#endif /* GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H */ diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 64aa0e3361..7903f57a68 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -46,6 +46,7 @@ #include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/channel/message_size_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/combiner.h" @@ -106,6 +107,15 @@ static void register_builtin_channel_init() { grpc_channel_init_register_stage( GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_server_deadline_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + prepend_filter, (void *)&grpc_message_size_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + prepend_filter, (void *)&grpc_message_size_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, + (void *)&grpc_message_size_filter); grpc_channel_init_register_stage( GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_compress_filter); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 804c30f798..179df7bb14 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -84,6 +84,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/handshaker.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', + 'src/core/lib/channel/message_size_filter.c', 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', 'src/core/lib/debug/trace.c', -- cgit v1.2.3