aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_args.c67
-rw-r--r--src/core/lib/channel/channel_args.h17
-rw-r--r--src/core/lib/channel/channel_stack.c68
-rw-r--r--src/core/lib/channel/channel_stack.h50
-rw-r--r--src/core/lib/channel/channel_stack_builder.h8
-rw-r--r--src/core/lib/channel/compress_filter.c24
-rw-r--r--src/core/lib/channel/connected_channel.c14
-rw-r--r--src/core/lib/channel/context.h11
-rw-r--r--src/core/lib/channel/deadline_filter.c334
-rw-r--r--src/core/lib/channel/deadline_filter.h98
-rw-r--r--src/core/lib/channel/handshaker.c203
-rw-r--r--src/core/lib/channel/handshaker.h148
-rw-r--r--src/core/lib/channel/http_client_filter.c192
-rw-r--r--src/core/lib/channel/http_client_filter.h3
-rw-r--r--src/core/lib/channel/http_server_filter.c95
-rw-r--r--src/core/lib/channel/message_size_filter.c251
-rw-r--r--src/core/lib/channel/message_size_filter.h39
17 files changed, 1541 insertions, 81 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 3a56b1ff20..cfc072c0b5 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -66,22 +66,59 @@ static grpc_arg copy_arg(const grpc_arg *src) {
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add,
size_t num_to_add) {
+ return grpc_channel_args_copy_and_add_and_remove(src, NULL, 0, to_add,
+ num_to_add);
+}
+
+grpc_channel_args *grpc_channel_args_copy_and_remove(
+ const grpc_channel_args *src, const char **to_remove,
+ size_t num_to_remove) {
+ return grpc_channel_args_copy_and_add_and_remove(src, to_remove,
+ num_to_remove, NULL, 0);
+}
+
+static bool should_remove_arg(const grpc_arg *arg, const char **to_remove,
+ size_t num_to_remove) {
+ for (size_t i = 0; i < num_to_remove; ++i) {
+ if (strcmp(arg->key, to_remove[i]) == 0) return true;
+ }
+ return false;
+}
+
+grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
+ const grpc_channel_args *src, const char **to_remove, size_t num_to_remove,
+ const grpc_arg *to_add, size_t num_to_add) {
+ // Figure out how many args we'll be copying.
+ size_t num_args_to_copy = 0;
+ if (src != NULL) {
+ for (size_t i = 0; i < src->num_args; ++i) {
+ if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) {
+ ++num_args_to_copy;
+ }
+ }
+ }
+ // Create result.
grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
- size_t i;
- size_t src_num_args = (src == NULL) ? 0 : src->num_args;
- if (!src && !to_add) {
- dst->num_args = 0;
+ dst->num_args = num_args_to_copy + num_to_add;
+ if (dst->num_args == 0) {
dst->args = NULL;
return dst;
}
- dst->num_args = src_num_args + num_to_add;
dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
- for (i = 0; i < src_num_args; i++) {
- dst->args[i] = copy_arg(&src->args[i]);
+ // Copy args from src that are not being removed.
+ size_t dst_idx = 0;
+ if (src != NULL) {
+ for (size_t i = 0; i < src->num_args; ++i) {
+ if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) {
+ dst->args[dst_idx++] = copy_arg(&src->args[i]);
+ }
+ }
}
- for (i = 0; i < num_to_add; i++) {
- dst->args[i + src_num_args] = copy_arg(&to_add[i]);
+ // Add args from to_add.
+ for (size_t i = 0; i < num_to_add; ++i) {
+ dst->args[dst_idx++] = copy_arg(&to_add[i]);
}
+ GPR_ASSERT(dst_idx == dst->num_args);
return dst;
}
@@ -272,6 +309,18 @@ int grpc_channel_args_compare(const grpc_channel_args *a,
return 0;
}
+const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
+ const char *name) {
+ if (args != NULL) {
+ for (size_t i = 0; i < args->num_args; ++i) {
+ if (strcmp(args->args[i].key, name) == 0) {
+ return &args->args[i];
+ }
+ }
+ }
+ return NULL;
+}
+
int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) {
if (arg->type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index e85e979eeb..1e05303471 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -37,6 +37,8 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
+// Channel args are intentionally immutable, to avoid the need for locking.
+
/** Copy the arguments in \a src into a new instance */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
@@ -49,6 +51,17 @@ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add,
size_t num_to_add);
+/** Copies the arguments in \a src except for those whose keys are in
+ \a to_remove. */
+grpc_channel_args *grpc_channel_args_copy_and_remove(
+ const grpc_channel_args *src, const char **to_remove, size_t num_to_remove);
+
+/** Copies the arguments from \a src except for those whose keys are in
+ \a to_remove and appends the arguments in \a to_add. */
+grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
+ const grpc_channel_args *src, const char **to_remove, size_t num_to_remove,
+ const grpc_arg *to_add, size_t num_to_add);
+
/** Concatenate args from \a a and \a b into a new instance */
grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
const grpc_channel_args *b);
@@ -87,6 +100,10 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
+/** Returns the value of argument \a name from \a args, or NULL if not found. */
+const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
+ const char *name);
+
typedef struct grpc_integer_options {
int default_value; // Return this if value is outside of expected bounds.
int min_value;
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 87175d7943..2c5367901d 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -32,6 +32,7 @@
*/
#include "src/core/lib/channel/channel_stack.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <stdlib.h>
@@ -157,12 +158,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack, int initial_refs,
- grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack) {
+grpc_error *grpc_call_stack_init(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context, const void *transport_server_data,
+ grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -178,17 +178,30 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
+ grpc_error *first_error = GRPC_ERROR_NONE;
+ args.start_time = gpr_now(GPR_CLOCK_MONOTONIC);
for (i = 0; i < count; i++) {
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
+ args.path = path;
+ args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
+ grpc_error *error =
+ call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
+ if (error != GRPC_ERROR_NONE) {
+ if (first_error == GRPC_ERROR_NONE) {
+ first_error = error;
+ } else {
+ GRPC_ERROR_UNREF(error);
+ }
+ }
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
+ return first_error;
}
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -217,7 +230,7 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set(
grpc_polling_entity *pollent) {}
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
- const grpc_call_stats *call_stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
@@ -225,7 +238,7 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
/* destroy per-filter data */
for (i = 0; i < count; i++) {
- elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], call_stats,
+ elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], final_info,
i == count - 1 ? and_free_memory : NULL);
}
}
@@ -259,21 +272,38 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
+static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
+ gpr_free(op);
+}
+
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem) {
- grpc_transport_stream_op op;
- memset(&op, 0, sizeof(op));
- op.cancel_error = GRPC_ERROR_CANCELLED;
- grpc_call_next_op(exec_ctx, cur_elem, &op);
+ grpc_call_element *elem) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->cancel_error = GRPC_ERROR_CANCELLED;
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem,
+ grpc_call_element *elem,
grpc_status_code status,
gpr_slice *optional_message) {
- grpc_transport_stream_op op;
- memset(&op, 0, sizeof(op));
- grpc_transport_stream_op_add_cancellation_with_message(&op, status,
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
- grpc_call_next_op(exec_ctx, cur_elem, &op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+}
+
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_transport_stream_op_add_close(op, status, optional_message);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index d72c015b67..27f3be7b29 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -51,6 +51,10 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/transport.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
@@ -70,14 +74,22 @@ typedef struct {
grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
+ grpc_mdstr *path;
+ gpr_timespec start_time;
+ gpr_timespec deadline;
} grpc_call_element_args;
typedef struct {
grpc_transport_stream_stats transport_stream_stats;
gpr_timespec latency; /* From call creating to enqueing of received status */
- grpc_status_code final_status;
} grpc_call_stats;
+/** Information about the call upon completion. */
+typedef struct {
+ grpc_call_stats stats;
+ grpc_status_code final_status;
+} grpc_call_final_info;
+
/* Channel filters specify:
1. the amount of memory needed in the channel & call (via the sizeof_XXX
members)
@@ -110,8 +122,9 @@ typedef struct {
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
argument. */
- void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args);
+ grpc_error *(*init_call_elem)(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args);
void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
@@ -119,16 +132,17 @@ typedef struct {
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
\a and_free_memory that should be passed to gpr_free when destruction
- is complete. */
+ is complete. \a final_info contains data about the completed call, mainly
+ for reporting purposes. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
/* Initialize per-channel data.
- elem is initialized at the start of the call, and elem->channel_data is
- what needs initializing.
+ elem is initialized at the creating of the channel, and elem->channel_data
+ is what needs initializing.
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
@@ -209,12 +223,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack, int initial_refs,
- grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack);
+grpc_error *grpc_call_stack_init(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context, const void *transport_server_data,
+ grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -243,7 +256,7 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
- const grpc_call_stats *call_stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory);
/* Ignore set pollset{_set} - used by filters if they don't care about pollsets
@@ -278,9 +291,18 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_status_code status,
gpr_slice *optional_message);
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem,
+ grpc_status_code status,
+ gpr_slice *optional_message);
+
extern int grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
if (grpc_trace_channel) grpc_call_log_op(sev, elem, op)
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H */
diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h
index 0e6bfd9aa6..4a00f7bfdb 100644
--- a/src/core/lib/channel/channel_stack_builder.h
+++ b/src/core/lib/channel/channel_stack_builder.h
@@ -39,6 +39,10 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/// grpc_channel_stack_builder offers a programmatic interface to selected
/// and order channel filters
typedef struct grpc_channel_stack_builder grpc_channel_stack_builder;
@@ -158,4 +162,8 @@ void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder);
extern int grpc_trace_channel_stack_builder;
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 32ebe53ee6..0981d59f63 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -60,7 +60,7 @@ typedef struct call_data {
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
- grpc_transport_stream_op send_op;
+ grpc_transport_stream_op *send_op;
uint32_t send_length;
uint32_t send_flags;
gpr_slice incoming_slice;
@@ -199,11 +199,11 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
- calld->send_op.send_message = &calld->replacement_stream.base;
- calld->post_send = calld->send_op.on_complete;
- calld->send_op.on_complete = &calld->send_done;
+ calld->send_op->send_message = &calld->replacement_stream.base;
+ calld->post_send = calld->send_op->on_complete;
+ calld->send_op->on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+ grpc_call_next_op(exec_ctx, elem, calld->send_op);
}
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
@@ -220,7 +220,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
- while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
+ while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
&calld->incoming_slice, ~(size_t)0,
&calld->got_slice)) {
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
@@ -243,7 +243,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
if (op->send_message != NULL && !skip_compression(elem) &&
0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
- calld->send_op = *op;
+ calld->send_op = op;
calld->send_length = op->send_message->length;
calld->send_flags = op->send_message->flags;
continue_send_message(exec_ctx, elem);
@@ -256,8 +256,9 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -266,11 +267,14 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
calld->has_compression_algorithm = 0;
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
+
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
+ const grpc_call_final_info *final_info,
+ void *ignored) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
gpr_slice_buffer_destroy(&calld->slices);
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 0a7d27a1dc..918379c845 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -81,16 +81,16 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- int r;
-
- r = grpc_transport_init_stream(
+ int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data);
- GPR_ASSERT(r == 0);
+ return r == 0 ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("transport stream initialization failed");
}
static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -104,7 +104,7 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h
index c50e84279d..071c5f695c 100644
--- a/src/core/lib/channel/context.h
+++ b/src/core/lib/channel/context.h
@@ -34,10 +34,19 @@
#ifndef GRPC_CORE_LIB_CHANNEL_CONTEXT_H
#define GRPC_CORE_LIB_CHANNEL_CONTEXT_H
-/* Call object context pointers */
+/// Call object context pointers.
+
+/// Call context is represented as an array of \a grpc_call_context_elements.
+/// This enum represents the indexes into the array, where each index
+/// contains a different type of value.
typedef enum {
+ /// Value is either a \a grpc_client_security_context or a
+ /// \a grpc_server_security_context.
GRPC_CONTEXT_SECURITY = 0,
+
+ /// Value is a \a census_context.
GRPC_CONTEXT_TRACING,
+
GRPC_CONTEXT_COUNT
} grpc_context_index;
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
new file mode 100644
index 0000000000..d2ea5250f6
--- /dev/null
+++ b/src/core/lib/channel/deadline_filter.c
@@ -0,0 +1,334 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/lib/channel/deadline_filter.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/timer.h"
+
+//
+// grpc_deadline_state
+//
+
+// Timer callback.
+static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ grpc_deadline_state* deadline_state = elem->call_data;
+ gpr_mu_lock(&deadline_state->timer_mu);
+ deadline_state->timer_pending = false;
+ gpr_mu_unlock(&deadline_state->timer_mu);
+ if (error != GRPC_ERROR_CANCELLED) {
+ gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded");
+ grpc_call_element_send_cancel_with_message(
+ exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg);
+ gpr_slice_unref(msg);
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
+}
+
+// Starts the deadline timer.
+static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ gpr_timespec deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ // Note: We do not start the timer if there is already a timer
+ // pending. This should be okay, because this is only called from two
+ // functions exported by this module: grpc_deadline_state_start(), which
+ // starts the initial timer, and grpc_deadline_state_reset(), which
+ // cancels any pre-existing timer before starting a new one. In
+ // particular, we want to ensure that if grpc_deadline_state_start()
+ // winds up trying to start the timer after grpc_deadline_state_reset()
+ // has already done so, we ignore the value from the former.
+ if (!deadline_state->timer_pending &&
+ gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // Take a reference to the call stack, to be owned by the timer.
+ GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+ deadline_state->timer_pending = true;
+ grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
+ elem, gpr_now(GPR_CLOCK_MONOTONIC));
+ }
+}
+static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ gpr_timespec deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ gpr_mu_lock(&deadline_state->timer_mu);
+ start_timer_if_needed_locked(exec_ctx, elem, deadline);
+ gpr_mu_unlock(&deadline_state->timer_mu);
+}
+
+// Cancels the deadline timer.
+static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
+ grpc_deadline_state* deadline_state) {
+ if (deadline_state->timer_pending) {
+ grpc_timer_cancel(exec_ctx, &deadline_state->timer);
+ deadline_state->timer_pending = false;
+ }
+}
+static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_deadline_state* deadline_state) {
+ gpr_mu_lock(&deadline_state->timer_mu);
+ cancel_timer_if_needed_locked(exec_ctx, deadline_state);
+ gpr_mu_unlock(&deadline_state->timer_mu);
+}
+
+// Callback run when the call is complete.
+static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_deadline_state* deadline_state = arg;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ // Invoke the next callback.
+ deadline_state->next_on_complete->cb(
+ exec_ctx, deadline_state->next_on_complete->cb_arg, error);
+}
+
+// Inject our own on_complete callback into op.
+static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
+ grpc_transport_stream_op* op) {
+ deadline_state->next_on_complete = op->on_complete;
+ grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
+ op->on_complete = &deadline_state->on_complete;
+}
+
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_stack* call_stack) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ memset(deadline_state, 0, sizeof(*deadline_state));
+ deadline_state->call_stack = call_stack;
+ gpr_mu_init(&deadline_state->timer_mu);
+}
+
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ gpr_mu_destroy(&deadline_state->timer_mu);
+}
+
+// Callback and associated state for starting the timer after call stack
+// initialization has been completed.
+struct start_timer_after_init_state {
+ grpc_call_element* elem;
+ gpr_timespec deadline;
+ grpc_closure closure;
+};
+static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ struct start_timer_after_init_state* state = arg;
+ start_timer_if_needed(exec_ctx, state->elem, state->deadline);
+ gpr_free(state);
+}
+
+void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec deadline) {
+ // Deadline will always be infinite on servers, so the timer will only be
+ // set on clients with a finite deadline.
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // When the deadline passes, we indicate the failure by sending down
+ // an op with cancel_error set. However, we can't send down any ops
+ // until after the call stack is fully initialized. If we start the
+ // timer here, we have no guarantee that the timer won't pop before
+ // call stack initialization is finished. To avoid that problem, we
+ // create a closure to start the timer, and we schedule that closure
+ // to be run after call stack initialization is done.
+ struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+ state->elem = elem;
+ state->deadline = deadline;
+ grpc_closure_init(&state->closure, start_timer_after_init, state);
+ grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+ }
+}
+
+void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec new_deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ gpr_mu_lock(&deadline_state->timer_mu);
+ cancel_timer_if_needed_locked(exec_ctx, deadline_state);
+ start_timer_if_needed_locked(exec_ctx, elem, new_deadline);
+ gpr_mu_unlock(&deadline_state->timer_mu);
+}
+
+void grpc_deadline_state_client_start_transport_stream_op(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ } else {
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ if (op->recv_trailing_metadata != NULL) {
+ inject_on_complete_cb(deadline_state, op);
+ }
+ }
+}
+
+//
+// filter code
+//
+
+// Constructor for channel_data. Used for both client and server filters.
+static void init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+}
+
+// Destructor for channel_data. Used for both client and server filters.
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {}
+
+// Call data used for both client and server filter.
+typedef struct base_call_data {
+ grpc_deadline_state deadline_state;
+} base_call_data;
+
+// Additional call data used only for the server filter.
+typedef struct server_call_data {
+ base_call_data base; // Must be first.
+ // The closure for receiving initial metadata.
+ grpc_closure recv_initial_metadata_ready;
+ // Received initial metadata batch.
+ grpc_metadata_batch* recv_initial_metadata;
+ // The original recv_initial_metadata_ready closure, which we chain to
+ // after our own closure is invoked.
+ grpc_closure* next_recv_initial_metadata_ready;
+} server_call_data;
+
+// Constructor for call_data. Used for both client and server filters.
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_call_element_args* args) {
+ // Note: size of call data is different between client and server.
+ memset(elem->call_data, 0, elem->filter->sizeof_call_data);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
+ grpc_deadline_state_start(exec_ctx, elem, args->deadline);
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for call_data. Used for both client and server filters.
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ void* and_free_memory) {
+ grpc_deadline_state_destroy(exec_ctx, elem);
+}
+
+// Method for starting a call op for client filter.
+static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+// Callback for receiving initial metadata on the server.
+static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ server_call_data* calld = elem->call_data;
+ // Get deadline from metadata and start the timer if needed.
+ start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
+ // Invoke the next callback.
+ calld->next_recv_initial_metadata_ready->cb(
+ exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
+}
+
+// Method for starting a call op for server filter.
+static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ server_call_data* calld = elem->call_data;
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
+ cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
+ } else {
+ // If we're receiving initial metadata, we need to get the deadline
+ // from the recv_initial_metadata_ready callback. So we inject our
+ // own callback into that hook.
+ if (op->recv_initial_metadata_ready != NULL) {
+ calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ grpc_closure_init(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem);
+ op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
+ }
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ // Note that we trigger this on recv_trailing_metadata, even though
+ // the client never sends trailing metadata, because this is the
+ // hook that tells us when the call is complete on the server side.
+ if (op->recv_trailing_metadata != NULL) {
+ inject_on_complete_cb(&calld->base.deadline_state, op);
+ }
+ }
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+const grpc_channel_filter grpc_client_deadline_filter = {
+ client_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(base_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "deadline",
+};
+
+const grpc_channel_filter grpc_server_deadline_filter = {
+ server_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(server_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "deadline",
+};
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
new file mode 100644
index 0000000000..716a852565
--- /dev/null
+++ b/src/core/lib/channel/deadline_filter.h
@@ -0,0 +1,98 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/timer.h"
+
+// State used for filters that enforce call deadlines.
+// Must be the first field in the filter's call_data.
+typedef struct grpc_deadline_state {
+ // We take a reference to the call stack for the timer callback.
+ grpc_call_stack* call_stack;
+ // Guards access to timer_pending and timer.
+ gpr_mu timer_mu;
+ // True if the timer callback is currently pending.
+ bool timer_pending;
+ // The deadline timer.
+ grpc_timer timer;
+ // Closure to invoke when the call is complete.
+ // We use this to cancel the timer.
+ grpc_closure on_complete;
+ // The original on_complete closure, which we chain to after our own
+ // closure is invoked.
+ grpc_closure* next_on_complete;
+} grpc_deadline_state;
+
+//
+// NOTE: All of these functions require that the first field in
+// elem->call_data is a grpc_deadline_state.
+//
+
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_stack* call_stack);
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem);
+
+// Starts the timer with the specified deadline.
+// Should be called from the filter's init_call_elem() method.
+void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec deadline);
+
+// Cancels the existing timer and starts a new one with new_deadline.
+//
+// Note: It is generally safe to call this with an earlier deadline
+// value than the current one, but not the reverse. No checks are done
+// to ensure that the timer callback is not invoked while it is in the
+// process of being reset, which means that attempting to increase the
+// deadline may result in the timer being called twice.
+void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec new_deadline);
+
+// To be called from the client-side filter's start_transport_stream_op()
+// method. Ensures that the deadline timer is cancelled when the call
+// is completed.
+//
+// Note: It is the caller's responsibility to chain to the next filter if
+// necessary after this function returns.
+void grpc_deadline_state_client_start_transport_stream_op(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op* op);
+
+// Deadline filters for direct client channels and server channels.
+// Note: Deadlines for non-direct client channels are handled by the
+// client_channel filter.
+extern const grpc_channel_filter grpc_client_deadline_filter;
+extern const grpc_channel_filter grpc_server_deadline_filter;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H */
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
new file mode 100644
index 0000000000..0d759887bc
--- /dev/null
+++ b/src/core/lib/channel/handshaker.c
@@ -0,0 +1,203 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+
+//
+// grpc_handshaker
+//
+
+void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+ grpc_handshaker* handshaker) {
+ handshaker->vtable = vtable;
+}
+
+void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
+ handshaker->vtable->destroy(exec_ctx, handshaker);
+}
+
+void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
+ handshaker->vtable->shutdown(exec_ctx, handshaker);
+}
+
+void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer,
+ gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data) {
+ handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args,
+ read_buffer, deadline, acceptor, cb,
+ user_data);
+}
+
+//
+// grpc_handshake_manager
+//
+
+// State used while chaining handshakers.
+struct grpc_handshaker_state {
+ // The index of the handshaker to invoke next.
+ size_t index;
+ // The deadline for all handshakers.
+ gpr_timespec deadline;
+ // The acceptor to call the handshakers with.
+ grpc_tcp_server_acceptor* acceptor;
+ // The final callback and user_data to invoke after the last handshaker.
+ grpc_handshaker_done_cb final_cb;
+ void* final_user_data;
+};
+
+struct grpc_handshake_manager {
+ // An array of handshakers added via grpc_handshake_manager_add().
+ size_t count;
+ grpc_handshaker** handshakers;
+ // State used while chaining handshakers.
+ struct grpc_handshaker_state* state;
+};
+
+grpc_handshake_manager* grpc_handshake_manager_create() {
+ grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager));
+ memset(mgr, 0, sizeof(*mgr));
+ return mgr;
+}
+
+static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; }
+
+void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
+ grpc_handshaker* handshaker) {
+ // To avoid allocating memory for each handshaker we add, we double
+ // the number of elements every time we need more.
+ size_t realloc_count = 0;
+ if (mgr->count == 0) {
+ realloc_count = 2;
+ } else if (mgr->count >= 2 && is_power_of_2(mgr->count)) {
+ realloc_count = mgr->count * 2;
+ }
+ if (realloc_count > 0) {
+ mgr->handshakers =
+ gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*));
+ }
+ mgr->handshakers[mgr->count++] = handshaker;
+}
+
+void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr) {
+ for (size_t i = 0; i < mgr->count; ++i) {
+ grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]);
+ }
+ gpr_free(mgr->handshakers);
+ gpr_free(mgr);
+}
+
+void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr) {
+ for (size_t i = 0; i < mgr->count; ++i) {
+ grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]);
+ }
+ if (mgr->state != NULL) {
+ gpr_free(mgr->state);
+ mgr->state = NULL;
+ }
+}
+
+// A function used as the handshaker-done callback when chaining
+// handshakers together.
+static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer, void* user_data,
+ grpc_error* error) {
+ grpc_handshake_manager* mgr = user_data;
+ GPR_ASSERT(mgr->state != NULL);
+ GPR_ASSERT(mgr->state->index < mgr->count);
+ // If we got an error, skip all remaining handshakers and invoke the
+ // caller-supplied callback immediately.
+ if (error != GRPC_ERROR_NONE) {
+ mgr->state->final_cb(exec_ctx, endpoint, args, read_buffer,
+ mgr->state->final_user_data, error);
+ return;
+ }
+ grpc_handshaker_done_cb cb = call_next_handshaker;
+ // If this is the last handshaker, use the caller-supplied callback
+ // and user_data instead of chaining back to this function again.
+ if (mgr->state->index == mgr->count - 1) {
+ cb = mgr->state->final_cb;
+ user_data = mgr->state->final_user_data;
+ }
+ // Invoke handshaker.
+ grpc_handshaker_do_handshake(
+ exec_ctx, mgr->handshakers[mgr->state->index], endpoint, args,
+ read_buffer, mgr->state->deadline, mgr->state->acceptor, cb, user_data);
+ ++mgr->state->index;
+ // If this is the last handshaker, clean up state.
+ if (mgr->state->index == mgr->count) {
+ gpr_free(mgr->state);
+ mgr->state = NULL;
+ }
+}
+
+void grpc_handshake_manager_do_handshake(
+ grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
+ grpc_endpoint* endpoint, const grpc_channel_args* args,
+ gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data) {
+ grpc_channel_args* args_copy = grpc_channel_args_copy(args);
+ gpr_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer));
+ gpr_slice_buffer_init(read_buffer);
+ if (mgr->count == 0) {
+ // No handshakers registered, so we just immediately call the done
+ // callback with the passed-in endpoint.
+ cb(exec_ctx, endpoint, args_copy, read_buffer, user_data, GRPC_ERROR_NONE);
+ } else {
+ GPR_ASSERT(mgr->state == NULL);
+ mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
+ memset(mgr->state, 0, sizeof(*mgr->state));
+ mgr->state->deadline = deadline;
+ mgr->state->acceptor = acceptor;
+ mgr->state->final_cb = cb;
+ mgr->state->final_user_data = user_data;
+ call_next_handshaker(exec_ctx, endpoint, args_copy, read_buffer, mgr,
+ GRPC_ERROR_NONE);
+ }
+}
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
new file mode 100644
index 0000000000..d574b46242
--- /dev/null
+++ b/src/core/lib/channel/handshaker.h
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H
+#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+
+/// Handshakers are used to perform initial handshakes on a connection
+/// before the client sends the initial request. Some examples of what
+/// a handshaker can be used for includes support for HTTP CONNECT on
+/// the client side and various types of security initialization.
+///
+/// In general, handshakers should be used via a handshake manager.
+
+///
+/// grpc_handshaker
+///
+
+typedef struct grpc_handshaker grpc_handshaker;
+
+/// Callback type invoked when a handshaker is done.
+/// Takes ownership of \a args and \a read_buffer.
+typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer,
+ void* user_data, grpc_error* error);
+
+struct grpc_handshaker_vtable {
+ /// Destroys the handshaker.
+ void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
+
+ /// Shuts down the handshaker (e.g., to clean up when the operation is
+ /// aborted in the middle).
+ void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
+
+ /// Performs handshaking. When finished, calls \a cb with \a user_data.
+ /// Takes ownership of \a args.
+ /// Takes ownership of \a read_buffer, which contains leftover bytes read
+ /// from the endpoint by the previous handshaker.
+ /// \a acceptor will be NULL for client-side handshakers.
+ void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
+ grpc_endpoint* endpoint, grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer, gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data);
+};
+
+/// Base struct. To subclass, make this the first member of the
+/// implementation struct.
+struct grpc_handshaker {
+ const struct grpc_handshaker_vtable* vtable;
+};
+
+/// Called by concrete implementations to initialize the base struct.
+void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+ grpc_handshaker* handshaker);
+
+/// Convenient wrappers for invoking methods via the vtable.
+/// These probably do not need to be called from anywhere but
+/// grpc_handshake_manager.
+void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker);
+void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker);
+void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer,
+ gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data);
+
+///
+/// grpc_handshake_manager
+///
+
+typedef struct grpc_handshake_manager grpc_handshake_manager;
+
+/// Creates a new handshake manager. Caller takes ownership.
+grpc_handshake_manager* grpc_handshake_manager_create();
+
+/// Adds a handshaker to the handshake manager.
+/// Takes ownership of \a handshaker.
+void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
+ grpc_handshaker* handshaker);
+
+/// Destroys the handshake manager.
+void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr);
+
+/// Shuts down the handshake manager (e.g., to clean up when the operation is
+/// aborted in the middle).
+/// The caller must still call grpc_handshake_manager_destroy() after
+/// calling this function.
+void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr);
+
+/// Invokes handshakers in the order they were added.
+/// Does NOT take ownership of \a args. Instead, makes a copy before
+/// invoking the first handshaker.
+/// \a acceptor will be NULL for client-side handshakers.
+/// Invokes \a cb with \a user_data after either a handshaker fails or
+/// all handshakers have completed successfully.
+void grpc_handshake_manager_do_handshake(
+ grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
+ grpc_endpoint* endpoint, const grpc_channel_args* args,
+ gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data);
+
+#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 8057e251f0..1dc05fb20d 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -43,6 +43,9 @@
#define EXPECTED_CONTENT_TYPE "application/grpc"
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
+/* default maximum size of payload eligable for GET request */
+static const size_t kMaxPayloadSizeForGet = 2048;
+
typedef struct call_data {
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
@@ -50,20 +53,39 @@ typedef struct call_data {
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
+ grpc_linked_mdelem payload_bin;
grpc_metadata_batch *recv_initial_metadata;
+ uint8_t *payload_bytes;
+
+ /* Vars to read data off of send_message */
+ grpc_transport_stream_op send_op;
+ uint32_t send_length;
+ uint32_t send_flags;
+ gpr_slice incoming_slice;
+ grpc_slice_buffer_stream replacement_stream;
+ gpr_slice_buffer slices;
+ /* flag that indicates that all slices of send_messages aren't availble */
+ bool send_message_blocked;
/** Closure to call when finished with the hc_on_recv hook */
grpc_closure *on_done_recv;
+ grpc_closure *on_complete;
+ grpc_closure *post_send;
+
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
grpc_closure hc_on_recv;
+ grpc_closure hc_on_complete;
+ grpc_closure got_slice;
+ grpc_closure send_done;
} call_data;
typedef struct channel_data {
grpc_mdelem *static_scheme;
grpc_mdelem *user_agent;
+ size_t max_payload_size_for_get;
} channel_data;
typedef struct {
@@ -81,8 +103,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
grpc_mdstr_as_c_string(md->value));
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_cancel_with_message(a->exec_ctx, a->elem,
- GRPC_STATUS_CANCELLED, &message);
+ grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
+ GRPC_STATUS_CANCELLED, &message);
return NULL;
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;
@@ -119,6 +141,24 @@ static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
}
+static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *error) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (calld->payload_bytes) {
+ gpr_free(calld->payload_bytes);
+ calld->payload_bytes = NULL;
+ }
+ calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error);
+}
+
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ gpr_slice_buffer_reset_and_unref(&calld->slices);
+ calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+}
+
static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
/* eat the things we'd like to set ourselves */
if (md->key == GRPC_MDSTR_METHOD) return NULL;
@@ -129,22 +169,104 @@ static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void hc_mutate_op(grpc_call_element *elem,
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ uint8_t *wrptr = calld->payload_bytes;
+ while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
+ &calld->incoming_slice, ~(size_t)0,
+ &calld->got_slice)) {
+ memcpy(wrptr, GPR_SLICE_START_PTR(calld->incoming_slice),
+ GPR_SLICE_LENGTH(calld->incoming_slice));
+ wrptr += GPR_SLICE_LENGTH(calld->incoming_slice);
+ gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ calld->send_message_blocked = false;
+ break;
+ }
+ }
+}
+
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ calld->send_message_blocked = false;
+ gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ /* Pass down the original send_message op that was blocked.*/
+ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
+ calld->send_flags);
+ calld->send_op.send_message = &calld->replacement_stream.base;
+ calld->post_send = calld->send_op.on_complete;
+ calld->send_op.on_complete = &calld->send_done;
+ grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+ } else {
+ continue_send_message(exec_ctx, elem);
+ }
+}
+
+static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+
if (op->send_initial_metadata != NULL) {
+ /* Decide which HTTP VERB to use. We use GET if the request is marked
+ cacheable, and the operation contains both initial metadata and send
+ message, and the payload is below the size threshold, and all the data
+ for this request is immediately available. */
+ grpc_mdelem *method = GRPC_MDELEM_METHOD_POST;
+ calld->send_message_blocked = false;
+ if ((op->send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
+ op->send_message != NULL &&
+ op->send_message->length < channeld->max_payload_size_for_get) {
+ method = GRPC_MDELEM_METHOD_GET;
+ calld->send_message_blocked = true;
+ } else if (op->send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ method = GRPC_MDELEM_METHOD_PUT;
+ }
+
+ /* Attempt to read the data from send_message and create a header field. */
+ if (method == GRPC_MDELEM_METHOD_GET) {
+ /* allocate memory to hold the entire payload */
+ calld->payload_bytes = gpr_malloc(op->send_message->length);
+
+ /* read slices of send_message and copy into payload_bytes */
+ calld->send_op = *op;
+ calld->send_length = op->send_message->length;
+ calld->send_flags = op->send_message->flags;
+ continue_send_message(exec_ctx, elem);
+
+ if (calld->send_message_blocked == false) {
+ /* when all the send_message data is available, then create a MDELEM and
+ append to headers */
+ grpc_mdelem *payload_bin = grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_GRPC_PAYLOAD_BIN,
+ grpc_mdstr_from_buffer(calld->payload_bytes,
+ op->send_message->length));
+ grpc_metadata_batch_add_tail(op->send_initial_metadata,
+ &calld->payload_bin, payload_bin);
+ calld->on_complete = op->on_complete;
+ op->on_complete = &calld->hc_on_complete;
+ op->send_message = NULL;
+ } else {
+ /* Not all data is available. Fall back to POST. */
+ gpr_log(GPR_DEBUG,
+ "Request is marked Cacheable but not all data is available.\
+ Falling back to POST");
+ method = GRPC_MDELEM_METHOD_POST;
+ }
+ }
+
grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
- grpc_metadata_batch_add_head(
- op->send_initial_metadata, &calld->method,
- op->send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
- ? GRPC_MDELEM_METHOD_PUT
- : GRPC_MDELEM_METHOD_POST);
+ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
+ method);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
@@ -169,22 +291,41 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("hc_start_transport_op", 0);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- hc_mutate_op(elem, op);
+ hc_mutate_op(exec_ctx, elem, op);
GPR_TIMER_END("hc_start_transport_op", 0);
- grpc_call_next_op(exec_ctx, elem, op);
+ call_data *calld = elem->call_data;
+ if (op->send_message != NULL && calld->send_message_blocked) {
+ /* Don't forward the op. send_message contains slices that aren't ready
+ yet. The call will be forwarded by the op_complete of slice read call.
+ */
+ } else {
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
calld->on_done_recv = NULL;
+ calld->on_complete = NULL;
+ calld->payload_bytes = NULL;
+ gpr_slice_buffer_init(&calld->slices);
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem);
+ grpc_closure_init(&calld->got_slice, got_slice, elem);
+ grpc_closure_init(&calld->send_done, send_done, elem);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {}
+ const grpc_call_final_info *final_info,
+ void *ignored) {
+ call_data *calld = elem->call_data;
+ gpr_slice_buffer_destroy(&calld->slices);
+}
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
unsigned i;
@@ -207,6 +348,22 @@ static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
return GRPC_MDELEM_SCHEME_HTTP;
}
+static size_t max_payload_size_from_args(const grpc_channel_args *args) {
+ if (args != NULL) {
+ for (size_t i = 0; i < args->num_args; ++i) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET)) {
+ if (args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s: must be an integer",
+ GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET);
+ } else {
+ return (size_t)args->args[i].value.integer;
+ }
+ }
+ }
+ }
+ return kMaxPayloadSizeForGet;
+}
+
static grpc_mdstr *user_agent_from_args(const grpc_channel_args *args,
const char *transport_name) {
gpr_strvec v;
@@ -230,8 +387,9 @@ static grpc_mdstr *user_agent_from_args(const grpc_channel_args *args,
}
}
- gpr_asprintf(&tmp, "%sgrpc-c/%s (%s; %s)", is_first ? "" : " ",
- grpc_version_string(), GPR_PLATFORM_STRING, transport_name);
+ gpr_asprintf(&tmp, "%sgrpc-c/%s (%s; %s; %s)", is_first ? "" : " ",
+ grpc_version_string(), GPR_PLATFORM_STRING, transport_name,
+ grpc_g_stands_for());
is_first = 0;
gpr_strvec_add(&v, tmp);
@@ -264,6 +422,8 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!args->is_last);
GPR_ASSERT(args->optional_transport != NULL);
chand->static_scheme = scheme_from_args(args->channel_args);
+ chand->max_payload_size_for_get =
+ max_payload_size_from_args(args->channel_args);
chand->user_agent = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_USER_AGENT,
user_agent_from_args(args->channel_args,
diff --git a/src/core/lib/channel/http_client_filter.h b/src/core/lib/channel/http_client_filter.h
index 47081175ea..9e6e106e9c 100644
--- a/src/core/lib/channel/http_client_filter.h
+++ b/src/core/lib/channel/http_client_filter.h
@@ -41,4 +41,7 @@ extern const grpc_channel_filter grpc_http_client_filter;
/* Channel arg to override the http2 :scheme header */
#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
+/* Channel arg to determine maximum size of payload eligable for GET request */
+#define GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET "grpc.max_payload_size_for_get"
+
#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H */
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index d0beebd817..f2221fb0fb 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -42,6 +42,8 @@
#define EXPECTED_CONTENT_TYPE "application/grpc"
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
+extern int grpc_http_trace;
+
typedef struct call_data {
uint8_t seen_path;
uint8_t seen_method;
@@ -49,17 +51,32 @@ typedef struct call_data {
uint8_t seen_scheme;
uint8_t seen_te_trailers;
uint8_t seen_authority;
+ uint8_t seen_payload_bin;
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;
+ /* flag to ensure payload_bin is delivered only once */
+ uint8_t payload_bin_delivered;
+
grpc_metadata_batch *recv_initial_metadata;
bool *recv_idempotent_request;
+ bool *recv_cacheable_request;
/** Closure to call when finished with the hs_on_recv hook */
grpc_closure *on_done_recv;
+ /** Closure to call when we retrieve read message from the payload-bin header
+ */
+ grpc_closure *recv_message_ready;
+ grpc_closure *on_complete;
+ grpc_byte_stream **pp_recv_message;
+ gpr_slice_buffer read_slice_buffer;
+ grpc_slice_buffer_stream read_stream;
+
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
grpc_closure hs_on_recv;
+ grpc_closure hs_on_complete;
+ grpc_closure hs_recv_message_ready;
} call_data;
typedef struct channel_data { uint8_t unused; } channel_data;
@@ -76,16 +93,20 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
/* Check if it is one of the headers we care about. */
if (md == GRPC_MDELEM_TE_TRAILERS || md == GRPC_MDELEM_METHOD_POST ||
- md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_SCHEME_HTTP ||
- md == GRPC_MDELEM_SCHEME_HTTPS ||
+ md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_METHOD_GET ||
+ md == GRPC_MDELEM_SCHEME_HTTP || md == GRPC_MDELEM_SCHEME_HTTPS ||
md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
/* swallow it */
if (md == GRPC_MDELEM_METHOD_POST) {
calld->seen_method = 1;
*calld->recv_idempotent_request = false;
+ *calld->recv_cacheable_request = false;
} else if (md == GRPC_MDELEM_METHOD_PUT) {
calld->seen_method = 1;
*calld->recv_idempotent_request = true;
+ } else if (md == GRPC_MDELEM_METHOD_GET) {
+ calld->seen_method = 1;
+ *calld->recv_cacheable_request = true;
} else if (md->key == GRPC_MDSTR_SCHEME) {
calld->seen_scheme = 1;
} else if (md == GRPC_MDELEM_TE_TRAILERS) {
@@ -137,6 +158,16 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
calld->seen_authority = 1;
return authority;
+ } else if (md->key == GRPC_MDSTR_GRPC_PAYLOAD_BIN) {
+ /* Retrieve the payload from the value of the 'grpc-internal-payload-bin'
+ header field */
+ calld->seen_payload_bin = 1;
+ gpr_slice_buffer_init(&calld->read_slice_buffer);
+ gpr_slice_buffer_add(&calld->read_slice_buffer,
+ gpr_slice_ref(md->value->slice));
+ grpc_slice_buffer_stream_init(&calld->read_stream,
+ &calld->read_slice_buffer, 0);
+ return NULL;
} else {
return md;
}
@@ -180,6 +211,11 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
err, GRPC_ERROR_CREATE("Missing te: trailers header"));
}
/* Error this call out */
+ if (grpc_http_trace) {
+ const char *error_str = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "Invalid http2 headers: %s", error_str);
+ grpc_error_free_string(error_str);
+ }
grpc_call_element_send_cancel(exec_ctx, elem);
}
} else {
@@ -189,6 +225,36 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_ERROR_UNREF(err);
}
+static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ /* Call recv_message_ready if we got the payload via the header field */
+ if (calld->seen_payload_bin && calld->recv_message_ready != NULL) {
+ *calld->pp_recv_message = calld->payload_bin_delivered
+ ? NULL
+ : (grpc_byte_stream *)&calld->read_stream;
+ calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
+ err);
+ calld->recv_message_ready = NULL;
+ calld->payload_bin_delivered = true;
+ }
+ calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, err);
+}
+
+static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (calld->seen_payload_bin) {
+ /* do nothing. This is probably a GET request, and payload will be returned
+ in hs_on_complete callback. */
+ } else {
+ calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
+ err);
+ }
+}
+
static void hs_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
@@ -206,11 +272,25 @@ static void hs_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
GPR_ASSERT(op->recv_idempotent_request != NULL);
+ GPR_ASSERT(op->recv_cacheable_request != NULL);
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->recv_idempotent_request = op->recv_idempotent_request;
+ calld->recv_cacheable_request = op->recv_cacheable_request;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->hs_on_recv;
}
+
+ if (op->recv_message) {
+ calld->recv_message_ready = op->recv_message_ready;
+ calld->pp_recv_message = op->recv_message;
+ if (op->recv_message_ready) {
+ op->recv_message_ready = &calld->hs_recv_message_ready;
+ }
+ if (op->on_complete) {
+ calld->on_complete = op->on_complete;
+ op->on_complete = &calld->hs_on_complete;
+ }
+ }
}
static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -224,18 +304,23 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
+ grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem);
+ grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {}
+ const grpc_call_final_info *final_info,
+ void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
new file mode 100644
index 0000000000..7dc5ae0df1
--- /dev/null
+++ b/src/core/lib/channel/message_size_filter.c
@@ -0,0 +1,251 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/lib/channel/message_size_filter.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/transport/method_config.h"
+
+#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
+// The protobuf library will (by default) start warning at 100 megs.
+#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
+
+typedef struct message_size_limits {
+ int max_send_size;
+ int max_recv_size;
+} message_size_limits;
+
+static void* message_size_limits_copy(void* value) {
+ void* new_value = gpr_malloc(sizeof(message_size_limits));
+ memcpy(new_value, value, sizeof(message_size_limits));
+ return new_value;
+}
+
+static int message_size_limits_cmp(void* value1, void* value2) {
+ const message_size_limits* v1 = value1;
+ const message_size_limits* v2 = value2;
+ if (v1->max_send_size > v2->max_send_size) return 1;
+ if (v1->max_send_size < v2->max_send_size) return -1;
+ if (v1->max_recv_size > v2->max_recv_size) return 1;
+ if (v1->max_recv_size < v2->max_recv_size) return -1;
+ return 0;
+}
+
+static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = {
+ gpr_free, message_size_limits_copy, message_size_limits_cmp};
+
+static void* method_config_convert_value(
+ const grpc_method_config* method_config) {
+ message_size_limits* value = gpr_malloc(sizeof(message_size_limits));
+ const int32_t* max_request_message_bytes =
+ grpc_method_config_get_max_request_message_bytes(method_config);
+ value->max_send_size =
+ max_request_message_bytes != NULL ? *max_request_message_bytes : -1;
+ const int32_t* max_response_message_bytes =
+ grpc_method_config_get_max_response_message_bytes(method_config);
+ value->max_recv_size =
+ max_response_message_bytes != NULL ? *max_response_message_bytes : -1;
+ return value;
+}
+
+typedef struct call_data {
+ int max_send_size;
+ int max_recv_size;
+ // Receive closures are chained: we inject this closure as the
+ // recv_message_ready up-call on transport_stream_op, and remember to
+ // call our next_recv_message_ready member after handling it.
+ grpc_closure recv_message_ready;
+ // Used by recv_message_ready.
+ grpc_byte_stream** recv_message;
+ // Original recv_message_ready callback, invoked after our own.
+ grpc_closure* next_recv_message_ready;
+} call_data;
+
+typedef struct channel_data {
+ int max_send_size;
+ int max_recv_size;
+ // Maps path names to message_size_limits structs.
+ grpc_mdstr_hash_table* method_limit_table;
+} channel_data;
+
+// Callback invoked when we receive a message. Here we check the max
+// receive message size.
+static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = user_data;
+ call_data* calld = elem->call_data;
+ if (*calld->recv_message != NULL && calld->max_recv_size >= 0 &&
+ (*calld->recv_message)->length > (size_t)calld->max_recv_size) {
+ char* message_string;
+ gpr_asprintf(&message_string,
+ "Received message larger than max (%u vs. %d)",
+ (*calld->recv_message)->length, calld->max_recv_size);
+ grpc_error* new_error = grpc_error_set_int(
+ GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_INVALID_ARGUMENT);
+ if (error == GRPC_ERROR_NONE) {
+ error = new_error;
+ } else {
+ error = grpc_error_add_child(error, new_error);
+ GRPC_ERROR_UNREF(new_error);
+ }
+ gpr_free(message_string);
+ }
+ // Invoke the next callback.
+ grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
+}
+
+// Start transport stream op.
+static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ call_data* calld = elem->call_data;
+ // Check max send message size.
+ if (op->send_message != NULL && calld->max_send_size >= 0 &&
+ op->send_message->length > (size_t)calld->max_send_size) {
+ char* message_string;
+ gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
+ op->send_message->length, calld->max_send_size);
+ gpr_slice message = gpr_slice_from_copied_string(message_string);
+ gpr_free(message_string);
+ grpc_call_element_send_close_with_message(
+ exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
+ }
+ // Inject callback for receiving a message.
+ if (op->recv_message_ready != NULL) {
+ calld->next_recv_message_ready = op->recv_message_ready;
+ calld->recv_message = op->recv_message;
+ op->recv_message_ready = &calld->recv_message_ready;
+ }
+ // Chain to the next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+// Constructor for call_data.
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_call_element_args* args) {
+ channel_data* chand = elem->channel_data;
+ call_data* calld = elem->call_data;
+ calld->next_recv_message_ready = NULL;
+ grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem);
+ // Get max sizes from channel data, then merge in per-method config values.
+ // Note: Per-method config is only available on the client, so we
+ // apply the max request size to the send limit and the max response
+ // size to the receive limit.
+ calld->max_send_size = chand->max_send_size;
+ calld->max_recv_size = chand->max_recv_size;
+ if (chand->method_limit_table != NULL) {
+ message_size_limits* limits =
+ grpc_method_config_table_get(chand->method_limit_table, args->path);
+ if (limits != NULL) {
+ if (limits->max_send_size >= 0 &&
+ (limits->max_send_size < calld->max_send_size ||
+ calld->max_send_size < 0)) {
+ calld->max_send_size = limits->max_send_size;
+ }
+ if (limits->max_recv_size >= 0 &&
+ (limits->max_recv_size < calld->max_recv_size ||
+ calld->max_recv_size < 0)) {
+ calld->max_recv_size = limits->max_recv_size;
+ }
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for call_data.
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ void* ignored) {}
+
+// Constructor for channel_data.
+static void init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+ channel_data* chand = elem->channel_data;
+ memset(chand, 0, sizeof(*chand));
+ chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
+ chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH;
+ for (size_t i = 0; i < args->channel_args->num_args; ++i) {
+ if (strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) {
+ const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0,
+ INT_MAX};
+ chand->max_send_size =
+ grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
+ }
+ if (strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
+ const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0,
+ INT_MAX};
+ chand->max_recv_size =
+ grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
+ }
+ }
+ // Get method config table from channel args.
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ chand->method_limit_table = grpc_method_config_table_convert(
+ (grpc_method_config_table*)channel_arg->value.pointer.p,
+ method_config_convert_value, &message_size_limits_vtable);
+ }
+}
+
+// Destructor for channel_data.
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {
+ channel_data* chand = elem->channel_data;
+ grpc_mdstr_hash_table_unref(chand->method_limit_table);
+}
+
+const grpc_channel_filter grpc_message_size_filter = {
+ start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "message_size"};
diff --git a/src/core/lib/channel/message_size_filter.h b/src/core/lib/channel/message_size_filter.h
new file mode 100644
index 0000000000..a88ff7f81a
--- /dev/null
+++ b/src/core/lib/channel/message_size_filter.h
@@ -0,0 +1,39 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_message_size_filter;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H */