aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/call_op_string.c155
-rw-r--r--src/core/channel/census_filter.c189
-rw-r--r--src/core/channel/census_filter.h44
-rw-r--r--src/core/channel/channel_args.c112
-rw-r--r--src/core/channel/channel_args.h54
-rw-r--r--src/core/channel/channel_stack.c223
-rw-r--r--src/core/channel/channel_stack.h288
-rw-r--r--src/core/channel/client_channel.c641
-rw-r--r--src/core/channel/client_channel.h62
-rw-r--r--src/core/channel/client_setup.c239
-rw-r--r--src/core/channel/client_setup.h68
-rw-r--r--src/core/channel/connected_channel.c501
-rw-r--r--src/core/channel/connected_channel.h49
-rw-r--r--src/core/channel/http_client_filter.c143
-rw-r--r--src/core/channel/http_client_filter.h42
-rw-r--r--src/core/channel/http_filter.c139
-rw-r--r--src/core/channel/http_filter.h43
-rw-r--r--src/core/channel/http_server_filter.c150
-rw-r--r--src/core/channel/http_server_filter.h42
-rw-r--r--src/core/channel/metadata_buffer.c198
-rw-r--r--src/core/channel/metadata_buffer.h70
-rw-r--r--src/core/channel/noop_filter.c138
-rw-r--r--src/core/channel/noop_filter.h44
23 files changed, 3634 insertions, 0 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
new file mode 100644
index 0000000000..4a98cbfbbb
--- /dev/null
+++ b/src/core/channel/call_op_string.c
@@ -0,0 +1,155 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string.h>
+#include <grpc/support/useful.h>
+
+#define MAX_APPEND 1024
+
+typedef struct {
+ size_t cap;
+ size_t len;
+ char *buffer;
+} buf;
+
+static void bprintf(buf *b, const char *fmt, ...) {
+ va_list arg;
+ if (b->len + MAX_APPEND > b->cap) {
+ b->cap = GPR_MAX(b->len + MAX_APPEND, b->cap * 3 / 2);
+ b->buffer = gpr_realloc(b->buffer, b->cap);
+ }
+ va_start(arg, fmt);
+ b->len += vsprintf(b->buffer + b->len, fmt, arg);
+ va_end(arg);
+}
+
+static void bputs(buf *b, const char *s) {
+ size_t slen = strlen(s);
+ if (b->len + slen + 1 > b->cap) {
+ b->cap = GPR_MAX(b->len + slen + 1, b->cap * 3 / 2);
+ b->buffer = gpr_realloc(b->buffer, b->cap);
+ }
+ strcat(b->buffer, s);
+ b->len += slen;
+}
+
+static void put_metadata(buf *b, grpc_mdelem *md) {
+ char *txt;
+
+ txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT);
+ bputs(b, " key=");
+ bputs(b, txt);
+ gpr_free(txt);
+
+ txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+ GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT);
+ bputs(b, " value=");
+ bputs(b, txt);
+ gpr_free(txt);
+}
+
+char *grpc_call_op_string(grpc_call_op *op) {
+ buf b = {0, 0, 0};
+
+ switch (op->dir) {
+ case GRPC_CALL_DOWN:
+ bprintf(&b, ">");
+ break;
+ case GRPC_CALL_UP:
+ bprintf(&b, "<");
+ break;
+ }
+ switch (op->type) {
+ case GRPC_SEND_METADATA:
+ bprintf(&b, "SEND_METADATA");
+ put_metadata(&b, op->data.metadata);
+ break;
+ case GRPC_SEND_DEADLINE:
+ bprintf(&b, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec,
+ op->data.deadline.tv_nsec);
+ break;
+ case GRPC_SEND_START:
+ bprintf(&b, "SEND_START");
+ break;
+ case GRPC_SEND_MESSAGE:
+ bprintf(&b, "SEND_MESSAGE");
+ break;
+ case GRPC_SEND_FINISH:
+ bprintf(&b, "SEND_FINISH");
+ break;
+ case GRPC_REQUEST_DATA:
+ bprintf(&b, "REQUEST_DATA");
+ break;
+ case GRPC_RECV_METADATA:
+ bprintf(&b, "RECV_METADATA");
+ put_metadata(&b, op->data.metadata);
+ break;
+ case GRPC_RECV_DEADLINE:
+ bprintf(&b, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec,
+ op->data.deadline.tv_nsec);
+ break;
+ case GRPC_RECV_END_OF_INITIAL_METADATA:
+ bprintf(&b, "RECV_END_OF_INITIAL_METADATA");
+ break;
+ case GRPC_RECV_MESSAGE:
+ bprintf(&b, "RECV_MESSAGE");
+ break;
+ case GRPC_RECV_HALF_CLOSE:
+ bprintf(&b, "RECV_HALF_CLOSE");
+ break;
+ case GRPC_RECV_FINISH:
+ bprintf(&b, "RECV_FINISH");
+ break;
+ case GRPC_CANCEL_OP:
+ bprintf(&b, "CANCEL_OP");
+ break;
+ }
+ bprintf(&b, " flags=0x%08x", op->flags);
+
+ return b.buffer;
+}
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_call_op *op) {
+ char *str = grpc_call_op_string(op);
+ gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
+ gpr_free(str);
+}
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
new file mode 100644
index 0000000000..4285b284ce
--- /dev/null
+++ b/src/core/channel/census_filter.c
@@ -0,0 +1,189 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/census_filter.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/channel/channel_stack.h"
+#include "src/core/channel/noop_filter.h"
+#include "src/core/statistics/census_interface.h"
+#include "src/core/statistics/census_rpc_stats.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/time.h>
+
+typedef struct call_data {
+ census_op_id op_id;
+ census_rpc_stats stats;
+ gpr_timespec start_ts;
+} call_data;
+
+typedef struct channel_data {
+ grpc_mdstr* path_str; /* pointer to meta data str with key == ":path" */
+} channel_data;
+
+static void init_rpc_stats(census_rpc_stats* stats) {
+ memset(stats, 0, sizeof(census_rpc_stats));
+ stats->cnt = 1;
+}
+
+static double gpr_timespec_to_micros(gpr_timespec t) {
+ return t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3;
+}
+
+static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
+ channel_data* chand) {
+ if (op->data.metadata->key == chand->path_str) {
+ census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
+ op->data.metadata->value->slice));
+ }
+}
+
+static void client_call_op(grpc_call_element* elem, grpc_call_op* op) {
+ call_data* calld = elem->call_data;
+ channel_data* chand = elem->channel_data;
+ GPR_ASSERT(calld != NULL);
+ GPR_ASSERT(chand != NULL);
+ GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0));
+ switch (op->type) {
+ case GRPC_SEND_METADATA:
+ extract_and_annotate_method_tag(op, calld, chand);
+ break;
+ case GRPC_RECV_FINISH:
+ /* Should we stop timing the rpc here? */
+ break;
+ default:
+ break;
+ }
+ /* Always pass control up or down the stack depending on op->dir */
+ grpc_call_next_op(elem, op);
+}
+
+static void server_call_op(grpc_call_element* elem, grpc_call_op* op) {
+ call_data* calld = elem->call_data;
+ channel_data* chand = elem->channel_data;
+ GPR_ASSERT(calld != NULL);
+ GPR_ASSERT(chand != NULL);
+ GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0));
+ switch (op->type) {
+ case GRPC_RECV_METADATA:
+ extract_and_annotate_method_tag(op, calld, chand);
+ break;
+ case GRPC_SEND_FINISH:
+ /* Should we stop timing the rpc here? */
+ break;
+ default:
+ break;
+ }
+ /* Always pass control up or down the stack depending on op->dir */
+ grpc_call_next_op(elem, op);
+}
+
+static void channel_op(grpc_channel_element* elem, grpc_channel_op* op) {
+ switch (op->type) {
+ case GRPC_TRANSPORT_CLOSED:
+ /* TODO(hongyu): Annotate trace information for all calls of the channel
+ */
+ break;
+ default:
+ break;
+ }
+ grpc_channel_next_op(elem, op);
+}
+
+static void client_init_call_elem(grpc_call_element* elem,
+ const void* server_transport_data) {
+ call_data* d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ init_rpc_stats(&d->stats);
+ d->start_ts = gpr_now();
+ d->op_id = census_tracing_start_op();
+}
+
+static void client_destroy_call_elem(grpc_call_element* elem) {
+ call_data* d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ census_record_rpc_client_stats(d->op_id, &d->stats);
+ census_tracing_end_op(d->op_id);
+}
+
+static void server_init_call_elem(grpc_call_element* elem,
+ const void* server_transport_data) {
+ call_data* d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ init_rpc_stats(&d->stats);
+ d->start_ts = gpr_now();
+ d->op_id = census_tracing_start_op();
+}
+
+static void server_destroy_call_elem(grpc_call_element* elem) {
+ call_data* d = elem->call_data;
+ GPR_ASSERT(d != NULL);
+ d->stats.elapsed_time_ms =
+ gpr_timespec_to_micros(gpr_time_sub(gpr_now(), d->start_ts));
+ census_record_rpc_server_stats(d->op_id, &d->stats);
+ census_tracing_end_op(d->op_id);
+}
+
+static void init_channel_elem(grpc_channel_element* elem,
+ const grpc_channel_args* args, grpc_mdctx* mdctx,
+ int is_first, int is_last) {
+ channel_data* chand = elem->channel_data;
+ GPR_ASSERT(chand != NULL);
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
+ chand->path_str = grpc_mdstr_from_string(mdctx, ":path");
+}
+
+static void destroy_channel_elem(grpc_channel_element* elem) {}
+
+const grpc_channel_filter grpc_client_census_filter = {
+ client_call_op, channel_op,
+
+ sizeof(call_data), client_init_call_elem, client_destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "census-client"};
+
+const grpc_channel_filter grpc_server_census_filter = {
+ server_call_op, channel_op,
+
+ sizeof(call_data), server_init_call_elem, server_destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "census-server"};
diff --git a/src/core/channel/census_filter.h b/src/core/channel/census_filter.h
new file mode 100644
index 0000000000..5b2c01ca9b
--- /dev/null
+++ b/src/core/channel/census_filter.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CENSUS_FILTER_H__
+#define __GRPC_INTERNAL_CHANNEL_CENSUS_FILTER_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* Census filters: provides tracing and stats collection functionalities. It
+ needs to reside right below the surface filter in the channel stack. */
+extern const grpc_channel_filter grpc_client_census_filter;
+extern const grpc_channel_filter grpc_server_census_filter;
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CENSUS_FILTER_H__ */
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
new file mode 100644
index 0000000000..36312e54de
--- /dev/null
+++ b/src/core/channel/channel_args.c
@@ -0,0 +1,112 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include "src/core/channel/channel_args.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string.h>
+
+#include <string.h>
+
+static grpc_arg copy_arg(const grpc_arg *src) {
+ grpc_arg dst;
+ dst.type = src->type;
+ dst.key = gpr_strdup(src->key);
+ switch (dst.type) {
+ case GRPC_ARG_STRING:
+ dst.value.string = gpr_strdup(src->value.string);
+ break;
+ case GRPC_ARG_INTEGER:
+ dst.value.integer = src->value.integer;
+ break;
+ case GRPC_ARG_POINTER:
+ dst.value.pointer = src->value.pointer;
+ dst.value.pointer.p = src->value.pointer.copy(src->value.pointer.p);
+ break;
+ }
+ return dst;
+}
+
+grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
+ const grpc_arg *to_add) {
+ grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
+ size_t i;
+ size_t src_num_args = (src == NULL) ? 0 : src->num_args;
+ if (!src && !to_add) {
+ dst->num_args = 0;
+ dst->args = NULL;
+ return dst;
+ }
+ dst->num_args = src_num_args + ((to_add == NULL) ? 0 : 1);
+ dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
+ for (i = 0; i < src_num_args; i++) {
+ dst->args[i] = copy_arg(&src->args[i]);
+ }
+ if (to_add != NULL) dst->args[src_num_args] = copy_arg(to_add);
+ return dst;
+}
+
+grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) {
+ return grpc_channel_args_copy_and_add(src, NULL);
+}
+
+void grpc_channel_args_destroy(grpc_channel_args *a) {
+ size_t i;
+ for (i = 0; i < a->num_args; i++) {
+ switch (a->args[i].type) {
+ case GRPC_ARG_STRING:
+ gpr_free(a->args[i].value.string);
+ break;
+ case GRPC_ARG_INTEGER:
+ break;
+ case GRPC_ARG_POINTER:
+ a->args[i].value.pointer.destroy(a->args[i].value.pointer.p);
+ break;
+ }
+ gpr_free(a->args[i].key);
+ }
+ gpr_free(a->args);
+ gpr_free(a);
+}
+
+int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
+ int i;
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; i++) {
+ if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
+ return a->args[i].value.integer != 0;
+ }
+ }
+ return 0;
+}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
new file mode 100644
index 0000000000..cf38d5d01f
--- /dev/null
+++ b/src/core/channel/channel_args.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CHANNEL_ARGS_H__
+#define __GRPC_INTERNAL_CHANNEL_CHANNEL_ARGS_H__
+
+#include <grpc/grpc.h>
+
+/* Copy some arguments */
+grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
+
+/* Copy some arguments and add the to_add parameter in the end.
+ If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
+grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
+ const grpc_arg *to_add);
+
+/* Destroy arguments created by grpc_channel_args_copy */
+void grpc_channel_args_destroy(grpc_channel_args *a);
+
+/* Reads census_enabled settings from channel args. Returns 1 if census_enabled
+ is specified in channel args, otherwise returns 0. */
+int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CHANNEL_ARGS_H__ */
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
new file mode 100644
index 0000000000..a403db35c2
--- /dev/null
+++ b/src/core/channel/channel_stack.c
@@ -0,0 +1,223 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+#include <grpc/support/log.h>
+
+#include <stdlib.h>
+
+/* Memory layouts.
+
+ Channel stack is laid out as: {
+ grpc_channel_stack stk;
+ padding to GPR_MAX_ALIGNMENT
+ grpc_channel_element[stk.count];
+ per-filter memory, aligned to GPR_MAX_ALIGNMENT
+ }
+
+ Call stack is laid out as: {
+ grpc_call_stack stk;
+ padding to GPR_MAX_ALIGNMENT
+ grpc_call_element[stk.count];
+ per-filter memory, aligned to GPR_MAX_ALIGNMENT
+ } */
+
+/* Given a size, round up to the next multiple of sizeof(void*) */
+#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
+ (((x)+GPR_MAX_ALIGNMENT - 1) & ~(GPR_MAX_ALIGNMENT - 1))
+
+size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
+ size_t filter_count) {
+ /* always need the header, and size for the channel elements */
+ size_t size =
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
+ size_t i;
+
+ GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 &&
+ "GPR_MAX_ALIGNMENT must be a power of two");
+
+ /* add the size for each filter */
+ for (i = 0; i < filter_count; i++) {
+ size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
+ }
+
+ return size;
+}
+
+#define CHANNEL_ELEMS_FROM_STACK(stk) \
+ ((grpc_channel_element *)((char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE( \
+ sizeof(grpc_channel_stack))))
+
+#define CALL_ELEMS_FROM_STACK(stk) \
+ ((grpc_call_element *)((char *)(stk) + \
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack))))
+
+grpc_channel_element *grpc_channel_stack_element(
+ grpc_channel_stack *channel_stack, size_t index) {
+ return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index;
+}
+
+grpc_channel_element *grpc_channel_stack_last_element(
+ grpc_channel_stack *channel_stack) {
+ return grpc_channel_stack_element(channel_stack, channel_stack->count - 1);
+}
+
+grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
+ size_t index) {
+ return CALL_ELEMS_FROM_STACK(call_stack) + index;
+}
+
+void grpc_channel_stack_init(const grpc_channel_filter **filters,
+ size_t filter_count, const grpc_channel_args *args,
+ grpc_mdctx *metadata_context,
+ grpc_channel_stack *stack) {
+ size_t call_size =
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
+ grpc_channel_element *elems;
+ char *user_data;
+ size_t i;
+
+ stack->count = filter_count;
+ elems = CHANNEL_ELEMS_FROM_STACK(stack);
+ user_data =
+ ((char *)elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
+
+ /* init per-filter data */
+ for (i = 0; i < filter_count; i++) {
+ elems[i].filter = filters[i];
+ elems[i].channel_data = user_data;
+ elems[i].filter->init_channel_elem(&elems[i], args, metadata_context,
+ i == 0, i == (filter_count - 1));
+ user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
+ call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
+ }
+
+ GPR_ASSERT(user_data - (char *)stack ==
+ grpc_channel_stack_size(filters, filter_count));
+
+ stack->call_stack_size = call_size;
+}
+
+void grpc_channel_stack_destroy(grpc_channel_stack *stack) {
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack);
+ size_t count = stack->count;
+ size_t i;
+
+ /* destroy per-filter data */
+ for (i = 0; i < count; i++) {
+ channel_elems[i].filter->destroy_channel_elem(&channel_elems[i]);
+ }
+}
+
+void grpc_call_stack_init(grpc_channel_stack *channel_stack,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack) {
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
+ size_t count = channel_stack->count;
+ grpc_call_element *call_elems;
+ char *user_data;
+ size_t i;
+
+ call_stack->count = count;
+ call_elems = CALL_ELEMS_FROM_STACK(call_stack);
+ user_data = ((char *)call_elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
+
+ /* init per-filter data */
+ for (i = 0; i < count; i++) {
+ call_elems[i].filter = channel_elems[i].filter;
+ call_elems[i].channel_data = channel_elems[i].channel_data;
+ call_elems[i].call_data = user_data;
+ call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data);
+ user_data +=
+ ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
+ }
+}
+
+void grpc_call_stack_destroy(grpc_call_stack *stack) {
+ grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
+ size_t count = stack->count;
+ size_t i;
+
+ /* destroy per-filter data */
+ for (i = 0; i < count; i++) {
+ elems[i].filter->destroy_call_elem(&elems[i]);
+ }
+}
+
+void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
+ grpc_call_element *next_elem = elem + op->dir;
+ next_elem->filter->call_op(next_elem, op);
+}
+
+void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ grpc_channel_element *next_elem = elem + op->dir;
+ next_elem->filter->channel_op(next_elem, op);
+}
+
+grpc_channel_stack *grpc_channel_stack_from_top_element(
+ grpc_channel_element *elem) {
+ return (grpc_channel_stack *)((char *)(elem) -
+ ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_channel_stack)));
+}
+
+grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
+ return (grpc_call_stack *)((char *)(elem) - ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_call_stack)));
+}
+
+static void do_nothing(void *user_data, grpc_op_error error) {}
+
+void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
+ grpc_mdelem *mdelem) {
+ grpc_call_op metadata_op;
+ metadata_op.type = GRPC_SEND_METADATA;
+ metadata_op.dir = GRPC_CALL_DOWN;
+ metadata_op.done_cb = do_nothing;
+ metadata_op.user_data = NULL;
+ metadata_op.data.metadata = grpc_mdelem_ref(mdelem);
+ grpc_call_next_op(cur_elem, &metadata_op);
+}
+
+void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
+ grpc_call_op cancel_op;
+ cancel_op.type = GRPC_CANCEL_OP;
+ cancel_op.dir = GRPC_CALL_DOWN;
+ cancel_op.done_cb = do_nothing;
+ cancel_op.user_data = NULL;
+ grpc_call_next_op(cur_elem, &cancel_op);
+}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
new file mode 100644
index 0000000000..0ae1005e67
--- /dev/null
+++ b/src/core/channel/channel_stack.h
@@ -0,0 +1,288 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CHANNEL_STACK_H__
+#define __GRPC_INTERNAL_CHANNEL_CHANNEL_STACK_H__
+
+/* A channel filter defines how operations on a channel are implemented.
+ Channel filters are chained together to create full channels, and if those
+ chains are linear, then channel stacks provide a mechanism to minimize
+ allocations for that chain.
+ Call stacks are created by channel stacks and represent the per-call data
+ for that stack. */
+
+#include <stddef.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/log.h>
+#include "src/core/transport/transport.h"
+
+/* #define GRPC_CHANNEL_STACK_TRACE 1 */
+
+typedef struct grpc_channel_element grpc_channel_element;
+typedef struct grpc_call_element grpc_call_element;
+
+/* Call operations - things that can be sent and received.
+
+ Threading:
+ SEND, RECV, and CANCEL ops can be active on a call at the same time, but
+ only one SEND, one RECV, and one CANCEL can be active at a time.
+
+ If state is shared between send/receive/cancel operations, it is up to
+ filters to provide their own protection around that. */
+typedef enum {
+ /* send metadata to the channels peer */
+ GRPC_SEND_METADATA,
+ /* send a deadline */
+ GRPC_SEND_DEADLINE,
+ /* start a connection (corresponds to start_invoke/accept) */
+ GRPC_SEND_START,
+ /* send a message to the channels peer */
+ GRPC_SEND_MESSAGE,
+ /* send half-close to the channels peer */
+ GRPC_SEND_FINISH,
+ /* request that more data be allowed through flow control */
+ GRPC_REQUEST_DATA,
+ /* metadata was received from the channels peer */
+ GRPC_RECV_METADATA,
+ /* receive a deadline */
+ GRPC_RECV_DEADLINE,
+ /* the end of the first batch of metadata was received */
+ GRPC_RECV_END_OF_INITIAL_METADATA,
+ /* a message was received from the channels peer */
+ GRPC_RECV_MESSAGE,
+ /* half-close was received from the channels peer */
+ GRPC_RECV_HALF_CLOSE,
+ /* full close was received from the channels peer */
+ GRPC_RECV_FINISH,
+ /* the call has been abnormally terminated */
+ GRPC_CANCEL_OP
+} grpc_call_op_type;
+
+/* The direction of the call.
+ The values of the enums (1, -1) matter here - they are used to increment
+ or decrement a pointer to find the next element to call */
+typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
+
+/* A single filterable operation to be performed on a call */
+typedef struct {
+ /* The type of operation we're performing */
+ grpc_call_op_type type;
+ /* The directionality of this call - does the operation begin at the bottom
+ of the stack and flow up, or does the operation start at the top of the
+ stack and flow down through the filters. */
+ grpc_call_dir dir;
+
+ /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
+ gpr_uint32 flags;
+
+ /* Argument data, matching up with grpc_call_op_type names */
+ union {
+ grpc_byte_buffer *message;
+ grpc_mdelem *metadata;
+ gpr_timespec deadline;
+ } data;
+
+ /* Must be called when processing of this call-op is complete.
+ Signature chosen to match transport flow control callbacks */
+ void (*done_cb)(void *user_data, grpc_op_error error);
+ /* User data to be passed into done_cb */
+ void *user_data;
+} grpc_call_op;
+
+/* returns a string representation of op, that can be destroyed with gpr_free */
+char *grpc_call_op_string(grpc_call_op *op);
+
+typedef enum {
+ GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ACCEPT_CALL,
+ GRPC_TRANSPORT_CLOSED
+} grpc_channel_op_type;
+
+/* A single filterable operation to be performed on a channel */
+typedef struct {
+ /* The type of operation we're performing */
+ grpc_channel_op_type type;
+ /* The directionality of this call - is it bubbling up the stack, or down? */
+ grpc_call_dir dir;
+
+ /* Argument data, matching up with grpc_channel_op_type names */
+ union {
+ struct {
+ grpc_transport *transport;
+ const void *transport_server_data;
+ } accept_call;
+ } data;
+} grpc_channel_op;
+
+/* Channel filters specify:
+ 1. the amount of memory needed in the channel & call (via the sizeof_XXX
+ members)
+ 2. functions to initialize and destroy channel & call data
+ (init_XXX, destroy_XXX)
+ 3. functions to implement call operations and channel operations (call_op,
+ channel_op)
+ 4. a name, which is useful when debugging
+
+ Members are laid out in approximate frequency of use order. */
+typedef struct {
+ /* Called to eg. send/receive data on a call.
+ See grpc_call_next_op on how to call the next element in the stack */
+ void (*call_op)(grpc_call_element *elem, grpc_call_op *op);
+ /* Called to handle channel level operations - e.g. new calls, or transport
+ closure.
+ See grpc_channel_next_op on how to call the next element in the stack */
+ void (*channel_op)(grpc_channel_element *elem, grpc_channel_op *op);
+
+ /* sizeof(per call data) */
+ size_t sizeof_call_data;
+ /* Initialize per call data.
+ elem is initialized at the start of the call, and elem->call_data is what
+ needs initializing.
+ The filter does not need to do any chaining.
+ server_transport_data is an opaque pointer. If it is NULL, this call is
+ on a client; if it is non-NULL, then it points to memory owned by the
+ transport and is on the server. Most filters want to ignore this
+ argument.*/
+ void (*init_call_elem)(grpc_call_element *elem,
+ const void *server_transport_data);
+ /* Destroy per call data.
+ The filter does not need to do any chaining */
+ void (*destroy_call_elem)(grpc_call_element *elem);
+
+ /* sizeof(per channel data) */
+ size_t sizeof_channel_data;
+ /* Initialize per-channel data.
+ elem is initialized at the start of the call, and elem->channel_data is
+ what needs initializing.
+ is_first, is_last designate this elements position in the stack, and are
+ useful for asserting correct configuration by upper layer code.
+ The filter does not need to do any chaining */
+ void (*init_channel_elem)(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last);
+ /* Destroy per channel data.
+ The filter does not need to do any chaining */
+ void (*destroy_channel_elem)(grpc_channel_element *elem);
+
+ /* The name of this filter */
+ const char *name;
+} grpc_channel_filter;
+
+/* A channel_element tracks its filter and the filter requested memory within
+ a channel allocation */
+struct grpc_channel_element {
+ const grpc_channel_filter *filter;
+ void *channel_data;
+};
+
+/* A call_element tracks its filter, the filter requested memory within
+ a channel allocation, and the filter requested memory within a call
+ allocation */
+struct grpc_call_element {
+ const grpc_channel_filter *filter;
+ void *channel_data;
+ void *call_data;
+};
+
+/* A channel stack tracks a set of related filters for one channel, and
+ guarantees they live within a single malloc() allocation */
+typedef struct {
+ size_t count;
+ /* Memory required for a call stack (computed at channel stack
+ initialization) */
+ size_t call_stack_size;
+} grpc_channel_stack;
+
+/* A call stack tracks a set of related filters for one call, and guarantees
+ they live within a single malloc() allocation */
+typedef struct { size_t count; } grpc_call_stack;
+
+/* Get a channel element given a channel stack and its index */
+grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
+ size_t i);
+/* Get the last channel element in a channel stack */
+grpc_channel_element *grpc_channel_stack_last_element(
+ grpc_channel_stack *stack);
+/* Get a call stack element given a call stack and an index */
+grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i);
+
+/* Determine memory required for a channel stack containing a set of filters */
+size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
+ size_t filter_count);
+/* Initialize a channel stack given some filters */
+void grpc_channel_stack_init(const grpc_channel_filter **filters,
+ size_t filter_count, const grpc_channel_args *args,
+ grpc_mdctx *metadata_context,
+ grpc_channel_stack *stack);
+/* Destroy a channel stack */
+void grpc_channel_stack_destroy(grpc_channel_stack *stack);
+
+/* Initialize a call stack given a channel stack. transport_server_data is
+ expected to be NULL on a client, or an opaque transport owned pointer on the
+ server. */
+void grpc_call_stack_init(grpc_channel_stack *channel_stack,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack);
+/* Destroy a call stack */
+void grpc_call_stack_destroy(grpc_call_stack *stack);
+
+/* Call the next operation (depending on call directionality) in a call stack */
+void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
+/* Call the next operation (depending on call directionality) in a channel
+ stack */
+void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
+
+/* Given the top element of a channel stack, get the channel stack itself */
+grpc_channel_stack *grpc_channel_stack_from_top_element(
+ grpc_channel_element *elem);
+/* Given the top element of a call stack, get the call stack itself */
+grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_call_op *op);
+
+void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
+ grpc_mdelem *elem);
+void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
+
+#ifdef GRPC_CHANNEL_STACK_TRACE
+#define GRPC_CALL_LOG_OP(sev, elem, op) grpc_call_log_op(sev, elem, op)
+#else
+#define GRPC_CALL_LOG_OP(sev, elem, op) \
+ do { \
+ } while (0)
+#endif
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CHANNEL_STACK_H__ */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
new file mode 100644
index 0000000000..90563683d5
--- /dev/null
+++ b/src/core/channel/client_channel.c
@@ -0,0 +1,641 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/client_channel.h"
+
+#include <stdio.h>
+
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/metadata_buffer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+/* Link back filter: passes up calls to the client channel, pushes down calls
+ down */
+
+typedef struct { grpc_channel_element *back; } lb_channel_data;
+
+typedef struct { grpc_call_element *back; } lb_call_data;
+
+static void lb_call_op(grpc_call_element *elem, grpc_call_op *op) {
+ lb_call_data *calld = elem->call_data;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ calld->back->filter->call_op(calld->back, op);
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+/* Currently we assume all channel operations should just be pushed up. */
+static void lb_channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ lb_channel_data *chand = elem->channel_data;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ chand->back->filter->channel_op(chand->back, op);
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void lb_init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {}
+
+/* Destructor for call_data */
+static void lb_destroy_call_elem(grpc_call_element *elem) {}
+
+/* Constructor for channel_data */
+static void lb_init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ GPR_ASSERT(is_first);
+ GPR_ASSERT(!is_last);
+}
+
+/* Destructor for channel_data */
+static void lb_destroy_channel_elem(grpc_channel_element *elem) {}
+
+static const grpc_channel_filter link_back_filter = {
+ lb_call_op, lb_channel_op,
+
+ sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
+
+ sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
+
+ "clientchannel.linkback",
+};
+
+/* Client channel implementation */
+
+typedef struct {
+ size_t inflight_requests;
+ grpc_channel_stack *channel_stack;
+} child_entry;
+
+typedef struct call_data call_data;
+
+typedef struct {
+ /* protects children, child_count, child_capacity, active_child,
+ transport_setup_initiated
+ does not protect channel stacks held by children
+ transport_setup is assumed to be set once during construction */
+ gpr_mu mu;
+
+ /* the sending child (points somewhere in children, or NULL) */
+ child_entry *active_child;
+ /* vector of child channels */
+ child_entry *children;
+ size_t child_count;
+ size_t child_capacity;
+
+ /* calls waiting for a channel to be ready */
+ call_data **waiting_children;
+ size_t waiting_child_count;
+ size_t waiting_child_capacity;
+
+ /* transport setup for this channel */
+ grpc_transport_setup *transport_setup;
+ int transport_setup_initiated;
+
+ grpc_channel_args *args;
+
+ /* metadata cache */
+ grpc_mdelem *cancel_status;
+} channel_data;
+
+typedef enum {
+ CALL_CREATED,
+ CALL_WAITING,
+ CALL_ACTIVE,
+ CALL_CANCELLED
+} call_state;
+
+struct call_data {
+ /* owning element */
+ grpc_call_element *elem;
+
+ call_state state;
+ grpc_metadata_buffer pending_metadata;
+ gpr_timespec deadline;
+ union {
+ struct {
+ /* our child call stack */
+ grpc_call_stack *child_stack;
+ /* ... and the channel stack associated with it */
+ grpc_channel_stack *using_stack;
+ } active;
+ struct {
+ void (*on_complete)(void *user_data, grpc_op_error error);
+ void *on_complete_user_data;
+ gpr_uint32 start_flags;
+ } waiting;
+ } s;
+};
+
+static int prepare_activate(call_data *calld, child_entry *on_child) {
+ grpc_call_element *child_elem;
+ grpc_channel_stack *use_stack = on_child->channel_stack;
+
+ if (calld->state == CALL_CANCELLED) return 0;
+
+ on_child->inflight_requests++;
+
+ /* no more access to calld->s.waiting allowed */
+ GPR_ASSERT(calld->state == CALL_WAITING);
+ calld->state = CALL_ACTIVE;
+
+ /* create a child stack, and record that we're using a particular channel
+ stack */
+ calld->s.active.child_stack = gpr_malloc(use_stack->call_stack_size);
+ calld->s.active.using_stack = use_stack;
+ grpc_call_stack_init(use_stack, NULL, calld->s.active.child_stack);
+ /* initialize the top level link back element */
+ child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ GPR_ASSERT(child_elem->filter == &link_back_filter);
+ ((lb_call_data *)child_elem->call_data)->back = calld->elem;
+
+ return 1;
+}
+
+static void do_nothing(void *ignored, grpc_op_error error) {}
+
+static void complete_activate(call_data *calld, child_entry *on_child,
+ grpc_call_op *op) {
+ grpc_call_element *child_elem =
+ grpc_call_stack_element(calld->s.active.child_stack, 0);
+
+ GPR_ASSERT(calld->state == CALL_ACTIVE);
+
+ /* sending buffered metadata down the stack before the start call */
+ grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem);
+
+ if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) {
+ grpc_call_op dop;
+ dop.type = GRPC_SEND_DEADLINE;
+ dop.dir = GRPC_CALL_DOWN;
+ dop.flags = 0;
+ dop.data.deadline = calld->deadline;
+ dop.done_cb = do_nothing;
+ dop.user_data = NULL;
+ child_elem->filter->call_op(child_elem, &dop);
+ }
+
+ /* continue the start call down the stack, this nees to happen after metadata
+ are flushed*/
+ child_elem->filter->call_op(child_elem, op);
+}
+
+static void start_rpc(call_data *calld, channel_data *chand, grpc_call_op *op) {
+ gpr_mu_lock(&chand->mu);
+ if (calld->state == CALL_CANCELLED) {
+ gpr_mu_unlock(&chand->mu);
+ op->done_cb(op->user_data, GRPC_OP_ERROR);
+ return;
+ }
+ GPR_ASSERT(calld->state == CALL_CREATED);
+ calld->state = CALL_WAITING;
+ if (chand->active_child) {
+ /* channel is connected - use the connected stack */
+ if (prepare_activate(calld, chand->active_child)) {
+ gpr_mu_unlock(&chand->mu);
+ /* activate the request (pass it down) outside the lock */
+ complete_activate(calld, chand->active_child, op);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ /* check to see if we should initiate a connection (if we're not already),
+ but don't do so until outside the lock to avoid re-entrancy problems if
+ the callback is immediate */
+ int initiate_transport_setup = 0;
+ if (!chand->transport_setup_initiated) {
+ chand->transport_setup_initiated = 1;
+ initiate_transport_setup = 1;
+ }
+ /* add this call to the waiting set to be resumed once we have a child
+ channel stack, growing the waiting set if needed */
+ if (chand->waiting_child_count == chand->waiting_child_capacity) {
+ chand->waiting_child_capacity =
+ GPR_MAX(chand->waiting_child_capacity * 2, 8);
+ chand->waiting_children =
+ gpr_realloc(chand->waiting_children,
+ chand->waiting_child_capacity * sizeof(call_data *));
+ }
+ calld->s.waiting.on_complete = op->done_cb;
+ calld->s.waiting.on_complete_user_data = op->user_data;
+ calld->s.waiting.start_flags = op->flags;
+ chand->waiting_children[chand->waiting_child_count++] = calld;
+ gpr_mu_unlock(&chand->mu);
+
+ /* finally initiate transport setup if needed */
+ if (initiate_transport_setup) {
+ grpc_transport_setup_initiate(chand->transport_setup);
+ }
+ }
+}
+
+static void remove_waiting_child(channel_data *chand, call_data *calld) {
+ size_t new_count;
+ size_t i;
+ for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
+ if (chand->waiting_children[i] == calld) continue;
+ chand->waiting_children[new_count++] = chand->waiting_children[i];
+ }
+ GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
+ new_count == chand->waiting_child_count);
+ chand->waiting_child_count = new_count;
+}
+
+static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_call_element *child_elem;
+ grpc_call_op finish_op;
+
+ gpr_mu_lock(&chand->mu);
+ switch (calld->state) {
+ case CALL_ACTIVE:
+ child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ gpr_mu_unlock(&chand->mu);
+ child_elem->filter->call_op(child_elem, op);
+ return; /* early out */
+ case CALL_WAITING:
+ remove_waiting_child(chand, calld);
+ calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
+ GRPC_OP_ERROR);
+ /* fallthrough intended */
+ case CALL_CREATED:
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ /* send up a synthesized status */
+ finish_op.type = GRPC_RECV_METADATA;
+ finish_op.dir = GRPC_CALL_UP;
+ finish_op.flags = 0;
+ finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
+ finish_op.done_cb = do_nothing;
+ finish_op.user_data = NULL;
+ grpc_call_next_op(elem, &finish_op);
+ /* send up a finish */
+ finish_op.type = GRPC_RECV_FINISH;
+ finish_op.dir = GRPC_CALL_UP;
+ finish_op.flags = 0;
+ finish_op.done_cb = do_nothing;
+ finish_op.user_data = NULL;
+ grpc_call_next_op(elem, &finish_op);
+ return; /* early out */
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&chand->mu);
+ return; /* early out */
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_call_element *child_elem;
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ switch (op->type) {
+ case GRPC_SEND_METADATA:
+ grpc_metadata_buffer_queue(&calld->pending_metadata, op);
+ break;
+ case GRPC_SEND_DEADLINE:
+ calld->deadline = op->data.deadline;
+ op->done_cb(op->user_data, GRPC_OP_OK);
+ break;
+ case GRPC_SEND_START:
+ /* filter out the start event to find which child to send on */
+ start_rpc(calld, chand, op);
+ break;
+ case GRPC_CANCEL_OP:
+ cancel_rpc(elem, op);
+ break;
+ default:
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ grpc_call_next_op(elem, op);
+ break;
+ case GRPC_CALL_DOWN:
+ child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ GPR_ASSERT(calld->state == CALL_ACTIVE);
+ child_elem->filter->call_op(child_elem, op);
+ break;
+ }
+ break;
+ }
+}
+
+static void broadcast_channel_op_down(grpc_channel_element *elem,
+ grpc_channel_op *op) {
+ channel_data *chand = elem->channel_data;
+ grpc_channel_element *child_elem;
+ grpc_channel_stack **children;
+ size_t child_count;
+ size_t i;
+
+ /* copy the current set of children, and mark them all as having an inflight
+ request */
+ gpr_mu_lock(&chand->mu);
+ child_count = chand->child_count;
+ children = gpr_malloc(sizeof(grpc_channel_stack *) * child_count);
+ for (i = 0; i < child_count; i++) {
+ children[i] = chand->children[i].channel_stack;
+ chand->children[i].inflight_requests++;
+ }
+ gpr_mu_unlock(&chand->mu);
+
+ /* send the message down */
+ for (i = 0; i < child_count; i++) {
+ child_elem = grpc_channel_stack_element(children[i], 0);
+ child_elem->filter->channel_op(child_elem, op);
+ }
+
+ /* unmark the inflight requests */
+ gpr_mu_lock(&chand->mu);
+ for (i = 0; i < child_count; i++) {
+ chand->children[i].inflight_requests--;
+ }
+ gpr_mu_unlock(&chand->mu);
+
+ gpr_free(children);
+}
+
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ switch (op->type) {
+ default:
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ grpc_channel_next_op(elem, op);
+ break;
+ case GRPC_CALL_DOWN:
+ broadcast_channel_op_down(elem, op);
+ break;
+ }
+ break;
+ }
+}
+
+static void error_bad_on_complete(void *arg, grpc_op_error error) {
+ gpr_log(GPR_ERROR,
+ "Waiting finished but not started? Bad on_complete callback");
+ abort();
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ call_data *calld = elem->call_data;
+
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GPR_ASSERT(server_transport_data == NULL);
+ calld->elem = elem;
+ calld->state = CALL_CREATED;
+ calld->deadline = gpr_inf_future;
+ calld->s.waiting.on_complete = error_bad_on_complete;
+ calld->s.waiting.on_complete_user_data = NULL;
+ grpc_metadata_buffer_init(&calld->pending_metadata);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ size_t i;
+
+ /* if the metadata buffer is not flushed, destroy it here. */
+ grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
+ /* if the call got activated, we need to destroy the child stack also, and
+ remove it from the in-flight requests tracked by the child_entry we
+ picked */
+ if (calld->state == CALL_ACTIVE) {
+ grpc_call_stack_destroy(calld->s.active.child_stack);
+ gpr_free(calld->s.active.child_stack);
+
+ gpr_mu_lock(&chand->mu);
+ for (i = 0; i < chand->child_count; i++) {
+ if (chand->children[i].channel_stack == calld->s.active.using_stack) {
+ chand->children[i].inflight_requests--;
+ /* TODO(ctiller): garbage collect channels that are not active
+ and have no inflight requests */
+ }
+ }
+ gpr_mu_unlock(&chand->mu);
+ }
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ channel_data *chand = elem->channel_data;
+ char temp[16];
+
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(is_last);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ gpr_mu_init(&chand->mu);
+ chand->active_child = NULL;
+ chand->children = NULL;
+ chand->child_count = 0;
+ chand->child_capacity = 0;
+ chand->waiting_children = NULL;
+ chand->waiting_child_count = 0;
+ chand->waiting_child_capacity = 0;
+ chand->transport_setup = NULL;
+ chand->transport_setup_initiated = 0;
+ chand->args = grpc_channel_args_copy(args);
+
+ sprintf(temp, "%d", GRPC_STATUS_CANCELLED);
+ chand->cancel_status =
+ grpc_mdelem_from_strings(metadata_context, "grpc-status", temp);
+}
+
+/* Destructor for channel_data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ size_t i;
+
+ grpc_transport_setup_cancel(chand->transport_setup);
+
+ for (i = 0; i < chand->child_count; i++) {
+ GPR_ASSERT(chand->children[i].inflight_requests == 0);
+ grpc_channel_stack_destroy(chand->children[i].channel_stack);
+ gpr_free(chand->children[i].channel_stack);
+ }
+
+ grpc_channel_args_destroy(chand->args);
+ grpc_mdelem_unref(chand->cancel_status);
+
+ gpr_mu_destroy(&chand->mu);
+ GPR_ASSERT(chand->waiting_child_count == 0);
+ gpr_free(chand->waiting_children);
+ gpr_free(chand->children);
+}
+
+const grpc_channel_filter grpc_client_channel_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "clientchannel",
+};
+
+grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
+ grpc_channel_stack *channel_stack, grpc_transport *transport,
+ grpc_channel_filter const **channel_filters, size_t num_channel_filters,
+ grpc_mdctx *mdctx) {
+ /* we just got a new transport: lets create a child channel stack for it */
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *chand = elem->channel_data;
+ grpc_channel_element *lb_elem;
+ grpc_channel_stack *child_stack;
+ size_t num_child_filters = 2 + num_channel_filters;
+ grpc_channel_filter const **child_filters;
+ grpc_transport_setup_result result;
+ child_entry *child_ent;
+ call_data **waiting_children;
+ size_t waiting_child_count;
+ size_t i;
+ grpc_call_op *call_ops;
+
+ /* build the child filter stack */
+ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
+ /* we always need a link back filter to get back to the connected channel */
+ child_filters[0] = &link_back_filter;
+ for (i = 0; i < num_channel_filters; i++) {
+ child_filters[i + 1] = channel_filters[i];
+ }
+ /* and we always need a connected channel to talk to the transport */
+ child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
+
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ /* BEGIN LOCKING CHANNEL */
+ gpr_mu_lock(&chand->mu);
+ chand->transport_setup_initiated = 0;
+
+ if (chand->child_count == chand->child_capacity) {
+ /* realloc will invalidate chand->active_child, but it's reset in the next
+ stanza anyway */
+ chand->child_capacity =
+ GPR_MAX(2 * chand->child_capacity, chand->child_capacity + 2);
+ chand->children = gpr_realloc(chand->children,
+ sizeof(child_entry) * chand->child_capacity);
+ }
+
+ /* build up the child stack */
+ child_stack =
+ gpr_malloc(grpc_channel_stack_size(child_filters, num_child_filters));
+ grpc_channel_stack_init(child_filters, num_child_filters, chand->args, mdctx,
+ child_stack);
+ lb_elem = grpc_channel_stack_element(child_stack, 0);
+ GPR_ASSERT(lb_elem->filter == &link_back_filter);
+ ((lb_channel_data *)lb_elem->channel_data)->back = elem;
+ result = grpc_connected_channel_bind_transport(child_stack, transport);
+ child_ent = &chand->children[chand->child_count++];
+ child_ent->channel_stack = child_stack;
+ child_ent->inflight_requests = 0;
+ chand->active_child = child_ent;
+
+ /* capture the waiting children - we'll activate them outside the lock
+ to avoid re-entrancy problems */
+ waiting_children = chand->waiting_children;
+ waiting_child_count = chand->waiting_child_count;
+ /* bumping up inflight_requests here avoids taking a lock per rpc below */
+
+ chand->waiting_children = NULL;
+ chand->waiting_child_count = 0;
+ chand->waiting_child_capacity = 0;
+
+ call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+
+ for (i = 0; i < waiting_child_count; i++) {
+ call_ops[i].type = GRPC_SEND_START;
+ call_ops[i].dir = GRPC_CALL_DOWN;
+ call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
+ call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
+ call_ops[i].user_data =
+ waiting_children[i]->s.waiting.on_complete_user_data;
+ if (!prepare_activate(waiting_children[i], child_ent)) {
+ waiting_children[i] = NULL;
+ call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ }
+ }
+
+ /* END LOCKING CHANNEL */
+ gpr_mu_unlock(&chand->mu);
+
+ /* activate any pending operations - this is safe to do as we guarantee one
+ and only one write operation per request at the surface api - if we lose
+ that guarantee we need to do some curly locking here */
+ for (i = 0; i < waiting_child_count; i++) {
+ if (waiting_children[i]) {
+ complete_activate(waiting_children[i], child_ent, &call_ops[i]);
+ }
+ }
+ gpr_free(waiting_children);
+ gpr_free(call_ops);
+ gpr_free(child_filters);
+
+ return result;
+}
+
+void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
+ grpc_transport_setup *setup) {
+ /* post construction initialization: set the transport setup pointer */
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(!chand->transport_setup);
+ chand->transport_setup = setup;
+}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
new file mode 100644
index 0000000000..576af64ec7
--- /dev/null
+++ b/src/core/channel/client_channel.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CLIENT_CHANNEL_H__
+#define __GRPC_INTERNAL_CHANNEL_CLIENT_CHANNEL_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* A client channel is a channel that begins disconnected, and can connect
+ to some endpoint on demand. If that endpoint disconnects, it will be
+ connected to again later.
+
+ Calls on a disconnected client channel are queued until a connection is
+ established. */
+
+extern const grpc_channel_filter grpc_client_channel_filter;
+
+/* post-construction initializer to let the client channel know which
+ transport setup it should cancel upon destruction, or initiate when it needs
+ a connection */
+void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
+ grpc_transport_setup *setup);
+
+/* grpc_transport_setup_callback for binding new transports into a client
+ channel - user_data should be the channel stack containing the client
+ channel */
+grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
+ grpc_channel_stack *channel_stack, grpc_transport *transport,
+ grpc_channel_filter const **channel_filters, size_t num_channel_filters,
+ grpc_mdctx *mdctx);
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CLIENT_CHANNEL_H__ */
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
new file mode 100644
index 0000000000..c667e39d9c
--- /dev/null
+++ b/src/core/channel/client_setup.c
@@ -0,0 +1,239 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/client_setup.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/channel_stack.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
+struct grpc_client_setup {
+ grpc_transport_setup base; /* must be first */
+ void (*initiate)(void *user_data, grpc_client_setup_request *request);
+ void (*done)(void *user_data);
+ void *user_data;
+ grpc_channel_args *args;
+ grpc_mdctx *mdctx;
+ grpc_em *em;
+ grpc_em_alarm backoff_alarm;
+ gpr_timespec current_backoff_interval;
+ int in_alarm;
+
+ gpr_mu mu;
+ grpc_client_setup_request *active_request;
+ int refs;
+};
+
+struct grpc_client_setup_request {
+ /* pointer back to the setup object */
+ grpc_client_setup *setup;
+ gpr_timespec deadline;
+};
+
+gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
+ return r->deadline;
+}
+
+static void destroy_setup(grpc_client_setup *s) {
+ gpr_mu_destroy(&s->mu);
+ s->done(s->user_data);
+ grpc_channel_args_destroy(s->args);
+ gpr_free(s);
+}
+
+/* initiate handshaking */
+static void setup_initiate(grpc_transport_setup *sp) {
+ grpc_client_setup *s = (grpc_client_setup *)sp;
+ grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
+ int in_alarm = 0;
+
+ r->setup = s;
+ /* TODO(klempner): Actually set a deadline */
+ r->deadline = gpr_inf_future;
+
+ gpr_mu_lock(&s->mu);
+ GPR_ASSERT(s->refs > 0);
+ /* there might be more than one request outstanding if the caller calls
+ initiate in some kind of rapid-fire way: we try to connect each time,
+ and keep track of the latest request (which is the only one that gets
+ to finish) */
+ if (!s->in_alarm) {
+ s->active_request = r;
+ s->refs++;
+ } else {
+ /* TODO(klempner): Maybe do something more clever here */
+ in_alarm = 1;
+ }
+ gpr_mu_unlock(&s->mu);
+
+ if (!in_alarm) {
+ s->initiate(s->user_data, r);
+ } else {
+ gpr_free(r);
+ }
+}
+
+/* cancel handshaking: cancel all requests, and shutdown (the caller promises
+ not to initiate again) */
+static void setup_cancel(grpc_transport_setup *sp) {
+ grpc_client_setup *s = (grpc_client_setup *)sp;
+ void *ignored;
+
+ gpr_mu_lock(&s->mu);
+
+ GPR_ASSERT(s->refs > 0);
+ /* effectively cancels the current request (if any) */
+ s->active_request = NULL;
+ if (s->in_alarm) {
+ grpc_em_alarm_cancel(&s->backoff_alarm, &ignored);
+ }
+ if (--s->refs == 0) {
+ gpr_mu_unlock(&s->mu);
+ destroy_setup(s);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ }
+}
+
+/* vtable for transport setup */
+static const grpc_transport_setup_vtable setup_vtable = {setup_initiate,
+ setup_cancel};
+
+void grpc_client_setup_create_and_attach(
+ grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
+ grpc_mdctx *mdctx,
+ void (*initiate)(void *user_data, grpc_client_setup_request *request),
+ void (*done)(void *user_data), void *user_data, grpc_em *em) {
+ grpc_client_setup *s = gpr_malloc(sizeof(grpc_client_setup));
+
+ s->base.vtable = &setup_vtable;
+ gpr_mu_init(&s->mu);
+ s->refs = 1;
+ s->mdctx = mdctx;
+ s->initiate = initiate;
+ s->done = done;
+ s->user_data = user_data;
+ s->em = em;
+ s->active_request = NULL;
+ s->args = grpc_channel_args_copy(args);
+ s->current_backoff_interval = gpr_time_from_micros(1000000);
+ s->in_alarm = 0;
+
+ grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base);
+}
+
+int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) {
+ int result;
+ if (gpr_time_cmp(gpr_now(), r->deadline) > 0) {
+ return 0;
+ }
+ gpr_mu_lock(&r->setup->mu);
+ result = r->setup->active_request == r;
+ gpr_mu_unlock(&r->setup->mu);
+ return result;
+}
+
+static void backoff_alarm_done(void *arg /* grpc_client_setup */,
+ grpc_em_cb_status status) {
+ grpc_client_setup *s = arg;
+ grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
+ r->setup = s;
+ /* TODO(klempner): Set this to something useful */
+ r->deadline = gpr_inf_future;
+ /* Handle status cancelled? */
+ gpr_mu_lock(&s->mu);
+ s->active_request = r;
+ s->in_alarm = 0;
+ if (status != GRPC_CALLBACK_SUCCESS) {
+ if (0 == --s->refs) {
+ gpr_mu_unlock(&s->mu);
+ destroy_setup(s);
+ gpr_free(r);
+ return;
+ } else {
+ gpr_mu_unlock(&s->mu);
+ return;
+ }
+ }
+ gpr_mu_unlock(&s->mu);
+ s->initiate(s->user_data, r);
+}
+
+void grpc_client_setup_request_finish(grpc_client_setup_request *r,
+ int was_successful) {
+ int retry = !was_successful;
+ grpc_client_setup *s = r->setup;
+
+ gpr_mu_lock(&s->mu);
+ if (s->active_request == r) {
+ s->active_request = NULL;
+ } else {
+ retry = 0;
+ }
+ if (!retry && 0 == --s->refs) {
+ gpr_mu_unlock(&s->mu);
+ destroy_setup(s);
+ gpr_free(r);
+ return;
+ }
+
+ gpr_free(r);
+
+ if (retry) {
+ /* TODO(klempner): Replace these values with further consideration. 2x is
+ probably too aggressive of a backoff. */
+ gpr_timespec max_backoff = gpr_time_from_micros(120000000);
+ GPR_ASSERT(!s->in_alarm);
+ s->in_alarm = 1;
+ grpc_em_alarm_init(&s->backoff_alarm, s->em, backoff_alarm_done, s);
+ grpc_em_alarm_add(&s->backoff_alarm,
+ gpr_time_add(s->current_backoff_interval, gpr_now()));
+ s->current_backoff_interval =
+ gpr_time_add(s->current_backoff_interval, s->current_backoff_interval);
+ if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) {
+ s->current_backoff_interval = max_backoff;
+ }
+ }
+
+ gpr_mu_unlock(&s->mu);
+}
+
+const grpc_channel_args *grpc_client_setup_get_channel_args(
+ grpc_client_setup_request *r) {
+ return r->setup->args;
+}
+
+grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r) {
+ return r->setup->mdctx;
+}
diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h
new file mode 100644
index 0000000000..862c1325a3
--- /dev/null
+++ b/src/core/channel/client_setup.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__
+#define __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__
+
+#include "src/core/channel/client_channel.h"
+#include "src/core/eventmanager/em.h"
+#include "src/core/transport/metadata.h"
+#include <grpc/support/time.h>
+
+/* Convenience API's to simplify transport setup */
+
+typedef struct grpc_client_setup grpc_client_setup;
+typedef struct grpc_client_setup_request grpc_client_setup_request;
+
+void grpc_client_setup_create_and_attach(
+ grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
+ grpc_mdctx *mdctx,
+ void (*initiate)(void *user_data, grpc_client_setup_request *request),
+ void (*done)(void *user_data), void *user_data, grpc_em *em);
+
+/* Check that r is the active request: needs to be performed at each callback.
+ If this races, we'll have two connection attempts running at once and the
+ old one will get cleaned up in due course, which is fine. */
+int grpc_client_setup_request_should_continue(grpc_client_setup_request *r);
+void grpc_client_setup_request_finish(grpc_client_setup_request *r,
+ int was_successful);
+const grpc_channel_args *grpc_client_setup_get_channel_args(
+ grpc_client_setup_request *r);
+
+/* Get the deadline for a request passed in to initiate. Implementations should
+ make a best effort to honor this deadline. */
+gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r);
+
+grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r);
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__ */
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
new file mode 100644
index 0000000000..336472e740
--- /dev/null
+++ b/src/core/channel/connected_channel.c
@@ -0,0 +1,501 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/connected_channel.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/transport/transport.h"
+#include <grpc/byte_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string.h>
+
+#define MAX_BUFFER_LENGTH 8192
+/* the protobuf library will (by default) start warning at 100megs */
+#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+
+typedef struct {
+ grpc_transport *transport;
+ gpr_uint32 max_message_length;
+} channel_data;
+
+typedef struct {
+ grpc_call_element *elem;
+ grpc_stream_op_buffer outgoing_sopb;
+
+ gpr_uint32 max_message_length;
+ gpr_uint32 incoming_message_length;
+ gpr_uint8 reading_message;
+ gpr_uint8 got_metadata_boundary;
+ gpr_uint8 got_read_close;
+ gpr_slice_buffer incoming_message;
+ gpr_uint32 outgoing_buffer_length_estimate;
+} call_data;
+
+/* We perform a small hack to locate transport data alongside the connected
+ channel data in call allocations, to allow everything to be pulled in minimal
+ cache line requests */
+#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream *)((calld)+1))
+#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
+ (((call_data *)(transport_stream)) - 1)
+
+/* Copy the contents of a byte buffer into stream ops */
+static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
+ grpc_stream_op_buffer *sopb) {
+ size_t i;
+
+ switch (byte_buffer->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ gpr_slice_ref(slice);
+ grpc_sopb_add_slice(sopb, slice);
+ }
+ break;
+ }
+}
+
+/* Flush queued stream operations onto the transport */
+static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
+ call_data *calld, int is_last) {
+ size_t nops;
+
+ if (op->flags & GRPC_WRITE_BUFFER_HINT) {
+ if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
+ op->done_cb(op->user_data, GRPC_OP_OK);
+ return;
+ }
+ }
+
+ calld->outgoing_buffer_length_estimate = 0;
+ grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
+
+ nops = calld->outgoing_sopb.nops;
+ calld->outgoing_sopb.nops = 0;
+ grpc_transport_send_batch(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ calld->outgoing_sopb.ops, nops, is_last);
+}
+
+/* Intercept a call operation and either push it directly up or translate it
+ into transport stream operations */
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ switch (op->type) {
+ case GRPC_SEND_METADATA:
+ grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
+ grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
+ op->user_data);
+ break;
+ case GRPC_SEND_DEADLINE:
+ grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline);
+ grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
+ op->user_data);
+ break;
+ case GRPC_SEND_START:
+ grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
+ end_bufferable_op(op, chand, calld, 0);
+ break;
+ case GRPC_SEND_MESSAGE:
+ grpc_sopb_add_begin_message(&calld->outgoing_sopb,
+ grpc_byte_buffer_length(op->data.message),
+ op->flags);
+ copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
+ calld->outgoing_buffer_length_estimate +=
+ (5 + grpc_byte_buffer_length(op->data.message));
+ end_bufferable_op(op, chand, calld, 0);
+ break;
+ case GRPC_SEND_FINISH:
+ end_bufferable_op(op, chand, calld, 1);
+ break;
+ case GRPC_REQUEST_DATA:
+ /* re-arm window updates if they were disarmed by finish_message */
+ grpc_transport_set_allow_window_updates(
+ chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
+ break;
+ case GRPC_CANCEL_OP:
+ grpc_transport_abort_stream(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ GRPC_STATUS_CANCELLED);
+ break;
+ default:
+ GPR_ASSERT(op->dir == GRPC_CALL_UP);
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+/* Currently we assume all channel operations should just be pushed up. */
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+
+ switch (op->type) {
+ case GRPC_CHANNEL_SHUTDOWN:
+ grpc_transport_close(chand->transport);
+ break;
+ default:
+ GPR_ASSERT(op->dir == GRPC_CALL_UP);
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ int r;
+
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ calld->elem = elem;
+ grpc_sopb_init(&calld->outgoing_sopb);
+
+ calld->reading_message = 0;
+ calld->got_metadata_boundary = 0;
+ calld->got_read_close = 0;
+ calld->outgoing_buffer_length_estimate = 0;
+ calld->max_message_length = chand->max_message_length;
+ gpr_slice_buffer_init(&calld->incoming_message);
+ r = grpc_transport_init_stream(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ server_transport_data);
+ GPR_ASSERT(r == 0);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ grpc_sopb_destroy(&calld->outgoing_sopb);
+ gpr_slice_buffer_destroy(&calld->incoming_message);
+ grpc_transport_destroy_stream(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld));
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ channel_data *cd = (channel_data *)elem->channel_data;
+ size_t i;
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(is_last);
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ cd->transport = NULL;
+
+ cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
+ if (args) {
+ for (i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
+ if (args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else if (args->args[i].value.integer < 0) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else {
+ cd->max_message_length = args->args[i].value.integer;
+ }
+ }
+ }
+ }
+}
+
+/* Destructor for channel_data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ channel_data *cd = (channel_data *)elem->channel_data;
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ grpc_transport_destroy(cd->transport);
+}
+
+const grpc_channel_filter grpc_connected_channel_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "connected",
+};
+
+static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
+ grpc_stream *stream, size_t size_hint) {
+ return gpr_slice_malloc(size_hint);
+}
+
+/* Transport callback to accept a new stream... calls up to handle it */
+static void accept_stream(void *user_data, grpc_transport *transport,
+ const void *transport_server_data) {
+ grpc_channel_element *elem = user_data;
+ channel_data *chand = elem->channel_data;
+ grpc_channel_op op;
+
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ GPR_ASSERT(chand->transport == transport);
+
+ op.type = GRPC_ACCEPT_CALL;
+ op.dir = GRPC_CALL_UP;
+ op.data.accept_call.transport = transport;
+ op.data.accept_call.transport_server_data = transport_server_data;
+ channel_op(elem, &op);
+}
+
+static void recv_error(channel_data *chand, call_data *calld, int line,
+ const char *fmt, ...) {
+ char msg[512];
+ va_list a;
+
+ va_start(a, fmt);
+ vsprintf(msg, fmt, a);
+ va_end(a);
+
+ gpr_log(__FILE__, line, GPR_LOG_SEVERITY_ERROR, "%s", msg);
+
+ if (chand->transport) {
+ grpc_transport_abort_stream(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ GRPC_STATUS_INVALID_ARGUMENT);
+ }
+}
+
+static void do_nothing(void *calldata, grpc_op_error error) {}
+
+static void done_message(void *user_data, grpc_op_error error) {
+ grpc_byte_buffer_destroy(user_data);
+}
+
+static void finish_message(channel_data *chand, call_data *calld) {
+ grpc_call_element *elem = calld->elem;
+ grpc_call_op call_op;
+ call_op.dir = GRPC_CALL_UP;
+ call_op.flags = 0;
+ /* if we got all the bytes for this message, call up the stack */
+ call_op.type = GRPC_RECV_MESSAGE;
+ call_op.done_cb = done_message;
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ call_op.user_data = call_op.data.message = grpc_byte_buffer_create(
+ calld->incoming_message.slices, calld->incoming_message.count);
+ gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
+
+ /* disable window updates until we get a request more from above */
+ grpc_transport_set_allow_window_updates(
+ chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
+
+ GPR_ASSERT(calld->incoming_message.count == 0);
+ calld->reading_message = 0;
+ grpc_call_next_op(elem, &call_op);
+}
+
+/* Handle incoming stream ops from the transport, translating them into
+ call_ops to pass up the call stack */
+static void recv_batch(void *user_data, grpc_transport *transport,
+ grpc_stream *stream, grpc_stream_op *ops,
+ size_t ops_count, grpc_stream_state final_state) {
+ call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
+ grpc_call_element *elem = calld->elem;
+ channel_data *chand = elem->channel_data;
+ grpc_stream_op *stream_op;
+ grpc_call_op call_op;
+ size_t i;
+ gpr_uint32 length;
+
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+
+ for (i = 0; i < ops_count; i++) {
+ stream_op = ops + i;
+ switch (stream_op->type) {
+ case GRPC_OP_FLOW_CTL_CB:
+ gpr_log(GPR_ERROR,
+ "should not receive flow control ops from transport");
+ abort();
+ break;
+ case GRPC_NO_OP:
+ break;
+ case GRPC_OP_METADATA:
+ call_op.type = GRPC_RECV_METADATA;
+ call_op.dir = GRPC_CALL_UP;
+ call_op.flags = 0;
+ call_op.data.metadata = stream_op->data.metadata;
+ call_op.done_cb = do_nothing;
+ call_op.user_data = NULL;
+ grpc_call_next_op(elem, &call_op);
+ break;
+ case GRPC_OP_DEADLINE:
+ call_op.type = GRPC_RECV_DEADLINE;
+ call_op.dir = GRPC_CALL_UP;
+ call_op.flags = 0;
+ call_op.data.deadline = stream_op->data.deadline;
+ call_op.done_cb = do_nothing;
+ call_op.user_data = NULL;
+ grpc_call_next_op(elem, &call_op);
+ break;
+ case GRPC_OP_METADATA_BOUNDARY:
+ if (!calld->got_metadata_boundary) {
+ calld->got_metadata_boundary = 1;
+ call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA;
+ call_op.dir = GRPC_CALL_UP;
+ call_op.flags = 0;
+ call_op.done_cb = do_nothing;
+ call_op.user_data = NULL;
+ grpc_call_next_op(elem, &call_op);
+ }
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ /* can't begin a message when we're still reading a message */
+ if (calld->reading_message) {
+ recv_error(chand, calld, __LINE__,
+ "Message terminated early; read %d bytes, expected %d",
+ calld->incoming_message.length,
+ calld->incoming_message_length);
+ return;
+ }
+ /* stash away parameters, and prepare for incoming slices */
+ length = stream_op->data.begin_message.length;
+ if (length > calld->max_message_length) {
+ recv_error(
+ chand, calld, __LINE__,
+ "Maximum message length of %d exceeded by a message of length %d",
+ calld->max_message_length, length);
+ } else if (length > 0) {
+ calld->reading_message = 1;
+ calld->incoming_message_length = length;
+ } else {
+ finish_message(chand, calld);
+ }
+ break;
+ case GRPC_OP_SLICE:
+ if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
+ gpr_slice_unref(stream_op->data.slice);
+ break;
+ }
+ /* we have to be reading a message to know what to do here */
+ if (!calld->reading_message) {
+ recv_error(chand, calld, __LINE__,
+ "Received payload data while not reading a message");
+ return;
+ }
+ /* append the slice to the incoming buffer */
+ gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
+ if (calld->incoming_message.length > calld->incoming_message_length) {
+ /* if we got too many bytes, complain */
+ recv_error(chand, calld, __LINE__,
+ "Receiving message overflow; read %d bytes, expected %d",
+ calld->incoming_message.length,
+ calld->incoming_message_length);
+ return;
+ } else if (calld->incoming_message.length ==
+ calld->incoming_message_length) {
+ finish_message(chand, calld);
+ }
+ }
+ }
+ /* if the stream closed, then call up the stack to let it know */
+ if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
+ final_state == GRPC_STREAM_CLOSED)) {
+ calld->got_read_close = 1;
+ if (calld->reading_message) {
+ recv_error(chand, calld, __LINE__,
+ "Last message truncated; read %d bytes, expected %d",
+ calld->incoming_message.length,
+ calld->incoming_message_length);
+ return;
+ }
+ call_op.type = GRPC_RECV_HALF_CLOSE;
+ call_op.dir = GRPC_CALL_UP;
+ call_op.flags = 0;
+ call_op.done_cb = do_nothing;
+ call_op.user_data = NULL;
+ grpc_call_next_op(elem, &call_op);
+ }
+ if (final_state == GRPC_STREAM_CLOSED) {
+ call_op.type = GRPC_RECV_FINISH;
+ call_op.dir = GRPC_CALL_UP;
+ call_op.flags = 0;
+ call_op.done_cb = do_nothing;
+ call_op.user_data = NULL;
+ grpc_call_next_op(elem, &call_op);
+ }
+}
+
+static void transport_closed(void *user_data, grpc_transport *transport) {
+ /* transport was closed ==> call up and handle it */
+ grpc_channel_element *elem = user_data;
+ channel_data *chand = elem->channel_data;
+ grpc_channel_op op;
+
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ GPR_ASSERT(chand->transport == transport);
+
+ op.type = GRPC_TRANSPORT_CLOSED;
+ op.dir = GRPC_CALL_UP;
+ channel_op(elem, &op);
+}
+
+const grpc_transport_callbacks connected_channel_transport_callbacks = {
+ alloc_recv_buffer, accept_stream, recv_batch, transport_closed,
+};
+
+grpc_transport_setup_result grpc_connected_channel_bind_transport(
+ grpc_channel_stack *channel_stack, grpc_transport *transport) {
+ /* Assumes that the connected channel filter is always the last filter
+ in a channel stack */
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *cd = (channel_data *)elem->channel_data;
+ grpc_transport_setup_result ret;
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ GPR_ASSERT(cd->transport == NULL);
+ cd->transport = transport;
+
+ /* HACK(ctiller): increase call stack size for the channel to make space
+ for channel data. We need a cleaner (but performant) way to do this,
+ and I'm not sure what that is yet.
+ This is only "safe" because call stacks place no additional data after
+ the last call element, and the last call element MUST be the connected
+ channel. */
+ channel_stack->call_stack_size += grpc_transport_stream_size(transport);
+
+ ret.user_data = elem;
+ ret.callbacks = &connected_channel_transport_callbacks;
+ return ret;
+}
diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h
new file mode 100644
index 0000000000..660ea7ad89
--- /dev/null
+++ b/src/core/channel/connected_channel.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CONNECTED_CHANNEL_H__
+#define __GRPC_INTERNAL_CHANNEL_CONNECTED_CHANNEL_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* A channel filter representing a channel that is on a connected transport.
+ This filter performs actual sending and receiving of messages. */
+
+extern const grpc_channel_filter grpc_connected_channel_filter;
+
+/* Post construction fixup: set the transport in the connected channel.
+ Must be called before any call stack using this filter is used. */
+grpc_transport_setup_result grpc_connected_channel_bind_transport(
+ grpc_channel_stack *channel_stack, grpc_transport *transport);
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CONNECTED_CHANNEL_H__ */
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
new file mode 100644
index 0000000000..b82c7352d3
--- /dev/null
+++ b/src/core/channel/http_client_filter.c
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/http_client_filter.h"
+#include <grpc/support/log.h>
+
+typedef struct call_data {
+ int unused; /* C89 requires at least one struct element */
+} call_data;
+
+typedef struct channel_data { grpc_mdelem *te_trailers; } channel_data;
+
+/* used to silence 'variable not used' warnings */
+static void ignore_unused(void *ignored) {}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ ignore_unused(calld);
+
+ switch (op->type) {
+ case GRPC_SEND_START:
+ /* just prior to starting, add a te: trailers header */
+ grpc_call_element_send_metadata(elem, channeld->te_trailers);
+ grpc_call_next_op(elem, op);
+ break;
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+/* Called on special channel events, such as disconnection or new incoming
+ calls on the server */
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+
+ switch (op->type) {
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+
+ /* initialize members */
+ calld->unused = 0;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(calld);
+ ignore_unused(channeld);
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ /* The first and the last filters tend to be implemented differently to
+ handle the case that there's no 'next' filter to call on the up or down
+ path */
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
+
+ /* initialize members */
+ channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ grpc_mdelem_unref(channeld->te_trailers);
+}
+
+const grpc_channel_filter grpc_http_client_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "http-client"};
diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h
new file mode 100644
index 0000000000..f939cbd351
--- /dev/null
+++ b/src/core/channel/http_client_filter.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__
+#define __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* Processes metadata on the client side for HTTP2 transports */
+extern const grpc_channel_filter grpc_http_client_filter;
+
+#endif /* __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__ */
diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c
new file mode 100644
index 0000000000..b5c154144e
--- /dev/null
+++ b/src/core/channel/http_filter.c
@@ -0,0 +1,139 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/http_filter.h"
+#include <grpc/support/log.h>
+
+typedef struct call_data {
+ int unused; /* C89 requires at least one struct element */
+} call_data;
+
+typedef struct channel_data {
+ int unused; /* C89 requires at least one struct element */
+} channel_data;
+
+/* used to silence 'variable not used' warnings */
+static void ignore_unused(void *ignored) {}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ ignore_unused(calld);
+ ignore_unused(channeld);
+
+ switch (op->type) {
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+/* Called on special channel events, such as disconnection or new incoming
+ calls on the server */
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+
+ switch (op->type) {
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ /* initialize members */
+ calld->unused = channeld->unused;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(calld);
+ ignore_unused(channeld);
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ /* The first and the last filters tend to be implemented differently to
+ handle the case that there's no 'next' filter to call on the up or down
+ path */
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
+
+ /* initialize members */
+ channeld->unused = 0;
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+}
+
+const grpc_channel_filter grpc_http_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "http"};
diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h
new file mode 100644
index 0000000000..89ad482d35
--- /dev/null
+++ b/src/core/channel/http_filter.h
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_HTTP_FILTER_H__
+#define __GRPC_INTERNAL_CHANNEL_HTTP_FILTER_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* Processes metadata that is common to both client and server for HTTP2
+ transports. */
+extern const grpc_channel_filter grpc_http_filter;
+
+#endif /* __GRPC_INTERNAL_CHANNEL_HTTP_FILTER_H__ */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
new file mode 100644
index 0000000000..b176064813
--- /dev/null
+++ b/src/core/channel/http_server_filter.c
@@ -0,0 +1,150 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/http_server_filter.h"
+#include <grpc/support/log.h>
+
+typedef struct call_data {
+ int unused; /* C89 requires at least one struct element */
+} call_data;
+
+typedef struct channel_data { grpc_mdelem *te_trailers; } channel_data;
+
+/* used to silence 'variable not used' warnings */
+static void ignore_unused(void *ignored) {}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ ignore_unused(calld);
+ ignore_unused(channeld);
+
+ switch (op->type) {
+ case GRPC_RECV_METADATA:
+ /* check if it's a te: trailers header */
+ if (op->data.metadata == channeld->te_trailers) {
+ /* swallow it */
+ grpc_mdelem_unref(op->data.metadata);
+ op->done_cb(op->user_data, GRPC_OP_OK);
+ } else {
+ /* pass the event up */
+ grpc_call_next_op(elem, op);
+ }
+ break;
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+/* Called on special channel events, such as disconnection or new incoming
+ calls on the server */
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+
+ switch (op->type) {
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+
+ /* initialize members */
+ calld->unused = 0;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(calld);
+ ignore_unused(channeld);
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ /* The first and the last filters tend to be implemented differently to
+ handle the case that there's no 'next' filter to call on the up or down
+ path */
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
+
+ /* initialize members */
+ channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ grpc_mdelem_unref(channeld->te_trailers);
+}
+
+const grpc_channel_filter grpc_http_server_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "http-server"};
diff --git a/src/core/channel/http_server_filter.h b/src/core/channel/http_server_filter.h
new file mode 100644
index 0000000000..5b475432aa
--- /dev/null
+++ b/src/core/channel/http_server_filter.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_HTTP_SERVER_FILTER_H__
+#define __GRPC_INTERNAL_CHANNEL_HTTP_SERVER_FILTER_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* Processes metadata on the client side for HTTP2 transports */
+extern const grpc_channel_filter grpc_http_server_filter;
+
+#endif /* __GRPC_INTERNAL_CHANNEL_HTTP_SERVER_FILTER_H__ */
diff --git a/src/core/channel/metadata_buffer.c b/src/core/channel/metadata_buffer.c
new file mode 100644
index 0000000000..75fd90b707
--- /dev/null
+++ b/src/core/channel/metadata_buffer.c
@@ -0,0 +1,198 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/metadata_buffer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+#include <string.h>
+
+#define INITIAL_ELEM_CAP 8
+
+/* One queued call; we track offsets to string data in a shared buffer to
+ reduce allocations. See grpc_metadata_buffer_impl for the memory use
+ strategy */
+typedef struct {
+ grpc_mdelem *md;
+ void (*cb)(void *user_data, grpc_op_error error);
+ void *user_data;
+ gpr_uint32 flags;
+} qelem;
+
+/* Memory layout:
+
+ grpc_metadata_buffer_impl
+ followed by an array of qelem */
+struct grpc_metadata_buffer_impl {
+ /* number of elements in q */
+ size_t elems;
+ /* capacity of q */
+ size_t elem_cap;
+};
+
+#define ELEMS(buffer) ((qelem *)((buffer)+1))
+
+void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer) {
+ /* start buffer as NULL, indicating no elements */
+ *buffer = NULL;
+}
+
+void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
+ grpc_op_error error) {
+ size_t i;
+ qelem *qe;
+ if (*buffer) {
+ for (i = 0; i < (*buffer)->elems; i++) {
+ qe = &ELEMS(*buffer)[i];
+ grpc_mdelem_unref(qe->md);
+ qe->cb(qe->user_data, error);
+ }
+ gpr_free(*buffer);
+ }
+}
+
+void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer,
+ grpc_call_op *op) {
+ grpc_metadata_buffer_impl *impl = *buffer;
+ qelem *qe;
+ size_t bytes;
+
+ GPR_ASSERT(op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA);
+
+ if (!impl) {
+ /* this is the first element: allocate enough space to hold the
+ header object and the initial element capacity of qelems */
+ bytes =
+ sizeof(grpc_metadata_buffer_impl) + INITIAL_ELEM_CAP * sizeof(qelem);
+ impl = gpr_malloc(bytes);
+ /* initialize the header object */
+ impl->elems = 0;
+ impl->elem_cap = INITIAL_ELEM_CAP;
+ } else if (impl->elems == impl->elem_cap) {
+ /* more qelems than what we can deal with: grow by doubling size */
+ impl->elem_cap *= 2;
+ bytes = sizeof(grpc_metadata_buffer_impl) + impl->elem_cap * sizeof(qelem);
+ impl = gpr_realloc(impl, bytes);
+ }
+
+ /* append an element to the queue */
+ qe = &ELEMS(impl)[impl->elems];
+ impl->elems++;
+
+ qe->md = op->data.metadata;
+ qe->cb = op->done_cb;
+ qe->user_data = op->user_data;
+ qe->flags = op->flags;
+
+ /* header object may have changed location: store it back */
+ *buffer = impl;
+}
+
+void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
+ grpc_call_element *elem) {
+ grpc_metadata_buffer_impl *impl = *buffer;
+ grpc_call_op op;
+ qelem *qe;
+ size_t i;
+
+ if (!impl) {
+ /* nothing to send */
+ return;
+ }
+
+ /* construct call_op's, and push them down the stack */
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ for (i = 0; i < impl->elems; i++) {
+ qe = &ELEMS(impl)[i];
+ op.done_cb = qe->cb;
+ op.user_data = qe->user_data;
+ op.flags = qe->flags;
+ op.data.metadata = qe->md;
+ grpc_call_next_op(elem, &op);
+ }
+
+ /* free data structures and reset to NULL: we can only flush once */
+ gpr_free(impl);
+ *buffer = NULL;
+}
+
+size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer) {
+ return *buffer ? (*buffer)->elems : 0;
+}
+
+typedef struct { grpc_metadata_buffer_impl *impl; } elems_hdr;
+
+grpc_metadata *grpc_metadata_buffer_extract_elements(
+ grpc_metadata_buffer *buffer) {
+ grpc_metadata_buffer_impl *impl;
+ elems_hdr *hdr;
+ qelem *src;
+ grpc_metadata *out;
+ size_t i;
+
+ impl = *buffer;
+
+ if (!impl) {
+ return NULL;
+ }
+
+ hdr = gpr_malloc(sizeof(elems_hdr) + impl->elems * sizeof(grpc_metadata));
+ src = ELEMS(impl);
+ out = (grpc_metadata *)(hdr + 1);
+
+ hdr->impl = impl;
+ for (i = 0; i < impl->elems; i++) {
+ out[i].key = (char *)grpc_mdstr_as_c_string(src[i].md->key);
+ out[i].value = (char *)grpc_mdstr_as_c_string(src[i].md->value);
+ out[i].value_length = GPR_SLICE_LENGTH(src[i].md->value->slice);
+ }
+
+ /* clear out buffer (it's not possible to extract elements twice */
+ *buffer = NULL;
+
+ return out;
+}
+
+void grpc_metadata_buffer_cleanup_elements(void *elements,
+ grpc_op_error error) {
+ elems_hdr *hdr = ((elems_hdr *)elements) - 1;
+
+ if (!elements) {
+ return;
+ }
+
+ grpc_metadata_buffer_destroy(&hdr->impl, error);
+ gpr_free(hdr);
+}
diff --git a/src/core/channel/metadata_buffer.h b/src/core/channel/metadata_buffer.h
new file mode 100644
index 0000000000..818b290ce2
--- /dev/null
+++ b/src/core/channel/metadata_buffer.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_METADATA_BUFFER_H__
+#define __GRPC_INTERNAL_CHANNEL_METADATA_BUFFER_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* Utility code to buffer GRPC_SEND_METADATA calls and pass them down the stack
+ all at once at some otherwise-determined time. Useful for implementing
+ filters that want to queue metadata until a START event chooses some
+ underlying filter stack to send an rpc on. */
+
+/* Clients should declare a member of grpc_metadata_buffer. This may at some
+ point become a typedef for a struct, but for now a pointer suffices */
+typedef struct grpc_metadata_buffer_impl grpc_metadata_buffer_impl;
+typedef grpc_metadata_buffer_impl *grpc_metadata_buffer;
+
+/* Initializes the metadata buffer. Allocates no memory. */
+void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer);
+/* Destroy the metadata buffer. */
+void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
+ grpc_op_error error);
+/* Append a call to the end of a metadata buffer: may allocate memory */
+void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, grpc_call_op *op);
+/* Flush all queued operations from the metadata buffer to the element below
+ self */
+void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
+ grpc_call_element *self);
+/* Count the number of queued elements in the buffer. */
+size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer);
+/* Extract elements as a grpc_metadata*, for presentation to applications.
+ The returned buffer must be freed with
+ grpc_metadata_buffer_cleanup_elements.
+ Clears the metadata buffer (this is a one-shot operation) */
+grpc_metadata *grpc_metadata_buffer_extract_elements(
+ grpc_metadata_buffer *buffer);
+void grpc_metadata_buffer_cleanup_elements(void *elements, grpc_op_error error);
+
+#endif /* __GRPC_INTERNAL_CHANNEL_METADATA_BUFFER_H__ */
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
new file mode 100644
index 0000000000..705df4a707
--- /dev/null
+++ b/src/core/channel/noop_filter.c
@@ -0,0 +1,138 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/noop_filter.h"
+#include <grpc/support/log.h>
+
+typedef struct call_data {
+ int unused; /* C89 requires at least one struct element */
+} call_data;
+
+typedef struct channel_data {
+ int unused; /* C89 requires at least one struct element */
+} channel_data;
+
+/* used to silence 'variable not used' warnings */
+static void ignore_unused(void *ignored) {}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(calld);
+ ignore_unused(channeld);
+
+ switch (op->type) {
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+/* Called on special channel events, such as disconnection or new incoming
+ calls on the server */
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+
+ switch (op->type) {
+ default:
+ /* pass control up or down the stack depending on op->dir */
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ /* initialize members */
+ calld->unused = channeld->unused;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(calld);
+ ignore_unused(channeld);
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ /* The first and the last filters tend to be implemented differently to
+ handle the case that there's no 'next' filter to call on the up or down
+ path */
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
+
+ /* initialize members */
+ channeld->unused = 0;
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ /* grab pointers to our data from the channel element */
+ channel_data *channeld = elem->channel_data;
+
+ ignore_unused(channeld);
+}
+
+const grpc_channel_filter grpc_no_op_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "no-op"};
diff --git a/src/core/channel/noop_filter.h b/src/core/channel/noop_filter.h
new file mode 100644
index 0000000000..4057ff7ac9
--- /dev/null
+++ b/src/core/channel/noop_filter.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_NOOP_FILTER_H__
+#define __GRPC_INTERNAL_CHANNEL_NOOP_FILTER_H__
+
+#include "src/core/channel/channel_stack.h"
+
+/* No-op filter: simply takes everything it's given, and passes it on to the
+ next filter. Exists simply as a starting point that others can take and
+ customize for their own filters */
+extern const grpc_channel_filter grpc_no_op_filter;
+
+#endif /* __GRPC_INTERNAL_CHANNEL_NOOP_FILTER_H__ */