diff options
Diffstat (limited to 'src/core/ext/filters/message_size/message_size_filter.cc')
-rw-r--r-- | src/core/ext/filters/message_size/message_size_filter.cc | 141 |
1 files changed, 101 insertions, 40 deletions
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e62..94d6942aa4 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -90,27 +90,70 @@ RefCountedPtr<MessageSizeLimits> MessageSizeLimits::CreateFromJson( } // namespace } // namespace grpc_core +static void recv_message_ready(void* user_data, grpc_error* error); +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error); + namespace { +struct channel_data { + message_size_limits limits; + // Maps path names to refcounted_message_size_limits structs. + grpc_core::RefCountedPtr<grpc_core::SliceHashTable< + grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>> + method_limit_table; +}; + struct call_data { + call_data(grpc_call_element* elem, const channel_data& chand, + const grpc_call_element_args& args) + : call_combiner(args.call_combiner), limits(chand.limits) { + GRPC_CLOSURE_INIT(&recv_message_ready, ::recv_message_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, + ::recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + // Get max sizes from channel data, then merge in per-method config values. + // Note: Per-method config is only available on the client, so we + // apply the max request size to the send limit and the max response + // size to the receive limit. + if (chand.method_limit_table != nullptr) { + grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits = + grpc_core::ServiceConfig::MethodConfigTableLookup( + *chand.method_limit_table, args.path); + if (limits != nullptr) { + if (limits->limits().max_send_size >= 0 && + (limits->limits().max_send_size < this->limits.max_send_size || + this->limits.max_send_size < 0)) { + this->limits.max_send_size = limits->limits().max_send_size; + } + if (limits->limits().max_recv_size >= 0 && + (limits->limits().max_recv_size < this->limits.max_recv_size || + this->limits.max_recv_size < 0)) { + this->limits.max_recv_size = limits->limits().max_recv_size; + } + } + } + } + + ~call_data() { GRPC_ERROR_UNREF(error); } + grpc_call_combiner* call_combiner; message_size_limits limits; // Receive closures are chained: we inject this closure as the // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; + grpc_closure recv_trailing_metadata_ready; + // The error caused by a message that is too large, or GRPC_ERROR_NONE + grpc_error* error = GRPC_ERROR_NONE; // Used by recv_message_ready. - grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr; // Original recv_message_ready callback, invoked after our own. - grpc_closure* next_recv_message_ready; -}; - -struct channel_data { - message_size_limits limits; - // Maps path names to refcounted_message_size_limits structs. - grpc_core::RefCountedPtr<grpc_core::SliceHashTable< - grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>> - method_limit_table; + grpc_closure* next_recv_message_ready = nullptr; + // Original recv_trailing_metadata callback, invoked after our own. + grpc_closure* original_recv_trailing_metadata_ready; + bool seen_recv_trailing_metadata = false; + grpc_error* recv_trailing_metadata_error; }; } // namespace @@ -130,18 +173,52 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_ERROR_UNREF(calld->error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); } + calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); } else { GRPC_ERROR_REF(error); } // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); + grpc_closure* closure = calld->next_recv_message_ready; + calld->next_recv_message_ready = nullptr; + if (calld->seen_recv_trailing_metadata) { + /* We might potentially see another RECV_MESSAGE op. In that case, we do not + * want to run the recv_trailing_metadata_ready closure again. The newer + * RECV_MESSAGE op cannot cause any errors since the transport has already + * invoked the recv_trailing_metadata_ready closure and all further + * RECV_MESSAGE ops will get null payloads. */ + calld->seen_recv_trailing_metadata = false; + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, + "continue recv_trailing_metadata_ready"); + } + GRPC_CLOSURE_RUN(closure, error); +} + +// Callback invoked on completion of recv_trailing_metadata +// Notifies the recv_trailing_metadata batch of any message size failures +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->next_recv_message_ready != nullptr) { + calld->seen_recv_trailing_metadata = true; + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring recv_trailing_metadata_ready until " + "after recv_message_ready"); + return; + } + error = + grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); + // Invoke the next callback. + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } // Start transport stream op. @@ -172,6 +249,13 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + // Inject callback for receiving trailing metadata. + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -180,40 +264,17 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->call_combiner = args->call_combiner; - calld->next_recv_message_ready = nullptr; - GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, - grpc_schedule_on_exec_ctx); - // Get max sizes from channel data, then merge in per-method config values. - // Note: Per-method config is only available on the client, so we - // apply the max request size to the send limit and the max response - // size to the receive limit. - calld->limits = chand->limits; - if (chand->method_limit_table != nullptr) { - grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits = - grpc_core::ServiceConfig::MethodConfigTableLookup( - *chand->method_limit_table, args->path); - if (limits != nullptr) { - if (limits->limits().max_send_size >= 0 && - (limits->limits().max_send_size < calld->limits.max_send_size || - calld->limits.max_send_size < 0)) { - calld->limits.max_send_size = limits->limits().max_send_size; - } - if (limits->limits().max_recv_size >= 0 && - (limits->limits().max_recv_size < calld->limits.max_recv_size || - calld->limits.max_recv_size < 0)) { - calld->limits.max_recv_size = limits->limits().max_recv_size; - } - } - } + new (elem->call_data) call_data(elem, *chand, *args); return GRPC_ERROR_NONE; } // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; + calld->~call_data(); +} static int default_size(const grpc_channel_args* args, int without_minimal_stack) { |