aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/http')
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.c10
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c276
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c58
3 files changed, 174 insertions, 170 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 3ca01a41b5..2d7429c41e 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -36,6 +36,7 @@
static const size_t kMaxPayloadSizeForGet = 2048;
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
@@ -215,13 +216,13 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = (call_data *)elem->call_data;
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
+ exec_ctx, calld->send_message_batch, error, calld->call_combiner);
return;
}
error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
+ exec_ctx, calld->send_message_batch, error, calld->call_combiner);
return;
}
// There may or may not be more to read, but we don't care. If we got
@@ -302,7 +303,6 @@ static void hc_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
if (batch->recv_initial_metadata) {
/* substitute our callback for the higher callback */
@@ -414,7 +414,7 @@ static void hc_start_transport_stream_op_batch(
done:
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
+ exec_ctx, calld->send_message_batch, error, calld->call_combiner);
} else if (!batch_will_be_handled_asynchronously) {
grpc_call_next_op(exec_ctx, elem, batch);
}
@@ -426,6 +426,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = (call_data *)elem->call_data;
+ calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -565,6 +566,5 @@ const grpc_channel_filter grpc_http_client_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-client"};
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index a32819bfe4..98a503cafc 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -35,35 +35,29 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
-#define INITIAL_METADATA_UNSEEN 0
-#define HAS_COMPRESSION_ALGORITHM 2
-#define NO_COMPRESSION_ALGORITHM 4
-
-#define CANCELLED_BIT ((gpr_atm)1)
+typedef enum {
+ // Initial metadata not yet seen.
+ INITIAL_METADATA_UNSEEN = 0,
+ // Initial metadata seen; compression algorithm set.
+ HAS_COMPRESSION_ALGORITHM,
+ // Initial metadata seen; no compression algorithm set.
+ NO_COMPRESSION_ALGORITHM,
+} initial_metadata_state;
typedef struct call_data {
- grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
+ grpc_call_combiner *call_combiner;
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
grpc_linked_mdelem accept_stream_encoding_storage;
- uint32_t remaining_slice_bytes;
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
-
- /* Atomic recording the state of initial metadata; allowed values:
- INITIAL_METADATA_UNSEEN - initial metadata op not seen
- HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm
- set
- NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm
- set
- pointer - a stalled op containing a send_message that's waiting on initial
- metadata
- pointer | CANCELLED_BIT - request was cancelled with error pointed to */
- gpr_atm send_initial_metadata_state;
-
+ initial_metadata_state send_initial_metadata_state;
+ grpc_error *cancel_error;
+ grpc_closure start_send_message_batch_in_call_combiner;
grpc_transport_stream_op_batch *send_message_batch;
+ grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_slice_buffer_stream replacement_stream;
grpc_closure *original_send_message_on_complete;
grpc_closure send_message_on_complete;
@@ -92,13 +86,13 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags,
channel_data *channeld = elem->channel_data;
if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
- return 1;
+ return true;
}
if (has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return 1;
+ return true;
}
- return 0; /* we have an actual call-specific algorithm */
+ return false; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@@ -226,6 +220,18 @@ static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_REF(error));
}
+static void send_message_batch_continue(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = (call_data *)elem->call_data;
+ // Note: The call to grpc_call_next_op() results in yielding the
+ // call combiner, so we need to clear calld->send_message_batch
+ // before we do that.
+ grpc_transport_stream_op_batch *send_message_batch =
+ calld->send_message_batch;
+ calld->send_message_batch = NULL;
+ grpc_call_next_op(exec_ctx, elem, send_message_batch);
+}
+
static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
@@ -234,8 +240,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
calld->send_message_batch->payload->send_message.send_message->flags;
- const bool did_compress = grpc_msg_compress(
- exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
+ bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
+ &calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -273,7 +279,19 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
calld->original_send_message_on_complete =
calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
- grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
+ send_message_batch_continue(exec_ctx, elem);
+}
+
+static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
+ call_data *calld = arg;
+ if (calld->send_message_batch != NULL) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error),
+ calld->call_combiner);
+ calld->send_message_batch = NULL;
+ }
}
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
@@ -293,21 +311,25 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
// If all data has been read, invokes finish_send_message(). Otherwise,
// an async call to grpc_byte_stream_next() has been started, which will
// eventually result in calling on_send_message_next_done().
-static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next(
exec_ctx, calld->send_message_batch->payload->send_message.send_message,
~(size_t)0, &calld->on_send_message_next_done)) {
grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) {
+ // Closure callback; does not take ownership of error.
+ fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
break;
}
}
- return GRPC_ERROR_NONE;
}
// Async callback for grpc_byte_stream_next().
@@ -315,46 +337,37 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- if (error != GRPC_ERROR_NONE) goto fail;
+ if (error != GRPC_ERROR_NONE) {
+ // Closure callback; does not take ownership of error.
+ fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
+ return;
+ }
error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) goto fail;
+ if (error != GRPC_ERROR_NONE) {
+ // Closure callback; does not take ownership of error.
+ fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
} else {
- // This will either finish reading all of the data and invoke
- // finish_send_message(), or else it will make an async call to
- // grpc_byte_stream_next(), which will eventually result in calling
- // this function again.
- error = continue_reading_send_message(exec_ctx, elem);
- if (error != GRPC_ERROR_NONE) goto fail;
+ continue_reading_send_message(exec_ctx, elem);
}
- return;
-fail:
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
}
-static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *batch,
- bool has_compression_algorithm) {
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *unused) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
- has_compression_algorithm)) {
- calld->send_message_batch = batch;
- // This will either finish reading all of the data and invoke
- // finish_send_message(), or else it will make an async call to
- // grpc_byte_stream_next(), which will eventually result in calling
- // on_send_message_next_done().
- grpc_error *error = continue_reading_send_message(exec_ctx, elem);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
- }
+ if (skip_compression(
+ elem,
+ calld->send_message_batch->payload->send_message.send_message->flags,
+ calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) {
+ send_message_batch_continue(exec_ctx, elem);
} else {
- /* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, batch);
+ continue_reading_send_message(exec_ctx, elem);
}
}
@@ -362,95 +375,80 @@ static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
-
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
-
+ // Handle cancel_stream.
if (batch->cancel_stream) {
- // TODO(roth): As part of the upcoming call combiner work, change
- // this to call grpc_byte_stream_shutdown() on the incoming byte
- // stream, to cancel any in-flight calls to grpc_byte_stream_next().
- GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- gpr_atm cur = gpr_atm_full_xchg(
- &calld->send_initial_metadata_state,
- CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
- switch (cur) {
- case HAS_COMPRESSION_ALGORITHM:
- case NO_COMPRESSION_ALGORITHM:
- case INITIAL_METADATA_UNSEEN:
- break;
- default:
- if ((cur & CANCELLED_BIT) == 0) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, (grpc_transport_stream_op_batch *)cur,
- GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
- } else {
- GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
- }
- break;
+ GRPC_ERROR_UNREF(calld->cancel_error);
+ calld->cancel_error =
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ if (calld->send_message_batch != NULL) {
+ if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
+ } else {
+ grpc_byte_stream_shutdown(
+ exec_ctx,
+ calld->send_message_batch->payload->send_message.send_message,
+ GRPC_ERROR_REF(calld->cancel_error));
+ }
}
+ } else if (calld->cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error),
+ calld->call_combiner);
+ goto done;
}
-
+ // Handle send_initial_metadata.
if (batch->send_initial_metadata) {
+ GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN);
bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- error);
- return;
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
+ calld->call_combiner);
+ goto done;
}
- gpr_atm cur;
- retry_send_im:
- cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
- GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM &&
- cur != NO_COMPRESSION_ALGORITHM);
- if ((cur & CANCELLED_BIT) == 0) {
- if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
- has_compression_algorithm
- ? HAS_COMPRESSION_ALGORITHM
- : NO_COMPRESSION_ALGORITHM)) {
- goto retry_send_im;
- }
- if (cur != INITIAL_METADATA_UNSEEN) {
- start_send_message_batch(exec_ctx, elem,
- (grpc_transport_stream_op_batch *)cur,
- has_compression_algorithm);
- }
+ calld->send_initial_metadata_state = has_compression_algorithm
+ ? HAS_COMPRESSION_ALGORITHM
+ : NO_COMPRESSION_ALGORITHM;
+ // If we had previously received a batch containing a send_message op,
+ // handle it now. Note that we need to re-enter the call combiner
+ // for this, since we can't send two batches down while holding the
+ // call combiner, since the connected_channel filter (at the bottom of
+ // the call stack) will release the call combiner for each batch it sees.
+ if (calld->send_message_batch != NULL) {
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, calld->call_combiner,
+ &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE,
+ "starting send_message after send_initial_metadata");
}
}
+ // Handle send_message.
if (batch->send_message) {
- gpr_atm cur;
- retry_send:
- cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
- switch (cur) {
- case INITIAL_METADATA_UNSEEN:
- if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
- (gpr_atm)batch)) {
- goto retry_send;
- }
- break;
- case HAS_COMPRESSION_ALGORITHM:
- case NO_COMPRESSION_ALGORITHM:
- start_send_message_batch(exec_ctx, elem, batch,
- cur == HAS_COMPRESSION_ALGORITHM);
- break;
- default:
- if (cur & CANCELLED_BIT) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch,
- GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
- } else {
- /* >1 send_message concurrently */
- GPR_UNREACHABLE_CODE(break);
- }
+ GPR_ASSERT(calld->send_message_batch == NULL);
+ calld->send_message_batch = batch;
+ // If we have not yet seen send_initial_metadata, then we have to
+ // wait. We save the batch in calld and then drop the call
+ // combiner, which we'll have to pick up again later when we get
+ // send_initial_metadata.
+ if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
+ GRPC_CALL_COMBINER_STOP(
+ exec_ctx, calld->call_combiner,
+ "send_message batch pending send_initial_metadata");
+ goto done;
}
+ start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE);
} else {
- /* pass control down the stack */
+ // Pass control down the stack.
grpc_call_next_op(exec_ctx, elem, batch);
}
-
+done:
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
}
@@ -458,16 +456,16 @@ static void compress_start_transport_stream_op_batch(
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
-
- /* initialize members */
+ call_data *calld = (call_data *)elem->call_data;
+ calld->call_combiner = args->call_combiner;
+ calld->cancel_error = GRPC_ERROR_NONE;
grpc_slice_buffer_init(&calld->slices);
+ GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner,
+ start_send_message_batch, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
-
return GRPC_ERROR_NONE;
}
@@ -475,14 +473,9 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *ignored) {
- /* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
- gpr_atm imstate =
- gpr_atm_no_barrier_load(&calld->send_initial_metadata_state);
- if (imstate & CANCELLED_BIT) {
- GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT));
- }
+ GRPC_ERROR_UNREF(calld->cancel_error);
}
/* Constructor for channel_data */
@@ -550,6 +543,5 @@ const grpc_channel_filter grpc_message_compress_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
- "compress"};
+ "message_compress"};
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
index b145f12aff..a10e69ba59 100644
--- a/src/core/ext/filters/http/server/http_server_filter.c
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -32,6 +32,8 @@
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
+
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;
@@ -281,7 +283,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
*calld->pp_recv_message = calld->payload_bin_delivered
? NULL
: (grpc_byte_stream *)&calld->read_stream;
- GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
+ // Re-enter call combiner for recv_message_ready, since the surface
+ // code will release the call combiner for each callback it receives.
+ GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
+ calld->recv_message_ready, GRPC_ERROR_REF(err),
+ "resuming recv_message_ready from on_complete");
calld->recv_message_ready = NULL;
calld->payload_bin_delivered = true;
}
@@ -293,15 +299,20 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (calld->seen_path_with_query) {
- /* do nothing. This is probably a GET request, and payload will be returned
- in hs_on_complete callback. */
+ // Do nothing. This is probably a GET request, and payload will be
+ // returned in hs_on_complete callback.
+ // Note that we release the call combiner here, so that other
+ // callbacks can run.
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "pausing recv_message_ready until on_complete");
} else {
GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
}
}
-static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -323,10 +334,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata));
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
}
if (op->recv_initial_metadata) {
@@ -359,21 +367,25 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_error *error = server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_trailing_metadata.send_trailing_metadata);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
}
+
+ return GRPC_ERROR_NONE;
}
-static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- GPR_TIMER_BEGIN("hs_start_transport_op", 0);
- hs_mutate_op(exec_ctx, elem, op);
- grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("hs_start_transport_op", 0);
+static void hs_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ call_data *calld = elem->call_data;
+ GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0);
+ grpc_error *error = hs_mutate_op(exec_ctx, elem, op);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error,
+ calld->call_combiner);
+ } else {
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+ GPR_TIMER_END("hs_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -383,6 +395,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
+ calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem,
@@ -414,7 +427,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op,
+ hs_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
@@ -423,6 +436,5 @@ const grpc_channel_filter grpc_http_server_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-server"};