aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_args.c7
-rw-r--r--src/core/lib/channel/channel_args.h2
-rw-r--r--src/core/lib/channel/channel_stack.c11
-rw-r--r--src/core/lib/channel/channel_stack.h11
-rw-r--r--src/core/lib/channel/compress_filter.c36
-rw-r--r--src/core/lib/channel/connected_channel.c8
-rw-r--r--src/core/lib/channel/deadline_filter.c44
-rw-r--r--src/core/lib/channel/deadline_filter.h6
-rw-r--r--src/core/lib/channel/http_client_filter.c123
-rw-r--r--src/core/lib/channel/http_server_filter.c82
-rw-r--r--src/core/lib/channel/max_age_filter.c386
-rw-r--r--src/core/lib/channel/max_age_filter.h39
-rw-r--r--src/core/lib/channel/message_size_filter.c29
13 files changed, 207 insertions, 577 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 1a099ac437..a6d124c719 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -346,3 +346,10 @@ int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) {
}
return arg->value.integer;
}
+
+bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args) {
+ const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_MINIMAL_STACK);
+ if (arg == NULL) return false;
+ if (arg->type == GRPC_ARG_INTEGER && arg->value.integer == 0) return false;
+ return true;
+}
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 5c7d31f8bb..158cda5b21 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -113,6 +113,8 @@ grpc_channel_args *grpc_channel_args_set_socket_mutator(
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name);
+bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args);
+
typedef struct grpc_integer_options {
int default_value; // Return this if value is outside of expected bounds.
int min_value;
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 6d53b0576e..94382980eb 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -246,9 +246,9 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
}
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
grpc_call_element *next_elem = elem + 1;
- next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op);
+ next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op);
}
char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
@@ -284,7 +284,8 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_error *error) {
- grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL);
- op->cancel_error = error;
- elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+ grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL);
+ op->cancel_stream = true;
+ op->payload->cancel_stream.cancel_error = error;
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 80e3603e8d..fdbcbdb018 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -112,9 +112,9 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op);
+ void (*start_transport_stream_op_batch)(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@@ -281,7 +281,7 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set(
grpc_polling_entity *pollent);
/* Call the next operation in a call stack */
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op_batch *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
@@ -300,7 +300,8 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_transport_stream_op *op);
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op);
void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 02dc479f3a..4625cba0d2 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -62,7 +62,7 @@ typedef struct call_data {
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
- grpc_transport_stream_op *send_op;
+ grpc_transport_stream_op_batch *send_op;
uint32_t send_length;
uint32_t send_flags;
grpc_slice incoming_slice;
@@ -210,7 +210,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
- calld->send_op->send_message = &calld->replacement_stream.base;
+ calld->send_op->payload->send_message.send_message =
+ &calld->replacement_stream.base;
calld->post_send = calld->send_op->on_complete;
calld->send_op->on_complete = &calld->send_done;
@@ -231,9 +232,9 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
- while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
- &calld->incoming_slice, ~(size_t)0,
- &calld->got_slice)) {
+ while (grpc_byte_stream_next(
+ exec_ctx, calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
@@ -242,33 +243,34 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
}
}
-static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void compress_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
- GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);
+ GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
if (op->send_initial_metadata) {
grpc_error *error = process_send_initial_metadata(
- exec_ctx, elem, op->send_initial_metadata);
+ exec_ctx, elem,
+ op->payload->send_initial_metadata.send_initial_metadata);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return;
}
}
- if (op->send_message != NULL &&
- !skip_compression(elem, op->send_message->flags)) {
+ if (op->send_message &&
+ !skip_compression(elem, op->payload->send_message.send_message->flags)) {
calld->send_op = op;
- calld->send_length = op->send_message->length;
- calld->send_flags = op->send_message->flags;
+ calld->send_length = op->payload->send_message.send_message->length;
+ calld->send_flags = op->payload->send_message.send_message->flags;
continue_send_message(exec_ctx, elem);
} else {
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
}
- GPR_TIMER_END("compress_start_transport_stream_op", 0);
+ GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -337,7 +339,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_compress_filter = {
- compress_start_transport_stream_op,
+ compress_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 75c68a5534..22caf24373 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -62,9 +62,9 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void con_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -142,7 +142,7 @@ static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
const grpc_channel_info *channel_info) {}
const grpc_channel_filter grpc_connected_filter = {
- con_start_transport_stream_op,
+ con_start_transport_stream_op_batch,
con_start_transport_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index ca701ed457..fda099b021 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -134,7 +134,7 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
// Inject our own on_complete callback into op.
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
- grpc_transport_stream_op* op) {
+ grpc_transport_stream_op_batch* op) {
deadline_state->next_on_complete = op->on_complete;
grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state,
grpc_schedule_on_exec_ctx);
@@ -196,16 +196,16 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
start_timer_if_needed(exec_ctx, elem, new_deadline);
}
-void grpc_deadline_state_client_start_transport_stream_op(
+void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op* op) {
+ grpc_transport_stream_op_batch* op) {
grpc_deadline_state* deadline_state = elem->call_data;
- if (op->cancel_error != GRPC_ERROR_NONE) {
+ if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, deadline_state);
} else {
// Make sure we know when the call is complete, so that we can cancel
// the timer.
- if (op->recv_trailing_metadata != NULL) {
+ if (op->recv_trailing_metadata) {
inject_on_complete_cb(deadline_state, op);
}
}
@@ -261,10 +261,11 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
}
// Method for starting a call op for client filter.
-static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- grpc_transport_stream_op* op) {
- grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
+static void client_start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
+ grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
+ op);
// Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -282,30 +283,33 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
}
// Method for starting a call op for server filter.
-static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- grpc_transport_stream_op* op) {
+static void server_start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
server_call_data* calld = elem->call_data;
- if (op->cancel_error != GRPC_ERROR_NONE) {
+ if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
} else {
// If we're receiving initial metadata, we need to get the deadline
// from the recv_initial_metadata_ready callback. So we inject our
// own callback into that hook.
- if (op->recv_initial_metadata_ready != NULL) {
- calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
- calld->recv_initial_metadata = op->recv_initial_metadata;
+ if (op->recv_initial_metadata) {
+ calld->next_recv_initial_metadata_ready =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
grpc_closure_init(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
- op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
// the timer.
// Note that we trigger this on recv_trailing_metadata, even though
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
- if (op->recv_trailing_metadata != NULL) {
+ if (op->recv_trailing_metadata) {
inject_on_complete_cb(&calld->base.deadline_state, op);
}
}
@@ -314,7 +318,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
}
const grpc_channel_filter grpc_client_deadline_filter = {
- client_start_transport_stream_op,
+ client_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(base_call_data),
init_call_elem,
@@ -329,7 +333,7 @@ const grpc_channel_filter grpc_client_deadline_filter = {
};
const grpc_channel_filter grpc_server_deadline_filter = {
- server_start_transport_stream_op,
+ server_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(server_call_data),
init_call_elem,
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
index 72cd5cb929..d8db9a9f97 100644
--- a/src/core/lib/channel/deadline_filter.h
+++ b/src/core/lib/channel/deadline_filter.h
@@ -83,15 +83,15 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline);
-// To be called from the client-side filter's start_transport_stream_op()
+// To be called from the client-side filter's start_transport_stream_op_batch()
// method. Ensures that the deadline timer is cancelled when the call
// is completed.
//
// Note: It is the caller's responsibility to chain to the next filter if
// necessary after this function returns.
-void grpc_deadline_state_client_start_transport_stream_op(
+void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op* op);
+ grpc_transport_stream_op_batch* op);
// Deadline filters for direct client channels and server channels.
// Note: Deadlines for non-direct client channels are handled by the
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 17af2013d9..4e47c5c658 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -63,7 +63,7 @@ typedef struct call_data {
uint8_t *payload_bytes;
/* Vars to read data off of send_message */
- grpc_transport_stream_op send_op;
+ grpc_transport_stream_op_batch *send_op;
uint32_t send_length;
uint32_t send_flags;
grpc_slice incoming_slice;
@@ -219,9 +219,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes;
- while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
- &calld->incoming_slice, ~(size_t)0,
- &calld->got_slice)) {
+ while (grpc_byte_stream_next(
+ exec_ctx, calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice));
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@@ -242,10 +242,11 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
/* Pass down the original send_message op that was blocked.*/
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
- calld->send_op.send_message = &calld->replacement_stream.base;
- calld->post_send = calld->send_op.on_complete;
- calld->send_op.on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+ calld->send_op->payload->send_message.send_message =
+ &calld->replacement_stream.base;
+ calld->post_send = calld->send_op->on_complete;
+ calld->send_op->on_complete = &calld->send_done;
+ grpc_call_next_op(exec_ctx, elem, calld->send_op);
} else {
continue_send_message(exec_ctx, elem);
}
@@ -253,29 +254,30 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
grpc_error *error;
- if (op->send_initial_metadata != NULL) {
+ if (op->send_initial_metadata) {
/* Decide which HTTP VERB to use. We use GET if the request is marked
cacheable, and the operation contains both initial metadata and send
message, and the payload is below the size threshold, and all the data
for this request is immediately available. */
grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
- if ((op->send_initial_metadata_flags &
+ if (op->send_message &&
+ (op->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
- op->send_message != NULL &&
- op->send_message->length < channeld->max_payload_size_for_get) {
+ op->payload->send_message.send_message->length <
+ channeld->max_payload_size_for_get) {
method = GRPC_MDELEM_METHOD_GET;
/* The following write to calld->send_message_blocked isn't racy with
reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
being here means ops->send_message is not NULL, which is primarily
guarding the read there. */
calld->send_message_blocked = true;
- } else if (op->send_initial_metadata_flags &
+ } else if (op->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
method = GRPC_MDELEM_METHOD_PUT;
}
@@ -283,12 +285,13 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
/* Attempt to read the data from send_message and create a header field. */
if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) {
/* allocate memory to hold the entire payload */
- calld->payload_bytes = gpr_malloc(op->send_message->length);
+ calld->payload_bytes =
+ gpr_malloc(op->payload->send_message.send_message->length);
/* read slices of send_message and copy into payload_bytes */
- calld->send_op = *op;
- calld->send_length = op->send_message->length;
- calld->send_flags = op->send_message->flags;
+ calld->send_op = op;
+ calld->send_length = op->payload->send_message.send_message->length;
+ calld->send_flags = op->payload->send_message.send_message->flags;
continue_send_message(exec_ctx, elem);
if (calld->send_message_blocked == false) {
@@ -299,13 +302,15 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
const unsigned char k_query_separator = '?';
grpc_slice path_slice =
- GRPC_MDVALUE(op->send_initial_metadata->idx.named.path->md);
+ GRPC_MDVALUE(op->payload->send_initial_metadata
+ .send_initial_metadata->idx.named.path->md);
/* sum up individual component's lengths and allocate enough memory to
* hold combined path+query */
size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
estimated_len++; /* for the '?' */
estimated_len += grpc_base64_estimate_encoded_size(
- op->send_message->length, k_url_safe, k_multi_line);
+ op->payload->send_message.send_message->length, k_url_safe,
+ k_multi_line);
estimated_len += 1; /* for the trailing 0 */
grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len);
@@ -320,8 +325,8 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
write_ptr++; /* for the '?' */
grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes,
- op->send_message->length, k_url_safe,
- k_multi_line);
+ op->payload->send_message.send_message->length,
+ k_url_safe, k_multi_line);
/* remove trailing unused memory and add trailing 0 to terminate string
*/
@@ -335,14 +340,15 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
/* substitute previous path with the new path+query */
grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
- grpc_metadata_batch *b = op->send_initial_metadata;
+ grpc_metadata_batch *b =
+ op->payload->send_initial_metadata.send_initial_metadata;
error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
mdelem_path_and_query);
if (error != GRPC_ERROR_NONE) return error;
calld->on_complete = op->on_complete;
op->on_complete = &calld->hc_on_complete;
- op->send_message = NULL;
+ op->send_message = false;
grpc_slice_unref_internal(exec_ctx, path_with_query_slice);
} else {
/* Not all data is available. Fall back to POST. */
@@ -353,47 +359,60 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
}
}
- remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_METHOD);
- remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_SCHEME);
- remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_TE);
- remove_if_present(exec_ctx, op->send_initial_metadata,
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_METHOD);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_SCHEME);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_TE);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_CONTENT_TYPE);
- remove_if_present(exec_ctx, op->send_initial_metadata,
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_USER_AGENT);
/* Send : prefixed headers, which have to be before any application
layer headers. */
- error = grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata,
- &calld->method, method);
+ error = grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->method, method);
if (error != GRPC_ERROR_NONE) return error;
- error =
- grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata,
- &calld->scheme, channeld->static_scheme);
+ error = grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->scheme, channeld->static_scheme);
if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata,
- &calld->te_trailers,
- GRPC_MDELEM_TE_TRAILERS);
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
if (error != GRPC_ERROR_NONE) return error;
error = grpc_metadata_batch_add_tail(
- exec_ctx, op->send_initial_metadata, &calld->content_type,
- GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata,
- &calld->user_agent,
- GRPC_MDELEM_REF(channeld->user_agent));
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
if (error != GRPC_ERROR_NONE) return error;
}
- if (op->recv_initial_metadata != NULL) {
+ if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
- calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata;
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->on_done_recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->hc_on_recv_initial_metadata;
}
- if (op->recv_trailing_metadata != NULL) {
+ if (op->recv_trailing_metadata) {
/* substitute our callback for the higher callback */
- calld->recv_trailing_metadata = op->recv_trailing_metadata;
+ calld->recv_trailing_metadata =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->on_done_recv_trailing_metadata = op->on_complete;
op->on_complete = &calld->hc_on_recv_trailing_metadata;
}
@@ -403,17 +422,17 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("hc_start_transport_op", 0);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
} else {
call_data *calld = elem->call_data;
- if (op->send_message != NULL && calld->send_message_blocked) {
+ if (op->send_message && calld->send_message_blocked) {
/* Don't forward the op. send_message contains slices that aren't ready
- yet. The call will be forwarded by the op_complete of slice read call.
+ yet. The call will be forwarded by the op_complete of slice read call.
*/
} else {
grpc_call_next_op(exec_ctx, elem, op);
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index df0f95010b..c1e49ffacc 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -58,8 +58,7 @@ typedef struct call_data {
bool payload_bin_delivered;
grpc_metadata_batch *recv_initial_metadata;
- bool *recv_idempotent_request;
- bool *recv_cacheable_request;
+ uint32_t *recv_initial_metadata_flags;
/** Closure to call when finished with the hs_on_recv hook */
grpc_closure *on_done_recv;
/** Closure to call when we retrieve read message from the path URI
@@ -116,14 +115,21 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
if (b->idx.named.method != NULL) {
if (grpc_mdelem_eq(b->idx.named.method->md, GRPC_MDELEM_METHOD_POST)) {
- *calld->recv_idempotent_request = false;
- *calld->recv_cacheable_request = false;
+ *calld->recv_initial_metadata_flags &=
+ ~(GRPC_INITIAL_METADATA_CACHEABLE_REQUEST |
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
} else if (grpc_mdelem_eq(b->idx.named.method->md,
GRPC_MDELEM_METHOD_PUT)) {
- *calld->recv_idempotent_request = true;
+ *calld->recv_initial_metadata_flags &=
+ ~GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ *calld->recv_initial_metadata_flags |=
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
} else if (grpc_mdelem_eq(b->idx.named.method->md,
GRPC_MDELEM_METHOD_GET)) {
- *calld->recv_cacheable_request = true;
+ *calld->recv_initial_metadata_flags |=
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ *calld->recv_initial_metadata_flags &=
+ ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
} else {
add_error(error_name, &error,
grpc_attach_md_to_error(
@@ -206,7 +212,8 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
grpc_error_set_str(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path")));
- } else if (*calld->recv_cacheable_request == true) {
+ } else if (*calld->recv_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
/* We have a cacheable request made with GET verb. The path contains the
* query parameter which is base64 encoded request payload. */
const char k_query_separator = '?';
@@ -311,45 +318,53 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
}
static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- if (op->send_initial_metadata != NULL) {
+ if (op->send_initial_metadata) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *error_name = "Failed sending initial metadata";
- add_error(error_name, &error, grpc_metadata_batch_add_head(
- exec_ctx, op->send_initial_metadata,
- &calld->status, GRPC_MDELEM_STATUS_200));
- add_error(error_name, &error,
- grpc_metadata_batch_add_tail(
- exec_ctx, op->send_initial_metadata, &calld->content_type,
- GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC));
+ add_error(
+ error_name, &error,
+ grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->status, GRPC_MDELEM_STATUS_200));
+ add_error(
+ error_name, &error,
+ grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->content_type,
+ GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC));
add_error(error_name, &error,
- server_filter_outgoing_metadata(exec_ctx, elem,
- op->send_initial_metadata));
+ server_filter_outgoing_metadata(
+ exec_ctx, elem,
+ op->payload->send_initial_metadata.send_initial_metadata));
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return;
}
}
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
- GPR_ASSERT(op->recv_idempotent_request != NULL);
- GPR_ASSERT(op->recv_cacheable_request != NULL);
- calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->recv_idempotent_request = op->recv_idempotent_request;
- calld->recv_cacheable_request = op->recv_cacheable_request;
- calld->on_done_recv = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->hs_on_recv;
+ GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags != NULL);
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->recv_initial_metadata_flags =
+ op->payload->recv_initial_metadata.recv_flags;
+ calld->on_done_recv =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->hs_on_recv;
}
if (op->recv_message) {
- calld->recv_message_ready = op->recv_message_ready;
- calld->pp_recv_message = op->recv_message;
- if (op->recv_message_ready) {
- op->recv_message_ready = &calld->hs_recv_message_ready;
+ calld->recv_message_ready = op->payload->recv_message.recv_message_ready;
+ calld->pp_recv_message = op->payload->recv_message.recv_message;
+ if (op->payload->recv_message.recv_message_ready) {
+ op->payload->recv_message.recv_message_ready =
+ &calld->hs_recv_message_ready;
}
if (op->on_complete) {
calld->on_complete = op->on_complete;
@@ -359,9 +374,10 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (op->send_trailing_metadata) {
grpc_error *error = server_filter_outgoing_metadata(
- exec_ctx, elem, op->send_trailing_metadata);
+ exec_ctx, elem,
+ op->payload->send_trailing_metadata.send_trailing_metadata);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return;
}
}
@@ -369,7 +385,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
GPR_TIMER_BEGIN("hs_start_transport_op", 0);
hs_mutate_op(exec_ctx, elem, op);
diff --git a/src/core/lib/channel/max_age_filter.c b/src/core/lib/channel/max_age_filter.c
deleted file mode 100644
index c25481486c..0000000000
--- a/src/core/lib/channel/max_age_filter.c
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- *
- * Copyright 2017, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/channel/message_size_filter.h"
-
-#include <limits.h>
-#include <string.h>
-
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/transport/http2_errors.h"
-#include "src/core/lib/transport/service_config.h"
-
-#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX
-#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX
-#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX
-
-typedef struct channel_data {
- /* We take a reference to the channel stack for the timer callback */
- grpc_channel_stack* channel_stack;
- /* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
- and max_age_grace_timer_pending */
- gpr_mu max_age_timer_mu;
- /* True if the max_age timer callback is currently pending */
- bool max_age_timer_pending;
- /* True if the max_age_grace timer callback is currently pending */
- bool max_age_grace_timer_pending;
- /* The timer for checking if the channel has reached its max age */
- grpc_timer max_age_timer;
- /* The timer for checking if the max-aged channel has uesed up the grace
- period */
- grpc_timer max_age_grace_timer;
- /* The timer for checking if the channel's idle duration reaches
- max_connection_idle */
- grpc_timer max_idle_timer;
- /* Allowed max time a channel may have no outstanding rpcs */
- gpr_timespec max_connection_idle;
- /* Allowed max time a channel may exist */
- gpr_timespec max_connection_age;
- /* Allowed grace period after the channel reaches its max age */
- gpr_timespec max_connection_age_grace;
- /* Closure to run when the channel's idle duration reaches max_connection_idle
- and should be closed gracefully */
- grpc_closure close_max_idle_channel;
- /* Closure to run when the channel reaches its max age and should be closed
- gracefully */
- grpc_closure close_max_age_channel;
- /* Closure to run the channel uses up its max age grace time and should be
- closed forcibly */
- grpc_closure force_close_max_age_channel;
- /* Closure to run when the init fo channel stack is done and the max_idle
- timer should be started */
- grpc_closure start_max_idle_timer_after_init;
- /* Closure to run when the init fo channel stack is done and the max_age timer
- should be started */
- grpc_closure start_max_age_timer_after_init;
- /* Closure to run when the goaway op is finished and the max_age_timer */
- grpc_closure start_max_age_grace_timer_after_goaway_op;
- /* Closure to run when the channel connectivity state changes */
- grpc_closure channel_connectivity_changed;
- /* Records the current connectivity state */
- grpc_connectivity_state connectivity_state;
- /* Number of active calls */
- gpr_atm call_count;
-} channel_data;
-
-/* Increase the nubmer of active calls. Before the increasement, if there are no
- calls, the max_idle_timer should be cancelled. */
-static void increase_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
- if (gpr_atm_full_fetch_add(&chand->call_count, 1) == 0) {
- grpc_timer_cancel(exec_ctx, &chand->max_idle_timer);
- }
-}
-
-/* Decrease the nubmer of active calls. After the decrement, if there are no
- calls, the max_idle_timer should be started. */
-static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
- if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) {
- GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer");
- grpc_timer_init(
- exec_ctx, &chand->max_idle_timer,
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_idle),
- &chand->close_max_idle_channel, gpr_now(GPR_CLOCK_MONOTONIC));
- }
-}
-
-static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- channel_data* chand = arg;
- /* Decrease call_count. If there are no active calls at this time,
- max_idle_timer will start here. If the number of active calls is not 0,
- max_idle_timer will start after all the active calls end. */
- decrease_call_count(exec_ctx, chand);
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
- "max_age start_max_idle_timer_after_init");
-}
-
-static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- channel_data* chand = arg;
- gpr_mu_lock(&chand->max_age_timer_mu);
- chand->max_age_timer_pending = true;
- GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
- grpc_timer_init(
- exec_ctx, &chand->max_age_timer,
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age),
- &chand->close_max_age_channel, gpr_now(GPR_CLOCK_MONOTONIC));
- gpr_mu_unlock(&chand->max_age_timer_mu);
- grpc_transport_op* op = grpc_make_transport_op(NULL);
- op->on_connectivity_state_change = &chand->channel_connectivity_changed,
- op->connectivity_state = &chand->connectivity_state;
- grpc_channel_next_op(exec_ctx,
- grpc_channel_stack_element(chand->channel_stack, 0), op);
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
- "max_age start_max_age_timer_after_init");
-}
-
-static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx,
- void* arg,
- grpc_error* error) {
- channel_data* chand = arg;
- gpr_mu_lock(&chand->max_age_timer_mu);
- chand->max_age_grace_timer_pending = true;
- GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
- grpc_timer_init(exec_ctx, &chand->max_age_grace_timer,
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- chand->max_connection_age_grace),
- &chand->force_close_max_age_channel,
- gpr_now(GPR_CLOCK_MONOTONIC));
- gpr_mu_unlock(&chand->max_age_timer_mu);
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
- "max_age start_max_age_grace_timer_after_goaway_op");
-}
-
-static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- channel_data* chand = arg;
- gpr_atm_no_barrier_fetch_add(&chand->call_count, 1);
- if (error == GRPC_ERROR_NONE) {
- grpc_transport_op* op = grpc_make_transport_op(NULL);
- op->goaway_error =
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_idle"),
- GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
- grpc_channel_element* elem =
- grpc_channel_stack_element(chand->channel_stack, 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
- } else if (error != GRPC_ERROR_CANCELLED) {
- GRPC_LOG_IF_ERROR("close_max_idle_channel", error);
- }
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
- "max_age max_idle_timer");
-}
-
-static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- channel_data* chand = arg;
- gpr_mu_lock(&chand->max_age_timer_mu);
- chand->max_age_timer_pending = false;
- gpr_mu_unlock(&chand->max_age_timer_mu);
- if (error == GRPC_ERROR_NONE) {
- GRPC_CHANNEL_STACK_REF(chand->channel_stack,
- "max_age start_max_age_grace_timer_after_goaway_op");
- grpc_transport_op* op = grpc_make_transport_op(
- &chand->start_max_age_grace_timer_after_goaway_op);
- op->goaway_error =
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"),
- GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
- grpc_channel_element* elem =
- grpc_channel_stack_element(chand->channel_stack, 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
- } else if (error != GRPC_ERROR_CANCELLED) {
- GRPC_LOG_IF_ERROR("close_max_age_channel", error);
- }
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
- "max_age max_age_timer");
-}
-
-static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- channel_data* chand = arg;
- gpr_mu_lock(&chand->max_age_timer_mu);
- chand->max_age_grace_timer_pending = false;
- gpr_mu_unlock(&chand->max_age_timer_mu);
- if (error == GRPC_ERROR_NONE) {
- grpc_transport_op* op = grpc_make_transport_op(NULL);
- op->disconnect_with_error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel reaches max age");
- grpc_channel_element* elem =
- grpc_channel_stack_element(chand->channel_stack, 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
- } else if (error != GRPC_ERROR_CANCELLED) {
- GRPC_LOG_IF_ERROR("force_close_max_age_channel", error);
- }
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
- "max_age max_age_grace_timer");
-}
-
-static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- channel_data* chand = arg;
- if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_transport_op* op = grpc_make_transport_op(NULL);
- op->on_connectivity_state_change = &chand->channel_connectivity_changed,
- op->connectivity_state = &chand->connectivity_state;
- grpc_channel_next_op(
- exec_ctx, grpc_channel_stack_element(chand->channel_stack, 0), op);
- } else {
- gpr_mu_lock(&chand->max_age_timer_mu);
- if (chand->max_age_timer_pending) {
- grpc_timer_cancel(exec_ctx, &chand->max_age_timer);
- chand->max_age_timer_pending = false;
- }
- if (chand->max_age_grace_timer_pending) {
- grpc_timer_cancel(exec_ctx, &chand->max_age_grace_timer);
- chand->max_age_grace_timer_pending = false;
- }
- gpr_mu_unlock(&chand->max_age_timer_mu);
- /* If there are no active calls, this increasement will cancel
- max_idle_timer, and prevent max_idle_timer from being started in the
- future. */
- increase_call_count(exec_ctx, chand);
- }
-}
-
-/* Constructor for call_data. */
-static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- const grpc_call_element_args* args) {
- channel_data* chand = elem->channel_data;
- increase_call_count(exec_ctx, chand);
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for call_data. */
-static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- const grpc_call_final_info* final_info,
- grpc_closure* ignored) {
- channel_data* chand = elem->channel_data;
- decrease_call_count(exec_ctx, chand);
-}
-
-/* Constructor for channel_data. */
-static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
- grpc_channel_element_args* args) {
- channel_data* chand = elem->channel_data;
- gpr_mu_init(&chand->max_age_timer_mu);
- chand->max_age_timer_pending = false;
- chand->max_age_grace_timer_pending = false;
- chand->channel_stack = args->channel_stack;
- chand->max_connection_age =
- DEFAULT_MAX_CONNECTION_AGE_MS == INT_MAX
- ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_MS, GPR_TIMESPAN);
- chand->max_connection_age_grace =
- DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX
- ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_GRACE_MS,
- GPR_TIMESPAN);
- chand->max_connection_idle =
- DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX
- ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_IDLE_MS, GPR_TIMESPAN);
- for (size_t i = 0; i < args->channel_args->num_args; ++i) {
- if (0 == strcmp(args->channel_args->args[i].key,
- GRPC_ARG_MAX_CONNECTION_AGE_MS)) {
- const int value = grpc_channel_arg_get_integer(
- &args->channel_args->args[i],
- (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX});
- chand->max_connection_age =
- value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_millis(value, GPR_TIMESPAN);
- } else if (0 == strcmp(args->channel_args->args[i].key,
- GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) {
- const int value = grpc_channel_arg_get_integer(
- &args->channel_args->args[i],
- (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0,
- INT_MAX});
- chand->max_connection_age_grace =
- value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_millis(value, GPR_TIMESPAN);
- } else if (0 == strcmp(args->channel_args->args[i].key,
- GRPC_ARG_MAX_CONNECTION_IDLE_MS)) {
- const int value = grpc_channel_arg_get_integer(
- &args->channel_args->args[i],
- (grpc_integer_options){DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX});
- chand->max_connection_idle =
- value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
- : gpr_time_from_millis(value, GPR_TIMESPAN);
- }
- }
- grpc_closure_init(&chand->close_max_idle_channel, close_max_idle_channel,
- chand, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&chand->close_max_age_channel, close_max_age_channel, chand,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&chand->force_close_max_age_channel,
- force_close_max_age_channel, chand,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&chand->start_max_idle_timer_after_init,
- start_max_idle_timer_after_init, chand,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&chand->start_max_age_timer_after_init,
- start_max_age_timer_after_init, chand,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&chand->start_max_age_grace_timer_after_goaway_op,
- start_max_age_grace_timer_after_goaway_op, chand,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&chand->channel_connectivity_changed,
- channel_connectivity_changed, chand,
- grpc_schedule_on_exec_ctx);
-
- if (gpr_time_cmp(chand->max_connection_age, gpr_inf_future(GPR_TIMESPAN)) !=
- 0) {
- /* When the channel reaches its max age, we send down an op with
- goaway_error set. However, we can't send down any ops until after the
- channel stack is fully initialized. If we start the timer here, we have
- no guarantee that the timer won't pop before channel stack initialization
- is finished. To avoid that problem, we create a closure to start the
- timer, and we schedule that closure to be run after call stack
- initialization is done. */
- GRPC_CHANNEL_STACK_REF(chand->channel_stack,
- "max_age start_max_age_timer_after_init");
- grpc_closure_sched(exec_ctx, &chand->start_max_age_timer_after_init,
- GRPC_ERROR_NONE);
- }
-
- /* Initialize the number of calls as 1, so that the max_idle_timer will not
- start until start_max_idle_timer_after_init is invoked. */
- gpr_atm_rel_store(&chand->call_count, 1);
- if (gpr_time_cmp(chand->max_connection_idle, gpr_inf_future(GPR_TIMESPAN)) !=
- 0) {
- GRPC_CHANNEL_STACK_REF(chand->channel_stack,
- "max_age start_max_idle_timer_after_init");
- grpc_closure_sched(exec_ctx, &chand->start_max_idle_timer_after_init,
- GRPC_ERROR_NONE);
- }
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for channel_data. */
-static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {}
-
-const grpc_channel_filter grpc_max_age_filter = {
- grpc_call_next_op,
- grpc_channel_next_op,
- 0, /* sizeof_call_data */
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- grpc_channel_next_get_info,
- "max_age"};
diff --git a/src/core/lib/channel/max_age_filter.h b/src/core/lib/channel/max_age_filter.h
deleted file mode 100644
index 93e357a88e..0000000000
--- a/src/core/lib/channel/max_age_filter.h
+++ /dev/null
@@ -1,39 +0,0 @@
-//
-// Copyright 2017, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#ifndef GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H
-#define GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H
-
-#include "src/core/lib/channel/channel_stack.h"
-
-extern const grpc_channel_filter grpc_max_age_filter;
-
-#endif /* GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H */
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 63136650a5..57726c8476 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -132,21 +132,23 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_free(message_string);
}
// Invoke the next callback.
- grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error);
+ grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error);
}
// Start transport stream op.
-static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- grpc_transport_stream_op* op) {
+static void start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
call_data* calld = elem->call_data;
// Check max send message size.
- if (op->send_message != NULL && calld->max_send_size >= 0 &&
- op->send_message->length > (size_t)calld->max_send_size) {
+ if (op->send_message && calld->max_send_size >= 0 &&
+ op->payload->send_message.send_message->length >
+ (size_t)calld->max_send_size) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
- op->send_message->length, calld->max_send_size);
- grpc_transport_stream_op_finish_with_failure(
+ op->payload->send_message.send_message->length,
+ calld->max_send_size);
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS,
@@ -155,10 +157,11 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
return;
}
// Inject callback for receiving a message.
- if (op->recv_message_ready != NULL) {
- calld->next_recv_message_ready = op->recv_message_ready;
- calld->recv_message = op->recv_message;
- op->recv_message_ready = &calld->recv_message_ready;
+ if (op->recv_message) {
+ calld->next_recv_message_ready =
+ op->payload->recv_message.recv_message_ready;
+ calld->recv_message = op->payload->recv_message.recv_message;
+ op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
// Chain to the next filter.
grpc_call_next_op(exec_ctx, elem, op);
@@ -253,7 +256,7 @@ static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
}
const grpc_channel_filter grpc_message_size_filter = {
- start_transport_stream_op,
+ start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,