aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http/server/http_server_filter.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/http/server/http_server_filter.c')
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c58
1 files changed, 35 insertions, 23 deletions
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
index b145f12aff..a10e69ba59 100644
--- a/src/core/ext/filters/http/server/http_server_filter.c
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -32,6 +32,8 @@
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
+
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;
@@ -281,7 +283,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
*calld->pp_recv_message = calld->payload_bin_delivered
? NULL
: (grpc_byte_stream *)&calld->read_stream;
- GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
+ // Re-enter call combiner for recv_message_ready, since the surface
+ // code will release the call combiner for each callback it receives.
+ GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
+ calld->recv_message_ready, GRPC_ERROR_REF(err),
+ "resuming recv_message_ready from on_complete");
calld->recv_message_ready = NULL;
calld->payload_bin_delivered = true;
}
@@ -293,15 +299,20 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (calld->seen_path_with_query) {
- /* do nothing. This is probably a GET request, and payload will be returned
- in hs_on_complete callback. */
+ // Do nothing. This is probably a GET request, and payload will be
+ // returned in hs_on_complete callback.
+ // Note that we release the call combiner here, so that other
+ // callbacks can run.
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "pausing recv_message_ready until on_complete");
} else {
GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
}
}
-static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -323,10 +334,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata));
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
}
if (op->recv_initial_metadata) {
@@ -359,21 +367,25 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_error *error = server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_trailing_metadata.send_trailing_metadata);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
}
+
+ return GRPC_ERROR_NONE;
}
-static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- GPR_TIMER_BEGIN("hs_start_transport_op", 0);
- hs_mutate_op(exec_ctx, elem, op);
- grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("hs_start_transport_op", 0);
+static void hs_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ call_data *calld = elem->call_data;
+ GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0);
+ grpc_error *error = hs_mutate_op(exec_ctx, elem, op);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error,
+ calld->call_combiner);
+ } else {
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+ GPR_TIMER_END("hs_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -383,6 +395,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
+ calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem,
@@ -414,7 +427,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op,
+ hs_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
@@ -423,6 +436,5 @@ const grpc_channel_filter grpc_http_server_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-server"};