diff options
Diffstat (limited to 'src/core/ext')
3 files changed, 61 insertions, 2 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 91fa163fec..cd459e47cd 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -58,6 +58,8 @@ struct call_data { grpc_metadata_batch* recv_trailing_metadata; grpc_closure* original_recv_trailing_metadata_ready; grpc_closure recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error; + bool seen_recv_trailing_metadata_ready; // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; size_t send_message_bytes_read; @@ -157,12 +159,27 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, "continue recv_trailing_metadata"); + } + GRPC_CLOSURE_RUN(closure, error); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->original_recv_initial_metadata_ready != nullptr) { + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring recv_trailing_metadata_ready until " + "after recv_initial_metadata_ready"); + return; + } if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_trailing_metadata); diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 1b3426b120..436ea09d94 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -63,8 +63,11 @@ struct call_data { grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; bool seen_recv_message_ready; + // State for intercepting recv_trailing_metadata grpc_closure recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_ready_error; + bool seen_recv_trailing_metadata_ready; }; struct channel_data { @@ -301,6 +304,13 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { } else { GRPC_ERROR_REF(err); } + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_ready_error, + "resuming hs_recv_trailing_metadata_ready from " + "hs_recv_initial_metadata_ready"); + } GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err); } @@ -331,6 +341,14 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (!calld->seen_recv_initial_metadata_ready) { + calld->recv_trailing_metadata_ready_error = GRPC_ERROR_REF(err); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring hs_recv_trailing_metadata_ready until " + "ater hs_recv_initial_metadata_ready"); + return; + } err = grpc_error_add_child( GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error)); diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c17df86f3d..2d3b16d992 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -108,6 +108,8 @@ struct call_data { grpc_closure* next_recv_message_ready; // Original recv_trailing_metadata callback, invoked after our own. grpc_closure* original_recv_trailing_metadata_ready; + bool seen_recv_trailing_metadata; + grpc_error* recv_trailing_metadata_error; }; struct channel_data { @@ -147,7 +149,21 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_ERROR_REF(error); } // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); + grpc_closure* closure = calld->next_recv_message_ready; + calld->next_recv_message_ready = nullptr; + if (calld->seen_recv_trailing_metadata) { + /* We might potentially see another RECV_MESSAGE op. In that case, we do not + * want to run the recv_trailing_metadata_ready closure again. The newer + * RECV_MESSAGE op cannot cause any errors since the transport has already + * invoked the recv_trailing_metadata_ready closure and all further + * RECV_MESSAGE ops will get null payloads. */ + calld->seen_recv_trailing_metadata = false; + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, + "continue recv_trailing_metadata_ready"); + } + GRPC_CLOSURE_RUN(closure, error); } // Callback invoked on completion of recv_trailing_metadata @@ -155,6 +171,14 @@ static void recv_message_ready(void* user_data, grpc_error* error) { static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->next_recv_message_ready != nullptr) { + calld->seen_recv_trailing_metadata = true; + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring recv_trailing_metadata_ready until " + "after recv_message_ready"); + return; + } error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); // Invoke the next callback. |