aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-03-31 09:43:03 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-03-31 09:43:03 -0700
commit2877d7398e9f6bf815af3f42f21cb4c3d917be2e (patch)
tree7a08a3e544cf8583d4bb2405bb53e48ebdac5a65 /src/core/lib/channel
parent2c73b22885ea07ecd338e71a0baf5cdcfc45605a (diff)
parent2b9012362ca03181eefaec22238c6100e77bb736 (diff)
Merge branch 'master' of github.com:grpc/grpc into lb_resolvers
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_args.c271
-rw-r--r--src/core/lib/channel/channel_args.h94
-rw-r--r--src/core/lib/channel/channel_stack.c262
-rw-r--r--src/core/lib/channel/channel_stack.h260
-rw-r--r--src/core/lib/channel/channel_stack_builder.c258
-rw-r--r--src/core/lib/channel/channel_stack_builder.h155
-rw-r--r--src/core/lib/channel/client_channel.c526
-rw-r--r--src/core/lib/channel/client_channel.h63
-rw-r--r--src/core/lib/channel/compress_filter.c304
-rw-r--r--src/core/lib/channel/compress_filter.h65
-rw-r--r--src/core/lib/channel/connected_channel.c176
-rw-r--r--src/core/lib/channel/connected_channel.h42
-rw-r--r--src/core/lib/channel/context.h49
-rw-r--r--src/core/lib/channel/http_client_filter.c257
-rw-r--r--src/core/lib/channel/http_client_filter.h44
-rw-r--r--src/core/lib/channel/http_server_filter.c248
-rw-r--r--src/core/lib/channel/http_server_filter.h42
-rw-r--r--src/core/lib/channel/subchannel_call_holder.c259
-rw-r--r--src/core/lib/channel/subchannel_call_holder.h97
19 files changed, 3472 insertions, 0 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
new file mode 100644
index 0000000000..1a02f1f4aa
--- /dev/null
+++ b/src/core/lib/channel/channel_args.c
@@ -0,0 +1,271 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/channel_args.h"
+#include <grpc/grpc.h>
+#include "src/core/lib/support/string.h"
+
+#include <grpc/census.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include <string.h>
+
+static grpc_arg copy_arg(const grpc_arg *src) {
+ grpc_arg dst;
+ dst.type = src->type;
+ dst.key = gpr_strdup(src->key);
+ switch (dst.type) {
+ case GRPC_ARG_STRING:
+ dst.value.string = gpr_strdup(src->value.string);
+ break;
+ case GRPC_ARG_INTEGER:
+ dst.value.integer = src->value.integer;
+ break;
+ case GRPC_ARG_POINTER:
+ dst.value.pointer = src->value.pointer;
+ dst.value.pointer.p =
+ src->value.pointer.vtable->copy(src->value.pointer.p);
+ break;
+ }
+ return dst;
+}
+
+grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
+ const grpc_arg *to_add,
+ size_t num_to_add) {
+ grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
+ size_t i;
+ size_t src_num_args = (src == NULL) ? 0 : src->num_args;
+ if (!src && !to_add) {
+ dst->num_args = 0;
+ dst->args = NULL;
+ return dst;
+ }
+ dst->num_args = src_num_args + num_to_add;
+ dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
+ for (i = 0; i < src_num_args; i++) {
+ dst->args[i] = copy_arg(&src->args[i]);
+ }
+ for (i = 0; i < num_to_add; i++) {
+ dst->args[i + src_num_args] = copy_arg(&to_add[i]);
+ }
+ return dst;
+}
+
+grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) {
+ return grpc_channel_args_copy_and_add(src, NULL, 0);
+}
+
+grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+ const grpc_channel_args *b) {
+ return grpc_channel_args_copy_and_add(a, b->args, b->num_args);
+}
+
+static int cmp_arg(const grpc_arg *a, const grpc_arg *b) {
+ int c = GPR_ICMP(a->type, b->type);
+ if (c != 0) return c;
+ c = strcmp(a->key, b->key);
+ if (c != 0) return c;
+ switch (a->type) {
+ case GRPC_ARG_STRING:
+ return strcmp(a->value.string, b->value.string);
+ case GRPC_ARG_INTEGER:
+ return GPR_ICMP(a->value.integer, b->value.integer);
+ case GRPC_ARG_POINTER:
+ c = GPR_ICMP(a->value.pointer.p, b->value.pointer.p);
+ if (c != 0) {
+ c = GPR_ICMP(a->value.pointer.vtable, b->value.pointer.vtable);
+ if (c == 0) {
+ c = a->value.pointer.vtable->cmp(a->value.pointer.p,
+ b->value.pointer.p);
+ }
+ }
+ return c;
+ }
+ GPR_UNREACHABLE_CODE(return 0);
+}
+
+/* stabilizing comparison function: since channel_args ordering matters for
+ * keys with the same name, we need to preserve that ordering */
+static int cmp_key_stable(const void *ap, const void *bp) {
+ const grpc_arg *const *a = ap;
+ const grpc_arg *const *b = bp;
+ int c = strcmp((*a)->key, (*b)->key);
+ if (c == 0) c = GPR_ICMP(*a, *b);
+ return c;
+}
+
+grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a) {
+ grpc_arg **args = gpr_malloc(sizeof(grpc_arg *) * a->num_args);
+ for (size_t i = 0; i < a->num_args; i++) {
+ args[i] = &a->args[i];
+ }
+ qsort(args, a->num_args, sizeof(grpc_arg *), cmp_key_stable);
+
+ grpc_channel_args *b = gpr_malloc(sizeof(grpc_channel_args));
+ b->num_args = a->num_args;
+ b->args = gpr_malloc(sizeof(grpc_arg) * b->num_args);
+ for (size_t i = 0; i < a->num_args; i++) {
+ b->args[i] = copy_arg(args[i]);
+ }
+
+ gpr_free(args);
+ return b;
+}
+
+void grpc_channel_args_destroy(grpc_channel_args *a) {
+ size_t i;
+ for (i = 0; i < a->num_args; i++) {
+ switch (a->args[i].type) {
+ case GRPC_ARG_STRING:
+ gpr_free(a->args[i].value.string);
+ break;
+ case GRPC_ARG_INTEGER:
+ break;
+ case GRPC_ARG_POINTER:
+ a->args[i].value.pointer.vtable->destroy(a->args[i].value.pointer.p);
+ break;
+ }
+ gpr_free(a->args[i].key);
+ }
+ gpr_free(a->args);
+ gpr_free(a);
+}
+
+int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
+ size_t i;
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; i++) {
+ if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
+ return a->args[i].value.integer != 0 && census_enabled();
+ }
+ }
+ return census_enabled();
+}
+
+grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
+ const grpc_channel_args *a) {
+ size_t i;
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
+ return (grpc_compression_algorithm)a->args[i].value.integer;
+ break;
+ }
+ }
+ return GRPC_COMPRESS_NONE;
+}
+
+grpc_channel_args *grpc_channel_args_set_compression_algorithm(
+ grpc_channel_args *a, grpc_compression_algorithm algorithm) {
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
+ tmp.value.integer = algorithm;
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
+}
+
+/** Returns 1 if the argument for compression algorithm's enabled states bitset
+ * was found in \a a, returning the arg's value in \a states. Otherwise, returns
+ * 0. */
+static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
+ int **states_arg) {
+ if (a != NULL) {
+ size_t i;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
+ *states_arg = &a->args[i].value.integer;
+ return 1; /* GPR_TRUE */
+ }
+ }
+ }
+ return 0; /* GPR_FALSE */
+}
+
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) {
+ int *states_arg;
+ grpc_channel_args *result = *a;
+ const int states_arg_found =
+ find_compression_algorithm_states_bitset(*a, &states_arg);
+
+ if (states_arg_found) {
+ if (state != 0) {
+ GPR_BITSET((unsigned *)states_arg, algorithm);
+ } else {
+ GPR_BITCLEAR((unsigned *)states_arg, algorithm);
+ }
+ } else {
+ /* create a new arg */
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ /* all enabled by default */
+ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
+ if (state != 0) {
+ GPR_BITSET((unsigned *)&tmp.value.integer, algorithm);
+ } else {
+ GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm);
+ }
+ result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
+ grpc_channel_args_destroy(*a);
+ *a = result;
+ }
+ return result;
+}
+
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a) {
+ int *states_arg;
+ if (find_compression_algorithm_states_bitset(a, &states_arg)) {
+ return *states_arg;
+ } else {
+ return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
+ }
+}
+
+int grpc_channel_args_compare(const grpc_channel_args *a,
+ const grpc_channel_args *b) {
+ int c = GPR_ICMP(a->num_args, b->num_args);
+ if (c != 0) return c;
+ for (size_t i = 0; i < a->num_args; i++) {
+ c = cmp_arg(&a->args[i], &b->args[i]);
+ if (c != 0) return c;
+ }
+ return 0;
+}
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
new file mode 100644
index 0000000000..1ea202c543
--- /dev/null
+++ b/src/core/lib/channel/channel_args.h
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H
+#define GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H
+
+#include <grpc/compression.h>
+#include <grpc/grpc.h>
+
+/** Copy the arguments in \a src into a new instance */
+grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
+
+/** Copy the arguments in \a src into a new instance, stably sorting keys */
+grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *src);
+
+/** Copy the arguments in \a src and append \a to_add. If \a to_add is NULL, it
+ * is equivalent to calling \a grpc_channel_args_copy. */
+grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
+ const grpc_arg *to_add,
+ size_t num_to_add);
+
+/** Concatenate args from \a a and \a b into a new instance */
+grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+ const grpc_channel_args *b);
+
+/** Destroy arguments created by \a grpc_channel_args_copy */
+void grpc_channel_args_destroy(grpc_channel_args *a);
+
+/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
+ * is specified in channel args, otherwise returns 0. */
+int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
+
+/** Returns the compression algorithm set in \a a. */
+grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
+ const grpc_channel_args *a);
+
+/** Returns a channel arg instance with compression enabled. If \a a is
+ * non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression
+ * for the channel. */
+grpc_channel_args *grpc_channel_args_set_compression_algorithm(
+ grpc_channel_args *a, grpc_compression_algorithm algorithm);
+
+/** Sets the support for the given compression algorithm. By default, all
+ * compression algorithms are enabled. It's an error to disable an algorithm set
+ * by grpc_channel_args_set_compression_algorithm.
+ *
+ * Returns an instance with the updated algorithm states. The \a a pointer is
+ * modified to point to the returned instance (which may be different from the
+ * input value of \a a). */
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled);
+
+/** Returns the bitset representing the support state (true for enabled, false
+ * for disabled) for compression algorithms.
+ *
+ * The i-th bit of the returned bitset corresponds to the i-th entry in the
+ * grpc_compression_algorithm enum. */
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a);
+
+int grpc_channel_args_compare(const grpc_channel_args *a,
+ const grpc_channel_args *b);
+
+#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
new file mode 100644
index 0000000000..52283e35fa
--- /dev/null
+++ b/src/core/lib/channel/channel_stack.c
@@ -0,0 +1,262 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/channel_stack.h"
+#include <grpc/support/log.h>
+
+#include <stdlib.h>
+#include <string.h>
+
+int grpc_trace_channel = 0;
+
+/* Memory layouts.
+
+ Channel stack is laid out as: {
+ grpc_channel_stack stk;
+ padding to GPR_MAX_ALIGNMENT
+ grpc_channel_element[stk.count];
+ per-filter memory, aligned to GPR_MAX_ALIGNMENT
+ }
+
+ Call stack is laid out as: {
+ grpc_call_stack stk;
+ padding to GPR_MAX_ALIGNMENT
+ grpc_call_element[stk.count];
+ per-filter memory, aligned to GPR_MAX_ALIGNMENT
+ } */
+
+/* Given a size, round up to the next multiple of sizeof(void*) */
+#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
+ (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
+
+size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
+ size_t filter_count) {
+ /* always need the header, and size for the channel elements */
+ size_t size =
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
+ size_t i;
+
+ GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 &&
+ "GPR_MAX_ALIGNMENT must be a power of two");
+
+ /* add the size for each filter */
+ for (i = 0; i < filter_count; i++) {
+ size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
+ }
+
+ return size;
+}
+
+#define CHANNEL_ELEMS_FROM_STACK(stk) \
+ ((grpc_channel_element *)((char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE( \
+ sizeof(grpc_channel_stack))))
+
+#define CALL_ELEMS_FROM_STACK(stk) \
+ ((grpc_call_element *)((char *)(stk) + \
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack))))
+
+grpc_channel_element *grpc_channel_stack_element(
+ grpc_channel_stack *channel_stack, size_t index) {
+ return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index;
+}
+
+grpc_channel_element *grpc_channel_stack_last_element(
+ grpc_channel_stack *channel_stack) {
+ return grpc_channel_stack_element(channel_stack, channel_stack->count - 1);
+}
+
+grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
+ size_t index) {
+ return CALL_ELEMS_FROM_STACK(call_stack) + index;
+}
+
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
+ const grpc_channel_filter **filters,
+ size_t filter_count,
+ const grpc_channel_args *channel_args,
+ const char *name, grpc_channel_stack *stack) {
+ size_t call_size =
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
+ grpc_channel_element *elems;
+ grpc_channel_element_args args;
+ char *user_data;
+ size_t i;
+
+ stack->count = filter_count;
+ GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
+ name);
+ elems = CHANNEL_ELEMS_FROM_STACK(stack);
+ user_data =
+ ((char *)elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
+
+ /* init per-filter data */
+ for (i = 0; i < filter_count; i++) {
+ args.channel_stack = stack;
+ args.channel_args = channel_args;
+ args.is_first = i == 0;
+ args.is_last = i == (filter_count - 1);
+ elems[i].filter = filters[i];
+ elems[i].channel_data = user_data;
+ elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args);
+ user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
+ call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
+ }
+
+ GPR_ASSERT(user_data > (char *)stack);
+ GPR_ASSERT((uintptr_t)(user_data - (char *)stack) ==
+ grpc_channel_stack_size(filters, filter_count));
+
+ stack->call_stack_size = call_size;
+}
+
+void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *stack) {
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack);
+ size_t count = stack->count;
+ size_t i;
+
+ /* destroy per-filter data */
+ for (i = 0; i < count; i++) {
+ channel_elems[i].filter->destroy_channel_elem(exec_ctx, &channel_elems[i]);
+ }
+}
+
+void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack) {
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
+ grpc_call_element_args args;
+ size_t count = channel_stack->count;
+ grpc_call_element *call_elems;
+ char *user_data;
+ size_t i;
+
+ call_stack->count = count;
+ GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy,
+ destroy_arg, "CALL_STACK");
+ call_elems = CALL_ELEMS_FROM_STACK(call_stack);
+ user_data = ((char *)call_elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
+
+ /* init per-filter data */
+ for (i = 0; i < count; i++) {
+ args.call_stack = call_stack;
+ args.server_transport_data = transport_server_data;
+ args.context = context;
+ call_elems[i].filter = channel_elems[i].filter;
+ call_elems[i].channel_data = channel_elems[i].channel_data;
+ call_elems[i].call_data = user_data;
+ call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
+ user_data +=
+ ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
+ }
+}
+
+void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_stack *call_stack,
+ grpc_pollset *pollset) {
+ size_t count = call_stack->count;
+ grpc_call_element *call_elems;
+ char *user_data;
+ size_t i;
+
+ call_elems = CALL_ELEMS_FROM_STACK(call_stack);
+ user_data = ((char *)call_elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
+
+ /* init per-filter data */
+ for (i = 0; i < count; i++) {
+ call_elems[i].filter->set_pollset(exec_ctx, &call_elems[i], pollset);
+ user_data +=
+ ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
+ }
+}
+
+void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_pollset *pollset) {}
+
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
+ grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
+ size_t count = stack->count;
+ size_t i;
+
+ /* destroy per-filter data */
+ for (i = 0; i < count; i++) {
+ elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]);
+ }
+}
+
+void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ grpc_call_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op);
+}
+
+char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ grpc_call_element *next_elem = elem + 1;
+ return next_elem->filter->get_peer(exec_ctx, next_elem);
+}
+
+void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ grpc_channel_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(exec_ctx, next_elem, op);
+}
+
+grpc_channel_stack *grpc_channel_stack_from_top_element(
+ grpc_channel_element *elem) {
+ return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_channel_stack)));
+}
+
+grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
+ return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_call_stack)));
+}
+
+void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem) {
+ grpc_transport_stream_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ grpc_call_next_op(exec_ctx, cur_elem, &op);
+}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
new file mode 100644
index 0000000000..b29bee411d
--- /dev/null
+++ b/src/core/lib/channel/channel_stack.h
@@ -0,0 +1,260 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
+#define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
+
+/* A channel filter defines how operations on a channel are implemented.
+ Channel filters are chained together to create full channels, and if those
+ chains are linear, then channel stacks provide a mechanism to minimize
+ allocations for that chain.
+ Call stacks are created by channel stacks and represent the per-call data
+ for that stack. */
+
+#include <stddef.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/log.h>
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/transport/transport.h"
+
+typedef struct grpc_channel_element grpc_channel_element;
+typedef struct grpc_call_element grpc_call_element;
+
+typedef struct grpc_channel_stack grpc_channel_stack;
+typedef struct grpc_call_stack grpc_call_stack;
+
+typedef struct {
+ grpc_channel_stack *channel_stack;
+ const grpc_channel_args *channel_args;
+ int is_first;
+ int is_last;
+} grpc_channel_element_args;
+
+typedef struct {
+ grpc_call_stack *call_stack;
+ const void *server_transport_data;
+ grpc_call_context_element *context;
+} grpc_call_element_args;
+
+/* Channel filters specify:
+ 1. the amount of memory needed in the channel & call (via the sizeof_XXX
+ members)
+ 2. functions to initialize and destroy channel & call data
+ (init_XXX, destroy_XXX)
+ 3. functions to implement call operations and channel operations (call_op,
+ channel_op)
+ 4. a name, which is useful when debugging
+
+ Members are laid out in approximate frequency of use order. */
+typedef struct {
+ /* Called to eg. send/receive data on a call.
+ See grpc_call_next_op on how to call the next element in the stack */
+ void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op);
+ /* Called to handle channel level operations - e.g. new calls, or transport
+ closure.
+ See grpc_channel_next_op on how to call the next element in the stack */
+ void (*start_transport_op)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_transport_op *op);
+
+ /* sizeof(per call data) */
+ size_t sizeof_call_data;
+ /* Initialize per call data.
+ elem is initialized at the start of the call, and elem->call_data is what
+ needs initializing.
+ The filter does not need to do any chaining.
+ server_transport_data is an opaque pointer. If it is NULL, this call is
+ on a client; if it is non-NULL, then it points to memory owned by the
+ transport and is on the server. Most filters want to ignore this
+ argument. */
+ void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_call_element_args *args);
+ void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset);
+ /* Destroy per call data.
+ The filter does not need to do any chaining */
+ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+
+ /* sizeof(per channel data) */
+ size_t sizeof_channel_data;
+ /* Initialize per-channel data.
+ elem is initialized at the start of the call, and elem->channel_data is
+ what needs initializing.
+ is_first, is_last designate this elements position in the stack, and are
+ useful for asserting correct configuration by upper layer code.
+ The filter does not need to do any chaining */
+ void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_channel_element_args *args);
+ /* Destroy per channel data.
+ The filter does not need to do any chaining */
+ void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem);
+
+ /* Implement grpc_call_get_peer() */
+ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+
+ /* The name of this filter */
+ const char *name;
+} grpc_channel_filter;
+
+/* A channel_element tracks its filter and the filter requested memory within
+ a channel allocation */
+struct grpc_channel_element {
+ const grpc_channel_filter *filter;
+ void *channel_data;
+};
+
+/* A call_element tracks its filter, the filter requested memory within
+ a channel allocation, and the filter requested memory within a call
+ allocation */
+struct grpc_call_element {
+ const grpc_channel_filter *filter;
+ void *channel_data;
+ void *call_data;
+};
+
+/* A channel stack tracks a set of related filters for one channel, and
+ guarantees they live within a single malloc() allocation */
+struct grpc_channel_stack {
+ grpc_stream_refcount refcount;
+ size_t count;
+ /* Memory required for a call stack (computed at channel stack
+ initialization) */
+ size_t call_stack_size;
+};
+
+/* A call stack tracks a set of related filters for one call, and guarantees
+ they live within a single malloc() allocation */
+struct grpc_call_stack {
+ /* shared refcount for this channel stack.
+ MUST be the first element: the underlying code calls destroy
+ with the address of the refcount, but higher layers prefer to think
+ about the address of the call stack itself. */
+ grpc_stream_refcount refcount;
+ size_t count;
+};
+
+/* Get a channel element given a channel stack and its index */
+grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
+ size_t i);
+/* Get the last channel element in a channel stack */
+grpc_channel_element *grpc_channel_stack_last_element(
+ grpc_channel_stack *stack);
+/* Get a call stack element given a call stack and an index */
+grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i);
+
+/* Determine memory required for a channel stack containing a set of filters */
+size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
+ size_t filter_count);
+/* Initialize a channel stack given some filters */
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
+ const grpc_channel_filter **filters,
+ size_t filter_count, const grpc_channel_args *args,
+ const char *name, grpc_channel_stack *stack);
+/* Destroy a channel stack */
+void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *stack);
+
+/* Initialize a call stack given a channel stack. transport_server_data is
+ expected to be NULL on a client, or an opaque transport owned pointer on the
+ server. */
+void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack);
+/* Set a pollset for a call stack: must occur before the first op is started */
+void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_stack *call_stack,
+ grpc_pollset *pollset);
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define GRPC_CALL_STACK_REF(call_stack, reason) \
+ grpc_stream_ref(&(call_stack)->refcount, reason)
+#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason)
+#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
+ grpc_stream_ref(&(channel_stack)->refcount, reason)
+#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason)
+#else
+#define GRPC_CALL_STACK_REF(call_stack, reason) \
+ grpc_stream_ref(&(call_stack)->refcount)
+#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(call_stack)->refcount)
+#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
+ grpc_stream_ref(&(channel_stack)->refcount)
+#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount)
+#endif
+
+/* Destroy a call stack */
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
+
+/* Ignore set pollset - used by filters to implement the set_pollset method
+ if they don't care about pollsets at all. Does nothing. */
+void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_pollset *pollset);
+/* Call the next operation in a call stack */
+void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op *op);
+/* Call the next operation (depending on call directionality) in a channel
+ stack */
+void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_transport_op *op);
+/* Pass through a request to get_peer to the next child element */
+char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+
+/* Given the top element of a channel stack, get the channel stack itself */
+grpc_channel_stack *grpc_channel_stack_from_top_element(
+ grpc_channel_element *elem);
+/* Given the top element of a call stack, get the call stack itself */
+grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_transport_stream_op *op);
+
+void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem);
+
+extern int grpc_trace_channel;
+
+#define GRPC_CALL_LOG_OP(sev, elem, op) \
+ if (grpc_trace_channel) grpc_call_log_op(sev, elem, op)
+
+#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H */
diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c
new file mode 100644
index 0000000000..1ce0c4e07f
--- /dev/null
+++ b/src/core/lib/channel/channel_stack_builder.c
@@ -0,0 +1,258 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/channel_stack_builder.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+int grpc_trace_channel_stack_builder = 0;
+
+typedef struct filter_node {
+ struct filter_node *next;
+ struct filter_node *prev;
+ const grpc_channel_filter *filter;
+ grpc_post_filter_create_init_func init;
+ void *init_arg;
+} filter_node;
+
+struct grpc_channel_stack_builder {
+ // sentinel nodes for filters that have been added
+ filter_node begin;
+ filter_node end;
+ // various set/get-able parameters
+ const grpc_channel_args *args;
+ grpc_transport *transport;
+ const char *name;
+};
+
+struct grpc_channel_stack_builder_iterator {
+ grpc_channel_stack_builder *builder;
+ filter_node *node;
+};
+
+grpc_channel_stack_builder *grpc_channel_stack_builder_create(void) {
+ grpc_channel_stack_builder *b = gpr_malloc(sizeof(*b));
+ memset(b, 0, sizeof(*b));
+
+ b->begin.filter = NULL;
+ b->end.filter = NULL;
+ b->begin.next = &b->end;
+ b->begin.prev = &b->end;
+ b->end.next = &b->begin;
+ b->end.prev = &b->begin;
+
+ return b;
+}
+
+static grpc_channel_stack_builder_iterator *create_iterator_at_filter_node(
+ grpc_channel_stack_builder *builder, filter_node *node) {
+ grpc_channel_stack_builder_iterator *it = gpr_malloc(sizeof(*it));
+ it->builder = builder;
+ it->node = node;
+ return it;
+}
+
+void grpc_channel_stack_builder_iterator_destroy(
+ grpc_channel_stack_builder_iterator *it) {
+ gpr_free(it);
+}
+
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_first(
+ grpc_channel_stack_builder *builder) {
+ return create_iterator_at_filter_node(builder, &builder->begin);
+}
+
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_last(
+ grpc_channel_stack_builder *builder) {
+ return create_iterator_at_filter_node(builder, &builder->end);
+}
+
+bool grpc_channel_stack_builder_move_next(
+ grpc_channel_stack_builder_iterator *iterator) {
+ if (iterator->node == &iterator->builder->end) return false;
+ iterator->node = iterator->node->next;
+ return true;
+}
+
+bool grpc_channel_stack_builder_move_prev(
+ grpc_channel_stack_builder_iterator *iterator) {
+ if (iterator->node == &iterator->builder->begin) return false;
+ iterator->node = iterator->node->prev;
+ return true;
+}
+
+bool grpc_channel_stack_builder_move_prev(
+ grpc_channel_stack_builder_iterator *iterator);
+
+void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder,
+ const char *name) {
+ GPR_ASSERT(builder->name == NULL);
+ builder->name = name;
+}
+
+void grpc_channel_stack_builder_set_channel_arguments(
+ grpc_channel_stack_builder *builder, const grpc_channel_args *args) {
+ GPR_ASSERT(builder->args == NULL);
+ builder->args = args;
+}
+
+void grpc_channel_stack_builder_set_transport(
+ grpc_channel_stack_builder *builder, grpc_transport *transport) {
+ GPR_ASSERT(builder->transport == NULL);
+ builder->transport = transport;
+}
+
+grpc_transport *grpc_channel_stack_builder_get_transport(
+ grpc_channel_stack_builder *builder) {
+ return builder->transport;
+}
+
+const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments(
+ grpc_channel_stack_builder *builder) {
+ return builder->args;
+}
+
+bool grpc_channel_stack_builder_append_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ grpc_channel_stack_builder_iterator *it =
+ grpc_channel_stack_builder_create_iterator_at_last(builder);
+ bool ok = grpc_channel_stack_builder_add_filter_before(
+ it, filter, post_init_func, user_data);
+ grpc_channel_stack_builder_iterator_destroy(it);
+ return ok;
+}
+
+bool grpc_channel_stack_builder_prepend_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ grpc_channel_stack_builder_iterator *it =
+ grpc_channel_stack_builder_create_iterator_at_first(builder);
+ bool ok = grpc_channel_stack_builder_add_filter_after(
+ it, filter, post_init_func, user_data);
+ grpc_channel_stack_builder_iterator_destroy(it);
+ return ok;
+}
+
+static void add_after(filter_node *before, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) {
+ filter_node *new = gpr_malloc(sizeof(*new));
+ new->next = before->next;
+ new->prev = before;
+ new->next->prev = new->prev->next = new;
+ new->filter = filter;
+ new->init = post_init_func;
+ new->init_arg = user_data;
+}
+
+bool grpc_channel_stack_builder_add_filter_before(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ if (iterator->node == &iterator->builder->begin) return false;
+ add_after(iterator->node->prev, filter, post_init_func, user_data);
+ return true;
+}
+
+bool grpc_channel_stack_builder_add_filter_after(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ if (iterator->node == &iterator->builder->end) return false;
+ add_after(iterator->node, filter, post_init_func, user_data);
+ return true;
+}
+
+void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) {
+ filter_node *p = builder->begin.next;
+ while (p != &builder->end) {
+ filter_node *next = p->next;
+ gpr_free(p);
+ p = next;
+ }
+ gpr_free(builder);
+}
+
+void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ size_t prefix_bytes, int initial_refs,
+ grpc_iomgr_cb_func destroy,
+ void *destroy_arg) {
+ // count the number of filters
+ size_t num_filters = 0;
+ for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
+ num_filters++;
+ }
+
+ // create an array of filters
+ const grpc_channel_filter **filters =
+ gpr_malloc(sizeof(*filters) * num_filters);
+ size_t i = 0;
+ for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
+ filters[i++] = p->filter;
+ }
+
+ // calculate the size of the channel stack
+ size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters);
+
+ // allocate memory, with prefix_bytes followed by channel_stack_size
+ char *result = gpr_malloc(prefix_bytes + channel_stack_size);
+ // fetch a pointer to the channel stack
+ grpc_channel_stack *channel_stack =
+ (grpc_channel_stack *)(result + prefix_bytes);
+ // and initialize it
+ grpc_channel_stack_init(exec_ctx, initial_refs, destroy,
+ destroy_arg == NULL ? result : destroy_arg, filters,
+ num_filters, builder->args, builder->name,
+ channel_stack);
+
+ // run post-initialization functions
+ i = 0;
+ for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
+ if (p->init != NULL) {
+ p->init(channel_stack, grpc_channel_stack_element(channel_stack, i),
+ p->init_arg);
+ }
+ i++;
+ }
+
+ grpc_channel_stack_builder_destroy(builder);
+ gpr_free((grpc_channel_filter **)filters);
+
+ return result;
+}
diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h
new file mode 100644
index 0000000000..8532c4462a
--- /dev/null
+++ b/src/core/lib/channel/channel_stack_builder.h
@@ -0,0 +1,155 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H
+#define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H
+
+#include <stdbool.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack.h"
+
+/// grpc_channel_stack_builder offers a programmatic interface to selected
+/// and order channel filters
+typedef struct grpc_channel_stack_builder grpc_channel_stack_builder;
+typedef struct grpc_channel_stack_builder_iterator
+ grpc_channel_stack_builder_iterator;
+
+/// Create a new channel stack builder
+grpc_channel_stack_builder *grpc_channel_stack_builder_create(void);
+
+/// Assign a name to the channel stack: \a name must be statically allocated
+void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder,
+ const char *name);
+
+/// Attach \a transport to the builder (does not take ownership)
+void grpc_channel_stack_builder_set_transport(
+ grpc_channel_stack_builder *builder, grpc_transport *transport);
+
+/// Fetch attached transport
+grpc_transport *grpc_channel_stack_builder_get_transport(
+ grpc_channel_stack_builder *builder);
+
+/// Set channel arguments: \a args must continue to exist until after
+/// grpc_channel_stack_builder_finish returns
+void grpc_channel_stack_builder_set_channel_arguments(
+ grpc_channel_stack_builder *builder, const grpc_channel_args *args);
+
+/// Return a borrowed pointer to the channel arguments
+const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments(
+ grpc_channel_stack_builder *builder);
+
+/// Begin iterating over already defined filters in the builder at the beginning
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_first(
+ grpc_channel_stack_builder *builder);
+
+/// Begin iterating over already defined filters in the builder at the end
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_last(
+ grpc_channel_stack_builder *builder);
+
+/// Is an iterator at the first element?
+bool grpc_channel_stack_builder_iterator_is_first(
+ grpc_channel_stack_builder_iterator *iterator);
+
+/// Is an iterator at the end?
+bool grpc_channel_stack_builder_iterator_is_end(
+ grpc_channel_stack_builder_iterator *iterator);
+
+/// Move an iterator to the next item
+bool grpc_channel_stack_builder_move_next(
+ grpc_channel_stack_builder_iterator *iterator);
+
+/// Move an iterator to the previous item
+bool grpc_channel_stack_builder_move_prev(
+ grpc_channel_stack_builder_iterator *iterator);
+
+typedef void (*grpc_post_filter_create_init_func)(
+ grpc_channel_stack *channel_stack, grpc_channel_element *elem, void *arg);
+
+/// Add \a filter to the stack, after \a iterator.
+/// Call \a post_init_func(..., \a user_data) once the channel stack is
+/// created.
+bool grpc_channel_stack_builder_add_filter_after(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+/// Add \a filter to the stack, before \a iterator.
+/// Call \a post_init_func(..., \a user_data) once the channel stack is
+/// created.
+bool grpc_channel_stack_builder_add_filter_before(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+/// Add \a filter to the beginning of the filter list.
+/// Call \a post_init_func(..., \a user_data) once the channel stack is
+/// created.
+bool grpc_channel_stack_builder_prepend_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+/// Add \a filter to the end of the filter list.
+/// Call \a post_init_func(..., \a user_data) once the channel stack is
+/// created.
+bool grpc_channel_stack_builder_append_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+/// Terminate iteration and destroy \a iterator
+void grpc_channel_stack_builder_iterator_destroy(
+ grpc_channel_stack_builder_iterator *iterator);
+
+/// Destroy the builder, return the freshly minted channel stack
+/// Allocates \a prefix_bytes bytes before the channel stack
+/// Returns the base pointer of the allocated block
+/// \a initial_refs, \a destroy, \a destroy_arg are as per
+/// grpc_channel_stack_init
+void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ size_t prefix_bytes, int initial_refs,
+ grpc_iomgr_cb_func destroy,
+ void *destroy_arg);
+
+/// Destroy the builder without creating a channel stack
+void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder);
+
+extern int grpc_trace_channel_stack_builder;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */
diff --git a/src/core/lib/channel/client_channel.c b/src/core/lib/channel/client_channel.c
new file mode 100644
index 0000000000..9fdf803ecf
--- /dev/null
+++ b/src/core/lib/channel/client_channel.c
@@ -0,0 +1,526 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/client_channel.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/subchannel_call_holder.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+/* Client channel implementation */
+
+typedef grpc_subchannel_call_holder call_data;
+
+typedef struct client_channel_channel_data {
+ /** resolver for this channel */
+ grpc_resolver *resolver;
+ /** have we started resolving this channel */
+ int started_resolving;
+
+ /** mutex protecting client configuration, including all
+ variables below in this data structure */
+ gpr_mu mu_config;
+ /** currently active load balancer - guarded by mu_config */
+ grpc_lb_policy *lb_policy;
+ /** incoming configuration - set by resolver.next
+ guarded by mu_config */
+ grpc_client_config *incoming_configuration;
+ /** a list of closures that are all waiting for config to come in */
+ grpc_closure_list waiting_for_config_closures;
+ /** resolver callback */
+ grpc_closure on_config_changed;
+ /** connectivity state being tracked */
+ grpc_connectivity_state_tracker state_tracker;
+ /** when an lb_policy arrives, should we try to exit idle */
+ int exit_idle_when_lb_policy_arrives;
+ /** owning stack */
+ grpc_channel_stack *owning_stack;
+ /** interested parties (owned) */
+ grpc_pollset_set *interested_parties;
+} channel_data;
+
+/** We create one watcher for each new lb_policy that is returned from a
+ resolver,
+ to watch for state changes from the lb_policy. When a state change is seen,
+ we
+ update the channel, and create a new watcher */
+typedef struct {
+ channel_data *chand;
+ grpc_closure on_changed;
+ grpc_connectivity_state state;
+ grpc_lb_policy *lb_policy;
+} lb_policy_connectivity_watcher;
+
+typedef struct {
+ grpc_closure closure;
+ grpc_call_element *elem;
+} waiting_call;
+
+static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data);
+}
+
+static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
+}
+
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state);
+
+static void on_lb_policy_state_changed_locked(
+ grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
+ grpc_connectivity_state publish_state = w->state;
+ /* check if the notification is for a stale policy */
+ if (w->lb_policy != w->chand->lb_policy) return;
+
+ if (publish_state == GRPC_CHANNEL_FATAL_FAILURE &&
+ w->chand->resolver != NULL) {
+ publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
+ GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
+ w->chand->lb_policy = NULL;
+ }
+ grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
+ "lb_changed");
+ if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
+ watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
+ }
+}
+
+static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ bool iomgr_success) {
+ lb_policy_connectivity_watcher *w = arg;
+
+ gpr_mu_lock(&w->chand->mu_config);
+ on_lb_policy_state_changed_locked(exec_ctx, w);
+ gpr_mu_unlock(&w->chand->mu_config);
+
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
+ gpr_free(w);
+}
+
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state) {
+ lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
+
+ w->chand = chand;
+ grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ w->state = current_state;
+ w->lb_policy = lb_policy;
+ grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
+ &w->on_changed);
+}
+
+static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ bool iomgr_success) {
+ channel_data *chand = arg;
+ grpc_lb_policy *lb_policy = NULL;
+ grpc_lb_policy *old_lb_policy;
+ grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ int exit_idle = 0;
+
+ if (chand->incoming_configuration != NULL) {
+ lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "channel");
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
+ }
+
+ grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
+ }
+
+ chand->incoming_configuration = NULL;
+
+ if (lb_policy != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
+ chand->interested_parties);
+ }
+
+ gpr_mu_lock(&chand->mu_config);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
+ NULL);
+ }
+ if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
+ exit_idle = 1;
+ chand->exit_idle_when_lb_policy_arrives = 0;
+ }
+
+ if (iomgr_success && chand->resolver) {
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
+ "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy(exec_ctx, chand, lb_policy, state);
+ }
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_configuration,
+ &chand->on_config_changed);
+ gpr_mu_unlock(&chand->mu_config);
+ } else {
+ if (chand->resolver != NULL) {
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ chand->resolver = NULL;
+ }
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ gpr_mu_unlock(&chand->mu_config);
+ }
+
+ if (exit_idle) {
+ grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
+ }
+
+ if (old_lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(
+ exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
+ }
+
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
+ }
+
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
+}
+
+static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ channel_data *chand = elem->channel_data;
+
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
+
+ GPR_ASSERT(op->set_accept_stream == false);
+ if (op->bind_pollset != NULL) {
+ grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
+ op->bind_pollset);
+ }
+
+ gpr_mu_lock(&chand->mu_config);
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
+
+ if (op->send_ping != NULL) {
+ if (chand->lb_policy == NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL);
+ } else {
+ grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
+ op->bind_pollset = NULL;
+ }
+ op->send_ping = NULL;
+ }
+
+ if (op->disconnect && chand->resolver != NULL) {
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ chand->resolver = NULL;
+ if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ chand->lb_policy = NULL;
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+}
+
+typedef struct {
+ grpc_metadata_batch *initial_metadata;
+ grpc_connected_subchannel **connected_subchannel;
+ grpc_closure *on_ready;
+ grpc_call_element *elem;
+ grpc_closure closure;
+} continue_picking_args;
+
+static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_metadata_batch *initial_metadata,
+ grpc_connected_subchannel **connected_subchannel,
+ grpc_closure *on_ready);
+
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ continue_picking_args *cpa = arg;
+ if (!success) {
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ } else if (cpa->connected_subchannel == NULL) {
+ /* cancelled, do nothing */
+ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->connected_subchannel, cpa->on_ready)) {
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
+ }
+ gpr_free(cpa);
+}
+
+static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
+ grpc_metadata_batch *initial_metadata,
+ grpc_connected_subchannel **connected_subchannel,
+ grpc_closure *on_ready) {
+ grpc_call_element *elem = elemp;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ continue_picking_args *cpa;
+ grpc_closure *closure;
+
+ GPR_ASSERT(connected_subchannel);
+
+ gpr_mu_lock(&chand->mu_config);
+ if (initial_metadata == NULL) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
+ connected_subchannel);
+ }
+ for (closure = chand->waiting_for_config_closures.head; closure != NULL;
+ closure = grpc_closure_next(closure)) {
+ cpa = closure->cb_arg;
+ if (cpa->connected_subchannel == connected_subchannel) {
+ cpa->connected_subchannel = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+ return 1;
+ }
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy *lb_policy = chand->lb_policy;
+ int r;
+ GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
+ gpr_mu_unlock(&chand->mu_config);
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
+ initial_metadata, connected_subchannel, on_ready);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
+ return r;
+ }
+ if (chand->resolver != NULL && !chand->started_resolving) {
+ chand->started_resolving = 1;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ cpa = gpr_malloc(sizeof(*cpa));
+ cpa->initial_metadata = initial_metadata;
+ cpa->connected_subchannel = connected_subchannel;
+ cpa->on_ready = on_ready;
+ cpa->elem = elem;
+ grpc_closure_init(&cpa->closure, continue_picking, cpa);
+ grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1);
+ gpr_mu_unlock(&chand->mu_config);
+ return 0;
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
+ args->call_stack);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *chand = elem->channel_data;
+
+ memset(chand, 0, sizeof(*chand));
+
+ GPR_ASSERT(args->is_last);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ gpr_mu_init(&chand->mu_config);
+ grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
+ chand->owning_stack = args->channel_stack;
+
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
+ "client_channel");
+ chand->interested_parties = grpc_pollset_set_create();
+}
+
+/* Destructor for channel_data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+
+ if (chand->resolver != NULL) {
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ }
+ if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ }
+ grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
+ grpc_pollset_set_destroy(chand->interested_parties);
+ gpr_mu_destroy(&chand->mu_config);
+}
+
+static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset) {
+ call_data *calld = elem->call_data;
+ calld->pollset = pollset;
+}
+
+const grpc_channel_filter grpc_client_channel_filter = {
+ cc_start_transport_stream_op,
+ cc_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ cc_set_pollset,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ cc_get_peer,
+ "client-channel",
+};
+
+void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver) {
+ /* post construction initialization: set the transport setup pointer */
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->mu_config);
+ GPR_ASSERT(!chand->resolver);
+ chand->resolver = resolver;
+ GRPC_RESOLVER_REF(resolver, "channel");
+ if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
+ chand->exit_idle_when_lb_policy_arrives) {
+ chand->started_resolving = 1;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ gpr_mu_unlock(&chand->mu_config);
+}
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
+ channel_data *chand = elem->channel_data;
+ grpc_connectivity_state out;
+ gpr_mu_lock(&chand->mu_config);
+ out = grpc_connectivity_state_check(&chand->state_tracker);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
+ } else {
+ chand->exit_idle_when_lb_policy_arrives = 1;
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ chand->started_resolving = 1;
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+ return out;
+}
+
+typedef struct {
+ channel_data *chand;
+ grpc_pollset *pollset;
+ grpc_closure *on_complete;
+ grpc_closure my_closure;
+} external_connectivity_watcher;
+
+static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ bool iomgr_success) {
+ external_connectivity_watcher *w = arg;
+ grpc_closure *follow_up = w->on_complete;
+ grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
+ w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
+}
+
+void grpc_client_channel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
+ grpc_connectivity_state *state, grpc_closure *on_complete) {
+ channel_data *chand = elem->channel_data;
+ external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ w->chand = chand;
+ w->pollset = pollset;
+ w->on_complete = on_complete;
+ grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
+ GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_mu_lock(&chand->mu_config);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, state, &w->my_closure);
+ gpr_mu_unlock(&chand->mu_config);
+}
diff --git a/src/core/lib/channel/client_channel.h b/src/core/lib/channel/client_channel.h
new file mode 100644
index 0000000000..8777796fb6
--- /dev/null
+++ b/src/core/lib/channel/client_channel.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H
+#define GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H
+
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/client_config/resolver.h"
+
+/* A client channel is a channel that begins disconnected, and can connect
+ to some endpoint on demand. If that endpoint disconnects, it will be
+ connected to again later.
+
+ Calls on a disconnected client channel are queued until a connection is
+ established. */
+
+extern const grpc_channel_filter grpc_client_channel_filter;
+
+/* post-construction initializer to let the client channel know which
+ transport setup it should cancel upon destruction, or initiate when it needs
+ a connection */
+void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver);
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
+
+void grpc_client_channel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
+ grpc_connectivity_state *state, grpc_closure *on_complete);
+
+#endif /* GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
new file mode 100644
index 0000000000..04bb7cc76f
--- /dev/null
+++ b/src/core/lib/channel/compress_filter.c
@@ -0,0 +1,304 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <assert.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/compress_filter.h"
+#include "src/core/lib/compression/algorithm_metadata.h"
+#include "src/core/lib/compression/message_compress.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+typedef struct call_data {
+ gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
+ grpc_linked_mdelem compression_algorithm_storage;
+ grpc_linked_mdelem accept_encoding_storage;
+ uint32_t remaining_slice_bytes;
+ /** Compression algorithm we'll try to use. It may be given by incoming
+ * metadata, or by the channel's default compression settings. */
+ grpc_compression_algorithm compression_algorithm;
+ /** If true, contents of \a compression_algorithm are authoritative */
+ int has_compression_algorithm;
+
+ grpc_transport_stream_op send_op;
+ uint32_t send_length;
+ uint32_t send_flags;
+ gpr_slice incoming_slice;
+ grpc_slice_buffer_stream replacement_stream;
+ grpc_closure *post_send;
+ grpc_closure send_done;
+ grpc_closure got_slice;
+} call_data;
+
+typedef struct channel_data {
+ /** The default, channel-level, compression algorithm */
+ grpc_compression_algorithm default_compression_algorithm;
+ /** Compression options for the channel */
+ grpc_compression_options compression_options;
+ /** Supported compression algorithms */
+ uint32_t supported_compression_algorithms;
+} channel_data;
+
+/** For each \a md element from the incoming metadata, filter out the entry for
+ * "grpc-encoding", using its value to populate the call data's
+ * compression_algorithm field. */
+static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ if (md->key == GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST) {
+ const char *md_c_str = grpc_mdstr_as_c_string(md->value);
+ if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
+ &calld->compression_algorithm)) {
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (unknown). Ignoring.",
+ md_c_str);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ if (grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options, calld->compression_algorithm) ==
+ 0) {
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (previously disabled). "
+ "Ignoring.",
+ md_c_str);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ calld->has_compression_algorithm = 1;
+ return NULL;
+ }
+
+ return md;
+}
+
+static int skip_compression(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ if (calld->has_compression_algorithm) {
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ return 1;
+ }
+ return 0; /* we have an actual call-specific algorithm */
+ }
+ /* no per-call compression override */
+ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
+}
+
+/** Filter initial metadata */
+static void process_send_initial_metadata(
+ grpc_call_element *elem, grpc_metadata_batch *initial_metadata) {
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem);
+ if (!calld->has_compression_algorithm) {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm = channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ }
+ /* hint compression algorithm */
+ grpc_metadata_batch_add_tail(
+ initial_metadata, &calld->compression_algorithm_storage,
+ grpc_compression_encoding_mdelem(calld->compression_algorithm));
+
+ /* convey supported compression algorithms */
+ grpc_metadata_batch_add_tail(initial_metadata,
+ &calld->accept_encoding_storage,
+ GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
+ channeld->supported_compression_algorithms));
+}
+
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem);
+
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ gpr_slice_buffer_reset_and_unref(&calld->slices);
+ calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success);
+}
+
+static void finish_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ int did_compress;
+ gpr_slice_buffer tmp;
+ gpr_slice_buffer_init(&tmp);
+ did_compress =
+ grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
+ if (did_compress) {
+ gpr_slice_buffer_swap(&calld->slices, &tmp);
+ calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
+ gpr_slice_buffer_destroy(&tmp);
+
+ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
+ calld->send_flags);
+ calld->send_op.send_message = &calld->replacement_stream.base;
+ calld->post_send = calld->send_op.on_complete;
+ calld->send_op.on_complete = &calld->send_done;
+
+ grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+}
+
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ finish_send_message(exec_ctx, elem);
+ } else {
+ continue_send_message(exec_ctx, elem);
+ }
+}
+
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
+ &calld->incoming_slice, ~(size_t)0,
+ &calld->got_slice)) {
+ gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ finish_send_message(exec_ctx, elem);
+ break;
+ }
+ }
+}
+
+static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ call_data *calld = elem->call_data;
+
+ GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);
+
+ if (op->send_initial_metadata) {
+ process_send_initial_metadata(elem, op->send_initial_metadata);
+ }
+ if (op->send_message != NULL && !skip_compression(elem) &&
+ 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
+ calld->send_op = *op;
+ calld->send_length = op->send_message->length;
+ calld->send_flags = op->send_message->flags;
+ continue_send_message(exec_ctx, elem);
+ } else {
+ /* pass control down the stack */
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+
+ GPR_TIMER_END("compress_start_transport_stream_op", 0);
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+
+ /* initialize members */
+ gpr_slice_buffer_init(&calld->slices);
+ calld->has_compression_algorithm = 0;
+ grpc_closure_init(&calld->got_slice, got_slice, elem);
+ grpc_closure_init(&calld->send_done, send_done, elem);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ gpr_slice_buffer_destroy(&calld->slices);
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *channeld = elem->channel_data;
+ grpc_compression_algorithm algo_idx;
+
+ grpc_compression_options_init(&channeld->compression_options);
+ channeld->compression_options.enabled_algorithms_bitset =
+ (uint32_t)grpc_channel_args_compression_algorithm_get_states(
+ args->channel_args);
+
+ channeld->default_compression_algorithm =
+ grpc_channel_args_get_compression_algorithm(args->channel_args);
+ /* Make sure the default isn't disabled. */
+ GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options, channeld->default_compression_algorithm));
+ channeld->compression_options.default_compression_algorithm =
+ channeld->default_compression_algorithm;
+
+ channeld->supported_compression_algorithms = 0;
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
+ /* skip disabled algorithms */
+ if (grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options, algo_idx) == 0) {
+ continue;
+ }
+ channeld->supported_compression_algorithms |= 1u << algo_idx;
+ }
+
+ GPR_ASSERT(!args->is_last);
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {}
+
+const grpc_channel_filter grpc_compress_filter = {
+ compress_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "compress"};
diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h
new file mode 100644
index 0000000000..9010074335
--- /dev/null
+++ b/src/core/lib/channel/compress_filter.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request"
+
+/** Compression filter for outgoing data.
+ *
+ * See <grpc/compression.h> for the available compression settings.
+ *
+ * Compression settings may come from:
+ * - Channel configuration, as established at channel creation time.
+ * - The metadata accompanying the outgoing data to be compressed. This is
+ * taken as a request only. We may choose not to honor it. The metadata key
+ * is given by \a GRPC_COMPRESS_REQUEST_ALGORITHM_KEY.
+ *
+ * Compression can be disabled for concrete messages (for instance in order to
+ * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in
+ * the BEGIN_MESSAGE flags.
+ *
+ * The attempted compression mechanism is added to the resulting initial
+ * metadata under the'grpc-encoding' key.
+ *
+ * If compression is actually performed, BEGIN_MESSAGE's flag is modified to
+ * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the
+ * aforementioned 'grpc-encoding' metadata value, data will pass through
+ * uncompressed. */
+
+extern const grpc_channel_filter grpc_compress_filter;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H */
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
new file mode 100644
index 0000000000..5e3a8974ce
--- /dev/null
+++ b/src/core/lib/channel/connected_channel.c
@@ -0,0 +1,176 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/connected_channel.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/byte_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/transport.h"
+
+#define MAX_BUFFER_LENGTH 8192
+
+typedef struct connected_channel_channel_data {
+ grpc_transport *transport;
+} channel_data;
+
+typedef struct connected_channel_call_data { void *unused; } call_data;
+
+/* We perform a small hack to locate transport data alongside the connected
+ channel data in call allocations, to allow everything to be pulled in minimal
+ cache line requests */
+#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream *)((calld) + 1))
+#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
+ (((call_data *)(transport_stream)) - 1)
+
+/* Intercept a call operation and either push it directly up or translate it
+ into transport stream operations */
+static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ grpc_transport_perform_stream_op(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+}
+
+static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ channel_data *chand = elem->channel_data;
+ grpc_transport_perform_op(exec_ctx, chand->transport, op);
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ int r;
+
+ r = grpc_transport_init_stream(
+ exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ &args->call_stack->refcount, args->server_transport_data);
+ GPR_ASSERT(r == 0);
+}
+
+static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_transport_set_pollset(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollset);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_transport_destroy_stream(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld));
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *cd = (channel_data *)elem->channel_data;
+ GPR_ASSERT(args->is_last);
+ cd->transport = NULL;
+}
+
+/* Destructor for channel_data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *cd = (channel_data *)elem->channel_data;
+ grpc_transport_destroy(exec_ctx, cd->transport);
+}
+
+static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ return grpc_transport_get_peer(exec_ctx, chand->transport);
+}
+
+static const grpc_channel_filter connected_channel_filter = {
+ con_start_transport_stream_op,
+ con_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ set_pollset,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ con_get_peer,
+ "connected",
+};
+
+static void bind_transport(grpc_channel_stack *channel_stack,
+ grpc_channel_element *elem, void *t) {
+ channel_data *cd = (channel_data *)elem->channel_data;
+ GPR_ASSERT(elem->filter == &connected_channel_filter);
+ GPR_ASSERT(cd->transport == NULL);
+ cd->transport = t;
+
+ /* HACK(ctiller): increase call stack size for the channel to make space
+ for channel data. We need a cleaner (but performant) way to do this,
+ and I'm not sure what that is yet.
+ This is only "safe" because call stacks place no additional data after
+ the last call element, and the last call element MUST be the connected
+ channel. */
+ channel_stack->call_stack_size += grpc_transport_stream_size(t);
+}
+
+bool grpc_add_connected_filter(grpc_channel_stack_builder *builder,
+ void *arg_must_be_null) {
+ GPR_ASSERT(arg_must_be_null == NULL);
+ grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
+ GPR_ASSERT(t != NULL);
+ return grpc_channel_stack_builder_append_filter(
+ builder, &connected_channel_filter, bind_transport, t);
+}
+
+grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ return TRANSPORT_STREAM_FROM_CALL_DATA(calld);
+}
diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h
new file mode 100644
index 0000000000..4f20b751cc
--- /dev/null
+++ b/src/core/lib/channel/connected_channel.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H
+#define GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H
+
+#include "src/core/lib/channel/channel_stack_builder.h"
+
+bool grpc_add_connected_filter(grpc_channel_stack_builder *builder,
+ void *arg_must_be_null);
+
+#endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */
diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h
new file mode 100644
index 0000000000..bca102da9a
--- /dev/null
+++ b/src/core/lib/channel/context.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_CONTEXT_H
+#define GRPC_CORE_LIB_CHANNEL_CONTEXT_H
+
+/* Call object context pointers */
+typedef enum {
+ GRPC_CONTEXT_SECURITY = 0,
+ GRPC_CONTEXT_TRACING,
+ GRPC_CONTEXT_COUNT
+} grpc_context_index;
+
+typedef struct {
+ void *value;
+ void (*destroy)(void *);
+} grpc_call_context_element;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_CONTEXT_H */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
new file mode 100644
index 0000000000..b4e58b7f29
--- /dev/null
+++ b/src/core/lib/channel/http_client_filter.c
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/http_client_filter.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <string.h>
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+typedef struct call_data {
+ grpc_linked_mdelem method;
+ grpc_linked_mdelem scheme;
+ grpc_linked_mdelem authority;
+ grpc_linked_mdelem te_trailers;
+ grpc_linked_mdelem content_type;
+ grpc_linked_mdelem user_agent;
+
+ grpc_metadata_batch *recv_initial_metadata;
+
+ /** Closure to call when finished with the hc_on_recv hook */
+ grpc_closure *on_done_recv;
+ /** Receive closures are chained: we inject this closure as the on_done_recv
+ up-call on transport_op, and remember to call our on_done_recv member
+ after handling it. */
+ grpc_closure hc_on_recv;
+} call_data;
+
+typedef struct channel_data {
+ grpc_mdelem *static_scheme;
+ grpc_mdelem *user_agent;
+} channel_data;
+
+typedef struct {
+ grpc_call_element *elem;
+ grpc_exec_ctx *exec_ctx;
+} client_recv_filter_args;
+
+static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
+ client_recv_filter_args *a = user_data;
+ if (md == GRPC_MDELEM_STATUS_200) {
+ return NULL;
+ } else if (md->key == GRPC_MDSTR_STATUS) {
+ grpc_call_element_send_cancel(a->exec_ctx, a->elem);
+ return NULL;
+ } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
+ return NULL;
+ }
+ return md;
+}
+
+static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ client_recv_filter_args a;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter,
+ &a);
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+}
+
+static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
+ /* eat the things we'd like to set ourselves */
+ if (md->key == GRPC_MDSTR_METHOD) return NULL;
+ if (md->key == GRPC_MDSTR_SCHEME) return NULL;
+ if (md->key == GRPC_MDSTR_TE) return NULL;
+ if (md->key == GRPC_MDSTR_CONTENT_TYPE) return NULL;
+ if (md->key == GRPC_MDSTR_USER_AGENT) return NULL;
+ return md;
+}
+
+static void hc_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ if (op->send_initial_metadata != NULL) {
+ grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter,
+ elem);
+ /* Send : prefixed headers, which have to be before any application
+ layer headers. */
+ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
+ op->send_idempotent_request
+ ? GRPC_MDELEM_METHOD_PUT
+ : GRPC_MDELEM_METHOD_POST);
+ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
+ channeld->static_scheme);
+ grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
+ GRPC_MDELEM_TE_TRAILERS);
+ grpc_metadata_batch_add_tail(
+ op->send_initial_metadata, &calld->content_type,
+ GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
+ grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->user_agent,
+ GRPC_MDELEM_REF(channeld->user_agent));
+ }
+
+ if (op->recv_initial_metadata != NULL) {
+ /* substitute our callback for the higher callback */
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hc_on_recv;
+ }
+}
+
+static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("hc_start_transport_op", 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hc_mutate_op(elem, op);
+ GPR_TIMER_END("hc_start_transport_op", 0);
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ call_data *calld = elem->call_data;
+ calld->on_done_recv = NULL;
+ grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {}
+
+static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
+ unsigned i;
+ size_t j;
+ grpc_mdelem *valid_schemes[] = {GRPC_MDELEM_SCHEME_HTTP,
+ GRPC_MDELEM_SCHEME_HTTPS};
+ if (args != NULL) {
+ for (i = 0; i < args->num_args; ++i) {
+ if (args->args[i].type == GRPC_ARG_STRING &&
+ strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) {
+ for (j = 0; j < GPR_ARRAY_SIZE(valid_schemes); j++) {
+ if (0 == strcmp(grpc_mdstr_as_c_string(valid_schemes[j]->value),
+ args->args[i].value.string)) {
+ return valid_schemes[j];
+ }
+ }
+ }
+ }
+ }
+ return GRPC_MDELEM_SCHEME_HTTP;
+}
+
+static grpc_mdstr *user_agent_from_args(const grpc_channel_args *args) {
+ gpr_strvec v;
+ size_t i;
+ int is_first = 1;
+ char *tmp;
+ grpc_mdstr *result;
+
+ gpr_strvec_init(&v);
+
+ for (i = 0; args && i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) {
+ if (args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
+ GRPC_ARG_PRIMARY_USER_AGENT_STRING);
+ } else {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
+ is_first = 0;
+ gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
+ }
+ }
+ }
+
+ gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ",
+ grpc_version_string(), GPR_PLATFORM_STRING);
+ is_first = 0;
+ gpr_strvec_add(&v, tmp);
+
+ for (i = 0; args && i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) {
+ if (args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
+ GRPC_ARG_SECONDARY_USER_AGENT_STRING);
+ } else {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
+ is_first = 0;
+ gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
+ }
+ }
+ }
+
+ tmp = gpr_strvec_flatten(&v, NULL);
+ gpr_strvec_destroy(&v);
+ result = grpc_mdstr_from_string(tmp);
+ gpr_free(tmp);
+
+ return result;
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(!args->is_last);
+ chand->static_scheme = scheme_from_args(args->channel_args);
+ chand->user_agent = grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_USER_AGENT, user_agent_from_args(args->channel_args));
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ GRPC_MDELEM_UNREF(chand->user_agent);
+}
+
+const grpc_channel_filter grpc_http_client_filter = {
+ hc_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "http-client"};
diff --git a/src/core/lib/channel/http_client_filter.h b/src/core/lib/channel/http_client_filter.h
new file mode 100644
index 0000000000..418426e9cc
--- /dev/null
+++ b/src/core/lib/channel/http_client_filter.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+/* Processes metadata on the client side for HTTP2 transports */
+extern const grpc_channel_filter grpc_http_client_filter;
+
+#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
+
+#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H */
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
new file mode 100644
index 0000000000..db1a3d5010
--- /dev/null
+++ b/src/core/lib/channel/http_server_filter.c
@@ -0,0 +1,248 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/http_server_filter.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <string.h>
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+typedef struct call_data {
+ uint8_t seen_path;
+ uint8_t seen_method;
+ uint8_t sent_status;
+ uint8_t seen_scheme;
+ uint8_t seen_te_trailers;
+ uint8_t seen_authority;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem content_type;
+
+ grpc_metadata_batch *recv_initial_metadata;
+ bool *recv_idempotent_request;
+ /** Closure to call when finished with the hs_on_recv hook */
+ grpc_closure *on_done_recv;
+ /** Receive closures are chained: we inject this closure as the on_done_recv
+ up-call on transport_op, and remember to call our on_done_recv member
+ after handling it. */
+ grpc_closure hs_on_recv;
+} call_data;
+
+typedef struct channel_data { uint8_t unused; } channel_data;
+
+typedef struct {
+ grpc_call_element *elem;
+ grpc_exec_ctx *exec_ctx;
+} server_filter_args;
+
+static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+ server_filter_args *a = user_data;
+ grpc_call_element *elem = a->elem;
+ call_data *calld = elem->call_data;
+
+ /* Check if it is one of the headers we care about. */
+ if (md == GRPC_MDELEM_TE_TRAILERS || md == GRPC_MDELEM_METHOD_POST ||
+ md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_SCHEME_HTTP ||
+ md == GRPC_MDELEM_SCHEME_HTTPS ||
+ md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
+ /* swallow it */
+ if (md == GRPC_MDELEM_METHOD_POST) {
+ calld->seen_method = 1;
+ *calld->recv_idempotent_request = false;
+ } else if (md == GRPC_MDELEM_METHOD_PUT) {
+ calld->seen_method = 1;
+ *calld->recv_idempotent_request = true;
+ } else if (md->key == GRPC_MDSTR_SCHEME) {
+ calld->seen_scheme = 1;
+ } else if (md == GRPC_MDELEM_TE_TRAILERS) {
+ calld->seen_te_trailers = 1;
+ }
+ /* TODO(klempner): Track that we've seen all the headers we should
+ require */
+ return NULL;
+ } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
+ if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) ==
+ 0) {
+ /* Although the C implementation doesn't (currently) generate them,
+ any custom +-suffix is explicitly valid. */
+ /* TODO(klempner): We should consider preallocating common values such
+ as +proto or +json, or at least stashing them if we see them. */
+ /* TODO(klempner): Should we be surfacing this to application code? */
+ } else {
+ /* TODO(klempner): We're currently allowing this, but we shouldn't
+ see it without a proxy so log for now. */
+ gpr_log(GPR_INFO, "Unexpected content-type %s",
+ grpc_mdstr_as_c_string(md->value));
+ }
+ return NULL;
+ } else if (md->key == GRPC_MDSTR_TE || md->key == GRPC_MDSTR_METHOD ||
+ md->key == GRPC_MDSTR_SCHEME) {
+ gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
+ /* swallow it and error everything out. */
+ /* TODO(klempner): We ought to generate more descriptive error messages
+ on the wire here. */
+ grpc_call_element_send_cancel(a->exec_ctx, elem);
+ return NULL;
+ } else if (md->key == GRPC_MDSTR_PATH) {
+ if (calld->seen_path) {
+ gpr_log(GPR_ERROR, "Received :path twice");
+ return NULL;
+ }
+ calld->seen_path = 1;
+ return md;
+ } else if (md->key == GRPC_MDSTR_AUTHORITY) {
+ calld->seen_authority = 1;
+ return md;
+ } else if (md->key == GRPC_MDSTR_HOST) {
+ /* translate host to :authority since :authority may be
+ omitted */
+ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
+ calld->seen_authority = 1;
+ return authority;
+ } else {
+ return md;
+ }
+}
+
+static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ server_filter_args a;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, &a);
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_method && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path && calld->seen_authority) {
+ /* do nothing */
+ } else {
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_authority) {
+ gpr_log(GPR_ERROR, "Missing :authority header");
+ }
+ if (!calld->seen_method) {
+ gpr_log(GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel(exec_ctx, elem);
+ }
+ }
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+}
+
+static void hs_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+
+ if (op->send_initial_metadata != NULL && !calld->sent_status) {
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->status,
+ GRPC_MDELEM_STATUS_200);
+ grpc_metadata_batch_add_tail(
+ op->send_initial_metadata, &calld->content_type,
+ GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
+ }
+
+ if (op->recv_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ GPR_ASSERT(op->recv_idempotent_request != NULL);
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ calld->recv_idempotent_request = op->recv_idempotent_request;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hs_on_recv;
+ }
+}
+
+static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ GPR_TIMER_BEGIN("hs_start_transport_op", 0);
+ hs_mutate_op(elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
+ GPR_TIMER_END("hs_start_transport_op", 0);
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ /* initialize members */
+ memset(calld, 0, sizeof(*calld));
+ grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ GPR_ASSERT(!args->is_last);
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {}
+
+const grpc_channel_filter grpc_http_server_filter = {
+ hs_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "http-server"};
diff --git a/src/core/lib/channel/http_server_filter.h b/src/core/lib/channel/http_server_filter.h
new file mode 100644
index 0000000000..c8cf920ded
--- /dev/null
+++ b/src/core/lib/channel/http_server_filter.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+/* Processes metadata on the client side for HTTP2 transports */
+extern const grpc_channel_filter grpc_http_server_filter;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H */
diff --git a/src/core/lib/channel/subchannel_call_holder.c b/src/core/lib/channel/subchannel_call_holder.c
new file mode 100644
index 0000000000..6c6d42dd73
--- /dev/null
+++ b/src/core/lib/channel/subchannel_call_holder.c
@@ -0,0 +1,259 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/subchannel_call_holder.h"
+
+#include <grpc/support/alloc.h>
+
+#include "src/core/lib/profiling/timers.h"
+
+#define GET_CALL(holder) \
+ ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call)))
+
+#define CANCELLED_CALL ((grpc_subchannel_call *)1)
+
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
+ bool success);
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
+ bool success);
+
+static void add_waiting_locked(grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op);
+static void fail_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder);
+static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder);
+
+void grpc_subchannel_call_holder_init(
+ grpc_subchannel_call_holder *holder,
+ grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
+ void *pick_subchannel_arg, grpc_call_stack *owning_call) {
+ gpr_atm_rel_store(&holder->subchannel_call, 0);
+ holder->pick_subchannel = pick_subchannel;
+ holder->pick_subchannel_arg = pick_subchannel_arg;
+ gpr_mu_init(&holder->mu);
+ holder->connected_subchannel = NULL;
+ holder->waiting_ops = NULL;
+ holder->waiting_ops_count = 0;
+ holder->waiting_ops_capacity = 0;
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ holder->owning_call = owning_call;
+}
+
+void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder) {
+ grpc_subchannel_call *call = GET_CALL(holder);
+ if (call != NULL && call != CANCELLED_CALL) {
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder");
+ }
+ GPR_ASSERT(holder->creation_phase ==
+ GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
+ gpr_mu_destroy(&holder->mu);
+ GPR_ASSERT(holder->waiting_ops_count == 0);
+ gpr_free(holder->waiting_ops);
+}
+
+void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op) {
+ /* try to (atomically) get the call */
+ grpc_subchannel_call *call = GET_CALL(holder);
+ GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
+ if (call == CANCELLED_CALL) {
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ if (call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, call, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ /* we failed; lock and figure out what to do */
+ gpr_mu_lock(&holder->mu);
+retry:
+ /* need to recheck that another thread hasn't set the call */
+ call = GET_CALL(holder);
+ if (call == CANCELLED_CALL) {
+ gpr_mu_unlock(&holder->mu);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ if (call != NULL) {
+ gpr_mu_unlock(&holder->mu);
+ grpc_subchannel_call_process_op(exec_ctx, call, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ /* if this is a cancellation, then we can raise our cancelled flag */
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
+ goto retry;
+ } else {
+ switch (holder->creation_phase) {
+ case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
+ fail_locked(exec_ctx, holder);
+ break;
+ case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
+ holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
+ &holder->connected_subchannel, NULL);
+ break;
+ }
+ gpr_mu_unlock(&holder->mu);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ }
+ /* if we don't have a subchannel, try to get one */
+ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
+ holder->connected_subchannel == NULL &&
+ op->send_initial_metadata != NULL) {
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
+ grpc_closure_init(&holder->next_step, subchannel_ready, holder);
+ GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
+ if (holder->pick_subchannel(
+ exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
+ &holder->connected_subchannel, &holder->next_step)) {
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
+ }
+ }
+ /* if we've got a subchannel, then let's ask it to create a call */
+ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
+ holder->connected_subchannel != NULL) {
+ gpr_atm_rel_store(
+ &holder->subchannel_call,
+ (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollset));
+ retry_waiting_locked(exec_ctx, holder);
+ goto retry;
+ }
+ /* nothing to be done but wait */
+ add_waiting_locked(holder, op);
+ gpr_mu_unlock(&holder->mu);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+}
+
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ grpc_subchannel_call_holder *holder = arg;
+ gpr_mu_lock(&holder->mu);
+ GPR_ASSERT(holder->creation_phase ==
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ if (holder->connected_subchannel == NULL) {
+ fail_locked(exec_ctx, holder);
+ } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
+ /* already cancelled before subchannel became ready */
+ fail_locked(exec_ctx, holder);
+ } else {
+ gpr_atm_rel_store(
+ &holder->subchannel_call,
+ (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollset));
+ retry_waiting_locked(exec_ctx, holder);
+ }
+ gpr_mu_unlock(&holder->mu);
+ GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
+}
+
+typedef struct {
+ grpc_transport_stream_op *ops;
+ size_t nops;
+ grpc_subchannel_call *call;
+} retry_ops_args;
+
+static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder) {
+ retry_ops_args *a = gpr_malloc(sizeof(*a));
+ a->ops = holder->waiting_ops;
+ a->nops = holder->waiting_ops_count;
+ a->call = GET_CALL(holder);
+ if (a->call == CANCELLED_CALL) {
+ gpr_free(a);
+ fail_locked(exec_ctx, holder);
+ return;
+ }
+ holder->waiting_ops = NULL;
+ holder->waiting_ops_count = 0;
+ holder->waiting_ops_capacity = 0;
+ GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true,
+ NULL);
+}
+
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) {
+ retry_ops_args *a = args;
+ size_t i;
+ for (i = 0; i < a->nops; i++) {
+ grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
+ }
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
+ gpr_free(a->ops);
+ gpr_free(a);
+}
+
+static void add_waiting_locked(grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("add_waiting_locked", 0);
+ if (holder->waiting_ops_count == holder->waiting_ops_capacity) {
+ holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity);
+ holder->waiting_ops =
+ gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
+ sizeof(*holder->waiting_ops));
+ }
+ holder->waiting_ops[holder->waiting_ops_count++] = *op;
+ GPR_TIMER_END("add_waiting_locked", 0);
+}
+
+static void fail_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder) {
+ size_t i;
+ for (i = 0; i < holder->waiting_ops_count; i++) {
+ grpc_transport_stream_op_finish_with_failure(exec_ctx,
+ &holder->waiting_ops[i]);
+ }
+ holder->waiting_ops_count = 0;
+}
+
+char *grpc_subchannel_call_holder_get_peer(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) {
+ grpc_subchannel_call *subchannel_call = GET_CALL(holder);
+
+ if (subchannel_call) {
+ return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
+ } else {
+ return NULL;
+ }
+}
diff --git a/src/core/lib/channel/subchannel_call_holder.h b/src/core/lib/channel/subchannel_call_holder.h
new file mode 100644
index 0000000000..882f366792
--- /dev/null
+++ b/src/core/lib/channel/subchannel_call_holder.h
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
+#define GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
+
+#include "src/core/lib/client_config/subchannel.h"
+
+/** Pick a subchannel for grpc_subchannel_call_holder;
+ Return 1 if subchannel is available immediately (in which case on_ready
+ should not be called), or 0 otherwise (in which case on_ready should be
+ called when the subchannel is available) */
+typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
+ grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
+ grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
+
+typedef enum {
+ GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
+} grpc_subchannel_call_holder_creation_phase;
+
+/** Wrapper for holding a pointer to grpc_subchannel_call, and the
+ associated machinery to create such a pointer.
+ Handles queueing of stream ops until a call object is ready, waiting
+ for initial metadata before trying to create a call object,
+ and handling cancellation gracefully.
+
+ The channel filter uses this as their call_data. */
+typedef struct grpc_subchannel_call_holder {
+ /** either 0 for no call, 1 for cancelled, or a pointer to a
+ grpc_subchannel_call */
+ gpr_atm subchannel_call;
+ /** Helper function to choose the subchannel on which to create
+ the call object. Channel filter delegates to the load
+ balancing policy (once it's ready). */
+ grpc_subchannel_call_holder_pick_subchannel pick_subchannel;
+ void *pick_subchannel_arg;
+
+ gpr_mu mu;
+
+ grpc_subchannel_call_holder_creation_phase creation_phase;
+ grpc_connected_subchannel *connected_subchannel;
+ grpc_pollset *pollset;
+
+ grpc_transport_stream_op *waiting_ops;
+ size_t waiting_ops_count;
+ size_t waiting_ops_capacity;
+
+ grpc_closure next_step;
+
+ grpc_call_stack *owning_call;
+} grpc_subchannel_call_holder;
+
+void grpc_subchannel_call_holder_init(
+ grpc_subchannel_call_holder *holder,
+ grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
+ void *pick_subchannel_arg, grpc_call_stack *owning_call);
+void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder);
+
+void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op);
+char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder);
+
+#endif /* GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H */