aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/grpc_plugin.c24
-rw-r--r--src/core/ext/client_config/README.md (renamed from src/core/lib/client_config/README.md)2
-rw-r--r--src/core/ext/client_config/channel_connectivity.c (renamed from src/core/lib/surface/channel_connectivity.c)2
-rw-r--r--src/core/ext/client_config/client_channel.c (renamed from src/core/lib/channel/client_channel.c)44
-rw-r--r--src/core/ext/client_config/client_channel.h (renamed from src/core/lib/channel/client_channel.h)8
-rw-r--r--src/core/ext/client_config/client_channel_factory.c57
-rw-r--r--src/core/ext/client_config/client_channel_factory.h85
-rw-r--r--src/core/ext/client_config/client_config.c (renamed from src/core/lib/client_config/client_config.c)2
-rw-r--r--src/core/ext/client_config/client_config.h (renamed from src/core/lib/client_config/client_config.h)8
-rw-r--r--src/core/ext/client_config/client_config_plugin.c95
-rw-r--r--src/core/ext/client_config/connector.c (renamed from src/core/lib/client_config/connector.c)2
-rw-r--r--src/core/ext/client_config/connector.h (renamed from src/core/lib/client_config/connector.h)6
-rw-r--r--src/core/ext/client_config/default_initial_connect_string.c (renamed from src/core/lib/client_config/default_initial_connect_string.c)0
-rw-r--r--src/core/ext/client_config/initial_connect_string.c (renamed from src/core/lib/client_config/initial_connect_string.c)2
-rw-r--r--src/core/ext/client_config/initial_connect_string.h (renamed from src/core/lib/client_config/initial_connect_string.h)6
-rw-r--r--src/core/ext/client_config/lb_policy.c (renamed from src/core/lib/client_config/lb_policy.c)13
-rw-r--r--src/core/ext/client_config/lb_policy.h (renamed from src/core/lib/client_config/lb_policy.h)21
-rw-r--r--src/core/ext/client_config/lb_policy_factory.c (renamed from src/core/lib/client_config/lb_policy_factory.c)2
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h (renamed from src/core/lib/client_config/lb_policy_factory.h)12
-rw-r--r--src/core/ext/client_config/lb_policy_registry.c (renamed from src/core/lib/client_config/lb_policy_registry.c)2
-rw-r--r--src/core/ext/client_config/lb_policy_registry.h (renamed from src/core/lib/client_config/lb_policy_registry.h)8
-rw-r--r--src/core/ext/client_config/resolver.c (renamed from src/core/lib/client_config/resolver.c)2
-rw-r--r--src/core/ext/client_config/resolver.h (renamed from src/core/lib/client_config/resolver.h)10
-rw-r--r--src/core/ext/client_config/resolver_factory.c (renamed from src/core/lib/client_config/resolver_factory.c)2
-rw-r--r--src/core/ext/client_config/resolver_factory.h (renamed from src/core/lib/client_config/resolver_factory.h)14
-rw-r--r--src/core/ext/client_config/resolver_registry.c (renamed from src/core/lib/client_config/resolver_registry.c)6
-rw-r--r--src/core/ext/client_config/resolver_registry.h (renamed from src/core/lib/client_config/resolver_registry.h)10
-rw-r--r--src/core/ext/client_config/subchannel.c (renamed from src/core/lib/client_config/subchannel.c)49
-rw-r--r--src/core/ext/client_config/subchannel.h (renamed from src/core/lib/client_config/subchannel.h)8
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c (renamed from src/core/lib/channel/subchannel_call_holder.c)7
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.h (renamed from src/core/lib/channel/subchannel_call_holder.h)9
-rw-r--r--src/core/ext/client_config/subchannel_factory.c (renamed from src/core/lib/client_config/subchannel_factory.c)2
-rw-r--r--src/core/ext/client_config/subchannel_factory.h (renamed from src/core/lib/client_config/subchannel_factory.h)8
-rw-r--r--src/core/ext/client_config/subchannel_index.c (renamed from src/core/lib/client_config/subchannel_index.c)2
-rw-r--r--src/core/ext/client_config/subchannel_index.h (renamed from src/core/lib/client_config/subchannel_index.h)10
-rw-r--r--src/core/ext/client_config/uri_parser.c (renamed from src/core/lib/client_config/uri_parser.c)2
-rw-r--r--src/core/ext/client_config/uri_parser.h (renamed from src/core/lib/client_config/uri_parser.h)6
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c25
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c56
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c48
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c14
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c87
-rw-r--r--src/core/ext/resolver/zookeeper/zookeeper_resolver.c15
-rw-r--r--src/core/ext/transport/chttp2/alpn/alpn.c (renamed from src/core/ext/transport/chttp2/transport/alpn.c)2
-rw-r--r--src/core/ext/transport/chttp2/alpn/alpn.h (renamed from src/core/ext/transport/chttp2/transport/alpn.h)6
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c94
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c105
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_encoder.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_encoder.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_plugin.c46
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c20
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h3
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c6
-rw-r--r--src/core/lib/channel/channel_args.c12
-rw-r--r--src/core/lib/channel/channel_stack_builder.c25
-rw-r--r--src/core/lib/channel/channel_stack_builder.h10
-rw-r--r--src/core/lib/channel/http_client_filter.c10
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.h2
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.c16
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.h7
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix_noop.c11
-rw-r--r--src/core/lib/security/security_connector.c2
-rw-r--r--src/core/lib/surface/call.c41
-rw-r--r--src/core/lib/surface/channel.c32
-rw-r--r--src/core/lib/surface/channel.h1
-rw-r--r--src/core/lib/surface/channel_init.c16
-rw-r--r--src/core/lib/surface/channel_init.h9
-rw-r--r--src/core/lib/surface/init.c69
-rw-r--r--src/core/lib/surface/server.c267
-rw-r--r--src/core/lib/transport/metadata.c47
-rw-r--r--src/core/lib/transport/metadata.h6
-rw-r--r--src/core/lib/transport/transport.h8
-rw-r--r--src/core/plugin_registry/grpc_plugin_registry.c8
-rw-r--r--src/core/plugin_registry/grpc_unsecure_plugin_registry.c8
79 files changed, 1144 insertions, 567 deletions
diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c
index 0f15ecb2c2..e43ceafd0c 100644
--- a/src/core/ext/census/grpc_plugin.c
+++ b/src/core/ext/census/grpc_plugin.c
@@ -32,6 +32,7 @@
*/
#include <limits.h>
+#include <string.h>
#include <grpc/census.h>
@@ -39,13 +40,24 @@
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_init.h"
+static bool is_census_enabled(const grpc_channel_args *a) {
+ size_t i;
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; i++) {
+ if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
+ return a->args[i].value.integer != 0 && census_enabled();
+ }
+ }
+ return census_enabled();
+}
+
static bool maybe_add_census_filter(grpc_channel_stack_builder *builder,
- void *arg_must_be_null) {
+ void *arg) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
- if (grpc_channel_args_is_census_enabled(args)) {
+ if (is_census_enabled(args)) {
return grpc_channel_stack_builder_prepend_filter(
- builder, &grpc_client_census_filter, NULL, NULL);
+ builder, (const grpc_channel_filter *)arg, NULL, NULL);
}
return true;
}
@@ -60,9 +72,11 @@ void census_grpc_plugin_init(void) {
}
}
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
- maybe_add_census_filter, NULL);
+ maybe_add_census_filter,
+ (void *)&grpc_client_census_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
- maybe_add_census_filter, NULL);
+ maybe_add_census_filter,
+ (void *)&grpc_server_census_filter);
}
void census_grpc_plugin_shutdown(void) { census_shutdown(); }
diff --git a/src/core/lib/client_config/README.md b/src/core/ext/client_config/README.md
index fff7a5af5b..7024fd540d 100644
--- a/src/core/lib/client_config/README.md
+++ b/src/core/ext/client_config/README.md
@@ -40,7 +40,7 @@ decisions (for example, by avoiding disconnected backends).
Configured sub-channels are fully setup to participate in the grpc data plane.
Their behavior is specified by a set of grpc channel filters defined at their
construction. To customize this behavior, resolvers build
-grpc_subchannel_factory objects, which use the decorator pattern to customize
+grpc_client_channel_factory objects, which use the decorator pattern to customize
construction arguments for concrete grpc_subchannel instances.
diff --git a/src/core/lib/surface/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c
index 9a9ee422c2..3ebc333608 100644
--- a/src/core/lib/surface/channel_connectivity.c
+++ b/src/core/ext/client_config/channel_connectivity.c
@@ -36,7 +36,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/lib/channel/client_channel.h"
+#include "src/core/ext/client_config/client_channel.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/completion_queue.h"
diff --git a/src/core/lib/channel/client_channel.c b/src/core/ext/client_config/client_channel.c
index 3f7cf1cf97..93d54fdcfe 100644
--- a/src/core/lib/channel/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/channel/client_channel.h"
+#include "src/core/ext/client_config/client_channel.h"
#include <stdio.h>
#include <string.h>
@@ -41,9 +41,9 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/client_config/subchannel_call_holder.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/subchannel_call_holder.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
@@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
+static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand,
+ grpc_connectivity_state state,
+ const char *reason) {
+ if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ state == GRPC_CHANNEL_FATAL_FAILURE) &&
+ chand->lb_policy != NULL) {
+ /* cancel fail-fast picks */
+ grpc_lb_policy_cancel_picks(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
+ /* check= */ 0);
+ }
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+}
+
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
grpc_connectivity_state publish_state = w->state;
@@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
- "lb_changed");
+ set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+ "lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
@@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (iomgr_success && chand->resolver) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
- "new_lb+resolver");
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect && chand->resolver != NULL) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
@@ -298,6 +315,7 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
@@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
@@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
@@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
- initial_metadata, connected_subchannel, on_ready);
+ initial_metadata, initial_metadata_flags,
+ connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
@@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
+ cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
diff --git a/src/core/lib/channel/client_channel.h b/src/core/ext/client_config/client_channel.h
index ac418c8c51..1e47ad34ad 100644
--- a/src/core/lib/channel/client_channel.h
+++ b/src/core/ext/client_config/client_channel.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H
-#define GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
+#include "src/core/ext/client_config/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/resolver.h"
/* A client channel is a channel that begins disconnected, and can connect
to some endpoint on demand. If that endpoint disconnects, it will be
@@ -60,4 +60,4 @@ void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
-#endif /* GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H */
diff --git a/src/core/ext/client_config/client_channel_factory.c b/src/core/ext/client_config/client_channel_factory.c
new file mode 100644
index 0000000000..71c64c0da1
--- /dev/null
+++ b/src/core/ext/client_config/client_channel_factory.c
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_config/client_channel_factory.h"
+
+void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory) {
+ factory->vtable->ref(factory);
+}
+
+void grpc_client_channel_factory_unref(grpc_exec_ctx* exec_ctx,
+ grpc_client_channel_factory* factory) {
+ factory->vtable->unref(exec_ctx, factory);
+}
+
+grpc_subchannel* grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
+ grpc_subchannel_args* args) {
+ return factory->vtable->create_subchannel(exec_ctx, factory, args);
+}
+
+grpc_channel* grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
+ const char* target, grpc_client_channel_type type,
+ grpc_channel_args* args) {
+ return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
+ args);
+}
diff --git a/src/core/ext/client_config/client_channel_factory.h b/src/core/ext/client_config/client_channel_factory.h
new file mode 100644
index 0000000000..1241b9b781
--- /dev/null
+++ b/src/core/ext/client_config/client_channel_factory.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/ext/client_config/subchannel.h"
+#include "src/core/lib/channel/channel_stack.h"
+
+typedef struct grpc_client_channel_factory grpc_client_channel_factory;
+typedef struct grpc_client_channel_factory_vtable
+ grpc_client_channel_factory_vtable;
+
+typedef enum {
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, /** for the user-level regular calls */
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, /** for communication with a load
+ balancing service */
+} grpc_client_channel_type;
+
+/** Constructor for new configured channels.
+ Creating decorators around this type is encouraged to adapt behavior. */
+struct grpc_client_channel_factory {
+ const grpc_client_channel_factory_vtable *vtable;
+};
+
+struct grpc_client_channel_factory_vtable {
+ void (*ref)(grpc_client_channel_factory *factory);
+ void (*unref)(grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory);
+ grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory,
+ grpc_subchannel_args *args);
+ grpc_channel *(*create_client_channel)(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory,
+ const char *target,
+ grpc_client_channel_type type,
+ grpc_channel_args *args);
+};
+
+void grpc_client_channel_factory_ref(grpc_client_channel_factory *factory);
+void grpc_client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory);
+
+/** Create a new grpc_subchannel */
+grpc_subchannel *grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
+ grpc_subchannel_args *args);
+
+/** Create a new grpc_channel */
+grpc_channel *grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
+ const char *target, grpc_client_channel_type type, grpc_channel_args *args);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H */
diff --git a/src/core/lib/client_config/client_config.c b/src/core/ext/client_config/client_config.c
index 2521023364..f9b8e68698 100644
--- a/src/core/lib/client_config/client_config.c
+++ b/src/core/ext/client_config/client_config.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/client_config.h"
+#include "src/core/ext/client_config/client_config.h"
#include <string.h>
diff --git a/src/core/lib/client_config/client_config.h b/src/core/ext/client_config/client_config.h
index 8dc2f6b299..a6290cbcf0 100644
--- a/src/core/lib/client_config/client_config.h
+++ b/src/core/ext/client_config/client_config.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CONFIG_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CONFIG_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CONFIG_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CONFIG_H
-#include "src/core/lib/client_config/lb_policy.h"
+#include "src/core/ext/client_config/lb_policy.h"
/** Total configuration for a client. Provided, and updated, by
grpc_resolver */
@@ -50,4 +50,4 @@ void grpc_client_config_set_lb_policy(grpc_client_config *client_config,
grpc_lb_policy *grpc_client_config_get_lb_policy(
grpc_client_config *client_config);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CONFIG_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CONFIG_H */
diff --git a/src/core/ext/client_config/client_config_plugin.c b/src/core/ext/client_config/client_config_plugin.c
new file mode 100644
index 0000000000..5e31613420
--- /dev/null
+++ b/src/core/ext/client_config/client_config_plugin.c
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <limits.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/subchannel_index.h"
+#include "src/core/lib/surface/channel_init.h"
+
+#ifndef GRPC_DEFAULT_NAME_PREFIX
+#define GRPC_DEFAULT_NAME_PREFIX "dns:///"
+#endif
+
+static bool append_filter(grpc_channel_stack_builder *builder, void *arg) {
+ return grpc_channel_stack_builder_append_filter(
+ builder, (const grpc_channel_filter *)arg, NULL, NULL);
+}
+
+static bool set_default_host_if_unset(grpc_channel_stack_builder *builder,
+ void *unused) {
+ const grpc_channel_args *args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ for (size_t i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY) ||
+ 0 == strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
+ return true;
+ }
+ }
+ char *default_authority = grpc_get_default_authority(
+ grpc_channel_stack_builder_get_target(builder));
+ if (default_authority != NULL) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_STRING;
+ arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
+ arg.value.string = default_authority;
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
+ gpr_free(default_authority);
+ grpc_channel_args_destroy(new_args);
+ }
+ return true;
+}
+
+void grpc_client_config_init(void) {
+ grpc_lb_policy_registry_init();
+ grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX);
+ grpc_subchannel_index_init();
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
+ set_default_host_if_unset, NULL);
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
+ (void *)&grpc_client_channel_filter);
+}
+
+void grpc_client_config_shutdown(void) {
+ grpc_subchannel_index_shutdown();
+ grpc_channel_init_shutdown();
+ grpc_resolver_registry_shutdown();
+ grpc_lb_policy_registry_shutdown();
+}
diff --git a/src/core/lib/client_config/connector.c b/src/core/ext/client_config/connector.c
index 4c7b823dac..5b629ed5fb 100644
--- a/src/core/lib/client_config/connector.c
+++ b/src/core/ext/client_config/connector.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/connector.h"
+#include "src/core/ext/client_config/connector.h"
grpc_connector* grpc_connector_ref(grpc_connector* connector) {
connector->vtable->ref(connector);
diff --git a/src/core/lib/client_config/connector.h b/src/core/ext/client_config/connector.h
index 39870a261c..dd85dfcb7d 100644
--- a/src/core/lib/client_config/connector.h
+++ b/src/core/ext/client_config/connector.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_CONNECTOR_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_CONNECTOR_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -89,4 +89,4 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_connector *connector);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_CONNECTOR_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H */
diff --git a/src/core/lib/client_config/default_initial_connect_string.c b/src/core/ext/client_config/default_initial_connect_string.c
index a70da4a84a..a70da4a84a 100644
--- a/src/core/lib/client_config/default_initial_connect_string.c
+++ b/src/core/ext/client_config/default_initial_connect_string.c
diff --git a/src/core/lib/client_config/initial_connect_string.c b/src/core/ext/client_config/initial_connect_string.c
index 4034ea2e87..41580d2106 100644
--- a/src/core/lib/client_config/initial_connect_string.c
+++ b/src/core/ext/client_config/initial_connect_string.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/initial_connect_string.h"
+#include "src/core/ext/client_config/initial_connect_string.h"
#include <stddef.h>
diff --git a/src/core/lib/client_config/initial_connect_string.h b/src/core/ext/client_config/initial_connect_string.h
index 51302768c6..06f0767832 100644
--- a/src/core/lib/client_config/initial_connect_string.h
+++ b/src/core/ext/client_config/initial_connect_string.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
#include <grpc/support/slice.h>
#include "src/core/lib/iomgr/sockaddr.h"
@@ -47,4 +47,4 @@ void grpc_test_set_initial_connect_string_function(
void grpc_set_initial_connect_string(struct sockaddr **addr, size_t *addr_len,
gpr_slice *connect_string);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H */
diff --git a/src/core/lib/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 3d23669ec2..a7ad9842dc 100644
--- a/src/core/lib/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/lb_policy.h"
+#include "src/core/ext/client_config/lb_policy.h"
#define WEAK_REF_BITS 16
@@ -101,10 +101,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
- target, on_complete);
+ initial_metadata_flags, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -112,6 +113,14 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
+ initial_metadata_flags_eq);
+}
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/lib/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a63e8e68df..0384e0b2eb 100644
--- a/src/core/lib/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -60,9 +60,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -122,6 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
@@ -131,6 +136,14 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+/** Cancel all pending picks which have:
+ (initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq */
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
@@ -141,4 +154,4 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H */
diff --git a/src/core/lib/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c
index 92e1f5f08b..70e46ef3cf 100644
--- a/src/core/lib/client_config/lb_policy_factory.c
+++ b/src/core/ext/client_config/lb_policy_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/lb_policy_factory.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/lib/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index 6f21912821..1c89b28b59 100644
--- a/src/core/lib/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H
-#include "src/core/lib/client_config/lb_policy.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
+#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/lb_policy.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -51,7 +51,7 @@ struct grpc_lb_policy_factory {
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
@@ -75,4 +75,4 @@ grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H */
diff --git a/src/core/lib/client_config/lb_policy_registry.c b/src/core/ext/client_config/lb_policy_registry.c
index af396362a1..a23643ecc6 100644
--- a/src/core/lib/client_config/lb_policy_registry.c
+++ b/src/core/ext/client_config/lb_policy_registry.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include <string.h>
diff --git a/src/core/lib/client_config/lb_policy_registry.h b/src/core/ext/client_config/lb_policy_registry.h
index 4b8495d8a1..92f38d6de6 100644
--- a/src/core/lib/client_config/lb_policy_registry.h
+++ b/src/core/ext/client_config/lb_policy_registry.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
-#include "src/core/lib/client_config/lb_policy_factory.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
/** Initialize the registry and set \a default_factory as the factory to be
@@ -52,4 +52,4 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory);
grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
grpc_lb_policy_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */
diff --git a/src/core/lib/client_config/resolver.c b/src/core/ext/client_config/resolver.c
index b9eef5575f..eb004455bd 100644
--- a/src/core/lib/client_config/resolver.c
+++ b/src/core/ext/client_config/resolver.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/resolver.h"
+#include "src/core/ext/client_config/resolver.h"
void grpc_resolver_init(grpc_resolver *resolver,
const grpc_resolver_vtable *vtable) {
diff --git a/src/core/lib/client_config/resolver.h b/src/core/ext/client_config/resolver.h
index cf0bb2bc7a..6ecb5d2774 100644
--- a/src/core/lib/client_config/resolver.h
+++ b/src/core/ext/client_config/resolver.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H
-#include "src/core/lib/client_config/client_config.h"
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/client_config.h"
+#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/iomgr/iomgr.h"
typedef struct grpc_resolver grpc_resolver;
@@ -91,4 +91,4 @@ void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_closure *on_complete);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H */
diff --git a/src/core/lib/client_config/resolver_factory.c b/src/core/ext/client_config/resolver_factory.c
index 001fa28536..67832dcf59 100644
--- a/src/core/lib/client_config/resolver_factory.c
+++ b/src/core/ext/client_config/resolver_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/resolver_factory.h"
+#include "src/core/ext/client_config/resolver_factory.h"
void grpc_resolver_factory_ref(grpc_resolver_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/lib/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h
index a5bca06475..4eb6979aad 100644
--- a/src/core/lib/client_config/resolver_factory.h
+++ b/src/core/ext/client_config/resolver_factory.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H
-#include "src/core/lib/client_config/resolver.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
-#include "src/core/lib/client_config/uri_parser.h"
+#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/resolver.h"
+#include "src/core/ext/client_config/uri_parser.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
@@ -49,7 +49,7 @@ struct grpc_resolver_factory {
typedef struct grpc_resolver_args {
grpc_uri *uri;
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
@@ -79,4 +79,4 @@ grpc_resolver *grpc_resolver_factory_create_resolver(
char *grpc_resolver_factory_get_default_authority(
grpc_resolver_factory *factory, grpc_uri *uri);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H */
diff --git a/src/core/lib/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index 5f3db273b5..07f29bcb27 100644
--- a/src/core/lib/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include <string.h>
@@ -123,14 +123,14 @@ static grpc_resolver_factory *resolve_factory(const char *target,
}
grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory) {
+ const char *target, grpc_client_channel_factory *client_channel_factory) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
- args.subchannel_factory = subchannel_factory;
+ args.client_channel_factory = client_channel_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
diff --git a/src/core/lib/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h
index 36c4f2fe03..5ef1383cd3 100644
--- a/src/core/lib/client_config/resolver_registry.h
+++ b/src/core/ext/client_config/resolver_registry.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_REGISTRY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_REGISTRY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H
-#include "src/core/lib/client_config/resolver_factory.h"
+#include "src/core/ext/client_config/resolver_factory.h"
void grpc_resolver_registry_init(const char *default_prefix);
void grpc_resolver_registry_shutdown(void);
@@ -56,7 +56,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
return it.
If a resolver factory was not found, return NULL. */
grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory);
+ const char *target, grpc_client_channel_factory *client_channel_factory);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
@@ -66,4 +66,4 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name);
representing the default authority to pass from a client. */
char *grpc_get_default_authority(const char *target);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_REGISTRY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H */
diff --git a/src/core/lib/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 47c53a16ba..125a291f21 100644
--- a/src/core/lib/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -31,18 +31,18 @@
*
*/
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/subchannel.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/initial_connect_string.h"
+#include "src/core/ext/client_config/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/client_config/initial_connect_string.h"
-#include "src/core/lib/client_config/subchannel_index.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/backoff.h"
@@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
-#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -352,6 +352,25 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->args->args[i].value.integer,
c->args->args[i].value.integer);
}
+ if (0 ==
+ strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ if (c->args->args[i].type == GRPC_ARG_INTEGER) {
+ if (c->args->args[i].value.integer >= 0) {
+ gpr_backoff_init(
+ &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ GPR_MIN(c->args->args[i].value.integer,
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
+ c->args->args[i].value.integer);
+ } else {
+ gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
+ " : must be non-negative");
+ }
+ } else {
+ gpr_log(GPR_ERROR,
+ GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer");
+ }
+ }
}
}
gpr_mu_init(&c->mu);
@@ -527,9 +546,20 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
state_watcher *sw_subchannel;
/* construct channel stack */
- con = grpc_channel_init_create_stack(
- exec_ctx, GRPC_CLIENT_SUBCHANNEL, 0, c->connecting_result.channel_args, 1,
- connection_destroy, NULL, c->connecting_result.transport);
+ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_channel_stack_builder_set_channel_arguments(
+ builder, c->connecting_result.channel_args);
+ grpc_channel_stack_builder_set_transport(builder,
+ c->connecting_result.transport);
+
+ if (grpc_channel_init_create_stack(exec_ctx, builder,
+ GRPC_CLIENT_SUBCHANNEL)) {
+ con = grpc_channel_stack_builder_finish(exec_ctx, builder, 0, 1,
+ connection_destroy, NULL);
+ } else {
+ grpc_channel_stack_builder_destroy(builder);
+ abort(); /* TODO(ctiller): what to do here (previously we just crashed) */
+ }
stk = CHANNEL_STACK_FROM_CONNECTION(con);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@@ -557,7 +587,8 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
- /* setup subchannel watching connected subchannel for changes; subchannel ref
+ /* setup subchannel watching connected subchannel for changes; subchannel
+ ref
for connecting is donated
to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
diff --git a/src/core/lib/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 68aeff39a1..0765a544e8 100644
--- a/src/core/lib/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H
+#include "src/core/ext/client_config/connector.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/connector.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A (sub-)channel that knows how to connect to exactly one target
@@ -171,4 +171,4 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
grpc_subchannel_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H */
diff --git a/src/core/lib/channel/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 22f3679bf5..3db462b246 100644
--- a/src/core/lib/channel/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/channel/subchannel_call_holder.h"
+#include "src/core/ext/client_config/subchannel_call_holder.h"
#include <grpc/support/alloc.h>
@@ -127,7 +127,7 @@ retry:
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
- &holder->connected_subchannel, NULL);
+ 0, &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@@ -145,7 +145,8 @@ retry:
GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
if (holder->pick_subchannel(
exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
- &holder->connected_subchannel, &holder->next_step)) {
+ op->send_initial_metadata_flags, &holder->connected_subchannel,
+ &holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
diff --git a/src/core/lib/channel/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 5cf291a266..9299908788 100644
--- a/src/core/lib/channel/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
-#define GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/subchannel.h"
/** Pick a subchannel for grpc_subchannel_call_holder;
Return 1 if subchannel is available immediately (in which case on_ready
@@ -42,6 +42,7 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {
@@ -94,4 +95,4 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
-#endif /* GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H */
diff --git a/src/core/lib/client_config/subchannel_factory.c b/src/core/ext/client_config/subchannel_factory.c
index 541368ec96..d1e4d75a02 100644
--- a/src/core/lib/client_config/subchannel_factory.c
+++ b/src/core/ext/client_config/subchannel_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/subchannel_factory.h"
+#include "src/core/ext/client_config/subchannel_factory.h"
void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/lib/client_config/subchannel_factory.h b/src/core/ext/client_config/subchannel_factory.h
index 96d68a2079..0fb806d081 100644
--- a/src/core/lib/client_config/subchannel_factory.h
+++ b/src/core/ext/client_config/subchannel_factory.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
+#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/subchannel.h"
typedef struct grpc_subchannel_factory grpc_subchannel_factory;
typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
@@ -63,4 +63,4 @@ grpc_subchannel *grpc_subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
grpc_subchannel_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
diff --git a/src/core/lib/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c
index 2c545002a2..ab8d9bd91d 100644
--- a/src/core/lib/client_config/subchannel_index.c
+++ b/src/core/ext/client_config/subchannel_index.c
@@ -31,7 +31,7 @@
//
//
-#include "src/core/lib/client_config/subchannel_index.h"
+#include "src/core/ext/client_config/subchannel_index.h"
#include <stdbool.h>
#include <string.h>
diff --git a/src/core/lib/client_config/subchannel_index.h b/src/core/ext/client_config/subchannel_index.h
index bc5f03beb4..6b8d063855 100644
--- a/src/core/lib/client_config/subchannel_index.h
+++ b/src/core/ext/client_config/subchannel_index.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
-#include "src/core/lib/client_config/connector.h"
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/connector.h"
+#include "src/core/ext/client_config/subchannel.h"
/** \file Provides an index of active subchannels so that they can be
shared amongst channels */
@@ -74,4 +74,4 @@ void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */
diff --git a/src/core/lib/client_config/uri_parser.c b/src/core/ext/client_config/uri_parser.c
index 6bec70da2d..3ca1a58e69 100644
--- a/src/core/lib/client_config/uri_parser.c
+++ b/src/core/ext/client_config/uri_parser.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/uri_parser.h"
+#include "src/core/ext/client_config/uri_parser.h"
#include <string.h>
diff --git a/src/core/lib/client_config/uri_parser.h b/src/core/ext/client_config/uri_parser.h
index 5d6785d293..875a7cb07c 100644
--- a/src/core/lib/client_config/uri_parser.h
+++ b/src/core/ext/client_config/uri_parser.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_URI_PARSER_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_URI_PARSER_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H
#include <stddef.h>
@@ -60,4 +60,4 @@ const char *grpc_uri_get_query_arg(const grpc_uri *uri, const char *key);
/** destroy a uri */
void grpc_uri_destroy(grpc_uri *uri);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_URI_PARSER_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H */
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index d8af644870..459d6d9954 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -110,13 +110,15 @@ grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response) {
grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response));
memset(res, 0, sizeof(*res));
status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
- GPR_ASSERT(status == true);
+ if (!status) {
+ grpc_grpclb_response_destroy(res);
+ return NULL;
+ }
return res;
}
grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
gpr_slice encoded_response) {
- grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist));
bool status;
decode_serverlist_arg arg;
pb_istream_t stream =
@@ -131,15 +133,20 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
res->server_list.servers.arg = &arg;
arg.first_pass = 1;
status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
- GPR_ASSERT(status == true);
- GPR_ASSERT(arg.num_servers > 0);
+ if (!status) {
+ grpc_grpclb_response_destroy(res);
+ return NULL;
+ }
arg.first_pass = 0;
status =
pb_decode(&stream_at_start, grpc_lb_v0_LoadBalanceResponse_fields, res);
- GPR_ASSERT(status == true);
- GPR_ASSERT(arg.servers != NULL);
+ if (!status) {
+ grpc_grpclb_response_destroy(res);
+ return NULL;
+ }
+ grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist));
sl->num_servers = arg.num_servers;
sl->servers = arg.servers;
if (res->server_list.has_expiration_interval) {
@@ -150,8 +157,10 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
}
void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) {
- size_t i;
- for (i = 0; i < serverlist->num_servers; i++) {
+ if (serverlist == NULL) {
+ return;
+ }
+ for (size_t i = 0; i < serverlist->num_servers; i++) {
gpr_free(serverlist->servers[i]);
}
gpr_free(serverlist->servers);
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index d329a2ffe8..968f7d278a 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -36,8 +36,8 @@
#include <grpc/support/slice_buffer.h>
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h"
-#include "src/core/lib/client_config/lb_policy_factory.h"
#ifdef __cplusplus
extern "C" {
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index cb5c40501e..5926f9d70b 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -34,12 +34,13 @@
#include <string.h>
#include <grpc/support/alloc.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/transport/connectivity_state.h"
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -149,6 +150,31 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
@@ -171,6 +197,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -199,6 +226,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->next = p->pending_picks;
pp->pollset = pollset;
pp->target = target;
+ pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -286,11 +314,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
+ if (p->checking_subchannel == 0) {
+ /* only trigger transient failure when we've tried all alternatives */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connecting_transient_failure");
+ }
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@@ -378,14 +409,9 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy,
- pf_shutdown,
- pf_pick,
- pf_cancel_pick,
- pf_ping_one,
- pf_exit_idle,
- pf_check_connectivity,
- pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick,
+ pf_cancel_pick, pf_cancel_picks, pf_ping_one,
+ pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -395,7 +421,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
- GPR_ASSERT(args->subchannel_factory != NULL);
+ GPR_ASSERT(args->client_channel_factory != NULL);
if (args->addresses->naddrs == 0) return NULL;
@@ -412,8 +438,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
- grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
- exec_ctx, args->subchannel_factory, &sc_args);
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index d94c081494..3f6051b892 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -35,7 +35,7 @@
#include <grpc/support/alloc.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -49,6 +49,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -275,6 +276,32 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ *pp->target = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@@ -303,6 +330,7 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -330,6 +358,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollset = pollset;
pp->target = target;
pp->on_complete = on_complete;
+ pp->initial_metadata_flags = initial_metadata_flags;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -485,14 +514,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown,
- rr_pick,
- rr_cancel_pick,
- rr_ping_one,
- rr_exit_idle,
- rr_check_connectivity,
- rr_notify_on_state_change};
+ rr_destroy, rr_shutdown, rr_pick,
+ rr_cancel_pick, rr_cancel_picks, rr_ping_one,
+ rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -502,7 +526,7 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
- GPR_ASSERT(args->subchannel_factory != NULL);
+ GPR_ASSERT(args->client_channel_factory != NULL);
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
@@ -518,8 +542,8 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
- grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
- exec_ctx, args->subchannel_factory, &sc_args);
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 70d8a3fe2d..2749b0ca01 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -37,8 +37,8 @@
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/backoff.h"
@@ -59,7 +59,7 @@ typedef struct {
/** default port to use */
char *default_port;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -170,7 +170,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
config = grpc_client_config_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
@@ -228,7 +228,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
@@ -255,10 +255,10 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
- r->subchannel_factory = args->subchannel_factory;
+ r->client_channel_factory = args->client_channel_factory;
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 69595ca3db..1d54a86c39 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,17 +31,21 @@
*
*/
-#include <grpc/support/port_platform.h>
-
+#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
+#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
+#ifdef GPR_HAVE_UNIX_SOCKET
+#include <sys/un.h>
+#endif
+
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
@@ -52,7 +56,7 @@ typedef struct {
/** refcount */
gpr_refcount refs;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -125,7 +129,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = r->addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(cfg, lb_policy);
@@ -140,7 +144,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -162,6 +166,24 @@ static char *ipv6_get_default_authority(grpc_resolver_factory *factory,
return ip_get_default_authority(uri);
}
+#ifdef GPR_HAVE_UNIX_SOCKET
+static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr,
+ size_t *len) {
+ struct sockaddr_un *un = (struct sockaddr_un *)addr;
+
+ un->sun_family = AF_UNIX;
+ strcpy(un->sun_path, uri->path);
+ *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
+
+ return 1;
+}
+
+char *unix_get_default_authority(grpc_resolver_factory *factory,
+ grpc_uri *uri) {
+ return gpr_strdup("localhost");
+}
+#endif
+
static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr,
size_t *len) {
const char *host_port = uri->path;
@@ -263,22 +285,24 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
- r->lb_policy_name = NULL;
- if (0 != strcmp(args->uri->query, "")) {
- gpr_slice query_slice;
- gpr_slice_buffer query_parts;
-
- query_slice =
- gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing);
- gpr_slice_buffer_init(&query_parts);
- gpr_slice_split(query_slice, "=", &query_parts);
- GPR_ASSERT(query_parts.count == 2);
- if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) {
- r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII);
- }
- gpr_slice_buffer_destroy(&query_parts);
- gpr_slice_unref(query_slice);
+ r->lb_policy_name =
+ gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy"));
+ const char *lb_enabled_qpart =
+ grpc_uri_get_query_arg(args->uri, "lb_enabled");
+ /* anything other than "0" is interpreted as true */
+ const bool lb_enabled =
+ (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0));
+
+ if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 &&
+ !lb_enabled) {
+ /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
+ * out, as this is meant mostly for tests. */
+ gpr_log(GPR_ERROR,
+ "Requested 'grpclb' LB policy but resolved addresses don't "
+ "support load balancing.");
+ abort();
}
+
if (r->lb_policy_name == NULL) {
r->lb_policy_name = gpr_strdup(default_lb_policy_name);
}
@@ -318,8 +342,8 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
- r->subchannel_factory = args->subchannel_factory;
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->client_channel_factory = args->client_channel_factory;
+ grpc_client_channel_factory_ref(r->client_channel_factory);
return &r->base;
}
@@ -332,23 +356,22 @@ static void sockaddr_factory_ref(grpc_resolver_factory *factory) {}
static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
-#define DECL_FACTORY(name, prefix) \
+#define DECL_FACTORY(name) \
static grpc_resolver *name##_factory_create_resolver( \
grpc_resolver_factory *factory, grpc_resolver_args *args) { \
- return sockaddr_create(args, "pick_first", prefix##parse_##name); \
+ return sockaddr_create(args, "pick_first", parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
- name##_factory_create_resolver, prefix##name##_get_default_authority, \
- #name}; \
+ name##_factory_create_resolver, name##_get_default_authority, #name}; \
static grpc_resolver_factory name##_resolver_factory = { \
&name##_factory_vtable}
#ifdef GPR_HAVE_UNIX_SOCKET
-DECL_FACTORY(unix, grpc_);
+DECL_FACTORY(unix);
#endif
-DECL_FACTORY(ipv4, );
-DECL_FACTORY(ipv6, );
+DECL_FACTORY(ipv4);
+DECL_FACTORY(ipv6);
void grpc_resolver_sockaddr_init(void) {
grpc_register_resolver_type(&ipv4_resolver_factory);
diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
index 5acb0940c6..898632c3cd 100644
--- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
+++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
@@ -39,8 +39,8 @@
#include <grpc/grpc_zookeeper.h>
#include <zookeeper/zookeeper.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/support/string.h"
@@ -57,7 +57,7 @@ typedef struct {
/** name to resolve */
char *name;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -187,9 +187,8 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
-
lb_policy_args.addresses = addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
@@ -424,7 +423,7 @@ static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config != NULL) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -454,8 +453,8 @@ static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
- r->subchannel_factory = args->subchannel_factory;
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->client_channel_factory = args->client_channel_factory;
+ grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
diff --git a/src/core/ext/transport/chttp2/transport/alpn.c b/src/core/ext/transport/chttp2/alpn/alpn.c
index 4271d08ded..48b0217265 100644
--- a/src/core/ext/transport/chttp2/transport/alpn.c
+++ b/src/core/ext/transport/chttp2/alpn/alpn.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/transport/chttp2/transport/alpn.h"
+#include "src/core/ext/transport/chttp2/alpn/alpn.h"
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
diff --git a/src/core/ext/transport/chttp2/transport/alpn.h b/src/core/ext/transport/chttp2/alpn/alpn.h
index 08a6f039f4..1316770f11 100644
--- a/src/core/ext/transport/chttp2/transport/alpn.h
+++ b/src/core/ext/transport/chttp2/alpn/alpn.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H
-#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_ALPN_ALPN_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_ALPN_ALPN_H
#include <string.h>
@@ -46,4 +46,4 @@ size_t grpc_chttp2_num_alpn_versions(void);
* grpc_chttp2_num_alpn_versions()) */
const char *grpc_chttp2_get_alpn_version_index(size_t i);
-#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H */
+#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_ALPN_ALPN_H */
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 606fff5fb4..0ed115793b 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -40,13 +40,12 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
-#include "src/core/ext/census/grpc_filter.h"
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -136,31 +135,35 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
- grpc_subchannel_factory base;
+ grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel *master;
-} subchannel_factory;
+} client_channel_factory;
-static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_ref(
+ grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_unref(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
+ if (f->master != NULL) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
+ "client_channel_factory");
+ }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
-static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
+static grpc_subchannel *client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
- subchannel_factory *f = (subchannel_factory *)scf;
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@@ -175,9 +178,33 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
-static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- subchannel_factory_ref, subchannel_factory_unref,
- subchannel_factory_create_subchannel};
+static grpc_channel *client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ const char *target, grpc_client_channel_type type,
+ grpc_channel_args *args) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
+ grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(final_args);
+ grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ if (!resolver) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
+ "client_channel_factory_create_channel");
+ return NULL;
+ }
+
+ grpc_client_channel_set_resolver(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
+
+ return channel;
+}
+
+static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
+ {client_channel_factory_ref, client_channel_factory_unref,
+ client_channel_factory_create_subchannel,
+ client_channel_factory_create_channel};
/* Create a client channel:
Asynchronously: - resolve target
@@ -186,38 +213,27 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_channel *channel = NULL;
- grpc_resolver *resolver;
- subchannel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
GPR_ASSERT(!reserved);
- channel =
- grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
-
- f = gpr_malloc(sizeof(*f));
- f->base.vtable = &subchannel_factory_vtable;
+ client_channel_factory *f = gpr_malloc(sizeof(*f));
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args);
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
- if (!resolver) {
- GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, f->master, "subchannel_factory");
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
- grpc_exec_ctx_finish(&exec_ctx);
- return NULL;
- }
- grpc_client_channel_set_resolver(
- &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
- GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
+ grpc_channel *channel = client_channel_factory_create_channel(
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ if (channel != NULL) {
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
+ }
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
- return channel;
+ return channel; /* may be NULL */
}
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 3465d2b6c4..58af6f995a 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -40,10 +40,10 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/client_channel.h"
-#include "src/core/lib/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/security/auth_filters.h"
#include "src/core/lib/security/credentials.h"
@@ -192,34 +192,38 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
- grpc_subchannel_factory base;
+ grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
grpc_channel *master;
-} subchannel_factory;
+} client_channel_factory;
-static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_ref(
+ grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_unref(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
- "subchannel_factory");
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
+ "client_channel_factory");
+ if (f->master != NULL) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
+ "client_channel_factory");
+ }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
-static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
+static grpc_subchannel *client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
- subchannel_factory *f = (subchannel_factory *)scf;
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@@ -236,9 +240,37 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
-static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- subchannel_factory_ref, subchannel_factory_unref,
- subchannel_factory_create_subchannel};
+static grpc_channel *client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ const char *target, grpc_client_channel_type type,
+ grpc_channel_args *args) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
+
+ grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(final_args);
+
+ grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ if (resolver != NULL) {
+ grpc_client_channel_set_resolver(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create");
+ } else {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
+ "client_channel_factory_create_channel");
+ channel = NULL;
+ }
+
+ GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
+ "client_channel_factory_create_channel");
+ return channel;
+}
+
+static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
+ {client_channel_factory_ref, client_channel_factory_unref,
+ client_channel_factory_create_subchannel,
+ client_channel_factory_create_channel};
/* Create a secure client channel:
Asynchronously: - resolve target
@@ -248,13 +280,11 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_channel *channel;
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *security_connector;
- grpc_resolver *resolver;
- subchannel_factory *f;
+ client_channel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
@@ -284,35 +314,30 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
- channel = grpc_channel_create(&exec_ctx, target, args_copy,
- GRPC_CLIENT_CHANNEL, NULL);
-
f = gpr_malloc(sizeof(*f));
- f->base.vtable = &subchannel_factory_vtable;
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
- GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "subchannel_factory");
- f->security_connector = security_connector;
+
f->merge_args = grpc_channel_args_copy(args_copy);
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
- if (resolver) {
- grpc_client_channel_set_resolver(
- &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
- GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
- }
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
- GRPC_SECURITY_CONNECTOR_UNREF(&security_connector->base, "channel_create");
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
- if (!resolver) {
- GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "subchannel_factory");
- channel = NULL;
+ GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
+ "grpc_secure_channel_create");
+ f->security_connector = security_connector;
+
+ grpc_channel *channel = client_channel_factory_create_channel(
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ if (channel != NULL) {
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create");
}
+
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
- return channel;
+ return channel; /* may be NULL */
}
diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.c b/src/core/ext/transport/chttp2/transport/bin_encoder.c
index 71c634e39b..db68e750ac 100644
--- a/src/core/ext/transport/chttp2/transport/bin_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/bin_encoder.c
@@ -175,7 +175,7 @@ static void enc_add1(huff_out *out, uint8_t a) {
enc_flush_some(out);
}
-gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input) {
+gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input) {
size_t input_length = GPR_SLICE_LENGTH(input);
size_t input_triplets = input_length / 3;
size_t tail_case = input_length % 3;
diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h
index 660f114ebc..61ebbafa9a 100644
--- a/src/core/ext/transport/chttp2/transport/bin_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h
@@ -49,6 +49,6 @@ gpr_slice grpc_chttp2_huffman_compress(gpr_slice input);
gpr_slice y = grpc_chttp2_huffman_compress(x);
gpr_slice_unref(x);
return y; */
-gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input);
+gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H */
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
new file mode 100644
index 0000000000..bd87253ed3
--- /dev/null
+++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/transport/metadata.h"
+
+void grpc_chttp2_plugin_init(void) {
+ grpc_chttp2_base64_encode_and_huffman_compress =
+ grpc_chttp2_base64_encode_and_huffman_compress_impl;
+ grpc_register_tracer("http", &grpc_http_trace);
+ grpc_register_tracer("flowctl", &grpc_flowctl_trace);
+}
+
+void grpc_chttp2_plugin_shutdown(void) {}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 62f0c53e0d..b4cd185e62 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1147,16 +1147,18 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status) {
- if (stream_global->id != 0) {
- gpr_slice_buffer_add(
- &transport_global->qbuf,
- grpc_chttp2_rst_stream_create(
- stream_global->id,
- (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
- &stream_global->stats.outgoing));
+ if (!stream_global->read_closed || !stream_global->write_closed) {
+ if (stream_global->id != 0) {
+ gpr_slice_buffer_add(
+ &transport_global->qbuf,
+ grpc_chttp2_rst_stream_create(
+ stream_global->id,
+ (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
+ &stream_global->stats.outgoing));
+ }
+ grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
+ NULL);
}
- grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
- NULL);
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index 9c301d1608..3a6d80e0a3 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -159,7 +159,10 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
}
switch (p->state) {
- fh_0:
+ case GRPC_CHTTP2_DATA_ERROR:
+ p->state = GRPC_CHTTP2_DATA_ERROR;
+ return GRPC_CHTTP2_STREAM_ERROR;
+ fh_0:
case GRPC_CHTTP2_DATA_FH_0:
stream_parsing->stats.incoming.framing_bytes++;
p->frame_type = *cur;
@@ -172,6 +175,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
break;
default:
gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type);
+ p->state = GRPC_CHTTP2_DATA_ERROR;
return GRPC_CHTTP2_STREAM_ERROR;
}
if (++cur == end) {
@@ -218,13 +222,11 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
message_flags, &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
+ stream_parsing);
if (cur == end) {
- grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
- stream_parsing);
return GRPC_CHTTP2_PARSE_OK;
}
- grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
- stream_parsing);
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
stream_parsing->stats.incoming.data_bytes += p->frame_size;
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index 2ff32963d6..af71f483a2 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -49,7 +49,8 @@ typedef enum {
GRPC_CHTTP2_DATA_FH_2,
GRPC_CHTTP2_DATA_FH_3,
GRPC_CHTTP2_DATA_FH_4,
- GRPC_CHTTP2_DATA_FRAME
+ GRPC_CHTTP2_DATA_FRAME,
+ GRPC_CHTTP2_DATA_ERROR
} grpc_chttp2_stream_state;
typedef struct grpc_chttp2_incoming_byte_stream
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index 807cb5c8f4..555027c866 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -49,6 +49,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_table.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
+#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h"
#define HASH_FRAGMENT_1(x) ((x)&255)
@@ -182,8 +183,7 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
uint32_t key_hash = elem->key->hash;
uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
uint32_t new_index = c->tail_remote_index + c->table_elems + 1;
- size_t elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) +
- GPR_SLICE_LENGTH(elem->value->slice);
+ size_t elem_size = grpc_mdelem_get_size_in_hpack_table(elem);
GPR_ASSERT(elem_size < 65536);
@@ -399,8 +399,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
}
/* should this elem be in the table? */
- decoder_space_usage = 32 + GPR_SLICE_LENGTH(elem->key->slice) +
- GPR_SLICE_LENGTH(elem->value->slice);
+ decoder_space_usage = grpc_mdelem_get_size_in_hpack_table(elem);
should_add_elem = decoder_space_usage < MAX_DECODER_SPACE_USAGE &&
c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >=
c->filter_elems_sum / ONE_ON_ADD_PROBABILITY;
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index c754c6005e..e827a43f7a 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -112,7 +112,7 @@ void grpc_chttp2_publish_reads(
GOAWAY last-grpc_chttp2_stream-id=0 in this case. */
if (!transport_parsing->is_client) {
transport_global->last_incoming_stream_id =
- transport_parsing->incoming_stream_id;
+ transport_parsing->last_incoming_stream_id;
}
/* update global settings */
@@ -371,7 +371,9 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
if (!init_frame_parser(exec_ctx, transport_parsing)) {
return 0;
}
- if (transport_parsing->incoming_stream_id) {
+ if (transport_parsing->incoming_stream_id != 0 &&
+ transport_parsing->incoming_stream_id >
+ transport_parsing->last_incoming_stream_id) {
transport_parsing->last_incoming_stream_id =
transport_parsing->incoming_stream_id;
}
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index b7393b988d..28d2d78d00 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -35,7 +35,6 @@
#include <grpc/grpc.h>
#include "src/core/lib/support/string.h"
-#include <grpc/census.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@@ -165,17 +164,6 @@ void grpc_channel_args_destroy(grpc_channel_args *a) {
gpr_free(a);
}
-int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
- size_t i;
- if (a == NULL) return 0;
- for (i = 0; i < a->num_args; i++) {
- if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
- return a->args[i].value.integer != 0 && census_enabled();
- }
- }
- return census_enabled();
-}
-
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a) {
size_t i;
diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c
index 1ce0c4e07f..a8646c9565 100644
--- a/src/core/lib/channel/channel_stack_builder.c
+++ b/src/core/lib/channel/channel_stack_builder.c
@@ -36,6 +36,7 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
int grpc_trace_channel_stack_builder = 0;
@@ -52,8 +53,9 @@ struct grpc_channel_stack_builder {
filter_node begin;
filter_node end;
// various set/get-able parameters
- const grpc_channel_args *args;
+ grpc_channel_args *args;
grpc_transport *transport;
+ char *target;
const char *name;
};
@@ -76,6 +78,17 @@ grpc_channel_stack_builder *grpc_channel_stack_builder_create(void) {
return b;
}
+void grpc_channel_stack_builder_set_target(grpc_channel_stack_builder *b,
+ const char *target) {
+ gpr_free(b->target);
+ b->target = gpr_strdup(target);
+}
+
+const char *grpc_channel_stack_builder_get_target(
+ grpc_channel_stack_builder *b) {
+ return b->target;
+}
+
static grpc_channel_stack_builder_iterator *create_iterator_at_filter_node(
grpc_channel_stack_builder *builder, filter_node *node) {
grpc_channel_stack_builder_iterator *it = gpr_malloc(sizeof(*it));
@@ -126,8 +139,10 @@ void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder,
void grpc_channel_stack_builder_set_channel_arguments(
grpc_channel_stack_builder *builder, const grpc_channel_args *args) {
- GPR_ASSERT(builder->args == NULL);
- builder->args = args;
+ if (builder->args != NULL) {
+ grpc_channel_args_destroy(builder->args);
+ }
+ builder->args = grpc_channel_args_copy(args);
}
void grpc_channel_stack_builder_set_transport(
@@ -205,6 +220,10 @@ void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) {
gpr_free(p);
p = next;
}
+ if (builder->args != NULL) {
+ grpc_channel_args_destroy(builder->args);
+ }
+ gpr_free(builder->target);
gpr_free(builder);
}
diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h
index 8532c4462a..0e6bfd9aa6 100644
--- a/src/core/lib/channel/channel_stack_builder.h
+++ b/src/core/lib/channel/channel_stack_builder.h
@@ -52,6 +52,13 @@ grpc_channel_stack_builder *grpc_channel_stack_builder_create(void);
void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder,
const char *name);
+/// Set the target uri
+void grpc_channel_stack_builder_set_target(grpc_channel_stack_builder *b,
+ const char *target);
+
+const char *grpc_channel_stack_builder_get_target(
+ grpc_channel_stack_builder *b);
+
/// Attach \a transport to the builder (does not take ownership)
void grpc_channel_stack_builder_set_transport(
grpc_channel_stack_builder *builder, grpc_transport *transport);
@@ -60,8 +67,7 @@ void grpc_channel_stack_builder_set_transport(
grpc_transport *grpc_channel_stack_builder_get_transport(
grpc_channel_stack_builder *builder);
-/// Set channel arguments: \a args must continue to exist until after
-/// grpc_channel_stack_builder_finish returns
+/// Set channel arguments: copies args
void grpc_channel_stack_builder_set_channel_arguments(
grpc_channel_stack_builder *builder, const grpc_channel_args *args);
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index deba23e21f..211f537c69 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -111,10 +111,12 @@ static void hc_mutate_op(grpc_call_element *elem,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
- grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
- op->send_idempotent_request
- ? GRPC_MDELEM_METHOD_PUT
- : GRPC_MDELEM_METHOD_POST);
+ grpc_metadata_batch_add_head(
+ op->send_initial_metadata, &calld->method,
+ op->send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ ? GRPC_MDELEM_METHOD_PUT
+ : GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index dba335490b..3c8127e1a8 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 235a7df08d..0eb95a2e09 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 9d27b2bcda..1fa9f5ef2d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c
index 42e44989e0..5767c852df 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.c
+++ b/src/core/lib/iomgr/unix_sockets_posix.c
@@ -41,6 +41,7 @@
#include <sys/un.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
void grpc_create_socketpair_if_unix(int sv[2]) {
GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
@@ -75,21 +76,6 @@ void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) {
}
}
-int grpc_parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) {
- struct sockaddr_un *un = (struct sockaddr_un *)addr;
-
- un->sun_family = AF_UNIX;
- strcpy(un->sun_path, uri->path);
- *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
-
- return 1;
-}
-
-char *grpc_unix_get_default_authority(grpc_resolver_factory *factory,
- grpc_uri *uri) {
- return gpr_strdup("localhost");
-}
-
char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) {
if (addr->sa_family != AF_UNIX) {
return NULL;
diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h
index 752cab85a5..6758c498e5 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.h
+++ b/src/core/lib/iomgr/unix_sockets_posix.h
@@ -38,8 +38,6 @@
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/resolver_factory.h"
-#include "src/core/lib/client_config/uri_parser.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -51,11 +49,6 @@ int grpc_is_unix_socket(const struct sockaddr *addr);
void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr);
-int grpc_parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len);
-
-char *grpc_unix_get_default_authority(grpc_resolver_factory *factory,
- grpc_uri *uri);
-
char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr);
#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */
diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c
index 43e006e15e..d30952789f 100644
--- a/src/core/lib/iomgr/unix_sockets_posix_noop.c
+++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c
@@ -35,6 +35,8 @@
#ifndef GPR_HAVE_UNIX_SOCKET
+#include <grpc/support/log.h>
+
void grpc_create_socketpair_if_unix(int sv[2]) {
// TODO: Either implement this for the non-Unix socket case or make
// sure that it is never called in any such case. Until then, leave an
@@ -50,15 +52,6 @@ int grpc_is_unix_socket(const struct sockaddr *addr) { return false; }
void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) {}
-int grpc_parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) {
- return 0;
-}
-
-char *grpc_unix_get_default_authority(grpc_resolver_factory *factory,
- grpc_uri *uri) {
- return NULL;
-}
-
char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) {
return NULL;
}
diff --git a/src/core/lib/security/security_connector.c b/src/core/lib/security/security_connector.c
index 4d8c5dd82d..59863ba064 100644
--- a/src/core/lib/security/security_connector.c
+++ b/src/core/lib/security/security_connector.c
@@ -42,7 +42,7 @@
#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/transport/chttp2/transport/alpn.h"
+#include "src/core/ext/transport/chttp2/alpn/alpn.h"
#include "src/core/lib/security/credentials.h"
#include "src/core/lib/security/handshake.h"
#include "src/core/lib/security/secure_endpoint.h"
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 37cc724b53..6581bbd3d1 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -81,11 +81,11 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
- /* Status was created by some internal channel stack operation */
- STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
@@ -1074,24 +1074,29 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
- grpc_metadata_batch *md =
- &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- grpc_metadata_batch_filter(md, recv_initial_filter, call);
- call->has_initial_md_been_received = true;
-
- if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0 &&
- !call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
+ if (!success) {
+ bctl->success = false;
+ } else {
+ grpc_metadata_batch *md =
+ &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ grpc_metadata_batch_filter(md, recv_initial_filter, call);
+
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+ 0 &&
+ !call->is_client) {
+ GPR_TIMER_BEGIN("set_deadline_alarm", 0);
+ set_deadline_alarm(exec_ctx, call, md->deadline);
+ GPR_TIMER_END("set_deadline_alarm", 0);
+ }
}
+ call->has_initial_md_been_received = true;
if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
grpc_closure *saved_rsr_closure = grpc_closure_create(
receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
- grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
- call->saved_receiving_stream_ready_ctx.success, NULL);
+ grpc_exec_ctx_enqueue(
+ exec_ctx, saved_rsr_closure,
+ call->saved_receiving_stream_ready_ctx.success && success, NULL);
call->saved_receiving_stream_ready_ctx.bctlp = NULL;
}
@@ -1110,6 +1115,9 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
+ if (!success) {
+ set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ }
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
@@ -1232,8 +1240,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op.send_idempotent_request =
- (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
+ stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 06f991b085..b6b760b5d8 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -40,7 +40,6 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
@@ -84,14 +83,26 @@ struct grpc_channel {
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success);
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
- const grpc_channel_args *args,
+ const grpc_channel_args *input_args,
grpc_channel_stack_type channel_stack_type,
grpc_transport *optional_transport) {
bool is_client = grpc_channel_stack_type_is_client(channel_stack_type);
- grpc_channel *channel = grpc_channel_init_create_stack(
- exec_ctx, channel_stack_type, sizeof(grpc_channel), args, 1,
- destroy_channel, NULL, optional_transport);
+ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_channel_stack_builder_set_channel_arguments(builder, input_args);
+ grpc_channel_stack_builder_set_target(builder, target);
+ grpc_channel_stack_builder_set_transport(builder, optional_transport);
+ grpc_channel *channel;
+ grpc_channel_args *args;
+ if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) {
+ grpc_channel_stack_builder_destroy(builder);
+ return NULL;
+ } else {
+ args = grpc_channel_args_copy(
+ grpc_channel_stack_builder_get_channel_arguments(builder));
+ channel = grpc_channel_stack_builder_finish(
+ exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL);
+ }
memset(channel, 0, sizeof(*channel));
channel->target = gpr_strdup(target);
@@ -142,16 +153,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
}
}
}
- }
-
- if (channel->is_client && channel->default_authority == NULL &&
- target != NULL) {
- char *default_authority = grpc_get_default_authority(target);
- if (default_authority) {
- channel->default_authority =
- grpc_mdelem_from_strings(":authority", default_authority);
- }
- gpr_free(default_authority);
+ grpc_channel_args_destroy(args);
}
return channel;
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 640fd7e137..22dae930e4 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -35,7 +35,6 @@
#define GRPC_CORE_LIB_SURFACE_CHANNEL_H
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/surface/channel_stack_type.h"
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c
index fc69f61f77..0627b34479 100644
--- a/src/core/lib/surface/channel_init.c
+++ b/src/core/lib/surface/channel_init.c
@@ -122,25 +122,19 @@ static const char *name_for_type(grpc_channel_stack_type type) {
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-void *grpc_channel_init_create_stack(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes,
- const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg, grpc_transport *transport) {
+bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ grpc_channel_stack_type type) {
GPR_ASSERT(g_finalized);
- grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_name(builder, name_for_type(type));
- grpc_channel_stack_builder_set_channel_arguments(builder, args);
- grpc_channel_stack_builder_set_transport(builder, transport);
for (size_t i = 0; i < g_slots[type].num_slots; i++) {
const stage_slot *slot = &g_slots[type].slots[i];
if (!slot->fn(builder, slot->arg)) {
- grpc_channel_stack_builder_destroy(builder);
- return NULL;
+ return false;
}
}
- return grpc_channel_stack_builder_finish(exec_ctx, builder, prefix_bytes,
- initial_refs, destroy, destroy_arg);
+ return true;
}
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index a4d8271ca6..3a18a61ddb 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -38,6 +38,8 @@
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
+#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000
+
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
/// It also provides a universal entry path to run those mutators to build
@@ -78,9 +80,8 @@ void grpc_channel_init_shutdown(void);
/// \a optional_transport is either NULL or a constructed transport object
/// Returns a pointer to the base of the memory allocated (the actual channel
/// stack object will be prefix_bytes past that pointer)
-void *grpc_channel_init_create_stack(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes,
- const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg, grpc_transport *optional_transport);
+bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ grpc_channel_stack_type type);
#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_INIT_H */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index aca4ce9d07..ec75af6e06 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -39,17 +39,11 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
-#include "src/core/lib/client_config/subchannel.h"
-#include "src/core/lib/client_config/subchannel_index.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -65,10 +59,6 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
-#ifndef GRPC_DEFAULT_NAME_PREFIX
-#define GRPC_DEFAULT_NAME_PREFIX "dns:///"
-#endif
-
/* (generated) built in registry of plugins */
extern void grpc_register_built_in_plugins(void);
@@ -105,31 +95,35 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
}
static void register_builtin_channel_init() {
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
- prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
- maybe_add_http_filter,
- (void *)&grpc_http_client_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ prepend_filter, (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_http_filter, (void *)&grpc_http_client_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
- maybe_add_http_filter,
- (void *)&grpc_http_client_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_http_filter, (void *)&grpc_http_client_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
- maybe_add_http_filter,
- (void *)&grpc_http_server_filter);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_http_filter, (void *)&grpc_http_server_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
- (void *)&grpc_client_channel_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
append_filter, (void *)&grpc_lame_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
(void *)&grpc_server_top_filter);
@@ -161,12 +155,8 @@ void grpc_init(void) {
gpr_time_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
- grpc_lb_policy_registry_init();
- grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX);
grpc_register_tracer("api", &grpc_api_trace);
grpc_register_tracer("channel", &grpc_trace_channel);
- grpc_register_tracer("http", &grpc_http_trace);
- grpc_register_tracer("flowctl", &grpc_flowctl_trace);
grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
@@ -176,7 +166,6 @@ void grpc_init(void) {
grpc_tracer_init("GRPC_TRACE");
gpr_timers_global_init();
grpc_cq_global_init();
- grpc_subchannel_index_init();
for (i = 0; i < g_number_of_plugins; i++) {
if (g_all_of_the_plugins[i].init != NULL) {
g_all_of_the_plugins[i].init();
@@ -201,17 +190,13 @@ void grpc_shutdown(void) {
grpc_executor_shutdown();
grpc_cq_global_shutdown();
grpc_iomgr_shutdown();
- grpc_subchannel_index_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
- grpc_resolver_registry_shutdown();
- grpc_lb_policy_registry_shutdown();
- for (i = 0; i < g_number_of_plugins; i++) {
+ for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != NULL) {
g_all_of_the_plugins[i].destroy();
}
}
- grpc_channel_init_shutdown();
grpc_mdctx_global_shutdown();
}
gpr_mu_unlock(&g_init_mu);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 1898bee1c1..ad8ee8c7a9 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -95,7 +95,6 @@ typedef struct requested_call {
grpc_byte_buffer **optional_payload;
} registered;
} data;
- grpc_closure publish;
} requested_call;
typedef struct channel_registered_method {
@@ -156,15 +155,21 @@ struct call_data {
bool recv_idempotent_request;
grpc_metadata_array initial_metadata;
+ request_matcher *request_matcher;
+ grpc_byte_buffer *payload;
+
grpc_closure got_initial_metadata;
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure publish;
+
call_data *pending_next;
};
struct request_matcher {
+ grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
gpr_stack_lockfree *requests;
@@ -173,6 +178,7 @@ struct request_matcher {
struct registered_method {
char *method;
char *host;
+ grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
request_matcher request_matcher;
registered_method *next;
@@ -226,8 +232,7 @@ struct grpc_server {
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc);
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
@@ -303,8 +308,10 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
-static void request_matcher_init(request_matcher *rm, size_t entries) {
+static void request_matcher_init(request_matcher *rm, size_t entries,
+ grpc_server *server) {
memset(rm, 0, sizeof(*rm));
+ rm->server = server;
rm->requests = gpr_stack_lockfree_create(entries);
}
@@ -417,21 +424,90 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
&op);
}
-static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
- grpc_call_element *elem, request_matcher *rm) {
- call_data *calld = elem->call_data;
- int request_id;
+static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
+ gpr_slice slice = value->slice;
+ size_t len = GPR_SLICE_LENGTH(slice);
- if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ if (len + 1 > *capacity) {
+ *capacity = GPR_MAX(len + 1, *capacity * 2);
+ *dest = gpr_realloc(*dest, *capacity);
+ }
+ memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
+}
+
+static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
+ grpc_cq_completion *c) {
+ requested_call *rc = req;
+ grpc_server *server = rc->server;
+
+ if (rc >= server->requested_calls &&
+ rc < server->requested_calls + server->max_requested_calls) {
+ GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
+ gpr_stack_lockfree_push(server->request_freelist,
+ (int)(rc - server->requested_calls));
+ } else {
+ gpr_free(req);
+ }
+
+ server_unref(exec_ctx, server);
+}
+
+static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
+ call_data *calld, requested_call *rc) {
+ grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
+ grpc_call *call = calld->call;
+ *rc->call = call;
+ calld->cq_new = rc->cq_for_notification;
+ GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
+ switch (rc->type) {
+ case BATCH_CALL:
+ GPR_ASSERT(calld->host != NULL);
+ GPR_ASSERT(calld->path != NULL);
+ cpstr(&rc->data.batch.details->host,
+ &rc->data.batch.details->host_capacity, calld->host);
+ cpstr(&rc->data.batch.details->method,
+ &rc->data.batch.details->method_capacity, calld->path);
+ rc->data.batch.details->deadline = calld->deadline;
+ rc->data.batch.details->flags =
+ 0 | (calld->recv_idempotent_request
+ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ : 0);
+ break;
+ case REGISTERED_CALL:
+ *rc->data.registered.deadline = calld->deadline;
+ if (rc->data.registered.optional_payload) {
+ *rc->data.registered.optional_payload = calld->payload;
+ }
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
+ grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc,
+ &rc->completion);
+}
+
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ call_data *calld = arg;
+ request_matcher *rm = calld->request_matcher;
+ grpc_server *server = rm->server;
+
+ if (!success || gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_closure_init(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
- request_id = gpr_stack_lockfree_pop(rm->requests);
+ int request_id = gpr_stack_lockfree_pop(rm->requests);
if (request_id == -1) {
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@@ -449,7 +525,41 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ }
+}
+
+static void finish_start_new_rpc(
+ grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem,
+ request_matcher *rm,
+ grpc_server_register_method_payload_handling payload_handling) {
+ call_data *calld = elem->call_data;
+
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_mu_lock(&calld->mu_state);
+ calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
+ return;
+ }
+
+ calld->request_matcher = rm;
+
+ switch (payload_handling) {
+ case GRPC_SRM_PAYLOAD_NONE:
+ publish_new_rpc(exec_ctx, calld, true);
+ break;
+ case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_RECV_MESSAGE;
+ op.data.recv_message = &calld->payload;
+ grpc_closure_init(&calld->publish, publish_new_rpc, calld);
+ grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
+ &calld->publish);
+ break;
+ }
}
}
@@ -475,7 +585,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ rm->server_registered_method->payload_handling);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -490,12 +601,14 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(exec_ctx, server, elem,
- &server->unregistered_request_matcher);
+ &server->unregistered_request_matcher,
+ GRPC_SRM_PAYLOAD_NONE);
}
static int num_listeners(grpc_server *server) {
@@ -572,10 +685,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (md->key == GRPC_MDSTR_PATH) {
- calld->path = GRPC_MDSTR_REF(md->value);
+ if (calld->path == NULL) {
+ calld->path = GRPC_MDSTR_REF(md->value);
+ }
return NULL;
} else if (md->key == GRPC_MDSTR_AUTHORITY) {
- calld->host = GRPC_MDSTR_REF(md->value);
+ if (calld->host == NULL) {
+ calld->host = GRPC_MDSTR_REF(md->value);
+ }
return NULL;
}
return md;
@@ -824,7 +941,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
gpr_stack_lockfree_push(server->request_freelist, (int)i);
}
request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls);
+ server->max_requested_calls, server);
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
@@ -840,8 +957,10 @@ static int streq(const char *a, const char *b) {
return 0 == strcmp(a, b);
}
-void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host, uint32_t flags) {
+void *grpc_server_register_method(
+ grpc_server *server, const char *method, const char *host,
+ grpc_server_register_method_payload_handling payload_handling,
+ uint32_t flags) {
registered_method *m;
GRPC_API_TRACE(
"grpc_server_register_method(server=%p, method=%s, host=%s, "
@@ -866,10 +985,12 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
- request_matcher_init(&m->request_matcher, server->max_requested_calls);
+ request_matcher_init(&m->request_matcher, server->max_requested_calls,
+ server);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
+ m->payload_handling = payload_handling;
m->flags = flags;
server->registered_methods = m;
return m;
@@ -1143,8 +1264,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(exec_ctx, server, calld,
- &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld,
+ &server->requested_calls[request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1209,6 +1330,12 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
+ if ((optional_payload == NULL) !=
+ (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
+ goto done;
+ }
grpc_cq_begin_op(cq_for_notification, tag);
rc->type = REGISTERED_CALL;
rc->server = server;
@@ -1226,86 +1353,6 @@ done:
return error;
}
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
- void *user_data, bool success);
-
-static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
- gpr_slice slice = value->slice;
- size_t len = GPR_SLICE_LENGTH(slice);
-
- if (len + 1 > *capacity) {
- *capacity = GPR_MAX(len + 1, *capacity * 2);
- *dest = gpr_realloc(*dest, *capacity);
- }
- memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
-}
-
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc) {
- grpc_op ops[1];
- grpc_op *op = ops;
-
- memset(ops, 0, sizeof(ops));
-
- /* called once initial metadata has been read by the call, but BEFORE
- the ioreq to fetch it out of the call has been executed.
- This means metadata related fields can be relied on in calld, but to
- fill in the metadata array passed by the client, we need to perform
- an ioreq op, that should complete immediately. */
-
- grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
- grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
- *rc->call = calld->call;
- calld->cq_new = rc->cq_for_notification;
- GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
- switch (rc->type) {
- case BATCH_CALL:
- GPR_ASSERT(calld->host != NULL);
- GPR_ASSERT(calld->path != NULL);
- cpstr(&rc->data.batch.details->host,
- &rc->data.batch.details->host_capacity, calld->host);
- cpstr(&rc->data.batch.details->method,
- &rc->data.batch.details->method_capacity, calld->path);
- rc->data.batch.details->deadline = calld->deadline;
- rc->data.batch.details->flags =
- 0 | (calld->recv_idempotent_request
- ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
- : 0);
- break;
- case REGISTERED_CALL:
- *rc->data.registered.deadline = calld->deadline;
- if (rc->data.registered.optional_payload) {
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message = rc->data.registered.optional_payload;
- op++;
- }
- break;
- default:
- GPR_UNREACHABLE_CODE(return );
- }
-
- GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
- (size_t)(op - ops), &rc->publish);
-}
-
-static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
- grpc_cq_completion *c) {
- requested_call *rc = req;
- grpc_server *server = rc->server;
-
- if (rc >= server->requested_calls &&
- rc < server->requested_calls + server->max_requested_calls) {
- GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
- gpr_stack_lockfree_push(server->request_freelist,
- (int)(rc - server->requested_calls));
- } else {
- gpr_free(req);
- }
-
- server_unref(exec_ctx, server);
-}
-
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc) {
*rc->call = NULL;
@@ -1316,20 +1363,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
done_request_event, rc, &rc->completion);
}
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
- bool success) {
- requested_call *rc = prc;
- grpc_call *call = *rc->call;
- grpc_call_element *elem =
- grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- server_ref(chand->server);
- grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
- rc, &rc->completion);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
-}
-
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
return server->channel_args;
}
diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c
index 2b1d32d55e..779efbb97d 100644
--- a/src/core/lib/transport/metadata.c
+++ b/src/core/lib/transport/metadata.c
@@ -38,19 +38,21 @@
#include <string.h>
#include <grpc/compression.h>
+#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
-#include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/murmur_hash.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
+gpr_slice (*grpc_chttp2_base64_encode_and_huffman_compress)(gpr_slice input);
+
/* There are two kinds of mdelem and mdstr instances.
* Static instances are declared in static_metadata.{h,c} and
* are initialized by grpc_mdctx_global_init().
@@ -79,6 +81,7 @@
typedef void (*destroy_user_data_func)(void *user_data);
+#define SIZE_IN_DECODER_TABLE_NOT_SET -1
/* Shadow structure for grpc_mdstr for non-static values */
typedef struct internal_string {
/* must be byte compatible with grpc_mdstr */
@@ -93,6 +96,8 @@ typedef struct internal_string {
gpr_slice base64_and_huffman;
+ gpr_atm size_in_decoder_table;
+
struct internal_string *bucket_next;
} internal_string;
@@ -242,6 +247,12 @@ void grpc_mdctx_global_shutdown(void) {
if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked",
shard->count);
+ for (size_t j = 0; j < shard->capacity; j++) {
+ for (internal_string *s = shard->strs[j]; s; s = s->bucket_next) {
+ gpr_log(GPR_DEBUG, "LEAKED: %s",
+ grpc_mdstr_as_c_string((grpc_mdstr *)s));
+ }
+ }
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
@@ -407,6 +418,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(const uint8_t *buf, size_t length) {
}
s->has_base64_and_huffman_encoded = 0;
s->hash = hash;
+ s->size_in_decoder_table = SIZE_IN_DECODER_TABLE_NOT_SET;
s->bucket_next = shard->strs[idx];
shard->strs[idx] = s;
@@ -576,6 +588,39 @@ grpc_mdelem *grpc_mdelem_from_string_and_buffer(const char *key,
grpc_mdstr_from_string(key), grpc_mdstr_from_buffer(value, value_length));
}
+static size_t get_base64_encoded_size(size_t raw_length) {
+ static const uint8_t tail_xtra[3] = {0, 2, 3};
+ return raw_length / 3 * 4 + tail_xtra[raw_length % 3];
+}
+
+size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem *elem) {
+ size_t overhead_and_key = 32 + GPR_SLICE_LENGTH(elem->key->slice);
+ size_t value_len = GPR_SLICE_LENGTH(elem->value->slice);
+ if (is_mdstr_static(elem->value)) {
+ if (grpc_is_binary_header(
+ (const char *)GPR_SLICE_START_PTR(elem->key->slice),
+ GPR_SLICE_LENGTH(elem->key->slice))) {
+ return overhead_and_key + get_base64_encoded_size(value_len);
+ } else {
+ return overhead_and_key + value_len;
+ }
+ } else {
+ internal_string *is = (internal_string *)elem->value;
+ gpr_atm current_size = gpr_atm_acq_load(&is->size_in_decoder_table);
+ if (current_size == SIZE_IN_DECODER_TABLE_NOT_SET) {
+ if (grpc_is_binary_header(
+ (const char *)GPR_SLICE_START_PTR(elem->key->slice),
+ GPR_SLICE_LENGTH(elem->key->slice))) {
+ current_size = (gpr_atm)get_base64_encoded_size(value_len);
+ } else {
+ current_size = (gpr_atm)value_len;
+ }
+ gpr_atm_rel_store(&is->size_in_decoder_table, current_size);
+ }
+ return overhead_and_key + (size_t)current_size;
+ }
+}
+
grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) {
internal_metadata *md = (internal_metadata *)gmd;
if (is_mdelem_static(gmd)) return gmd;
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index 6a02437fdf..e29e8df2c9 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -110,6 +110,8 @@ grpc_mdelem *grpc_mdelem_from_string_and_buffer(const char *key,
const uint8_t *value,
size_t value_length);
+size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem *elem);
+
/* Mutator and accessor for grpc_mdelem user data. The destructor function
is used as a type tag and is checked during user_data fetch. */
void *grpc_mdelem_get_user_data(grpc_mdelem *md,
@@ -153,4 +155,8 @@ int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s);
void grpc_mdctx_global_init(void);
void grpc_mdctx_global_shutdown(void);
+/* Implementation provided by chttp2_transport */
+extern gpr_slice (*grpc_chttp2_base64_encode_and_huffman_compress)(
+ gpr_slice input);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_METADATA_H */
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 460e4dcedc..1eb446312b 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport;
for a stream. */
typedef struct grpc_stream grpc_stream;
-/*#define GRPC_STREAM_REFCOUNT_DEBUG*/
+//#define GRPC_STREAM_REFCOUNT_DEBUG
typedef struct grpc_stream_refcount {
gpr_refcount refs;
@@ -101,9 +101,9 @@ typedef struct grpc_transport_stream_op {
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
- /** Iff send_initial_metadata != NULL, flags if this is an idempotent request
- or not */
- bool send_idempotent_request;
+ /** Iff send_initial_metadata != NULL, flags associated with
+ send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
+ uint32_t send_initial_metadata_flags;
/** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;
diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c
index 79df85516e..822aa6d8b7 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_plugin_registry.c
@@ -33,6 +33,10 @@
#include <grpc/grpc.h>
+extern void grpc_chttp2_plugin_init(void);
+extern void grpc_chttp2_plugin_shutdown(void);
+extern void grpc_client_config_init(void);
+extern void grpc_client_config_shutdown(void);
extern void grpc_lb_policy_pick_first_init(void);
extern void grpc_lb_policy_pick_first_shutdown(void);
extern void grpc_lb_policy_round_robin_init(void);
@@ -45,6 +49,10 @@ extern void census_grpc_plugin_init(void);
extern void census_grpc_plugin_shutdown(void);
void grpc_register_built_in_plugins(void) {
+ grpc_register_plugin(grpc_chttp2_plugin_init,
+ grpc_chttp2_plugin_shutdown);
+ grpc_register_plugin(grpc_client_config_init,
+ grpc_client_config_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
index b3786c927d..a6108ae7a9 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
@@ -33,6 +33,10 @@
#include <grpc/grpc.h>
+extern void grpc_chttp2_plugin_init(void);
+extern void grpc_chttp2_plugin_shutdown(void);
+extern void grpc_client_config_init(void);
+extern void grpc_client_config_shutdown(void);
extern void grpc_resolver_dns_native_init(void);
extern void grpc_resolver_dns_native_shutdown(void);
extern void grpc_resolver_sockaddr_init(void);
@@ -45,6 +49,10 @@ extern void census_grpc_plugin_init(void);
extern void census_grpc_plugin_shutdown(void);
void grpc_register_built_in_plugins(void) {
+ grpc_register_plugin(grpc_chttp2_plugin_init,
+ grpc_chttp2_plugin_shutdown);
+ grpc_register_plugin(grpc_client_config_init,
+ grpc_client_config_shutdown);
grpc_register_plugin(grpc_resolver_dns_native_init,
grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init,