aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel/message_size_filter.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel/message_size_filter.c')
-rw-r--r--src/core/lib/channel/message_size_filter.c97
1 files changed, 87 insertions, 10 deletions
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index f067a3a51c..1331fe1c65 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -39,12 +39,53 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/transport/method_config.h"
#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
// The protobuf library will (by default) start warning at 100 megs.
#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
+typedef struct message_size_limits {
+ int max_send_size;
+ int max_recv_size;
+} message_size_limits;
+
+static void* message_size_limits_copy(void* value) {
+ void* new_value = gpr_malloc(sizeof(message_size_limits));
+ memcpy(new_value, value, sizeof(message_size_limits));
+ return new_value;
+}
+
+static int message_size_limits_cmp(void* value1, void* value2) {
+ const message_size_limits* v1 = value1;
+ const message_size_limits* v2 = value2;
+ if (v1->max_send_size > v2->max_send_size) return 1;
+ if (v1->max_send_size < v2->max_send_size) return -1;
+ if (v1->max_recv_size > v2->max_recv_size) return 1;
+ if (v1->max_recv_size < v2->max_recv_size) return -1;
+ return 0;
+}
+
+static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = {
+ gpr_free, message_size_limits_copy, message_size_limits_cmp};
+
+static void* method_config_convert_value(
+ const grpc_method_config* method_config) {
+ message_size_limits* value = gpr_malloc(sizeof(message_size_limits));
+ const int32_t* max_request_message_bytes =
+ grpc_method_config_get_max_request_message_bytes(method_config);
+ value->max_send_size =
+ max_request_message_bytes != NULL ? *max_request_message_bytes : -1;
+ const int32_t* max_response_message_bytes =
+ grpc_method_config_get_max_response_message_bytes(method_config);
+ value->max_recv_size =
+ max_response_message_bytes != NULL ? *max_response_message_bytes : -1;
+ return value;
+}
+
typedef struct call_data {
+ int max_send_size;
+ int max_recv_size;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
@@ -58,6 +99,8 @@ typedef struct call_data {
typedef struct channel_data {
int max_send_size;
int max_recv_size;
+ // Maps path names to message_size_limits structs.
+ grpc_mdstr_hash_table* method_limit_table;
} channel_data;
// Callback invoked when we receive a message. Here we check the max
@@ -66,13 +109,12 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_error* error) {
grpc_call_element* elem = user_data;
call_data* calld = elem->call_data;
- channel_data* chand = elem->channel_data;
- if (*calld->recv_message != NULL && chand->max_recv_size >= 0 &&
- (*calld->recv_message)->length > (size_t)chand->max_recv_size) {
+ if (*calld->recv_message != NULL && calld->max_recv_size >= 0 &&
+ (*calld->recv_message)->length > (size_t)calld->max_recv_size) {
char* message_string;
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
- (*calld->recv_message)->length, chand->max_recv_size);
+ (*calld->recv_message)->length, calld->max_recv_size);
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_INVALID_ARGUMENT);
@@ -93,14 +135,13 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
- channel_data* chand = elem->channel_data;
// Check max send message size.
- if (op->send_message != NULL && chand->max_send_size >= 0 &&
- op->send_message->length > (size_t)chand->max_send_size) {
+ if (op->send_message != NULL && calld->max_send_size >= 0 &&
+ op->send_message->length > (size_t)calld->max_send_size) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
- op->send_message->length, chand->max_send_size);
- gpr_slice message = gpr_slice_from_copied_string(message_string);
+ op->send_message->length, calld->max_send_size);
+ grpc_slice message = grpc_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
@@ -119,9 +160,32 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_call_element_args* args) {
+ channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem);
+ // Get max sizes from channel data, then merge in per-method config values.
+ // Note: Per-method config is only available on the client, so we
+ // apply the max request size to the send limit and the max response
+ // size to the receive limit.
+ calld->max_send_size = chand->max_send_size;
+ calld->max_recv_size = chand->max_recv_size;
+ if (chand->method_limit_table != NULL) {
+ message_size_limits* limits =
+ grpc_method_config_table_get(chand->method_limit_table, args->path);
+ if (limits != NULL) {
+ if (limits->max_send_size >= 0 &&
+ (limits->max_send_size < calld->max_send_size ||
+ calld->max_send_size < 0)) {
+ calld->max_send_size = limits->max_send_size;
+ }
+ if (limits->max_recv_size >= 0 &&
+ (limits->max_recv_size < calld->max_recv_size ||
+ calld->max_recv_size < 0)) {
+ calld->max_recv_size = limits->max_recv_size;
+ }
+ }
+ }
return GRPC_ERROR_NONE;
}
@@ -155,11 +219,23 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
}
+ // Get method config table from channel args.
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ chand->method_limit_table = grpc_method_config_table_convert(
+ (grpc_method_config_table*)channel_arg->value.pointer.p,
+ method_config_convert_value, &message_size_limits_vtable);
+ }
}
// Destructor for channel_data.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {}
+ grpc_channel_element* elem) {
+ channel_data* chand = elem->channel_data;
+ grpc_mdstr_hash_table_unref(chand->method_limit_table);
+}
const grpc_channel_filter grpc_message_size_filter = {
start_transport_stream_op,
@@ -172,4 +248,5 @@ const grpc_channel_filter grpc_message_size_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
"message_size"};