diff options
author | 2018-05-24 13:20:15 -0700 | |
---|---|---|
committer | 2018-05-24 13:20:15 -0700 | |
commit | 89736891e8bd89f5a7ec6a32604558e35bb9caee (patch) | |
tree | f41b0547450204fe5a149ec7e589dcd7943dfdbe /src/core/ext/filters/http | |
parent | f9035befc6c9b24f7111914d620102e58e80e94f (diff) |
Cleanup http_server_filter.
Diffstat (limited to 'src/core/ext/filters/http')
-rw-r--r-- | src/core/ext/filters/http/server/http_server_filter.cc | 135 |
1 files changed, 63 insertions, 72 deletions
diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index c202015875..83b8c291bb 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -35,39 +35,33 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 namespace { + struct call_data { grpc_call_combiner* call_combiner; + // Outgoing headers to add to send_initial_metadata. grpc_linked_mdelem status; grpc_linked_mdelem content_type; - /* did this request come with path query containing request payload */ - bool seen_path_with_query; - /* flag to ensure payload_bin is delivered only once */ - bool payload_bin_delivered; + // If we see the recv_message contents in the GET query string, we + // store it here. + grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> read_stream; + bool have_read_stream; + // State for intercepting recv_initial_metadata. + grpc_closure recv_initial_metadata_ready; + grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; - /** Closure to call when finished with the hs_on_recv hook */ - grpc_closure* on_done_recv; - /** Closure to call when we retrieve read message from the path URI - */ - grpc_closure* recv_message_ready; - grpc_closure* on_complete; - grpc_core::OrphanablePtr<grpc_core::ByteStream>* pp_recv_message; - grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> read_stream; + bool seen_recv_initial_metadata_ready; - /** Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member - after handling it. */ - grpc_closure hs_on_recv; - grpc_closure hs_on_complete; - grpc_closure hs_recv_message_ready; + // State for intercepting recv_message. + grpc_closure* original_recv_message_ready; + grpc_closure recv_message_ready; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; + bool seen_recv_message_ready; }; -struct channel_data { - uint8_t unused; -}; } // namespace static grpc_error* server_filter_outgoing_metadata(grpc_call_element* elem, @@ -235,7 +229,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, GRPC_SLICE_LENGTH(query_slice), k_url_safe)); calld->read_stream.Init(&read_slice_buffer, 0); grpc_slice_buffer_destroy_internal(&read_slice_buffer); - calld->seen_path_with_query = true; + calld->have_read_stream = true; grpc_slice_unref_internal(query_slice); } else { gpr_log(GPR_ERROR, "GET request without QUERY"); @@ -266,49 +260,55 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, return error; } -static void hs_on_recv(void* user_data, grpc_error* err) { +static void recv_initial_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { err = server_filter_incoming_metadata(elem, calld->recv_initial_metadata); + if (calld->seen_recv_message_ready) { + // We've already seen the recv_message callback, but we previously + // deferred it, so we need to return it here. + // Replace the recv_message byte stream if needed. + if (calld->have_read_stream) { + calld->recv_message->reset(calld->read_stream.get()); + calld->have_read_stream = false; + } + // Re-enter call combiner for original_recv_message_ready, since the + // surface code will release the call combiner for each callback it + // receives. + GRPC_CALL_COMBINER_START( + calld->call_combiner, calld->original_recv_message_ready, + GRPC_ERROR_REF(err), + "resuming recv_message_ready from recv_initial_metadata_ready"); + } } else { GRPC_ERROR_REF(err); } - GRPC_CLOSURE_RUN(calld->on_done_recv, err); -} - -static void hs_on_complete(void* user_data, grpc_error* err) { - grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - /* Call recv_message_ready if we got the payload via the path field */ - if (calld->seen_path_with_query && calld->recv_message_ready != nullptr) { - calld->pp_recv_message->reset( - calld->payload_bin_delivered ? nullptr - : reinterpret_cast<grpc_core::ByteStream*>( - calld->read_stream.get())); - // Re-enter call combiner for recv_message_ready, since the surface - // code will release the call combiner for each callback it receives. - GRPC_CALL_COMBINER_START(calld->call_combiner, calld->recv_message_ready, - GRPC_ERROR_REF(err), - "resuming recv_message_ready from on_complete"); - calld->recv_message_ready = nullptr; - calld->payload_bin_delivered = true; - } - GRPC_CLOSURE_RUN(calld->on_complete, GRPC_ERROR_REF(err)); + GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err); } -static void hs_recv_message_ready(void* user_data, grpc_error* err) { +static void recv_message_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); - if (calld->seen_path_with_query) { - // Do nothing. This is probably a GET request, and payload will be - // returned in hs_on_complete callback. + calld->seen_recv_message_ready = true; + if (calld->seen_recv_initial_metadata_ready) { + // We've already seen the recv_initial_metadata callback, so + // replace the recv_message byte stream if needed and invoke the + // original recv_message callback immediately. + if (calld->have_read_stream) { + calld->recv_message->reset(calld->read_stream.get()); + calld->have_read_stream = false; + } + GRPC_CLOSURE_RUN(calld->original_recv_message_ready, GRPC_ERROR_REF(err)); + } else { + // We have not yet seen the recv_initial_metadata callback, so we + // need to wait to see if this is a GET request. // Note that we release the call combiner here, so that other // callbacks can run. - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "pausing recv_message_ready until on_complete"); - } else { - GRPC_CLOSURE_RUN(calld->recv_message_ready, GRPC_ERROR_REF(err)); + GRPC_CALL_COMBINER_STOP( + calld->call_combiner, + "pausing recv_message_ready until recv_initial_metadata_ready"); } } @@ -343,23 +343,17 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_initial_metadata.recv_initial_metadata; calld->recv_initial_metadata_flags = op->payload->recv_initial_metadata.recv_flags; - calld->on_done_recv = + calld->original_recv_initial_metadata_ready = op->payload->recv_initial_metadata.recv_initial_metadata_ready; op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->hs_on_recv; + &calld->recv_initial_metadata_ready; } if (op->recv_message) { - calld->recv_message_ready = op->payload->recv_message.recv_message_ready; - calld->pp_recv_message = op->payload->recv_message.recv_message; - if (op->payload->recv_message.recv_message_ready) { - op->payload->recv_message.recv_message_ready = - &calld->hs_recv_message_ready; - } - if (op->on_complete) { - calld->on_complete = op->on_complete; - op->on_complete = &calld->hs_on_complete; - } + calld->recv_message = op->payload->recv_message.recv_message; + calld->original_recv_message_ready = + op->payload->recv_message.recv_message_ready; + op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } if (op->send_trailing_metadata) { @@ -387,15 +381,12 @@ static void hs_start_transport_stream_op_batch( /* Constructor for call_data */ static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - /* grab pointers to our data from the call element */ call_data* calld = static_cast<call_data*>(elem->call_data); - /* initialize members */ calld->call_combiner = args->call_combiner; - GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, + GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -405,7 +396,7 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); - if (calld->seen_path_with_query && !calld->payload_bin_delivered) { + if (calld->have_read_stream) { calld->read_stream->Orphan(); } } @@ -427,7 +418,7 @@ const grpc_channel_filter grpc_http_server_filter = { init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, - sizeof(channel_data), + 0, init_channel_elem, destroy_channel_elem, grpc_channel_next_get_info, |