aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/message_size
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2018-06-08 13:50:32 -0700
committerGravatar kpayson64 <kpayson@google.com>2018-06-08 13:50:32 -0700
commit87daf00f437b2bc9fb3c2ab662e7f7105e3dfccb (patch)
tree2bafe08c318afbf8e7ef4141718e80561259b62c /src/core/ext/filters/message_size
parent701a4b1c32a6da0c246bfe573ef3c96a9c652cc2 (diff)
Simplify call error/status aggregation
Diffstat (limited to 'src/core/ext/filters/message_size')
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc38
1 files changed, 37 insertions, 1 deletions
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index c7fc3f2e62..3cd569b5da 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -99,10 +99,15 @@ struct call_data {
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
+ grpc_closure recv_trailing_metadata;
+ // The error caused by a message that is too large, or GRPC_ERROR_NONE
+ grpc_error* error;
// Used by recv_message_ready.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready;
+ // Original recv_trailing_metadata callback, invoked after our own.
+ grpc_closure* next_recv_trailing_metadata;
};
struct channel_data {
@@ -130,6 +135,8 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_ERROR_UNREF(calld->error);
+ calld->error = GRPC_ERROR_REF(new_error);
if (error == GRPC_ERROR_NONE) {
error = new_error;
} else {
@@ -144,6 +151,23 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error);
}
+// Callback invoked on completion of recv_trailing_metadata
+// Notifies the recv_trailing_metadata batch of any message size failures
+static void recv_trailing_metadata(void* user_data, grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)user_data;
+ call_data* calld = (call_data*)elem->call_data;
+ if (error == GRPC_ERROR_NONE) {
+ error = calld->error;
+ } else if (calld->error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_REF(error);
+ } else if (calld->error != error) {
+ error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error);
+ }
+ calld->error = GRPC_ERROR_NONE;
+ // Invoke the next callback.
+ GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata, error);
+}
+
// Start transport stream op.
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
@@ -172,6 +196,11 @@ static void start_transport_stream_op_batch(
calld->recv_message = op->payload->recv_message.recv_message;
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
+ // Inject callback for receiving trailing metadata.
+ if (op->recv_trailing_metadata) {
+ calld->next_recv_trailing_metadata = op->on_complete;
+ op->on_complete = &calld->recv_trailing_metadata;
+ }
// Chain to the next filter.
grpc_call_next_op(elem, op);
}
@@ -183,8 +212,12 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->call_combiner = args->call_combiner;
calld->next_recv_message_ready = nullptr;
+ calld->next_recv_trailing_metadata = nullptr;
+ calld->error = GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata, recv_trailing_metadata,
+ elem, grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
@@ -213,7 +246,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
// Destructor for call_data.
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
- grpc_closure* ignored) {}
+ grpc_closure* ignored) {
+ call_data* calld = (call_data*)elem->call_data;
+ GRPC_ERROR_UNREF(calld->error);
+}
static int default_size(const grpc_channel_args* args,
int without_minimal_stack) {