diff options
author | 2018-06-08 13:50:32 -0700 | |
---|---|---|
committer | 2018-06-08 13:50:32 -0700 | |
commit | 87daf00f437b2bc9fb3c2ab662e7f7105e3dfccb (patch) | |
tree | 2bafe08c318afbf8e7ef4141718e80561259b62c /src/core/ext/filters/message_size | |
parent | 701a4b1c32a6da0c246bfe573ef3c96a9c652cc2 (diff) |
Simplify call error/status aggregation
Diffstat (limited to 'src/core/ext/filters/message_size')
-rw-r--r-- | src/core/ext/filters/message_size/message_size_filter.cc | 38 |
1 files changed, 37 insertions, 1 deletions
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e62..3cd569b5da 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,10 +99,15 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; + grpc_closure recv_trailing_metadata; + // The error caused by a message that is too large, or GRPC_ERROR_NONE + grpc_error* error; // Used by recv_message_ready. grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; + // Original recv_trailing_metadata callback, invoked after our own. + grpc_closure* next_recv_trailing_metadata; }; struct channel_data { @@ -130,6 +135,8 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_ERROR_UNREF(calld->error); + calld->error = GRPC_ERROR_REF(new_error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { @@ -144,6 +151,23 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); } +// Callback invoked on completion of recv_trailing_metadata +// Notifies the recv_trailing_metadata batch of any message size failures +static void recv_trailing_metadata(void* user_data, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; + if (error == GRPC_ERROR_NONE) { + error = calld->error; + } else if (calld->error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(error); + } else if (calld->error != error) { + error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error); + } + calld->error = GRPC_ERROR_NONE; + // Invoke the next callback. + GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata, error); +} + // Start transport stream op. static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { @@ -172,6 +196,11 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + // Inject callback for receiving trailing metadata. + if (op->recv_trailing_metadata) { + calld->next_recv_trailing_metadata = op->on_complete; + op->on_complete = &calld->recv_trailing_metadata; + } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -183,8 +212,12 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; + calld->next_recv_trailing_metadata = nullptr; + calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata, recv_trailing_metadata, + elem, grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -213,7 +246,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; + GRPC_ERROR_UNREF(calld->error); +} static int default_size(const grpc_channel_args* args, int without_minimal_stack) { |