diff options
author | 2018-09-24 09:34:52 -0700 | |
---|---|---|
committer | 2018-09-24 09:34:52 -0700 | |
commit | f23fb4cf3114787806c330985c8fb3213597a09b (patch) | |
tree | 1a645472c06011577f374553032a0ae7c5546c16 /src | |
parent | 670a1096c4edfe6d9cbb9b8425cf7a7dfee8c504 (diff) | |
parent | 4c1243478c591438194403d907b8d6971792144a (diff) |
Merge pull request #16582 from yashykt/statuscaution
Be cautious and wait for recv initial metadata and recv message callbacks to complete if they cause errors
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/filters/http/client/http_client_filter.cc | 19 | ||||
-rw-r--r-- | src/core/ext/filters/http/server/http_server_filter.cc | 18 | ||||
-rw-r--r-- | src/core/ext/filters/message_size/message_size_filter.cc | 26 | ||||
-rw-r--r-- | src/core/lib/security/transport/server_auth_filter.cc | 40 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 42 |
5 files changed, 128 insertions, 17 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 91fa163fec..cd459e47cd 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -58,6 +58,8 @@ struct call_data { grpc_metadata_batch* recv_trailing_metadata; grpc_closure* original_recv_trailing_metadata_ready; grpc_closure recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error; + bool seen_recv_trailing_metadata_ready; // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; size_t send_message_bytes_read; @@ -157,12 +159,27 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, "continue recv_trailing_metadata"); + } + GRPC_CLOSURE_RUN(closure, error); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->original_recv_initial_metadata_ready != nullptr) { + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring recv_trailing_metadata_ready until " + "after recv_initial_metadata_ready"); + return; + } if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_trailing_metadata); diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 1b3426b120..436ea09d94 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -63,8 +63,11 @@ struct call_data { grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; bool seen_recv_message_ready; + // State for intercepting recv_trailing_metadata grpc_closure recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_ready_error; + bool seen_recv_trailing_metadata_ready; }; struct channel_data { @@ -301,6 +304,13 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { } else { GRPC_ERROR_REF(err); } + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_ready_error, + "resuming hs_recv_trailing_metadata_ready from " + "hs_recv_initial_metadata_ready"); + } GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err); } @@ -331,6 +341,14 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (!calld->seen_recv_initial_metadata_ready) { + calld->recv_trailing_metadata_ready_error = GRPC_ERROR_REF(err); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring hs_recv_trailing_metadata_ready until " + "ater hs_recv_initial_metadata_ready"); + return; + } err = grpc_error_add_child( GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error)); diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c17df86f3d..2d3b16d992 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -108,6 +108,8 @@ struct call_data { grpc_closure* next_recv_message_ready; // Original recv_trailing_metadata callback, invoked after our own. grpc_closure* original_recv_trailing_metadata_ready; + bool seen_recv_trailing_metadata; + grpc_error* recv_trailing_metadata_error; }; struct channel_data { @@ -147,7 +149,21 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_ERROR_REF(error); } // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); + grpc_closure* closure = calld->next_recv_message_ready; + calld->next_recv_message_ready = nullptr; + if (calld->seen_recv_trailing_metadata) { + /* We might potentially see another RECV_MESSAGE op. In that case, we do not + * want to run the recv_trailing_metadata_ready closure again. The newer + * RECV_MESSAGE op cannot cause any errors since the transport has already + * invoked the recv_trailing_metadata_ready closure and all further + * RECV_MESSAGE ops will get null payloads. */ + calld->seen_recv_trailing_metadata = false; + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, + "continue recv_trailing_metadata_ready"); + } + GRPC_CLOSURE_RUN(closure, error); } // Callback invoked on completion of recv_trailing_metadata @@ -155,6 +171,14 @@ static void recv_message_ready(void* user_data, grpc_error* error) { static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->next_recv_message_ready != nullptr) { + calld->seen_recv_trailing_metadata = true; + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring recv_trailing_metadata_ready until " + "after recv_message_ready"); + return; + } error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); // Invoke the next callback. diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 552e70130a..b99fc5e178 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -41,9 +41,11 @@ struct call_data { grpc_transport_stream_op_batch* recv_initial_metadata_batch; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; - grpc_error* error; + grpc_error* recv_initial_metadata_error; grpc_closure recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error; + bool seen_recv_trailing_metadata_ready; grpc_metadata_array md; const grpc_metadata* consumed_md; size_t num_consumed_md; @@ -114,8 +116,16 @@ static void on_md_processing_done_inner(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); } - calld->error = GRPC_ERROR_REF(error); - GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, + "continue recv_trailing_metadata_ready"); + } + GRPC_CLOSURE_SCHED(closure, error); } // Called from application code. @@ -184,14 +194,30 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { return; } } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, + "continue recv_trailing_metadata_ready"); + } + GRPC_CLOSURE_RUN(closure, GRPC_ERROR_REF(error)); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); - err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); + if (calld->original_recv_initial_metadata_ready != nullptr) { + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(err); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring recv_trailing_metadata_ready until " + "after recv_initial_metadata_ready"); + return; + } + err = grpc_error_add_child( + GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } @@ -249,7 +275,7 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); - GRPC_ERROR_UNREF(calld->error); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); } /* Constructor for channel_data */ diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 5fa58ffdec..72ddc2648d 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -150,12 +150,15 @@ struct call_data { grpc_closure kill_zombie_closure; grpc_closure* on_done_recv_initial_metadata; grpc_closure recv_trailing_metadata_ready; - grpc_error* error; + grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error; + bool seen_recv_trailing_metadata_ready; grpc_closure publish; call_data* pending_next; + grpc_call_combiner* call_combiner; }; struct request_matcher { @@ -727,21 +730,43 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { if (calld->host_set && calld->path_set) { /* do nothing */ } else { + /* Pass the error reference to calld->recv_initial_metadata_error */ grpc_error* src_error = error; error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Missing :authority or :path", &error, 1); + "Missing :authority or :path", &src_error, 1); GRPC_ERROR_UNREF(src_error); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } - - GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error); + grpc_closure* closure = calld->on_done_recv_initial_metadata; + calld->on_done_recv_initial_metadata = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, + "continue server_recv_trailing_metadata_ready"); + } + GRPC_CLOSURE_RUN(closure, error); } static void server_recv_trailing_metadata_ready(void* user_data, - grpc_error* err) { + grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); - err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); + if (calld->on_done_recv_initial_metadata != nullptr) { + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + server_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring server_recv_trailing_metadata_ready " + "until after server_on_recv_initial_metadata"); + return; + } + error = + grpc_error_add_child(GRPC_ERROR_REF(error), + GRPC_ERROR_REF(calld->recv_initial_metadata_error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } static void server_mutate_op(grpc_call_element* elem, @@ -845,6 +870,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, memset(calld, 0, sizeof(call_data)); calld->deadline = GRPC_MILLIS_INF_FUTURE; calld->call = grpc_call_from_top_element(elem); + calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, server_on_recv_initial_metadata, elem, @@ -863,7 +889,7 @@ static void destroy_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); GPR_ASSERT(calld->state != PENDING); - GRPC_ERROR_UNREF(calld->error); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); if (calld->host_set) { grpc_slice_unref_internal(calld->host); } |