From 86f1c7a5df2eb71e653ab8f227582e40c73a4c5c Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 7 Sep 2018 16:54:31 -0700 Subject: Be cautious and wait for possible error causing callbacks before we treat trailing metadata --- .../ext/filters/http/client/http_client_filter.cc | 18 ++++++++++++++- .../ext/filters/http/server/http_server_filter.cc | 20 +++++++++++++++- .../filters/message_size/message_size_filter.cc | 18 ++++++++++++++- .../lib/security/transport/server_auth_filter.cc | 27 +++++++++++++++++++--- 4 files changed, 77 insertions(+), 6 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 91fa163fec..c7d7f333a5 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -67,6 +67,8 @@ struct call_data { grpc_closure on_send_message_next_done; grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; + grpc_error* recv_trailing_metadata_err; + bool seen_recv_trailing_metadata_ready; }; struct channel_data { @@ -157,12 +159,25 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_err, "continue recv trailing metadata"); + } + GRPC_CLOSURE_RUN(closure, error); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); + if (calld->original_recv_initial_metadata_ready != nullptr) { + calld->recv_trailing_metadata_err = GRPC_ERROR_REF(error); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata"); + return; + } if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_trailing_metadata); @@ -427,6 +442,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; + calld->seen_recv_trailing_metadata_ready = false; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 926afeec84..484ce9c22f 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -64,6 +64,9 @@ struct call_data { grpc_closure recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata_ready; + + grpc_error* recv_trailing_metadata_ready_error; + bool seen_recv_trailing_metadata_ready; }; } // namespace @@ -291,7 +294,15 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { } else { GRPC_ERROR_REF(err); } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_ready_error, + "continue recv trailing metadata"); + } + GRPC_CLOSURE_RUN(closure, err); } static void hs_recv_message_ready(void* user_data, grpc_error* err) { @@ -321,6 +332,12 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); + if (calld->original_recv_initial_metadata_ready) { + calld->recv_trailing_metadata_ready_error = GRPC_ERROR_REF(err); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata"); + return; + } err = grpc_error_add_child( GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error)); @@ -405,6 +422,7 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; + calld->seen_recv_trailing_metadata_ready = false; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, hs_recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c17df86f3d..01d483f45e 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -108,6 +108,8 @@ struct call_data { grpc_closure* next_recv_message_ready; // Original recv_trailing_metadata callback, invoked after our own. grpc_closure* original_recv_trailing_metadata_ready; + bool seen_recv_trailing_metadata; + grpc_error* recv_trailing_metadata_error; }; struct channel_data { @@ -147,7 +149,14 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_ERROR_REF(error); } // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); + grpc_closure* closure = calld->next_recv_message_ready; + calld->next_recv_message_ready = nullptr; + if (calld->seen_recv_trailing_metadata) { + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, "continue recv trailing metadata"); + } + GRPC_CLOSURE_RUN(closure, error); } // Callback invoked on completion of recv_trailing_metadata @@ -155,6 +164,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) { static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); + if (calld->next_recv_message_ready) { + calld->seen_recv_trailing_metadata = true; + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for recv message"); + return; + } error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); // Invoke the next callback. @@ -209,6 +224,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, calld->next_recv_message_ready = nullptr; calld->original_recv_trailing_metadata_ready = nullptr; calld->error = GRPC_ERROR_NONE; + calld->seen_recv_trailing_metadata = false; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 552e70130a..05dfd09ffb 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -49,6 +49,8 @@ struct call_data { size_t num_consumed_md; grpc_closure cancel_closure; gpr_atm state; // async_state + grpc_error* recv_trailing_metadata_error; + bool seen_recv_trailing_ready; }; struct channel_data { @@ -115,7 +117,14 @@ static void on_md_processing_done_inner(grpc_call_element* elem, remove_consumed_md, elem, "Response metadata filtering error"); } calld->error = GRPC_ERROR_REF(error); - GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_ready) { + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, "continue recv trailing metadata"); + } + GRPC_CLOSURE_SCHED(closure, error); } // Called from application code. @@ -184,13 +193,24 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { return; } } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_ready) { + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, "continue recv trailing metadata"); + } + GRPC_CLOSURE_RUN(closure, GRPC_ERROR_REF(error)); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); + if (calld->original_recv_initial_metadata_ready) { + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(err); + calld->seen_recv_trailing_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata"); + } err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } @@ -228,6 +248,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, recv_trailing_metadata_ready, elem, grpc_schedule_on_exec_ctx); + calld->seen_recv_trailing_ready = false; // Create server security context. Set its auth context from channel // data and save it in the call context. grpc_server_security_context* server_ctx = -- cgit v1.2.3