diff options
author | Craig Tiller <ctiller@google.com> | 2017-08-31 12:24:15 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-08-31 12:24:15 -0700 |
commit | 890f542498a8af4c05f22e42d818f3b0eeafaea8 (patch) | |
tree | 4ccf813375c1744629ed43c5c5eaa62fbdffe7ac /src/core/ext/filters/http | |
parent | 5489d41c15926abbf12a5b8d27b24d1d605d7f0f (diff) | |
parent | ccad38227f63797318d7cffcba8a2df783394ccd (diff) |
Merge branch 'stats' into stats_histo
Diffstat (limited to 'src/core/ext/filters/http')
3 files changed, 169 insertions, 174 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index 99ddd08e6a..3ca01a41b5 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -36,7 +36,6 @@ static const size_t kMaxPayloadSizeForGet = 2048; typedef struct call_data { - grpc_call_combiner *call_combiner; // State for handling send_initial_metadata ops. grpc_linked_mdelem method; grpc_linked_mdelem scheme; @@ -216,13 +215,13 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, call_data *calld = (call_data *)elem->call_data; if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + exec_ctx, calld->send_message_batch, error); return; } error = pull_slice_from_send_message(exec_ctx, calld); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + exec_ctx, calld->send_message_batch, error); return; } // There may or may not be more to read, but we don't care. If we got @@ -415,7 +414,7 @@ static void hc_start_transport_stream_op_batch( done: if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + exec_ctx, calld->send_message_batch, error); } else if (!batch_will_be_handled_asynchronously) { grpc_call_next_op(exec_ctx, elem, batch); } @@ -427,7 +426,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = (call_data *)elem->call_data; - calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); @@ -567,5 +565,6 @@ const grpc_channel_filter grpc_http_client_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "http-client"}; diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 98a503cafc..a32819bfe4 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -35,29 +35,35 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -typedef enum { - // Initial metadata not yet seen. - INITIAL_METADATA_UNSEEN = 0, - // Initial metadata seen; compression algorithm set. - HAS_COMPRESSION_ALGORITHM, - // Initial metadata seen; no compression algorithm set. - NO_COMPRESSION_ALGORITHM, -} initial_metadata_state; +#define INITIAL_METADATA_UNSEEN 0 +#define HAS_COMPRESSION_ALGORITHM 2 +#define NO_COMPRESSION_ALGORITHM 4 + +#define CANCELLED_BIT ((gpr_atm)1) typedef struct call_data { - grpc_call_combiner *call_combiner; + grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; grpc_linked_mdelem accept_stream_encoding_storage; + uint32_t remaining_slice_bytes; /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; - initial_metadata_state send_initial_metadata_state; - grpc_error *cancel_error; - grpc_closure start_send_message_batch_in_call_combiner; + + /* Atomic recording the state of initial metadata; allowed values: + INITIAL_METADATA_UNSEEN - initial metadata op not seen + HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm + set + NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm + set + pointer - a stalled op containing a send_message that's waiting on initial + metadata + pointer | CANCELLED_BIT - request was cancelled with error pointed to */ + gpr_atm send_initial_metadata_state; + grpc_transport_stream_op_batch *send_message_batch; - grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_slice_buffer_stream replacement_stream; grpc_closure *original_send_message_on_complete; grpc_closure send_message_on_complete; @@ -86,13 +92,13 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags, channel_data *channeld = elem->channel_data; if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { - return true; + return 1; } if (has_compression_algorithm) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { - return true; + return 1; } - return false; /* we have an actual call-specific algorithm */ + return 0; /* we have an actual call-specific algorithm */ } /* no per-call compression override */ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; @@ -220,18 +226,6 @@ static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; - // Note: The call to grpc_call_next_op() results in yielding the - // call combiner, so we need to clear calld->send_message_batch - // before we do that. - grpc_transport_stream_op_batch *send_message_batch = - calld->send_message_batch; - calld->send_message_batch = NULL; - grpc_call_next_op(exec_ctx, elem, send_message_batch); -} - static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = (call_data *)elem->call_data; @@ -240,8 +234,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&tmp); uint32_t send_flags = calld->send_message_batch->payload->send_message.send_message->flags; - bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + const bool did_compress = grpc_msg_compress( + exec_ctx, calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -279,19 +273,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; - send_message_batch_continue(exec_ctx, elem); -} - -static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error) { - call_data *calld = arg; - if (calld->send_message_batch != NULL) { - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error), - calld->call_combiner); - calld->send_message_batch = NULL; - } + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); } // Pulls a slice from the send_message byte stream and adds it to calld->slices. @@ -311,25 +293,21 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = (call_data *)elem->call_data; while (grpc_byte_stream_next( exec_ctx, calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, &calld->on_send_message_next_done)) { grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); - if (error != GRPC_ERROR_NONE) { - // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); - GRPC_ERROR_UNREF(error); - return; - } + if (error != GRPC_ERROR_NONE) return error; if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); break; } } + return GRPC_ERROR_NONE; } // Async callback for grpc_byte_stream_next(). @@ -337,37 +315,46 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)arg; call_data *calld = (call_data *)elem->call_data; - if (error != GRPC_ERROR_NONE) { - // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); - return; - } + if (error != GRPC_ERROR_NONE) goto fail; error = pull_slice_from_send_message(exec_ctx, calld); - if (error != GRPC_ERROR_NONE) { - // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); - GRPC_ERROR_UNREF(error); - return; - } + if (error != GRPC_ERROR_NONE) goto fail; if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); } else { - continue_reading_send_message(exec_ctx, elem); + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // this function again. + error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) goto fail; } + return; +fail: + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); } -static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *unused) { - grpc_call_element *elem = (grpc_call_element *)arg; +static void start_send_message_batch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch, + bool has_compression_algorithm) { call_data *calld = (call_data *)elem->call_data; - if (skip_compression( - elem, - calld->send_message_batch->payload->send_message.send_message->flags, - calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { - send_message_batch_continue(exec_ctx, elem); + if (!skip_compression(elem, batch->payload->send_message.send_message->flags, + has_compression_algorithm)) { + calld->send_message_batch = batch; + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // on_send_message_next_done(). + grpc_error *error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } } else { - continue_reading_send_message(exec_ctx, elem); + /* pass control down the stack */ + grpc_call_next_op(exec_ctx, elem, batch); } } @@ -375,80 +362,95 @@ static void compress_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); - // Handle cancel_stream. + if (batch->cancel_stream) { - GRPC_ERROR_UNREF(calld->cancel_error); - calld->cancel_error = - GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (calld->send_message_batch != NULL) { - if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { - GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, - GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); - } else { - grpc_byte_stream_shutdown( - exec_ctx, - calld->send_message_batch->payload->send_message.send_message, - GRPC_ERROR_REF(calld->cancel_error)); - } + // TODO(roth): As part of the upcoming call combiner work, change + // this to call grpc_byte_stream_shutdown() on the incoming byte + // stream, to cancel any in-flight calls to grpc_byte_stream_next(). + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + gpr_atm cur = gpr_atm_full_xchg( + &calld->send_initial_metadata_state, + CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error); + switch (cur) { + case HAS_COMPRESSION_ALGORITHM: + case NO_COMPRESSION_ALGORITHM: + case INITIAL_METADATA_UNSEEN: + break; + default: + if ((cur & CANCELLED_BIT) == 0) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, (grpc_transport_stream_op_batch *)cur, + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error)); + } else { + GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT)); + } + break; } - } else if (calld->cancel_error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error), - calld->call_combiner); - goto done; } - // Handle send_initial_metadata. + if (batch->send_initial_metadata) { - GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; grpc_error *error = process_send_initial_metadata( exec_ctx, elem, batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, - calld->call_combiner); - goto done; + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, + error); + return; } - calld->send_initial_metadata_state = has_compression_algorithm - ? HAS_COMPRESSION_ALGORITHM - : NO_COMPRESSION_ALGORITHM; - // If we had previously received a batch containing a send_message op, - // handle it now. Note that we need to re-enter the call combiner - // for this, since we can't send two batches down while holding the - // call combiner, since the connected_channel filter (at the bottom of - // the call stack) will release the call combiner for each batch it sees. - if (calld->send_message_batch != NULL) { - GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, - &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, - "starting send_message after send_initial_metadata"); + gpr_atm cur; + retry_send_im: + cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); + GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM && + cur != NO_COMPRESSION_ALGORITHM); + if ((cur & CANCELLED_BIT) == 0) { + if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, + has_compression_algorithm + ? HAS_COMPRESSION_ALGORITHM + : NO_COMPRESSION_ALGORITHM)) { + goto retry_send_im; + } + if (cur != INITIAL_METADATA_UNSEEN) { + start_send_message_batch(exec_ctx, elem, + (grpc_transport_stream_op_batch *)cur, + has_compression_algorithm); + } } } - // Handle send_message. if (batch->send_message) { - GPR_ASSERT(calld->send_message_batch == NULL); - calld->send_message_batch = batch; - // If we have not yet seen send_initial_metadata, then we have to - // wait. We save the batch in calld and then drop the call - // combiner, which we'll have to pick up again later when we get - // send_initial_metadata. - if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { - GRPC_CALL_COMBINER_STOP( - exec_ctx, calld->call_combiner, - "send_message batch pending send_initial_metadata"); - goto done; + gpr_atm cur; + retry_send: + cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); + switch (cur) { + case INITIAL_METADATA_UNSEEN: + if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, + (gpr_atm)batch)) { + goto retry_send; + } + break; + case HAS_COMPRESSION_ALGORITHM: + case NO_COMPRESSION_ALGORITHM: + start_send_message_batch(exec_ctx, elem, batch, + cur == HAS_COMPRESSION_ALGORITHM); + break; + default: + if (cur & CANCELLED_BIT) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, + GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT))); + } else { + /* >1 send_message concurrently */ + GPR_UNREACHABLE_CODE(break); + } } - start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE); } else { - // Pass control down the stack. + /* pass control down the stack */ grpc_call_next_op(exec_ctx, elem, batch); } -done: + GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } @@ -456,16 +458,16 @@ done: static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = (call_data *)elem->call_data; - calld->call_combiner = args->call_combiner; - calld->cancel_error = GRPC_ERROR_NONE; + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + + /* initialize members */ grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner, - start_send_message_batch, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, elem, grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; } @@ -473,9 +475,14 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *ignored) { + /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); - GRPC_ERROR_UNREF(calld->cancel_error); + gpr_atm imstate = + gpr_atm_no_barrier_load(&calld->send_initial_metadata_state); + if (imstate & CANCELLED_BIT) { + GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT)); + } } /* Constructor for channel_data */ @@ -543,5 +550,6 @@ const grpc_channel_filter grpc_message_compress_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, - "message_compress"}; + "compress"}; diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index a10e69ba59..b145f12aff 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -32,8 +32,6 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 typedef struct call_data { - grpc_call_combiner *call_combiner; - grpc_linked_mdelem status; grpc_linked_mdelem content_type; @@ -283,11 +281,7 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; - // Re-enter call combiner for recv_message_ready, since the surface - // code will release the call combiner for each callback it receives. - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - calld->recv_message_ready, GRPC_ERROR_REF(err), - "resuming recv_message_ready from on_complete"); + GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); calld->recv_message_ready = NULL; calld->payload_bin_delivered = true; } @@ -299,20 +293,15 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (calld->seen_path_with_query) { - // Do nothing. This is probably a GET request, and payload will be - // returned in hs_on_complete callback. - // Note that we release the call combiner here, so that other - // callbacks can run. - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "pausing recv_message_ready until on_complete"); + /* do nothing. This is probably a GET request, and payload will be returned + in hs_on_complete callback. */ } else { GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); } } -static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; @@ -334,7 +323,10 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata)); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; + } } if (op->recv_initial_metadata) { @@ -367,25 +359,21 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_error *error = server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_trailing_metadata.send_trailing_metadata); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; + } } - - return GRPC_ERROR_NONE; } -static void hs_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - call_data *calld = elem->call_data; - GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0); - grpc_error *error = hs_mutate_op(exec_ctx, elem, op); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error, - calld->call_combiner); - } else { - grpc_call_next_op(exec_ctx, elem, op); - } - GPR_TIMER_END("hs_start_transport_stream_op_batch", 0); +static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + GPR_TIMER_BEGIN("hs_start_transport_op", 0); + hs_mutate_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, op); + GPR_TIMER_END("hs_start_transport_op", 0); } /* Constructor for call_data */ @@ -395,7 +383,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, @@ -427,7 +414,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_stream_op_batch, + hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, @@ -436,5 +423,6 @@ const grpc_channel_filter grpc_http_server_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "http-server"}; |