aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-08-29 16:59:07 -0700
committerGravatar GitHub <noreply@github.com>2017-08-29 16:59:07 -0700
commitbf19961d0a49b43cb528392efeb4880eeebb9b5e (patch)
tree1d4c96db4d3bdc05c634e5d386c14a77845a1758 /src/core/ext/filters/http
parent9811915ba3fa1ccdf44b6a70fe1b1dd4782cd508 (diff)
Revert "Implement call combiner"
Diffstat (limited to 'src/core/ext/filters/http')
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.c9
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c276
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c58
3 files changed, 169 insertions, 174 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 99ddd08e6a..3ca01a41b5 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -36,7 +36,6 @@
static const size_t kMaxPayloadSizeForGet = 2048;
typedef struct call_data {
- grpc_call_combiner *call_combiner;
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
@@ -216,13 +215,13 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = (call_data *)elem->call_data;
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error, calld->call_combiner);
+ exec_ctx, calld->send_message_batch, error);
return;
}
error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error, calld->call_combiner);
+ exec_ctx, calld->send_message_batch, error);
return;
}
// There may or may not be more to read, but we don't care. If we got
@@ -415,7 +414,7 @@ static void hc_start_transport_stream_op_batch(
done:
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error, calld->call_combiner);
+ exec_ctx, calld->send_message_batch, error);
} else if (!batch_will_be_handled_asynchronously) {
grpc_call_next_op(exec_ctx, elem, batch);
}
@@ -427,7 +426,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = (call_data *)elem->call_data;
- calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -567,5 +565,6 @@ const grpc_channel_filter grpc_http_client_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-client"};
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 98a503cafc..a32819bfe4 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -35,29 +35,35 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef enum {
- // Initial metadata not yet seen.
- INITIAL_METADATA_UNSEEN = 0,
- // Initial metadata seen; compression algorithm set.
- HAS_COMPRESSION_ALGORITHM,
- // Initial metadata seen; no compression algorithm set.
- NO_COMPRESSION_ALGORITHM,
-} initial_metadata_state;
+#define INITIAL_METADATA_UNSEEN 0
+#define HAS_COMPRESSION_ALGORITHM 2
+#define NO_COMPRESSION_ALGORITHM 4
+
+#define CANCELLED_BIT ((gpr_atm)1)
typedef struct call_data {
- grpc_call_combiner *call_combiner;
+ grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
grpc_linked_mdelem accept_stream_encoding_storage;
+ uint32_t remaining_slice_bytes;
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
- initial_metadata_state send_initial_metadata_state;
- grpc_error *cancel_error;
- grpc_closure start_send_message_batch_in_call_combiner;
+
+ /* Atomic recording the state of initial metadata; allowed values:
+ INITIAL_METADATA_UNSEEN - initial metadata op not seen
+ HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm
+ set
+ NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm
+ set
+ pointer - a stalled op containing a send_message that's waiting on initial
+ metadata
+ pointer | CANCELLED_BIT - request was cancelled with error pointed to */
+ gpr_atm send_initial_metadata_state;
+
grpc_transport_stream_op_batch *send_message_batch;
- grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_slice_buffer_stream replacement_stream;
grpc_closure *original_send_message_on_complete;
grpc_closure send_message_on_complete;
@@ -86,13 +92,13 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags,
channel_data *channeld = elem->channel_data;
if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
- return true;
+ return 1;
}
if (has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return true;
+ return 1;
}
- return false; /* we have an actual call-specific algorithm */
+ return 0; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@@ -220,18 +226,6 @@ static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_REF(error));
}
-static void send_message_batch_continue(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = (call_data *)elem->call_data;
- // Note: The call to grpc_call_next_op() results in yielding the
- // call combiner, so we need to clear calld->send_message_batch
- // before we do that.
- grpc_transport_stream_op_batch *send_message_batch =
- calld->send_message_batch;
- calld->send_message_batch = NULL;
- grpc_call_next_op(exec_ctx, elem, send_message_batch);
-}
-
static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
@@ -240,8 +234,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
calld->send_message_batch->payload->send_message.send_message->flags;
- bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
- &calld->slices, &tmp);
+ const bool did_compress = grpc_msg_compress(
+ exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -279,19 +273,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
calld->original_send_message_on_complete =
calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
- send_message_batch_continue(exec_ctx, elem);
-}
-
-static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error) {
- call_data *calld = arg;
- if (calld->send_message_batch != NULL) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error),
- calld->call_combiner);
- calld->send_message_batch = NULL;
- }
+ grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
}
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
@@ -311,25 +293,21 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
// If all data has been read, invokes finish_send_message(). Otherwise,
// an async call to grpc_byte_stream_next() has been started, which will
// eventually result in calling on_send_message_next_done().
-static void continue_reading_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next(
exec_ctx, calld->send_message_batch->payload->send_message.send_message,
~(size_t)0, &calld->on_send_message_next_done)) {
grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) {
- // Closure callback; does not take ownership of error.
- fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
- GRPC_ERROR_UNREF(error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
break;
}
}
+ return GRPC_ERROR_NONE;
}
// Async callback for grpc_byte_stream_next().
@@ -337,37 +315,46 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- if (error != GRPC_ERROR_NONE) {
- // Closure callback; does not take ownership of error.
- fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) goto fail;
error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) {
- // Closure callback; does not take ownership of error.
- fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
- GRPC_ERROR_UNREF(error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) goto fail;
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
} else {
- continue_reading_send_message(exec_ctx, elem);
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // this function again.
+ error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) goto fail;
}
+ return;
+fail:
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
}
-static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *unused) {
- grpc_call_element *elem = (grpc_call_element *)arg;
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch,
+ bool has_compression_algorithm) {
call_data *calld = (call_data *)elem->call_data;
- if (skip_compression(
- elem,
- calld->send_message_batch->payload->send_message.send_message->flags,
- calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) {
- send_message_batch_continue(exec_ctx, elem);
+ if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
+ has_compression_algorithm)) {
+ calld->send_message_batch = batch;
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // on_send_message_next_done().
+ grpc_error *error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ }
} else {
- continue_reading_send_message(exec_ctx, elem);
+ /* pass control down the stack */
+ grpc_call_next_op(exec_ctx, elem, batch);
}
}
@@ -375,80 +362,95 @@ static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
+
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
- // Handle cancel_stream.
+
if (batch->cancel_stream) {
- GRPC_ERROR_UNREF(calld->cancel_error);
- calld->cancel_error =
- GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- if (calld->send_message_batch != NULL) {
- if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
- GRPC_CALL_COMBINER_START(
- exec_ctx, calld->call_combiner,
- GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
- } else {
- grpc_byte_stream_shutdown(
- exec_ctx,
- calld->send_message_batch->payload->send_message.send_message,
- GRPC_ERROR_REF(calld->cancel_error));
- }
+ // TODO(roth): As part of the upcoming call combiner work, change
+ // this to call grpc_byte_stream_shutdown() on the incoming byte
+ // stream, to cancel any in-flight calls to grpc_byte_stream_next().
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ gpr_atm cur = gpr_atm_full_xchg(
+ &calld->send_initial_metadata_state,
+ CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
+ switch (cur) {
+ case HAS_COMPRESSION_ALGORITHM:
+ case NO_COMPRESSION_ALGORITHM:
+ case INITIAL_METADATA_UNSEEN:
+ break;
+ default:
+ if ((cur & CANCELLED_BIT) == 0) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, (grpc_transport_stream_op_batch *)cur,
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
+ } else {
+ GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
+ }
+ break;
}
- } else if (calld->cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error),
- calld->call_combiner);
- goto done;
}
- // Handle send_initial_metadata.
+
if (batch->send_initial_metadata) {
- GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN);
bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
- calld->call_combiner);
- goto done;
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ error);
+ return;
}
- calld->send_initial_metadata_state = has_compression_algorithm
- ? HAS_COMPRESSION_ALGORITHM
- : NO_COMPRESSION_ALGORITHM;
- // If we had previously received a batch containing a send_message op,
- // handle it now. Note that we need to re-enter the call combiner
- // for this, since we can't send two batches down while holding the
- // call combiner, since the connected_channel filter (at the bottom of
- // the call stack) will release the call combiner for each batch it sees.
- if (calld->send_message_batch != NULL) {
- GRPC_CALL_COMBINER_START(
- exec_ctx, calld->call_combiner,
- &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE,
- "starting send_message after send_initial_metadata");
+ gpr_atm cur;
+ retry_send_im:
+ cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
+ GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM &&
+ cur != NO_COMPRESSION_ALGORITHM);
+ if ((cur & CANCELLED_BIT) == 0) {
+ if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
+ has_compression_algorithm
+ ? HAS_COMPRESSION_ALGORITHM
+ : NO_COMPRESSION_ALGORITHM)) {
+ goto retry_send_im;
+ }
+ if (cur != INITIAL_METADATA_UNSEEN) {
+ start_send_message_batch(exec_ctx, elem,
+ (grpc_transport_stream_op_batch *)cur,
+ has_compression_algorithm);
+ }
}
}
- // Handle send_message.
if (batch->send_message) {
- GPR_ASSERT(calld->send_message_batch == NULL);
- calld->send_message_batch = batch;
- // If we have not yet seen send_initial_metadata, then we have to
- // wait. We save the batch in calld and then drop the call
- // combiner, which we'll have to pick up again later when we get
- // send_initial_metadata.
- if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
- GRPC_CALL_COMBINER_STOP(
- exec_ctx, calld->call_combiner,
- "send_message batch pending send_initial_metadata");
- goto done;
+ gpr_atm cur;
+ retry_send:
+ cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
+ switch (cur) {
+ case INITIAL_METADATA_UNSEEN:
+ if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
+ (gpr_atm)batch)) {
+ goto retry_send;
+ }
+ break;
+ case HAS_COMPRESSION_ALGORITHM:
+ case NO_COMPRESSION_ALGORITHM:
+ start_send_message_batch(exec_ctx, elem, batch,
+ cur == HAS_COMPRESSION_ALGORITHM);
+ break;
+ default:
+ if (cur & CANCELLED_BIT) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch,
+ GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
+ } else {
+ /* >1 send_message concurrently */
+ GPR_UNREACHABLE_CODE(break);
+ }
}
- start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE);
} else {
- // Pass control down the stack.
+ /* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, batch);
}
-done:
+
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
}
@@ -456,16 +458,16 @@ done:
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- call_data *calld = (call_data *)elem->call_data;
- calld->call_combiner = args->call_combiner;
- calld->cancel_error = GRPC_ERROR_NONE;
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+
+ /* initialize members */
grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner,
- start_send_message_batch, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
+
return GRPC_ERROR_NONE;
}
@@ -473,9 +475,14 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *ignored) {
+ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
- GRPC_ERROR_UNREF(calld->cancel_error);
+ gpr_atm imstate =
+ gpr_atm_no_barrier_load(&calld->send_initial_metadata_state);
+ if (imstate & CANCELLED_BIT) {
+ GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT));
+ }
}
/* Constructor for channel_data */
@@ -543,5 +550,6 @@ const grpc_channel_filter grpc_message_compress_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
- "message_compress"};
+ "compress"};
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
index a10e69ba59..b145f12aff 100644
--- a/src/core/ext/filters/http/server/http_server_filter.c
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -32,8 +32,6 @@
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
typedef struct call_data {
- grpc_call_combiner *call_combiner;
-
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;
@@ -283,11 +281,7 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
*calld->pp_recv_message = calld->payload_bin_delivered
? NULL
: (grpc_byte_stream *)&calld->read_stream;
- // Re-enter call combiner for recv_message_ready, since the surface
- // code will release the call combiner for each callback it receives.
- GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
- calld->recv_message_ready, GRPC_ERROR_REF(err),
- "resuming recv_message_ready from on_complete");
+ GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
calld->recv_message_ready = NULL;
calld->payload_bin_delivered = true;
}
@@ -299,20 +293,15 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (calld->seen_path_with_query) {
- // Do nothing. This is probably a GET request, and payload will be
- // returned in hs_on_complete callback.
- // Note that we release the call combiner here, so that other
- // callbacks can run.
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
- "pausing recv_message_ready until on_complete");
+ /* do nothing. This is probably a GET request, and payload will be returned
+ in hs_on_complete callback. */
} else {
GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
}
}
-static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -334,7 +323,10 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata));
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
}
if (op->recv_initial_metadata) {
@@ -367,25 +359,21 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
grpc_error *error = server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_trailing_metadata.send_trailing_metadata);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
}
-
- return GRPC_ERROR_NONE;
}
-static void hs_start_transport_stream_op_batch(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- call_data *calld = elem->call_data;
- GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0);
- grpc_error *error = hs_mutate_op(exec_ctx, elem, op);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error,
- calld->call_combiner);
- } else {
- grpc_call_next_op(exec_ctx, elem, op);
- }
- GPR_TIMER_END("hs_start_transport_stream_op_batch", 0);
+static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ GPR_TIMER_BEGIN("hs_start_transport_op", 0);
+ hs_mutate_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
+ GPR_TIMER_END("hs_start_transport_op", 0);
}
/* Constructor for call_data */
@@ -395,7 +383,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
- calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem,
@@ -427,7 +414,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_stream_op_batch,
+ hs_start_transport_op,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
@@ -436,5 +423,6 @@ const grpc_channel_filter grpc_http_server_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-server"};