diff options
Diffstat (limited to 'src/core/ext/filters/http')
3 files changed, 174 insertions, 170 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index 3ca01a41b5..2d7429c41e 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -36,6 +36,7 @@ static const size_t kMaxPayloadSizeForGet = 2048; typedef struct call_data { + grpc_call_combiner *call_combiner; // State for handling send_initial_metadata ops. grpc_linked_mdelem method; grpc_linked_mdelem scheme; @@ -215,13 +216,13 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, call_data *calld = (call_data *)elem->call_data; if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error); + exec_ctx, calld->send_message_batch, error, calld->call_combiner); return; } error = pull_slice_from_send_message(exec_ctx, calld); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error); + exec_ctx, calld->send_message_batch, error, calld->call_combiner); return; } // There may or may not be more to read, but we don't care. If we got @@ -302,7 +303,6 @@ static void hc_start_transport_stream_op_batch( call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0); - GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); if (batch->recv_initial_metadata) { /* substitute our callback for the higher callback */ @@ -414,7 +414,7 @@ static void hc_start_transport_stream_op_batch( done: if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error); + exec_ctx, calld->send_message_batch, error, calld->call_combiner); } else if (!batch_will_be_handled_asynchronously) { grpc_call_next_op(exec_ctx, elem, batch); } @@ -426,6 +426,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = (call_data *)elem->call_data; + calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); @@ -565,6 +566,5 @@ const grpc_channel_filter grpc_http_client_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "http-client"}; diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index a32819bfe4..98a503cafc 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -35,35 +35,29 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -#define INITIAL_METADATA_UNSEEN 0 -#define HAS_COMPRESSION_ALGORITHM 2 -#define NO_COMPRESSION_ALGORITHM 4 - -#define CANCELLED_BIT ((gpr_atm)1) +typedef enum { + // Initial metadata not yet seen. + INITIAL_METADATA_UNSEEN = 0, + // Initial metadata seen; compression algorithm set. + HAS_COMPRESSION_ALGORITHM, + // Initial metadata seen; no compression algorithm set. + NO_COMPRESSION_ALGORITHM, +} initial_metadata_state; typedef struct call_data { - grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ + grpc_call_combiner *call_combiner; grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; grpc_linked_mdelem accept_stream_encoding_storage; - uint32_t remaining_slice_bytes; /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; - - /* Atomic recording the state of initial metadata; allowed values: - INITIAL_METADATA_UNSEEN - initial metadata op not seen - HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm - set - NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm - set - pointer - a stalled op containing a send_message that's waiting on initial - metadata - pointer | CANCELLED_BIT - request was cancelled with error pointed to */ - gpr_atm send_initial_metadata_state; - + initial_metadata_state send_initial_metadata_state; + grpc_error *cancel_error; + grpc_closure start_send_message_batch_in_call_combiner; grpc_transport_stream_op_batch *send_message_batch; + grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_slice_buffer_stream replacement_stream; grpc_closure *original_send_message_on_complete; grpc_closure send_message_on_complete; @@ -92,13 +86,13 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags, channel_data *channeld = elem->channel_data; if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { - return 1; + return true; } if (has_compression_algorithm) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { - return 1; + return true; } - return 0; /* we have an actual call-specific algorithm */ + return false; /* we have an actual call-specific algorithm */ } /* no per-call compression override */ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; @@ -226,6 +220,18 @@ static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_REF(error)); } +static void send_message_batch_continue(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = (call_data *)elem->call_data; + // Note: The call to grpc_call_next_op() results in yielding the + // call combiner, so we need to clear calld->send_message_batch + // before we do that. + grpc_transport_stream_op_batch *send_message_batch = + calld->send_message_batch; + calld->send_message_batch = NULL; + grpc_call_next_op(exec_ctx, elem, send_message_batch); +} + static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = (call_data *)elem->call_data; @@ -234,8 +240,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&tmp); uint32_t send_flags = calld->send_message_batch->payload->send_message.send_message->flags; - const bool did_compress = grpc_msg_compress( - exec_ctx, calld->compression_algorithm, &calld->slices, &tmp); + bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, + &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -273,7 +279,19 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; - grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); + send_message_batch_continue(exec_ctx, elem); +} + +static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { + call_data *calld = arg; + if (calld->send_message_batch != NULL) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error), + calld->call_combiner); + calld->send_message_batch = NULL; + } } // Pulls a slice from the send_message byte stream and adds it to calld->slices. @@ -293,21 +311,25 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void continue_reading_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = (call_data *)elem->call_data; while (grpc_byte_stream_next( exec_ctx, calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, &calld->on_send_message_next_done)) { grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) { + // Closure callback; does not take ownership of error. + fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + GRPC_ERROR_UNREF(error); + return; + } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); break; } } - return GRPC_ERROR_NONE; } // Async callback for grpc_byte_stream_next(). @@ -315,46 +337,37 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)arg; call_data *calld = (call_data *)elem->call_data; - if (error != GRPC_ERROR_NONE) goto fail; + if (error != GRPC_ERROR_NONE) { + // Closure callback; does not take ownership of error. + fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + return; + } error = pull_slice_from_send_message(exec_ctx, calld); - if (error != GRPC_ERROR_NONE) goto fail; + if (error != GRPC_ERROR_NONE) { + // Closure callback; does not take ownership of error. + fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); + GRPC_ERROR_UNREF(error); + return; + } if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); } else { - // This will either finish reading all of the data and invoke - // finish_send_message(), or else it will make an async call to - // grpc_byte_stream_next(), which will eventually result in calling - // this function again. - error = continue_reading_send_message(exec_ctx, elem); - if (error != GRPC_ERROR_NONE) goto fail; + continue_reading_send_message(exec_ctx, elem); } - return; -fail: - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error); } -static void start_send_message_batch(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *batch, - bool has_compression_algorithm) { +static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *unused) { + grpc_call_element *elem = (grpc_call_element *)arg; call_data *calld = (call_data *)elem->call_data; - if (!skip_compression(elem, batch->payload->send_message.send_message->flags, - has_compression_algorithm)) { - calld->send_message_batch = batch; - // This will either finish reading all of the data and invoke - // finish_send_message(), or else it will make an async call to - // grpc_byte_stream_next(), which will eventually result in calling - // on_send_message_next_done(). - grpc_error *error = continue_reading_send_message(exec_ctx, elem); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error); - } + if (skip_compression( + elem, + calld->send_message_batch->payload->send_message.send_message->flags, + calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { + send_message_batch_continue(exec_ctx, elem); } else { - /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, batch); + continue_reading_send_message(exec_ctx, elem); } } @@ -362,95 +375,80 @@ static void compress_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; - GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); - + // Handle cancel_stream. if (batch->cancel_stream) { - // TODO(roth): As part of the upcoming call combiner work, change - // this to call grpc_byte_stream_shutdown() on the incoming byte - // stream, to cancel any in-flight calls to grpc_byte_stream_next(). - GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - gpr_atm cur = gpr_atm_full_xchg( - &calld->send_initial_metadata_state, - CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error); - switch (cur) { - case HAS_COMPRESSION_ALGORITHM: - case NO_COMPRESSION_ALGORITHM: - case INITIAL_METADATA_UNSEEN: - break; - default: - if ((cur & CANCELLED_BIT) == 0) { - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, (grpc_transport_stream_op_batch *)cur, - GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error)); - } else { - GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT)); - } - break; + GRPC_ERROR_UNREF(calld->cancel_error); + calld->cancel_error = + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + if (calld->send_message_batch != NULL) { + if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { + GRPC_CALL_COMBINER_START( + exec_ctx, calld->call_combiner, + GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); + } else { + grpc_byte_stream_shutdown( + exec_ctx, + calld->send_message_batch->payload->send_message.send_message, + GRPC_ERROR_REF(calld->cancel_error)); + } } + } else if (calld->cancel_error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error), + calld->call_combiner); + goto done; } - + // Handle send_initial_metadata. if (batch->send_initial_metadata) { + GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; grpc_error *error = process_send_initial_metadata( exec_ctx, elem, batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, - error); - return; + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, + calld->call_combiner); + goto done; } - gpr_atm cur; - retry_send_im: - cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); - GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM && - cur != NO_COMPRESSION_ALGORITHM); - if ((cur & CANCELLED_BIT) == 0) { - if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, - has_compression_algorithm - ? HAS_COMPRESSION_ALGORITHM - : NO_COMPRESSION_ALGORITHM)) { - goto retry_send_im; - } - if (cur != INITIAL_METADATA_UNSEEN) { - start_send_message_batch(exec_ctx, elem, - (grpc_transport_stream_op_batch *)cur, - has_compression_algorithm); - } + calld->send_initial_metadata_state = has_compression_algorithm + ? HAS_COMPRESSION_ALGORITHM + : NO_COMPRESSION_ALGORITHM; + // If we had previously received a batch containing a send_message op, + // handle it now. Note that we need to re-enter the call combiner + // for this, since we can't send two batches down while holding the + // call combiner, since the connected_channel filter (at the bottom of + // the call stack) will release the call combiner for each batch it sees. + if (calld->send_message_batch != NULL) { + GRPC_CALL_COMBINER_START( + exec_ctx, calld->call_combiner, + &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, + "starting send_message after send_initial_metadata"); } } + // Handle send_message. if (batch->send_message) { - gpr_atm cur; - retry_send: - cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); - switch (cur) { - case INITIAL_METADATA_UNSEEN: - if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, - (gpr_atm)batch)) { - goto retry_send; - } - break; - case HAS_COMPRESSION_ALGORITHM: - case NO_COMPRESSION_ALGORITHM: - start_send_message_batch(exec_ctx, elem, batch, - cur == HAS_COMPRESSION_ALGORITHM); - break; - default: - if (cur & CANCELLED_BIT) { - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, batch, - GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT))); - } else { - /* >1 send_message concurrently */ - GPR_UNREACHABLE_CODE(break); - } + GPR_ASSERT(calld->send_message_batch == NULL); + calld->send_message_batch = batch; + // If we have not yet seen send_initial_metadata, then we have to + // wait. We save the batch in calld and then drop the call + // combiner, which we'll have to pick up again later when we get + // send_initial_metadata. + if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { + GRPC_CALL_COMBINER_STOP( + exec_ctx, calld->call_combiner, + "send_message batch pending send_initial_metadata"); + goto done; } + start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE); } else { - /* pass control down the stack */ + // Pass control down the stack. grpc_call_next_op(exec_ctx, elem, batch); } - +done: GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } @@ -458,16 +456,16 @@ static void compress_start_transport_stream_op_batch( static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - - /* initialize members */ + call_data *calld = (call_data *)elem->call_data; + calld->call_combiner = args->call_combiner; + calld->cancel_error = GRPC_ERROR_NONE; grpc_slice_buffer_init(&calld->slices); + GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner, + start_send_message_batch, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, elem, grpc_schedule_on_exec_ctx); - return GRPC_ERROR_NONE; } @@ -475,14 +473,9 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *ignored) { - /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); - gpr_atm imstate = - gpr_atm_no_barrier_load(&calld->send_initial_metadata_state); - if (imstate & CANCELLED_BIT) { - GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT)); - } + GRPC_ERROR_UNREF(calld->cancel_error); } /* Constructor for channel_data */ @@ -550,6 +543,5 @@ const grpc_channel_filter grpc_message_compress_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, - "compress"}; + "message_compress"}; diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index b145f12aff..a10e69ba59 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -32,6 +32,8 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 typedef struct call_data { + grpc_call_combiner *call_combiner; + grpc_linked_mdelem status; grpc_linked_mdelem content_type; @@ -281,7 +283,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; - GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); + // Re-enter call combiner for recv_message_ready, since the surface + // code will release the call combiner for each callback it receives. + GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, + calld->recv_message_ready, GRPC_ERROR_REF(err), + "resuming recv_message_ready from on_complete"); calld->recv_message_ready = NULL; calld->payload_bin_delivered = true; } @@ -293,15 +299,20 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (calld->seen_path_with_query) { - /* do nothing. This is probably a GET request, and payload will be returned - in hs_on_complete callback. */ + // Do nothing. This is probably a GET request, and payload will be + // returned in hs_on_complete callback. + // Note that we release the call combiner here, so that other + // callbacks can run. + GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, + "pausing recv_message_ready until on_complete"); } else { GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); } } -static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; @@ -323,10 +334,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata)); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; - } + if (error != GRPC_ERROR_NONE) return error; } if (op->recv_initial_metadata) { @@ -359,21 +367,25 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_error *error = server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_trailing_metadata.send_trailing_metadata); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; - } + if (error != GRPC_ERROR_NONE) return error; } + + return GRPC_ERROR_NONE; } -static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - GPR_TIMER_BEGIN("hs_start_transport_op", 0); - hs_mutate_op(exec_ctx, elem, op); - grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("hs_start_transport_op", 0); +static void hs_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0); + grpc_error *error = hs_mutate_op(exec_ctx, elem, op); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error, + calld->call_combiner); + } else { + grpc_call_next_op(exec_ctx, elem, op); + } + GPR_TIMER_END("hs_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -383,6 +395,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ + calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, @@ -414,7 +427,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_op, + hs_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, @@ -423,6 +436,5 @@ const grpc_channel_filter grpc_http_server_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "http-server"}; |