aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-02-17 20:54:46 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-02-17 20:54:46 -0800
commit178edfae2be7014abce0998ca6c34babc65df499 (patch)
tree9ef0028ad1c54cf6729086a9067813da9c9d641f /src/core
parentc15cd723ebabbab4825480032b56a2ddd9a8b76b (diff)
Decouple filter selection from channel construction
Allow plugins to extend the set of filters used by gRPC core: - plugins at construction time can register against the 'channel_init' system to be allowed to mutate a new channel_stack_builder type - channel_stack_builder provides a central and rather dynamic place to construct the list of filters required by a channel stack - ultimately we construct the channel stack in the fashion we always have This is also a prerequisite step to allowing filters to be implemented from wrapped languages.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/grpc_plugin.c72
-rw-r--r--src/core/census/grpc_plugin.h (renamed from src/core/surface/server_create.c)20
-rw-r--r--src/core/channel/channel_stack_builder.c256
-rw-r--r--src/core/channel/channel_stack_builder.h138
-rw-r--r--src/core/channel/client_uchannel.c12
-rw-r--r--src/core/channel/connected_channel.c29
-rw-r--r--src/core/channel/connected_channel.h15
-rw-r--r--src/core/client_config/connector.h3
-rw-r--r--src/core/client_config/subchannel.c26
-rw-r--r--src/core/security/server_secure_chttp2.c5
-rw-r--r--src/core/surface/channel.c27
-rw-r--r--src/core/surface/channel.h9
-rw-r--r--src/core/surface/channel_create.c14
-rw-r--r--src/core/surface/channel_init.c146
-rw-r--r--src/core/surface/channel_init.h76
-rw-r--r--src/core/surface/channel_stack_type.c54
-rw-r--r--src/core/surface/channel_stack_type.h51
-rw-r--r--src/core/surface/init.c80
-rw-r--r--src/core/surface/init.h1
-rw-r--r--src/core/surface/init_secure.c45
-rw-r--r--src/core/surface/init_unsecure.c2
-rw-r--r--src/core/surface/lame_client.c14
-rw-r--r--src/core/surface/lame_client.h41
-rw-r--r--src/core/surface/secure_channel_create.c20
-rw-r--r--src/core/surface/server.c52
-rw-r--r--src/core/surface/server.h9
-rw-r--r--src/core/surface/server_chttp2.c5
-rw-r--r--src/core/transport/chttp2_transport.c5
-rw-r--r--src/core/transport/transport_impl.h3
29 files changed, 1023 insertions, 207 deletions
diff --git a/src/core/census/grpc_plugin.c b/src/core/census/grpc_plugin.c
new file mode 100644
index 0000000000..3be2a48eb8
--- /dev/null
+++ b/src/core/census/grpc_plugin.c
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/census/grpc_plugin.h"
+
+#include <limits.h>
+
+#include <grpc/census.h>
+
+#include "src/core/census/grpc_filter.h"
+#include "src/core/surface/channel_init.h"
+#include "src/core/channel/channel_stack_builder.h"
+
+static bool maybe_add_census_filter(grpc_channel_stack_builder *builder,
+ void *arg_must_be_null) {
+ const grpc_channel_args *args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ if (grpc_channel_args_is_census_enabled(args)) {
+ return grpc_channel_stack_builder_prepend_filter(
+ builder, &grpc_client_census_filter, NULL, NULL);
+ }
+ return true;
+}
+
+void census_grpc_plugin_init(void) {
+ /* Only initialize census if no one else has and some features are
+ * available. */
+ if (census_enabled() == CENSUS_FEATURE_NONE &&
+ census_supported() != CENSUS_FEATURE_NONE) {
+ if (census_initialize(census_supported())) { /* enable all features. */
+ gpr_log(GPR_ERROR, "Could not initialize census.");
+ }
+ }
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
+ maybe_add_census_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_CLIENT_UCHANNEL, INT_MAX,
+ maybe_add_census_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ maybe_add_census_filter, NULL);
+}
+
+void census_grpc_plugin_destroy(void) { census_shutdown(); }
diff --git a/src/core/surface/server_create.c b/src/core/census/grpc_plugin.h
index 5e37e80948..a51e8e1141 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/census/grpc_plugin.h
@@ -31,18 +31,10 @@
*
*/
-#include <grpc/grpc.h>
-#include "src/core/census/grpc_filter.h"
-#include "src/core/channel/channel_args.h"
-#include "src/core/channel/compress_filter.h"
-#include "src/core/surface/api_trace.h"
-#include "src/core/surface/completion_queue.h"
-#include "src/core/surface/server.h"
+#ifndef GRPC_INTERNAL_CORE_CENSUS_GRPC_PLUGIN_H
+#define GRPC_INTERNAL_CORE_CENSUS_GRPC_PLUGIN_H
-grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
- const grpc_channel_filter *filters[3];
- size_t num_filters = 0;
- filters[num_filters++] = &grpc_compress_filter;
- GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
- return grpc_server_create_from_filters(filters, num_filters, args);
-}
+void census_grpc_plugin_init(void);
+void census_grpc_plugin_destroy(void);
+
+#endif /* GRPC_INTERNAL_CORE_CENSUS_GRPC_PLUGIN_H */
diff --git a/src/core/channel/channel_stack_builder.c b/src/core/channel/channel_stack_builder.c
new file mode 100644
index 0000000000..d12fe59aa1
--- /dev/null
+++ b/src/core/channel/channel_stack_builder.c
@@ -0,0 +1,256 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack_builder.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+#define BEGIN_SENTINAL ((grpc_channel_filter *)1)
+#define END_SENTINAL ((grpc_channel_filter *)2)
+
+typedef struct filter_node {
+ struct filter_node *next;
+ struct filter_node *prev;
+ const grpc_channel_filter *filter;
+ grpc_post_filter_create_init_func init;
+ void *init_arg;
+} filter_node;
+
+struct grpc_channel_stack_builder {
+ // sentinal nodes for filters that have been added
+ filter_node begin;
+ filter_node end;
+ // various set/get-able parameters
+ const grpc_channel_args *args;
+ grpc_transport *transport;
+ const char *name;
+};
+
+struct grpc_channel_stack_builder_iterator {
+ grpc_channel_stack_builder *builder;
+ filter_node *node;
+};
+
+grpc_channel_stack_builder *grpc_channel_stack_builder_create(void) {
+ grpc_channel_stack_builder *b = gpr_malloc(sizeof(*b));
+ memset(b, 0, sizeof(*b));
+
+ b->begin.filter = BEGIN_SENTINAL;
+ b->end.filter = END_SENTINAL;
+ b->begin.next = &b->end;
+ b->begin.prev = &b->end;
+ b->end.next = &b->begin;
+ b->end.prev = &b->begin;
+
+ return b;
+}
+
+static grpc_channel_stack_builder_iterator *create_iterator_at_filter_node(
+ grpc_channel_stack_builder *builder, filter_node *node) {
+ grpc_channel_stack_builder_iterator *it = gpr_malloc(sizeof(*it));
+ it->builder = builder;
+ it->node = node;
+ return it;
+}
+
+void grpc_channel_stack_builder_iterator_destroy(
+ grpc_channel_stack_builder_iterator *it) {
+ gpr_free(it);
+}
+
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_first(
+ grpc_channel_stack_builder *builder) {
+ return create_iterator_at_filter_node(builder, &builder->begin);
+}
+
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_last(
+ grpc_channel_stack_builder *builder) {
+ return create_iterator_at_filter_node(builder, &builder->end);
+}
+
+bool grpc_channel_stack_builder_move_next(
+ grpc_channel_stack_builder_iterator *iterator) {
+ if (iterator->node == &iterator->builder->end) return false;
+ iterator->node = iterator->node->next;
+ return true;
+}
+
+bool grpc_channel_stack_builder_move_prev(
+ grpc_channel_stack_builder_iterator *iterator) {
+ if (iterator->node == &iterator->builder->begin) return false;
+ iterator->node = iterator->node->prev;
+ return true;
+}
+
+bool grpc_channel_stack_builder_move_prev(
+ grpc_channel_stack_builder_iterator *iterator);
+
+void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder,
+ const char *name) {
+ GPR_ASSERT(builder->name == NULL);
+ builder->name = name;
+}
+
+void grpc_channel_stack_builder_set_channel_arguments(
+ grpc_channel_stack_builder *builder, const grpc_channel_args *args) {
+ GPR_ASSERT(builder->args == NULL);
+ builder->args = args;
+}
+
+void grpc_channel_stack_builder_set_transport(
+ grpc_channel_stack_builder *builder, grpc_transport *transport) {
+ GPR_ASSERT(builder->transport == NULL);
+ builder->transport = transport;
+}
+
+grpc_transport *grpc_channel_stack_builder_get_transport(
+ grpc_channel_stack_builder *builder) {
+ return builder->transport;
+}
+
+const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments(
+ grpc_channel_stack_builder *builder) {
+ return builder->args;
+}
+
+bool grpc_channel_stack_builder_append_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ grpc_channel_stack_builder_iterator *it =
+ grpc_channel_stack_builder_create_iterator_at_last(builder);
+ bool ok = grpc_channel_stack_builder_add_filter_before(
+ it, filter, post_init_func, user_data);
+ grpc_channel_stack_builder_iterator_destroy(it);
+ return ok;
+}
+
+bool grpc_channel_stack_builder_prepend_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ grpc_channel_stack_builder_iterator *it =
+ grpc_channel_stack_builder_create_iterator_at_first(builder);
+ bool ok = grpc_channel_stack_builder_add_filter_after(
+ it, filter, post_init_func, user_data);
+ grpc_channel_stack_builder_iterator_destroy(it);
+ return ok;
+}
+
+static void add_after(filter_node *before, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) {
+ filter_node *new = gpr_malloc(sizeof(*new));
+ new->next = before->next;
+ new->prev = before;
+ new->next->prev = new->prev->next = new;
+ new->filter = filter;
+ new->init = post_init_func;
+ new->init_arg = user_data;
+}
+
+bool grpc_channel_stack_builder_add_filter_before(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ if (iterator->node == &iterator->builder->begin) return false;
+ add_after(iterator->node->prev, filter, post_init_func, user_data);
+ return true;
+}
+
+bool grpc_channel_stack_builder_add_filter_after(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func, void *user_data) {
+ if (iterator->node == &iterator->builder->end) return false;
+ add_after(iterator->node, filter, post_init_func, user_data);
+ return true;
+}
+
+void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) {
+ filter_node *p = builder->begin.next;
+ while (p != &builder->end) {
+ filter_node *next = p->next;
+ gpr_free(p);
+ p = next;
+ }
+ gpr_free(builder);
+}
+
+void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ size_t prefix_bytes, int initial_refs,
+ grpc_iomgr_cb_func destroy,
+ void *destroy_arg) {
+ // count the number of filters
+ size_t num_filters = 0;
+ for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
+ num_filters++;
+ }
+
+ // create an array of filters
+ const grpc_channel_filter *filters[num_filters];
+ size_t i = 0;
+ for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
+ filters[i++] = p->filter;
+ }
+
+ // calculate the size of the channel stack
+ size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters);
+
+ // allocate memory, with prefix_bytes followed by channel_stack_size
+ char *result = gpr_malloc(prefix_bytes + channel_stack_size);
+ // fetch a pointer to the channel stack
+ grpc_channel_stack *channel_stack =
+ (grpc_channel_stack *)(result + prefix_bytes);
+ // and initialize it
+ grpc_channel_stack_init(exec_ctx, initial_refs, destroy,
+ destroy_arg == NULL ? result : destroy_arg, filters,
+ num_filters, builder->args, builder->name,
+ channel_stack);
+
+ // run post-initialization functions
+ i = 0;
+ for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
+ if (p->init != NULL)
+ p->init(channel_stack, grpc_channel_stack_element(channel_stack, i),
+ p->init_arg);
+ i++;
+ }
+
+ grpc_channel_stack_builder_destroy(builder);
+
+ return result;
+}
diff --git a/src/core/channel/channel_stack_builder.h b/src/core/channel/channel_stack_builder.h
new file mode 100644
index 0000000000..163c8dbeb0
--- /dev/null
+++ b/src/core/channel/channel_stack_builder.h
@@ -0,0 +1,138 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_BUILDER_H
+#define GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_BUILDER_H
+
+#include <stdbool.h>
+
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/channel_stack.h"
+
+// grpc_channel_stack_builder offers a programmatic interface to selected
+// and order channel filters
+typedef struct grpc_channel_stack_builder grpc_channel_stack_builder;
+typedef struct grpc_channel_stack_builder_iterator
+ grpc_channel_stack_builder_iterator;
+
+// Create a new channel stack builder
+grpc_channel_stack_builder *grpc_channel_stack_builder_create(void);
+
+// Assign a name to the channel stack: string must be statically allocated
+void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder,
+ const char *name);
+
+// Attach a transport to the builder (does not take ownership)
+void grpc_channel_stack_builder_set_transport(
+ grpc_channel_stack_builder *builder, grpc_transport *transport);
+
+// Fetch attached transport
+grpc_transport *grpc_channel_stack_builder_get_transport(
+ grpc_channel_stack_builder *builder);
+
+// Set channel arguments: they must continue to exist until after
+// grpc_channel_stack_builder_finish returns
+void grpc_channel_stack_builder_set_channel_arguments(
+ grpc_channel_stack_builder *builder, const grpc_channel_args *args);
+
+// Return a borrowed pointer to the channel arguments
+const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments(
+ grpc_channel_stack_builder *builder);
+
+// Begin iterating over already defined filters in the builder
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_first(
+ grpc_channel_stack_builder *builder);
+grpc_channel_stack_builder_iterator *
+grpc_channel_stack_builder_create_iterator_at_last(
+ grpc_channel_stack_builder *builder);
+
+// Is an iterator at the first element?
+bool grpc_channel_stack_builder_iterator_is_first(
+ grpc_channel_stack_builder_iterator *iterator);
+
+// Is an iterator at the end?
+bool grpc_channel_stack_builder_iterator_is_end(
+ grpc_channel_stack_builder_iterator *iterator);
+
+// Move an iterator to the next item
+bool grpc_channel_stack_builder_move_next(
+ grpc_channel_stack_builder_iterator *iterator);
+
+bool grpc_channel_stack_builder_move_prev(
+ grpc_channel_stack_builder_iterator *iterator);
+
+typedef void (*grpc_post_filter_create_init_func)(
+ grpc_channel_stack *channel_stack, grpc_channel_element *elem, void *arg);
+
+// Add a filter to the stack, after the given iterator
+bool grpc_channel_stack_builder_add_filter_after(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+// Add a filter to the stack, before the given iterator
+bool grpc_channel_stack_builder_add_filter_before(
+ grpc_channel_stack_builder_iterator *iterator,
+ const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+// Add a filter to the beginning of the filter list
+bool grpc_channel_stack_builder_prepend_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+// Add a filter to the end of the filter list
+bool grpc_channel_stack_builder_append_filter(
+ grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
+ grpc_post_filter_create_init_func post_init_func,
+ void *user_data) GRPC_MUST_USE_RESULT;
+
+// Terminate iteration
+void grpc_channel_stack_builder_iterator_destroy(
+ grpc_channel_stack_builder_iterator *iterator);
+
+// Destroy the builder, return the freshly minted channel stack
+void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ size_t prefix_bytes, int initial_refs,
+ grpc_iomgr_cb_func destroy,
+ void *destroy_arg);
+
+// Destroy the builder without creating a channel stack
+void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder);
+
+#endif
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index b1e7155773..bc997b192c 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -212,20 +212,10 @@ void grpc_client_uchannel_watch_connectivity_state(
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args) {
grpc_channel *channel = NULL;
-#define MAX_FILTERS 3
- const grpc_channel_filter *filters[MAX_FILTERS];
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- size_t n = 0;
-
- if (grpc_channel_args_is_census_enabled(args)) {
- filters[n++] = &grpc_client_census_filter;
- }
- filters[n++] = &grpc_compress_filter;
- filters[n++] = &grpc_client_uchannel_filter;
- GPR_ASSERT(n <= MAX_FILTERS);
channel =
- grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1);
+ grpc_channel_create(&exec_ctx, NULL, args, GRPC_CLIENT_UCHANNEL, NULL);
return channel;
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index e8eb9dcfc5..c3060cd926 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -67,7 +67,6 @@ static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_transport_perform_stream_op(exec_ctx, chand->transport,
@@ -88,7 +87,6 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
channel_data *chand = elem->channel_data;
int r;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data);
@@ -108,7 +106,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
grpc_transport_destroy_stream(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@@ -119,7 +116,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element_args *args) {
channel_data *cd = (channel_data *)elem->channel_data;
GPR_ASSERT(args->is_last);
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
}
@@ -127,7 +123,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
grpc_transport_destroy(exec_ctx, cd->transport);
}
@@ -136,21 +131,18 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
return grpc_transport_get_peer(exec_ctx, chand->transport);
}
-const grpc_channel_filter grpc_connected_channel_filter = {
+static const grpc_channel_filter connected_channel_filter = {
con_start_transport_stream_op, con_start_transport_op, sizeof(call_data),
init_call_elem, set_pollset, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, con_get_peer, "connected",
};
-void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
- grpc_transport *transport) {
- /* Assumes that the connected channel filter is always the last filter
- in a channel stack */
- grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+static void bind_transport(grpc_channel_stack *channel_stack,
+ grpc_channel_element *elem, void *t) {
channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ GPR_ASSERT(elem->filter == &connected_channel_filter);
GPR_ASSERT(cd->transport == NULL);
- cd->transport = transport;
+ cd->transport = t;
/* HACK(ctiller): increase call stack size for the channel to make space
for channel data. We need a cleaner (but performant) way to do this,
@@ -158,7 +150,16 @@ void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
This is only "safe" because call stacks place no additional data after
the last call element, and the last call element MUST be the connected
channel. */
- channel_stack->call_stack_size += grpc_transport_stream_size(transport);
+ channel_stack->call_stack_size += grpc_transport_stream_size(t);
+}
+
+bool grpc_add_connected_filter(grpc_channel_stack_builder *builder,
+ void *arg_must_be_null) {
+ GPR_ASSERT(arg_must_be_null == NULL);
+ grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
+ GPR_ASSERT(t != NULL);
+ return grpc_channel_stack_builder_append_filter(
+ builder, &connected_channel_filter, bind_transport, t);
}
grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) {
diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h
index 95c1834bfa..2943dd2935 100644
--- a/src/core/channel/connected_channel.h
+++ b/src/core/channel/connected_channel.h
@@ -34,18 +34,9 @@
#ifndef GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H
#define GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H
-#include "src/core/channel/channel_stack.h"
+#include "src/core/channel/channel_stack_builder.h"
-/* A channel filter representing a channel that is on a connected transport.
- This filter performs actual sending and receiving of messages. */
-
-extern const grpc_channel_filter grpc_connected_channel_filter;
-
-/* Post construction fixup: set the transport in the connected channel.
- Must be called before any call stack using this filter is used. */
-void grpc_connected_channel_bind_transport(grpc_channel_stack* channel_stack,
- grpc_transport* transport);
-
-grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem);
+bool grpc_add_connected_filter(grpc_channel_stack_builder *builder,
+ void *arg_must_be_null);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index b91eb512c3..1b36182f50 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -62,9 +62,6 @@ typedef struct {
typedef struct {
/** the connected transport */
grpc_transport *transport;
- /** any additional filters (owned by the caller of connect) */
- const grpc_channel_filter **filters;
- size_t num_filters;
/** channel arguments (to be passed to the filters) */
const grpc_channel_args *channel_args;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6599c75dba..86d7ef783f 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -39,6 +39,7 @@
#include <grpc/support/avl.h>
#include "src/core/channel/channel_args.h"
+#include "src/core/surface/channel_init.h"
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/client_config/initial_connect_string.h"
@@ -511,32 +512,15 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
}
static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- size_t channel_stack_size;
grpc_connected_subchannel *con;
grpc_channel_stack *stk;
- size_t num_filters;
- const grpc_channel_filter **filters;
state_watcher *sw_subchannel;
- /* build final filter list */
- num_filters = c->num_filters + c->connecting_result.num_filters + 1;
- filters = gpr_malloc(sizeof(*filters) * num_filters);
- if (c->num_filters > 0) {
- memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
- }
- memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
- sizeof(*filters) * c->connecting_result.num_filters);
- filters[num_filters - 1] = &grpc_connected_channel_filter;
-
/* construct channel stack */
- channel_stack_size = grpc_channel_stack_size(filters, num_filters);
- con = gpr_malloc(channel_stack_size);
+ con = grpc_channel_init_create_stack(
+ exec_ctx, GRPC_CLIENT_SUBCHANNEL, 0, c->connecting_result.channel_args, 1,
+ connection_destroy, NULL, c->connecting_result.transport);
stk = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters,
- num_filters, c->connecting_result.channel_args,
- "CONNECTED_SUBCHANNEL", stk);
- grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
- gpr_free((void *)c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
@@ -551,7 +535,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
if (c->disconnected) {
gpr_mu_unlock(&c->mu);
gpr_free(sw_subchannel);
- gpr_free((void *)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
gpr_free(con);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
@@ -581,7 +564,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
"connected");
gpr_mu_unlock(&c->mu);
- gpr_free((void *)filters);
}
/* Generate a random number between 0 and 1. */
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 84a883390c..b4fbd769bd 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -83,8 +83,6 @@ static void state_unref(grpc_server_secure_state *state) {
static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep,
grpc_transport *transport,
grpc_auth_context *auth_context) {
- static grpc_channel_filter const *extra_filters[] = {
- &grpc_server_auth_filter, &grpc_http_server_filter};
grpc_server_secure_state *state = statep;
grpc_channel_args *args_copy;
grpc_arg args_to_add[2];
@@ -93,8 +91,7 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep,
args_copy = grpc_channel_args_copy_and_add(
grpc_server_get_channel_args(state->server), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
- grpc_server_setup_transport(exec_ctx, state->server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), args_copy);
+ grpc_server_setup_transport(exec_ctx, state->server, transport, args_copy);
grpc_channel_args_destroy(args_copy);
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 12d8ebceb9..964ab34431 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/surface/channel_init.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
@@ -82,24 +83,25 @@ struct grpc_channel {
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success);
-grpc_channel *grpc_channel_create_from_filters(
- grpc_exec_ctx *exec_ctx, const char *target,
- const grpc_channel_filter **filters, size_t num_filters,
- const grpc_channel_args *args, int is_client) {
- size_t i;
- size_t size =
- sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
- grpc_channel *channel = gpr_malloc(size);
+grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
+ const grpc_channel_args *args,
+ grpc_channel_stack_type channel_stack_type,
+ grpc_transport *optional_transport) {
+ bool is_client = grpc_channel_stack_type_is_client(channel_stack_type);
+
+ grpc_channel *channel = grpc_channel_init_create_stack(
+ exec_ctx, channel_stack_type, sizeof(grpc_channel), args, 1,
+ destroy_channel, NULL, optional_transport);
+
memset(channel, 0, sizeof(*channel));
channel->target = gpr_strdup(target);
- GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
if (args) {
- for (i = 0; i < args->num_args; i++) {
+ for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
if (args->args[i].type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
@@ -152,11 +154,6 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_free(default_authority);
}
- grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters,
- num_filters, args,
- is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
- CHANNEL_STACK_FROM_CHANNEL(channel));
-
return channel;
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 00240c637f..0a892bbe82 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -35,12 +35,13 @@
#define GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H
#include "src/core/channel/channel_stack.h"
+#include "src/core/surface/channel_stack_type.h"
#include "src/core/client_config/subchannel_factory.h"
-grpc_channel *grpc_channel_create_from_filters(
- grpc_exec_ctx *exec_ctx, const char *target,
- const grpc_channel_filter **filters, size_t count,
- const grpc_channel_args *args, int is_client);
+grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
+ const grpc_channel_args *args,
+ grpc_channel_stack_type channel_stack_type,
+ grpc_transport *optional_transport);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index fd7e20e9cc..123447c8ed 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -105,9 +105,6 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
0);
GPR_ASSERT(c->result->transport);
c->result->channel_args = c->args.channel_args;
- c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
- c->result->filters[0] = &grpc_http_client_filter;
- c->result->num_filters = 1;
} else {
memset(c->result, 0, sizeof(*c->result));
}
@@ -190,25 +187,16 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
void *reserved) {
grpc_channel *channel = NULL;
-#define MAX_FILTERS 3
- const grpc_channel_filter *filters[MAX_FILTERS];
grpc_resolver *resolver;
subchannel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- size_t n = 0;
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
GPR_ASSERT(!reserved);
- if (grpc_channel_args_is_census_enabled(args)) {
- filters[n++] = &grpc_client_census_filter;
- }
- filters[n++] = &grpc_compress_filter;
- filters[n++] = &grpc_client_channel_filter;
- GPR_ASSERT(n <= MAX_FILTERS);
channel =
- grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
+ grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
diff --git a/src/core/surface/channel_init.c b/src/core/surface/channel_init.c
new file mode 100644
index 0000000000..3b11402ebc
--- /dev/null
+++ b/src/core/surface/channel_init.c
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/channel_init.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+typedef struct stage_slot {
+ grpc_channel_init_stage fn;
+ void *arg;
+ int priority;
+ size_t insertion_order;
+} stage_slot;
+
+typedef struct stage_slots {
+ stage_slot *slots;
+ size_t num_slots;
+ size_t cap_slots;
+} stage_slots;
+
+static stage_slots g_slots[GRPC_NUM_CHANNEL_STACK_TYPES];
+static bool g_finalized;
+
+void grpc_channel_init_init(void) {
+ for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
+ g_slots[i].slots = NULL;
+ g_slots[i].num_slots = 0;
+ g_slots[i].cap_slots = 0;
+ }
+ g_finalized = false;
+}
+
+void grpc_channel_init_register_stage(grpc_channel_stack_type type,
+ int priority,
+ grpc_channel_init_stage stage,
+ void *stage_arg) {
+ GPR_ASSERT(!g_finalized);
+ if (g_slots[type].cap_slots == g_slots[type].num_slots) {
+ g_slots[type].cap_slots = GPR_MAX(8, 3 * g_slots[type].cap_slots / 2);
+ g_slots[type].slots =
+ gpr_realloc(g_slots[type].slots,
+ g_slots[type].cap_slots * sizeof(*g_slots[type].slots));
+ }
+ stage_slot *s = &g_slots[type].slots[g_slots[type].num_slots++];
+ s->insertion_order = g_slots[type].num_slots;
+ s->priority = priority;
+ s->fn = stage;
+ s->arg = stage_arg;
+}
+
+static int compare_slots(const void *a, const void *b) {
+ const stage_slot *sa = a;
+ const stage_slot *sb = b;
+
+ int c = GPR_ICMP(sa->priority, sb->priority);
+ if (c != 0) return c;
+ return GPR_ICMP(sa->insertion_order, sb->insertion_order);
+}
+
+void grpc_channel_init_finalize(void) {
+ GPR_ASSERT(!g_finalized);
+ for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
+ qsort(g_slots[i].slots, g_slots[i].num_slots, sizeof(*g_slots[i].slots),
+ compare_slots);
+ }
+ g_finalized = true;
+}
+
+void grpc_channel_init_shutdown(void) {
+ for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
+ gpr_free(g_slots[i].slots);
+ g_slots[i].slots = (void *)0xdeadbeef;
+ }
+}
+
+static const char *name_for_type(grpc_channel_stack_type type) {
+ switch (type) {
+ case GRPC_CLIENT_CHANNEL:
+ return "CLIENT_CHANNEL";
+ case GRPC_CLIENT_SUBCHANNEL:
+ return "CLIENT_SUBCHANNEL";
+ case GRPC_SERVER_CHANNEL:
+ return "SERVER_CHANNEL";
+ case GRPC_CLIENT_UCHANNEL:
+ return "CLIENT_UCHANNEL";
+ case GRPC_CLIENT_LAME_CHANNEL:
+ return "CLIENT_LAME_CHANNEL";
+ case GRPC_NUM_CHANNEL_STACK_TYPES:
+ break;
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+void *grpc_channel_init_create_stack(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes,
+ const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg, grpc_transport *transport) {
+ GPR_ASSERT(g_finalized);
+
+ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_channel_stack_builder_set_name(builder, name_for_type(type));
+ grpc_channel_stack_builder_set_channel_arguments(builder, args);
+ grpc_channel_stack_builder_set_transport(builder, transport);
+
+ for (size_t i = 0; i < g_slots[type].num_slots; i++) {
+ const stage_slot *slot = &g_slots[type].slots[i];
+ if (!slot->fn(builder, slot->arg)) {
+ grpc_channel_stack_builder_destroy(builder);
+ return NULL;
+ }
+ }
+
+ return grpc_channel_stack_builder_finish(exec_ctx, builder, prefix_bytes,
+ initial_refs, destroy, destroy_arg);
+}
diff --git a/src/core/surface/channel_init.h b/src/core/surface/channel_init.h
new file mode 100644
index 0000000000..ebda8bd9a0
--- /dev/null
+++ b/src/core/surface/channel_init.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_INIT_H
+#define GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_INIT_H
+
+#include "src/core/channel/channel_stack_builder.h"
+#include "src/core/surface/channel_stack_type.h"
+#include "src/core/transport/transport.h"
+
+// This module provides a way for plugins (and the grpc core library itself)
+// to register mutators for channel stacks.
+// It also provides a universal entry path to run those mutators to build
+// a channel stack for various subsystems.
+
+// One stage of mutation: call channel stack builder's to influence the finally
+// constructed channel stack
+typedef bool (*grpc_channel_init_stage)(grpc_channel_stack_builder *builder,
+ void *arg);
+
+// Global initialization of the system
+void grpc_channel_init_init(void);
+
+// Register one stage of mutators.
+// Stages are run in priority order (lowest to highest), and then in
+// registration order (in the case of a tie).
+// Stages are registered against one of the pre-determined channel stack
+// types.
+void grpc_channel_init_register_stage(grpc_channel_stack_type type,
+ int priority,
+ grpc_channel_init_stage stage,
+ void *stage_arg);
+
+// Finalize registration. No more calls to grpc_channel_init_register_stage are
+// allowed.
+void grpc_channel_init_finalize(void);
+// Shutdown the channel init system
+void grpc_channel_init_shutdown(void);
+
+// Construct a channel stack of some sort: see channel_stack.h for details
+void *grpc_channel_init_create_stack(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes,
+ const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg, grpc_transport *optional_transport);
+
+#endif
diff --git a/src/core/surface/channel_stack_type.c b/src/core/surface/channel_stack_type.c
new file mode 100644
index 0000000000..c2e6716135
--- /dev/null
+++ b/src/core/surface/channel_stack_type.c
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+#include "src/core/surface/channel_stack_type.h"
+#include <grpc/support/log.h>
+
+bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) {
+ switch (type) {
+ case GRPC_CLIENT_CHANNEL:
+ return true;
+ case GRPC_CLIENT_UCHANNEL:
+ return true;
+ case GRPC_CLIENT_SUBCHANNEL:
+ return true;
+ case GRPC_CLIENT_LAME_CHANNEL:
+ return true;
+ case GRPC_SERVER_CHANNEL:
+ return false;
+ case GRPC_NUM_CHANNEL_STACK_TYPES:
+ break;
+ }
+ GPR_UNREACHABLE_CODE(return true;);
+}
diff --git a/src/core/surface/channel_stack_type.h b/src/core/surface/channel_stack_type.h
new file mode 100644
index 0000000000..2669501cb9
--- /dev/null
+++ b/src/core/surface/channel_stack_type.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_SURFACE_CHANNEL_STACK_TYPE_H
+#define GRPC_INTERNAL_CORE_SURFACE_CHANNEL_STACK_TYPE_H
+
+#include <stdbool.h>
+
+typedef enum {
+ GRPC_CLIENT_CHANNEL,
+ GRPC_CLIENT_UCHANNEL,
+ GRPC_CLIENT_SUBCHANNEL,
+ GRPC_CLIENT_LAME_CHANNEL,
+ GRPC_SERVER_CHANNEL,
+ // must be last
+ GRPC_NUM_CHANNEL_STACK_TYPES
+} grpc_channel_stack_type;
+
+bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type);
+
+#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_STACK_TYPE_H */
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index a4a53d3ec1..890f86a9b1 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -33,13 +33,21 @@
#include <grpc/support/port_platform.h>
+#include <limits.h>
#include <memory.h>
-#include <grpc/census.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
+/* TODO(ctiller): find another way? - better not to include census here */
+#include "src/core/census/grpc_plugin.h"
#include "src/core/channel/channel_stack.h"
+#include "src/core/channel/compress_filter.h"
+#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/client_channel.h"
+#include "src/core/channel/client_uchannel.h"
+#include "src/core/channel/http_client_filter.h"
+#include "src/core/channel/http_server_filter.h"
#include "src/core/client_config/lb_policy_registry.h"
#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/client_config/lb_policies/round_robin.h"
@@ -54,11 +62,15 @@
#include "src/core/profiling/timers.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/call.h"
+#include "src/core/surface/channel_init.h"
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/init.h"
+#include "src/core/surface/lame_client.h"
+#include "src/core/surface/server.h"
#include "src/core/surface/surface_trace.h"
#include "src/core/transport/chttp2_transport.h"
#include "src/core/transport/connectivity_state.h"
+#include "src/core/transport/transport_impl.h"
#ifndef GRPC_DEFAULT_NAME_PREFIX
#define GRPC_DEFAULT_NAME_PREFIX "dns:///"
@@ -72,9 +84,56 @@ static int g_initializations;
static void do_basic_init(void) {
gpr_mu_init(&g_init_mu);
+ /* TODO(ctiller): ideally remove this strict linkage */
+ grpc_register_plugin(census_grpc_plugin_init, census_grpc_plugin_destroy);
g_initializations = 0;
}
+static bool append_filter(grpc_channel_stack_builder *builder, void *arg) {
+ return grpc_channel_stack_builder_append_filter(builder, arg, NULL, NULL);
+}
+
+static bool prepend_filter(grpc_channel_stack_builder *builder, void *arg) {
+ return grpc_channel_stack_builder_prepend_filter(builder, arg, NULL, NULL);
+}
+
+static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
+ void *arg) {
+ grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
+ if (t && strstr(t->vtable->name, "http")) {
+ return grpc_channel_stack_builder_prepend_filter(builder, arg, NULL, NULL);
+ }
+ return true;
+}
+
+static void register_builtin_channel_init() {
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, prepend_filter,
+ (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_UCHANNEL, INT_MAX,
+ prepend_filter,
+ (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
+ (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
+ maybe_add_http_filter,
+ (void *)&grpc_http_client_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
+ grpc_add_connected_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ maybe_add_http_filter,
+ (void *)&grpc_http_server_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ grpc_add_connected_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
+ (void *)&grpc_client_channel_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_UCHANNEL, INT_MAX, append_filter,
+ (void *)&grpc_client_uchannel_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL, INT_MAX,
+ append_filter, (void *)&grpc_lame_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
+ (void *)&grpc_server_top_filter);
+}
+
typedef struct grpc_plugin {
void (*init)();
void (*destroy)();
@@ -85,7 +144,7 @@ static int g_number_of_plugins = 0;
void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) {
GRPC_API_TRACE("grpc_register_plugin(init=%p, destroy=%p)", 2,
- ((void*)(intptr_t)init, (void*)(intptr_t)destroy));
+ ((void *)(intptr_t)init, (void *)(intptr_t)destroy));
GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS);
g_all_of_the_plugins[g_number_of_plugins].init = init;
g_all_of_the_plugins[g_number_of_plugins].destroy = destroy;
@@ -100,6 +159,7 @@ void grpc_init(void) {
if (++g_initializations == 1) {
gpr_time_init();
grpc_mdctx_global_init();
+ grpc_channel_init_init();
grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create());
grpc_register_lb_policy(grpc_pick_first_lb_factory_create());
grpc_register_lb_policy(grpc_round_robin_lb_factory_create());
@@ -119,14 +179,6 @@ void grpc_init(void) {
grpc_iomgr_init();
grpc_executor_init();
grpc_tracer_init("GRPC_TRACE");
- /* Only initialize census if no one else has and some features are
- * available. */
- if (census_enabled() == CENSUS_FEATURE_NONE &&
- census_supported() != CENSUS_FEATURE_NONE) {
- if (census_initialize(census_supported())) { /* enable all features. */
- gpr_log(GPR_ERROR, "Could not initialize census.");
- }
- }
gpr_timers_global_init();
grpc_cq_global_init();
grpc_subchannel_index_init();
@@ -135,6 +187,12 @@ void grpc_init(void) {
g_all_of_the_plugins[i].init();
}
}
+ /* register channel finalization AFTER all plugins, to ensure that it's run
+ * at the appropriate time */
+ grpc_register_security_filters();
+ register_builtin_channel_init();
+ /* no more changes to channel init pipelines */
+ grpc_channel_init_finalize();
}
gpr_mu_unlock(&g_init_mu);
GRPC_API_TRACE("grpc_init(void)", 0, ());
@@ -149,7 +207,6 @@ void grpc_shutdown(void) {
grpc_cq_global_shutdown();
grpc_iomgr_shutdown();
grpc_subchannel_index_shutdown();
- census_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
grpc_resolver_registry_shutdown();
@@ -159,6 +216,7 @@ void grpc_shutdown(void) {
g_all_of_the_plugins[i].destroy();
}
}
+ grpc_channel_init_shutdown();
grpc_mdctx_global_shutdown();
}
gpr_mu_unlock(&g_init_mu);
diff --git a/src/core/surface/init.h b/src/core/surface/init.h
index 771c30f412..f921fc863d 100644
--- a/src/core/surface/init.h
+++ b/src/core/surface/init.h
@@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_SURFACE_INIT_H
#define GRPC_INTERNAL_CORE_SURFACE_INIT_H
+void grpc_register_security_filters(void);
void grpc_security_pre_init(void);
int grpc_is_initialized(void);
diff --git a/src/core/surface/init_secure.c b/src/core/surface/init_secure.c
index fa20e91583..3df3a87a96 100644
--- a/src/core/surface/init_secure.c
+++ b/src/core/surface/init_secure.c
@@ -32,11 +32,56 @@
*/
#include "src/core/surface/init.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include "src/core/surface/channel_init.h"
#include "src/core/debug/trace.h"
+#include "src/core/security/auth_filters.h"
+#include "src/core/security/credentials.h"
#include "src/core/security/secure_endpoint.h"
+#include "src/core/security/security_connector.h"
#include "src/core/tsi/transport_security_interface.h"
void grpc_security_pre_init(void) {
grpc_register_tracer("secure_endpoint", &grpc_trace_secure_endpoint);
grpc_register_tracer("transport_security", &tsi_tracing_enabled);
}
+
+static bool maybe_prepend_client_auth_filter(
+ grpc_channel_stack_builder *builder, void *arg) {
+ const grpc_channel_args *args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ if (args) {
+ for (size_t i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(GRPC_SECURITY_CONNECTOR_ARG, args->args[i].key)) {
+ return grpc_channel_stack_builder_prepend_filter(
+ builder, &grpc_client_auth_filter, NULL, NULL);
+ }
+ }
+ }
+ return true;
+}
+
+static bool maybe_prepend_server_auth_filter(
+ grpc_channel_stack_builder *builder, void *arg) {
+ const grpc_channel_args *args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ if (args) {
+ for (size_t i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(GRPC_SERVER_CREDENTIALS_ARG, args->args[i].key)) {
+ return grpc_channel_stack_builder_prepend_filter(
+ builder, &grpc_server_auth_filter, NULL, NULL);
+ }
+ }
+ }
+ return true;
+}
+
+void grpc_register_security_filters(void) {
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
+ maybe_prepend_client_auth_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ maybe_prepend_server_auth_filter, NULL);
+}
diff --git a/src/core/surface/init_unsecure.c b/src/core/surface/init_unsecure.c
index 630d564a7d..7b26e01e9c 100644
--- a/src/core/surface/init_unsecure.c
+++ b/src/core/surface/init_unsecure.c
@@ -34,3 +34,5 @@
#include "src/core/surface/init.h"
void grpc_security_pre_init(void) {}
+
+void grpc_register_security_filters(void) {}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 537069e984..58f89946d2 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -31,6 +31,8 @@
*
*/
+#include "src/core/surface/lame_client.h"
+
#include <grpc/grpc.h>
#include <string.h>
@@ -115,7 +117,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
-static const grpc_channel_filter lame_filter = {
+const grpc_channel_filter grpc_lame_filter = {
lame_start_transport_stream_op, lame_start_transport_op, sizeof(call_data),
init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
@@ -127,19 +129,17 @@ static const grpc_channel_filter lame_filter = {
grpc_channel *grpc_lame_client_channel_create(const char *target,
grpc_status_code error_code,
const char *error_message) {
- grpc_channel *channel;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_channel_element *elem;
channel_data *chand;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- static const grpc_channel_filter *filters[] = {&lame_filter};
- channel =
- grpc_channel_create_from_filters(&exec_ctx, target, filters, 1, NULL, 1);
+ grpc_channel *channel = grpc_channel_create(&exec_ctx, target, NULL,
+ GRPC_CLIENT_LAME_CHANNEL, NULL);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GRPC_API_TRACE(
"grpc_lame_client_channel_create(target=%s, error_code=%d, "
"error_message=%s)",
3, (target, (int)error_code, error_message));
- GPR_ASSERT(elem->filter == &lame_filter);
+ GPR_ASSERT(elem->filter == &grpc_lame_filter);
chand = (channel_data *)elem->channel_data;
chand->error_code = error_code;
chand->error_message = error_message;
diff --git a/src/core/surface/lame_client.h b/src/core/surface/lame_client.h
new file mode 100644
index 0000000000..9bf3125880
--- /dev/null
+++ b/src/core/surface/lame_client.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_SURFACE_LAME_CHANNEL_H
+#define GRPC_INTERNAL_CORE_SURFACE_LAME_CHANNEL_H
+
+#include "src/core/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_lame_filter;
+
+#endif // GRPC_INTERNAL_CORE_SURFACE_LAME_CHANNEL_H
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 9c04426d87..dab55f853d 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -40,11 +40,8 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
-#include "src/core/census/grpc_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
-#include "src/core/channel/compress_filter.h"
-#include "src/core/channel/http_client_filter.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/security/auth_filters.h"
@@ -115,10 +112,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
args_copy = grpc_channel_args_copy_and_add(c->args.channel_args,
&auth_context_arg, 1);
c->result->channel_args = args_copy;
- c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
- c->result->filters[0] = &grpc_http_client_filter;
- c->result->filters[1] = &grpc_client_auth_filter;
- c->result->num_filters = 2;
}
notify = c->notify;
c->notify = NULL;
@@ -263,10 +256,7 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
grpc_channel_security_connector *security_connector;
grpc_resolver *resolver;
subchannel_factory *f;
-#define MAX_FILTERS 3
- const grpc_channel_filter *filters[MAX_FILTERS];
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- size_t n = 0;
GRPC_API_TRACE(
"grpc_secure_channel_create(creds=%p, target=%s, args=%p, "
@@ -295,15 +285,9 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
args_copy = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
- if (grpc_channel_args_is_census_enabled(args)) {
- filters[n++] = &grpc_client_census_filter;
- }
- filters[n++] = &grpc_compress_filter;
- filters[n++] = &grpc_client_channel_filter;
- GPR_ASSERT(n <= MAX_FILTERS);
- channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, n,
- args_copy, 1);
+ channel = grpc_channel_create(&exec_ctx, target, args_copy,
+ GRPC_CLIENT_CHANNEL, NULL);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index fb5e0d4b9e..c88f769eda 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -42,7 +42,6 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-#include "src/core/census/grpc_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/iomgr.h"
@@ -182,8 +181,6 @@ typedef struct {
} channel_broadcaster;
struct grpc_server {
- size_t channel_filter_count;
- grpc_channel_filter const **channel_filters;
grpc_channel_args *channel_args;
grpc_completion_queue **cqs;
@@ -355,7 +352,6 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
- gpr_free((void *)server->channel_filters);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
request_matcher_destroy(&rm->request_matcher);
@@ -750,7 +746,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
}
-static const grpc_channel_filter server_surface_filter = {
+const grpc_channel_filter grpc_server_top_filter = {
server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
@@ -776,11 +772,10 @@ void grpc_server_register_completion_queue(grpc_server *server,
server->cqs[n] = cq;
}
-grpc_server *grpc_server_create_from_filters(
- const grpc_channel_filter **filters, size_t filter_count,
- const grpc_channel_args *args) {
+grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
size_t i;
- int census_enabled = grpc_channel_args_is_census_enabled(args);
+
+ GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -808,23 +803,6 @@ grpc_server *grpc_server_create_from_filters(
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
- /* Server filter stack is:
-
- server_surface_filter - for making surface API calls
- grpc_server_census_filter (optional) - for stats collection and tracing
- {passed in filter stack}
- grpc_connected_channel_filter - for interfacing with transports */
- server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
- server->channel_filters =
- gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
- server->channel_filters[0] = &server_surface_filter;
- if (census_enabled) {
- server->channel_filters[1] = &grpc_server_census_filter;
- }
- for (i = 0; i < filter_count; i++) {
- server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
- }
-
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -885,12 +863,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
- grpc_channel_filter const **extra_filters,
- size_t num_extra_filters,
const grpc_channel_args *args) {
- size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
- grpc_channel_filter const **filters =
- gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
size_t i;
size_t num_registered_methods;
size_t alloc;
@@ -906,22 +879,14 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
uint32_t max_probes = 0;
grpc_transport_op op;
- for (i = 0; i < s->channel_filter_count; i++) {
- filters[i] = s->channel_filters[i];
- }
- for (; i < s->channel_filter_count + num_extra_filters; i++) {
- filters[i] = extra_filters[i - s->channel_filter_count];
- }
- filters[i] = &grpc_connected_channel_filter;
-
for (i = 0; i < s->cq_count; i++) {
memset(&op, 0, sizeof(op));
op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
grpc_transport_perform_op(exec_ctx, transport, &op);
}
- channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters,
- num_filters, args, 0);
+ channel =
+ grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)->channel_data;
chand->server = s;
@@ -958,17 +923,12 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
chand->registered_method_max_probes = max_probes;
}
- grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
- transport);
-
gpr_mu_lock(&s->mu_global);
chand->next = &s->root_channel_data;
chand->prev = chand->next->prev;
chand->next->prev = chand->prev->next = chand;
gpr_mu_unlock(&s->mu_global);
- gpr_free((void *)filters);
-
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
memset(&op, 0, sizeof(op));
op.set_accept_stream = accept_stream;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index a957fdb360..9a7f564970 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -34,14 +34,11 @@
#ifndef GRPC_INTERNAL_CORE_SURFACE_SERVER_H
#define GRPC_INTERNAL_CORE_SURFACE_SERVER_H
-#include "src/core/channel/channel_stack.h"
#include <grpc/grpc.h>
+#include "src/core/channel/channel_stack.h"
#include "src/core/transport/transport.h"
-/* Create a server */
-grpc_server *grpc_server_create_from_filters(
- const grpc_channel_filter **filters, size_t filter_count,
- const grpc_channel_args *args);
+extern const grpc_channel_filter grpc_server_top_filter;
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
@@ -56,8 +53,6 @@ void grpc_server_add_listener(
server */
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_transport *transport,
- grpc_channel_filter const **extra_filters,
- size_t num_extra_filters,
const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index ce970dfe73..ff2840f655 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -45,10 +45,7 @@
static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
grpc_transport *transport) {
- static grpc_channel_filter const *extra_filters[] = {
- &grpc_http_server_filter};
- grpc_server_setup_transport(exec_ctx, server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters),
+ grpc_server_setup_transport(exec_ctx, server, transport,
grpc_server_get_channel_args(server));
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 617d98875c..1901307d94 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1713,8 +1713,9 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
}
static const grpc_transport_vtable vtable = {
- sizeof(grpc_chttp2_stream), init_stream, set_pollset, perform_stream_op,
- perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer};
+ sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset,
+ perform_stream_op, perform_transport_op, destroy_stream, destroy_transport,
+ chttp2_get_peer};
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index 40bfb4b13a..16f7fd4d32 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -41,6 +41,9 @@ typedef struct grpc_transport_vtable {
layers and initialized by the transport */
size_t sizeof_stream; /* = sizeof(transport stream) */
+ /* name of this transport implementation */
+ const char *name;
+
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_stream_refcount *refcount,