diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-07 15:51:35 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-07 15:51:35 -0700 |
commit | 23af48d9721c8c3409e03c8620ba717a12921814 (patch) | |
tree | cbc2d82cf2ce907cc6be2c380454726d9e9cba93 /src/core/ext/filters/http/server/http_server_filter.c | |
parent | 561dc32247534d9204d9f68e92259759875c6d93 (diff) | |
parent | 41630a29507c8dd5b6110f0397b346b7feab442b (diff) |
Merge github.com:grpc/grpc into server_stats
Diffstat (limited to 'src/core/ext/filters/http/server/http_server_filter.c')
-rw-r--r-- | src/core/ext/filters/http/server/http_server_filter.c | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index b145f12aff..a10e69ba59 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -32,6 +32,8 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 typedef struct call_data { + grpc_call_combiner *call_combiner; + grpc_linked_mdelem status; grpc_linked_mdelem content_type; @@ -281,7 +283,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; - GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); + // Re-enter call combiner for recv_message_ready, since the surface + // code will release the call combiner for each callback it receives. + GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, + calld->recv_message_ready, GRPC_ERROR_REF(err), + "resuming recv_message_ready from on_complete"); calld->recv_message_ready = NULL; calld->payload_bin_delivered = true; } @@ -293,15 +299,20 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (calld->seen_path_with_query) { - /* do nothing. This is probably a GET request, and payload will be returned - in hs_on_complete callback. */ + // Do nothing. This is probably a GET request, and payload will be + // returned in hs_on_complete callback. + // Note that we release the call combiner here, so that other + // callbacks can run. + GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, + "pausing recv_message_ready until on_complete"); } else { GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); } } -static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; @@ -323,10 +334,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata)); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; - } + if (error != GRPC_ERROR_NONE) return error; } if (op->recv_initial_metadata) { @@ -359,21 +367,25 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_error *error = server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_trailing_metadata.send_trailing_metadata); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; - } + if (error != GRPC_ERROR_NONE) return error; } + + return GRPC_ERROR_NONE; } -static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - GPR_TIMER_BEGIN("hs_start_transport_op", 0); - hs_mutate_op(exec_ctx, elem, op); - grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("hs_start_transport_op", 0); +static void hs_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0); + grpc_error *error = hs_mutate_op(exec_ctx, elem, op); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error, + calld->call_combiner); + } else { + grpc_call_next_op(exec_ctx, elem, op); + } + GPR_TIMER_END("hs_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -383,6 +395,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ + calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, @@ -414,7 +427,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_op, + hs_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, @@ -423,6 +436,5 @@ const grpc_channel_filter grpc_http_server_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "http-server"}; |