diff options
Diffstat (limited to 'src/core/ext/filters/http/server')
-rw-r--r-- | src/core/ext/filters/http/server/http_server_filter.c | 58 |
1 files changed, 23 insertions, 35 deletions
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index a10e69ba59..b145f12aff 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -32,8 +32,6 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 typedef struct call_data { - grpc_call_combiner *call_combiner; - grpc_linked_mdelem status; grpc_linked_mdelem content_type; @@ -283,11 +281,7 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; - // Re-enter call combiner for recv_message_ready, since the surface - // code will release the call combiner for each callback it receives. - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - calld->recv_message_ready, GRPC_ERROR_REF(err), - "resuming recv_message_ready from on_complete"); + GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); calld->recv_message_ready = NULL; calld->payload_bin_delivered = true; } @@ -299,20 +293,15 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (calld->seen_path_with_query) { - // Do nothing. This is probably a GET request, and payload will be - // returned in hs_on_complete callback. - // Note that we release the call combiner here, so that other - // callbacks can run. - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "pausing recv_message_ready until on_complete"); + /* do nothing. This is probably a GET request, and payload will be returned + in hs_on_complete callback. */ } else { GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); } } -static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; @@ -334,7 +323,10 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata)); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; + } } if (op->recv_initial_metadata) { @@ -367,25 +359,21 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_error *error = server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_trailing_metadata.send_trailing_metadata); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; + } } - - return GRPC_ERROR_NONE; } -static void hs_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - call_data *calld = elem->call_data; - GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0); - grpc_error *error = hs_mutate_op(exec_ctx, elem, op); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error, - calld->call_combiner); - } else { - grpc_call_next_op(exec_ctx, elem, op); - } - GPR_TIMER_END("hs_start_transport_stream_op_batch", 0); +static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + GPR_TIMER_BEGIN("hs_start_transport_op", 0); + hs_mutate_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, op); + GPR_TIMER_END("hs_start_transport_op", 0); } /* Constructor for call_data */ @@ -395,7 +383,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, @@ -427,7 +414,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_stream_op_batch, + hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, @@ -436,5 +423,6 @@ const grpc_channel_filter grpc_http_server_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "http-server"}; |