aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-09-07 16:54:31 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-09-07 16:54:31 -0700
commit86f1c7a5df2eb71e653ab8f227582e40c73a4c5c (patch)
tree7085d1c5faced206c4b3da2cb1799b898b8c1788 /src/core
parent11622c0648f8b4181b46ad8272d48a34acb2d593 (diff)
Be cautious and wait for possible error causing callbacks before we treat trailing metadata
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc18
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.cc20
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc18
-rw-r--r--src/core/lib/security/transport/server_auth_filter.cc27
4 files changed, 77 insertions, 6 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index 91fa163fec..c7d7f333a5 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -67,6 +67,8 @@ struct call_data {
grpc_closure on_send_message_next_done;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
+ grpc_error* recv_trailing_metadata_err;
+ bool seen_recv_trailing_metadata_ready;
};
struct channel_data {
@@ -157,12 +159,25 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
+ grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+ calld->original_recv_initial_metadata_ready = nullptr;
+ if (calld->seen_recv_trailing_metadata_ready) {
+ GRPC_CALL_COMBINER_START(
+ calld->call_combiner, &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_err, "continue recv trailing metadata");
+ }
+ GRPC_CLOSURE_RUN(closure, error);
}
static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->original_recv_initial_metadata_ready != nullptr) {
+ calld->recv_trailing_metadata_err = GRPC_ERROR_REF(error);
+ calld->seen_recv_trailing_metadata_ready = true;
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+ return;
+ }
if (error == GRPC_ERROR_NONE) {
error =
client_filter_incoming_metadata(elem, calld->recv_trailing_metadata);
@@ -427,6 +442,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->call_combiner = args->call_combiner;
+ calld->seen_recv_trailing_metadata_ready = false;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc
index 926afeec84..484ce9c22f 100644
--- a/src/core/ext/filters/http/server/http_server_filter.cc
+++ b/src/core/ext/filters/http/server/http_server_filter.cc
@@ -64,6 +64,9 @@ struct call_data {
grpc_closure recv_trailing_metadata_ready;
grpc_closure* original_recv_trailing_metadata_ready;
+
+ grpc_error* recv_trailing_metadata_ready_error;
+ bool seen_recv_trailing_metadata_ready;
};
} // namespace
@@ -291,7 +294,15 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) {
} else {
GRPC_ERROR_REF(err);
}
- GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err);
+ grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+ calld->original_recv_initial_metadata_ready = nullptr;
+ if (calld->seen_recv_trailing_metadata_ready) {
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_ready_error,
+ "continue recv trailing metadata");
+ }
+ GRPC_CLOSURE_RUN(closure, err);
}
static void hs_recv_message_ready(void* user_data, grpc_error* err) {
@@ -321,6 +332,12 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) {
static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->original_recv_initial_metadata_ready) {
+ calld->recv_trailing_metadata_ready_error = GRPC_ERROR_REF(err);
+ calld->seen_recv_trailing_metadata_ready = true;
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+ return;
+ }
err = grpc_error_add_child(
GRPC_ERROR_REF(err),
GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error));
@@ -405,6 +422,7 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->call_combiner = args->call_combiner;
+ calld->seen_recv_trailing_metadata_ready = false;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
hs_recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index c17df86f3d..01d483f45e 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -108,6 +108,8 @@ struct call_data {
grpc_closure* next_recv_message_ready;
// Original recv_trailing_metadata callback, invoked after our own.
grpc_closure* original_recv_trailing_metadata_ready;
+ bool seen_recv_trailing_metadata;
+ grpc_error* recv_trailing_metadata_error;
};
struct channel_data {
@@ -147,7 +149,14 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
GRPC_ERROR_REF(error);
}
// Invoke the next callback.
- GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error);
+ grpc_closure* closure = calld->next_recv_message_ready;
+ calld->next_recv_message_ready = nullptr;
+ if (calld->seen_recv_trailing_metadata) {
+ GRPC_CALL_COMBINER_START(
+ calld->call_combiner, &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+ }
+ GRPC_CLOSURE_RUN(closure, error);
}
// Callback invoked on completion of recv_trailing_metadata
@@ -155,6 +164,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->next_recv_message_ready) {
+ calld->seen_recv_trailing_metadata = true;
+ calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for recv message");
+ return;
+ }
error =
grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error));
// Invoke the next callback.
@@ -209,6 +224,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
calld->next_recv_message_ready = nullptr;
calld->original_recv_trailing_metadata_ready = nullptr;
calld->error = GRPC_ERROR_NONE;
+ calld->seen_recv_trailing_metadata = false;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc
index 552e70130a..05dfd09ffb 100644
--- a/src/core/lib/security/transport/server_auth_filter.cc
+++ b/src/core/lib/security/transport/server_auth_filter.cc
@@ -49,6 +49,8 @@ struct call_data {
size_t num_consumed_md;
grpc_closure cancel_closure;
gpr_atm state; // async_state
+ grpc_error* recv_trailing_metadata_error;
+ bool seen_recv_trailing_ready;
};
struct channel_data {
@@ -115,7 +117,14 @@ static void on_md_processing_done_inner(grpc_call_element* elem,
remove_consumed_md, elem, "Response metadata filtering error");
}
calld->error = GRPC_ERROR_REF(error);
- GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error);
+ grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+ calld->original_recv_initial_metadata_ready = nullptr;
+ if (calld->seen_recv_trailing_ready) {
+ GRPC_CALL_COMBINER_START(
+ calld->call_combiner, &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+ }
+ GRPC_CLOSURE_SCHED(closure, error);
}
// Called from application code.
@@ -184,13 +193,24 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
return;
}
}
- GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
- GRPC_ERROR_REF(error));
+ grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+ calld->original_recv_initial_metadata_ready = nullptr;
+ if (calld->seen_recv_trailing_ready) {
+ GRPC_CALL_COMBINER_START(
+ calld->call_combiner, &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+ }
+ GRPC_CLOSURE_RUN(closure, GRPC_ERROR_REF(error));
}
static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->original_recv_initial_metadata_ready) {
+ calld->recv_trailing_metadata_error = GRPC_ERROR_REF(err);
+ calld->seen_recv_trailing_ready = true;
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+ }
err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
}
@@ -228,6 +248,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
+ calld->seen_recv_trailing_ready = false;
// Create server security context. Set its auth context from channel
// data and save it in the call context.
grpc_server_security_context* server_ctx =