aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_config/README.md (renamed from src/core/lib/client_config/README.md)2
-rw-r--r--src/core/ext/client_config/channel_connectivity.c (renamed from src/core/lib/surface/channel_connectivity.c)2
-rw-r--r--src/core/ext/client_config/client_channel.c (renamed from src/core/lib/channel/client_channel.c)44
-rw-r--r--src/core/ext/client_config/client_channel.h (renamed from src/core/lib/channel/client_channel.h)8
-rw-r--r--src/core/ext/client_config/client_channel_factory.c57
-rw-r--r--src/core/ext/client_config/client_channel_factory.h85
-rw-r--r--src/core/ext/client_config/client_config.c (renamed from src/core/lib/client_config/client_config.c)2
-rw-r--r--src/core/ext/client_config/client_config.h (renamed from src/core/lib/client_config/client_config.h)8
-rw-r--r--src/core/ext/client_config/connector.c (renamed from src/core/lib/client_config/connector.c)2
-rw-r--r--src/core/ext/client_config/connector.h (renamed from src/core/lib/client_config/connector.h)6
-rw-r--r--src/core/ext/client_config/default_initial_connect_string.c (renamed from src/core/lib/client_config/default_initial_connect_string.c)0
-rw-r--r--src/core/ext/client_config/initial_connect_string.c (renamed from src/core/lib/client_config/initial_connect_string.c)2
-rw-r--r--src/core/ext/client_config/initial_connect_string.h (renamed from src/core/lib/client_config/initial_connect_string.h)6
-rw-r--r--src/core/ext/client_config/lb_policy.c (renamed from src/core/lib/client_config/lb_policy.c)13
-rw-r--r--src/core/ext/client_config/lb_policy.h (renamed from src/core/lib/client_config/lb_policy.h)21
-rw-r--r--src/core/ext/client_config/lb_policy_factory.c (renamed from src/core/lib/client_config/lb_policy_factory.c)2
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h (renamed from src/core/lib/client_config/lb_policy_factory.h)12
-rw-r--r--src/core/ext/client_config/lb_policy_registry.c (renamed from src/core/lib/client_config/lb_policy_registry.c)2
-rw-r--r--src/core/ext/client_config/lb_policy_registry.h (renamed from src/core/lib/client_config/lb_policy_registry.h)8
-rw-r--r--src/core/ext/client_config/resolver.c (renamed from src/core/lib/client_config/resolver.c)2
-rw-r--r--src/core/ext/client_config/resolver.h (renamed from src/core/lib/client_config/resolver.h)10
-rw-r--r--src/core/ext/client_config/resolver_factory.c (renamed from src/core/lib/client_config/resolver_factory.c)2
-rw-r--r--src/core/ext/client_config/resolver_factory.h (renamed from src/core/lib/client_config/resolver_factory.h)14
-rw-r--r--src/core/ext/client_config/resolver_registry.c (renamed from src/core/lib/client_config/resolver_registry.c)6
-rw-r--r--src/core/ext/client_config/resolver_registry.h (renamed from src/core/lib/client_config/resolver_registry.h)10
-rw-r--r--src/core/ext/client_config/subchannel.c (renamed from src/core/lib/client_config/subchannel.c)29
-rw-r--r--src/core/ext/client_config/subchannel.h (renamed from src/core/lib/client_config/subchannel.h)8
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c (renamed from src/core/lib/channel/subchannel_call_holder.c)7
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.h (renamed from src/core/lib/channel/subchannel_call_holder.h)9
-rw-r--r--src/core/ext/client_config/subchannel_factory.c (renamed from src/core/lib/client_config/subchannel_factory.c)2
-rw-r--r--src/core/ext/client_config/subchannel_factory.h (renamed from src/core/lib/client_config/subchannel_factory.h)8
-rw-r--r--src/core/ext/client_config/subchannel_index.c (renamed from src/core/lib/client_config/subchannel_index.c)2
-rw-r--r--src/core/ext/client_config/subchannel_index.h (renamed from src/core/lib/client_config/subchannel_index.h)10
-rw-r--r--src/core/ext/client_config/uri_parser.c (renamed from src/core/lib/client_config/uri_parser.c)2
-rw-r--r--src/core/ext/client_config/uri_parser.h (renamed from src/core/lib/client_config/uri_parser.h)6
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c25
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c56
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c48
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c14
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c52
-rw-r--r--src/core/ext/resolver/zookeeper/zookeeper_resolver.c15
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c93
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c105
-rw-r--r--src/core/lib/channel/http_client_filter.c10
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.h2
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.h4
-rw-r--r--src/core/lib/surface/call.c10
-rw-r--r--src/core/lib/surface/channel.c2
-rw-r--r--src/core/lib/surface/channel.h1
-rw-r--r--src/core/lib/surface/init.c10
-rw-r--r--src/core/lib/surface/server.c259
-rw-r--r--src/core/lib/transport/transport.h6
55 files changed, 734 insertions, 393 deletions
diff --git a/src/core/lib/client_config/README.md b/src/core/ext/client_config/README.md
index fff7a5af5b..7024fd540d 100644
--- a/src/core/lib/client_config/README.md
+++ b/src/core/ext/client_config/README.md
@@ -40,7 +40,7 @@ decisions (for example, by avoiding disconnected backends).
Configured sub-channels are fully setup to participate in the grpc data plane.
Their behavior is specified by a set of grpc channel filters defined at their
construction. To customize this behavior, resolvers build
-grpc_subchannel_factory objects, which use the decorator pattern to customize
+grpc_client_channel_factory objects, which use the decorator pattern to customize
construction arguments for concrete grpc_subchannel instances.
diff --git a/src/core/lib/surface/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c
index 9a9ee422c2..3ebc333608 100644
--- a/src/core/lib/surface/channel_connectivity.c
+++ b/src/core/ext/client_config/channel_connectivity.c
@@ -36,7 +36,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/lib/channel/client_channel.h"
+#include "src/core/ext/client_config/client_channel.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/completion_queue.h"
diff --git a/src/core/lib/channel/client_channel.c b/src/core/ext/client_config/client_channel.c
index 3f7cf1cf97..93d54fdcfe 100644
--- a/src/core/lib/channel/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/channel/client_channel.h"
+#include "src/core/ext/client_config/client_channel.h"
#include <stdio.h>
#include <string.h>
@@ -41,9 +41,9 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/client_config/subchannel_call_holder.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/subchannel_call_holder.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
@@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
+static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand,
+ grpc_connectivity_state state,
+ const char *reason) {
+ if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ state == GRPC_CHANNEL_FATAL_FAILURE) &&
+ chand->lb_policy != NULL) {
+ /* cancel fail-fast picks */
+ grpc_lb_policy_cancel_picks(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
+ /* check= */ 0);
+ }
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+}
+
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
grpc_connectivity_state publish_state = w->state;
@@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
- "lb_changed");
+ set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+ "lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
@@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (iomgr_success && chand->resolver) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
- "new_lb+resolver");
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect && chand->resolver != NULL) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
@@ -298,6 +315,7 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
@@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
@@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
@@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
- initial_metadata, connected_subchannel, on_ready);
+ initial_metadata, initial_metadata_flags,
+ connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
@@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
+ cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
diff --git a/src/core/lib/channel/client_channel.h b/src/core/ext/client_config/client_channel.h
index ac418c8c51..1e47ad34ad 100644
--- a/src/core/lib/channel/client_channel.h
+++ b/src/core/ext/client_config/client_channel.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H
-#define GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
+#include "src/core/ext/client_config/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/resolver.h"
/* A client channel is a channel that begins disconnected, and can connect
to some endpoint on demand. If that endpoint disconnects, it will be
@@ -60,4 +60,4 @@ void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
-#endif /* GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H */
diff --git a/src/core/ext/client_config/client_channel_factory.c b/src/core/ext/client_config/client_channel_factory.c
new file mode 100644
index 0000000000..71c64c0da1
--- /dev/null
+++ b/src/core/ext/client_config/client_channel_factory.c
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_config/client_channel_factory.h"
+
+void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory) {
+ factory->vtable->ref(factory);
+}
+
+void grpc_client_channel_factory_unref(grpc_exec_ctx* exec_ctx,
+ grpc_client_channel_factory* factory) {
+ factory->vtable->unref(exec_ctx, factory);
+}
+
+grpc_subchannel* grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
+ grpc_subchannel_args* args) {
+ return factory->vtable->create_subchannel(exec_ctx, factory, args);
+}
+
+grpc_channel* grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
+ const char* target, grpc_client_channel_type type,
+ grpc_channel_args* args) {
+ return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
+ args);
+}
diff --git a/src/core/ext/client_config/client_channel_factory.h b/src/core/ext/client_config/client_channel_factory.h
new file mode 100644
index 0000000000..1241b9b781
--- /dev/null
+++ b/src/core/ext/client_config/client_channel_factory.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/ext/client_config/subchannel.h"
+#include "src/core/lib/channel/channel_stack.h"
+
+typedef struct grpc_client_channel_factory grpc_client_channel_factory;
+typedef struct grpc_client_channel_factory_vtable
+ grpc_client_channel_factory_vtable;
+
+typedef enum {
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, /** for the user-level regular calls */
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, /** for communication with a load
+ balancing service */
+} grpc_client_channel_type;
+
+/** Constructor for new configured channels.
+ Creating decorators around this type is encouraged to adapt behavior. */
+struct grpc_client_channel_factory {
+ const grpc_client_channel_factory_vtable *vtable;
+};
+
+struct grpc_client_channel_factory_vtable {
+ void (*ref)(grpc_client_channel_factory *factory);
+ void (*unref)(grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory);
+ grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory,
+ grpc_subchannel_args *args);
+ grpc_channel *(*create_client_channel)(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory,
+ const char *target,
+ grpc_client_channel_type type,
+ grpc_channel_args *args);
+};
+
+void grpc_client_channel_factory_ref(grpc_client_channel_factory *factory);
+void grpc_client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory);
+
+/** Create a new grpc_subchannel */
+grpc_subchannel *grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
+ grpc_subchannel_args *args);
+
+/** Create a new grpc_channel */
+grpc_channel *grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
+ const char *target, grpc_client_channel_type type, grpc_channel_args *args);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H */
diff --git a/src/core/lib/client_config/client_config.c b/src/core/ext/client_config/client_config.c
index 2521023364..f9b8e68698 100644
--- a/src/core/lib/client_config/client_config.c
+++ b/src/core/ext/client_config/client_config.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/client_config.h"
+#include "src/core/ext/client_config/client_config.h"
#include <string.h>
diff --git a/src/core/lib/client_config/client_config.h b/src/core/ext/client_config/client_config.h
index 8dc2f6b299..a6290cbcf0 100644
--- a/src/core/lib/client_config/client_config.h
+++ b/src/core/ext/client_config/client_config.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CONFIG_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CONFIG_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CONFIG_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CONFIG_H
-#include "src/core/lib/client_config/lb_policy.h"
+#include "src/core/ext/client_config/lb_policy.h"
/** Total configuration for a client. Provided, and updated, by
grpc_resolver */
@@ -50,4 +50,4 @@ void grpc_client_config_set_lb_policy(grpc_client_config *client_config,
grpc_lb_policy *grpc_client_config_get_lb_policy(
grpc_client_config *client_config);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CONFIG_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CONFIG_H */
diff --git a/src/core/lib/client_config/connector.c b/src/core/ext/client_config/connector.c
index 4c7b823dac..5b629ed5fb 100644
--- a/src/core/lib/client_config/connector.c
+++ b/src/core/ext/client_config/connector.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/connector.h"
+#include "src/core/ext/client_config/connector.h"
grpc_connector* grpc_connector_ref(grpc_connector* connector) {
connector->vtable->ref(connector);
diff --git a/src/core/lib/client_config/connector.h b/src/core/ext/client_config/connector.h
index 39870a261c..dd85dfcb7d 100644
--- a/src/core/lib/client_config/connector.h
+++ b/src/core/ext/client_config/connector.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_CONNECTOR_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_CONNECTOR_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -89,4 +89,4 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_connector *connector);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_CONNECTOR_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H */
diff --git a/src/core/lib/client_config/default_initial_connect_string.c b/src/core/ext/client_config/default_initial_connect_string.c
index a70da4a84a..a70da4a84a 100644
--- a/src/core/lib/client_config/default_initial_connect_string.c
+++ b/src/core/ext/client_config/default_initial_connect_string.c
diff --git a/src/core/lib/client_config/initial_connect_string.c b/src/core/ext/client_config/initial_connect_string.c
index 4034ea2e87..41580d2106 100644
--- a/src/core/lib/client_config/initial_connect_string.c
+++ b/src/core/ext/client_config/initial_connect_string.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/initial_connect_string.h"
+#include "src/core/ext/client_config/initial_connect_string.h"
#include <stddef.h>
diff --git a/src/core/lib/client_config/initial_connect_string.h b/src/core/ext/client_config/initial_connect_string.h
index 51302768c6..06f0767832 100644
--- a/src/core/lib/client_config/initial_connect_string.h
+++ b/src/core/ext/client_config/initial_connect_string.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H
#include <grpc/support/slice.h>
#include "src/core/lib/iomgr/sockaddr.h"
@@ -47,4 +47,4 @@ void grpc_test_set_initial_connect_string_function(
void grpc_set_initial_connect_string(struct sockaddr **addr, size_t *addr_len,
gpr_slice *connect_string);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H */
diff --git a/src/core/lib/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 3d23669ec2..a7ad9842dc 100644
--- a/src/core/lib/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/lb_policy.h"
+#include "src/core/ext/client_config/lb_policy.h"
#define WEAK_REF_BITS 16
@@ -101,10 +101,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
- target, on_complete);
+ initial_metadata_flags, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -112,6 +113,14 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
+ initial_metadata_flags_eq);
+}
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/lib/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a63e8e68df..0384e0b2eb 100644
--- a/src/core/lib/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -60,9 +60,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -122,6 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
@@ -131,6 +136,14 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+/** Cancel all pending picks which have:
+ (initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq */
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
@@ -141,4 +154,4 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H */
diff --git a/src/core/lib/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c
index 92e1f5f08b..70e46ef3cf 100644
--- a/src/core/lib/client_config/lb_policy_factory.c
+++ b/src/core/ext/client_config/lb_policy_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/lb_policy_factory.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/lib/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index 6f21912821..1c89b28b59 100644
--- a/src/core/lib/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H
-#include "src/core/lib/client_config/lb_policy.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
+#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/lb_policy.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -51,7 +51,7 @@ struct grpc_lb_policy_factory {
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
@@ -75,4 +75,4 @@ grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H */
diff --git a/src/core/lib/client_config/lb_policy_registry.c b/src/core/ext/client_config/lb_policy_registry.c
index af396362a1..a23643ecc6 100644
--- a/src/core/lib/client_config/lb_policy_registry.c
+++ b/src/core/ext/client_config/lb_policy_registry.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include <string.h>
diff --git a/src/core/lib/client_config/lb_policy_registry.h b/src/core/ext/client_config/lb_policy_registry.h
index 4b8495d8a1..92f38d6de6 100644
--- a/src/core/lib/client_config/lb_policy_registry.h
+++ b/src/core/ext/client_config/lb_policy_registry.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
-#include "src/core/lib/client_config/lb_policy_factory.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
/** Initialize the registry and set \a default_factory as the factory to be
@@ -52,4 +52,4 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory);
grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
grpc_lb_policy_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */
diff --git a/src/core/lib/client_config/resolver.c b/src/core/ext/client_config/resolver.c
index b9eef5575f..eb004455bd 100644
--- a/src/core/lib/client_config/resolver.c
+++ b/src/core/ext/client_config/resolver.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/resolver.h"
+#include "src/core/ext/client_config/resolver.h"
void grpc_resolver_init(grpc_resolver *resolver,
const grpc_resolver_vtable *vtable) {
diff --git a/src/core/lib/client_config/resolver.h b/src/core/ext/client_config/resolver.h
index cf0bb2bc7a..6ecb5d2774 100644
--- a/src/core/lib/client_config/resolver.h
+++ b/src/core/ext/client_config/resolver.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H
-#include "src/core/lib/client_config/client_config.h"
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/client_config.h"
+#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/iomgr/iomgr.h"
typedef struct grpc_resolver grpc_resolver;
@@ -91,4 +91,4 @@ void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_closure *on_complete);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H */
diff --git a/src/core/lib/client_config/resolver_factory.c b/src/core/ext/client_config/resolver_factory.c
index 001fa28536..67832dcf59 100644
--- a/src/core/lib/client_config/resolver_factory.c
+++ b/src/core/ext/client_config/resolver_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/resolver_factory.h"
+#include "src/core/ext/client_config/resolver_factory.h"
void grpc_resolver_factory_ref(grpc_resolver_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/lib/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h
index a5bca06475..4eb6979aad 100644
--- a/src/core/lib/client_config/resolver_factory.h
+++ b/src/core/ext/client_config/resolver_factory.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H
-#include "src/core/lib/client_config/resolver.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
-#include "src/core/lib/client_config/uri_parser.h"
+#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/resolver.h"
+#include "src/core/ext/client_config/uri_parser.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
@@ -49,7 +49,7 @@ struct grpc_resolver_factory {
typedef struct grpc_resolver_args {
grpc_uri *uri;
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
@@ -79,4 +79,4 @@ grpc_resolver *grpc_resolver_factory_create_resolver(
char *grpc_resolver_factory_get_default_authority(
grpc_resolver_factory *factory, grpc_uri *uri);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H */
diff --git a/src/core/lib/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index 5f3db273b5..07f29bcb27 100644
--- a/src/core/lib/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include <string.h>
@@ -123,14 +123,14 @@ static grpc_resolver_factory *resolve_factory(const char *target,
}
grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory) {
+ const char *target, grpc_client_channel_factory *client_channel_factory) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
- args.subchannel_factory = subchannel_factory;
+ args.client_channel_factory = client_channel_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
diff --git a/src/core/lib/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h
index 36c4f2fe03..5ef1383cd3 100644
--- a/src/core/lib/client_config/resolver_registry.h
+++ b/src/core/ext/client_config/resolver_registry.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_REGISTRY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_REGISTRY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H
-#include "src/core/lib/client_config/resolver_factory.h"
+#include "src/core/ext/client_config/resolver_factory.h"
void grpc_resolver_registry_init(const char *default_prefix);
void grpc_resolver_registry_shutdown(void);
@@ -56,7 +56,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
return it.
If a resolver factory was not found, return NULL. */
grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory);
+ const char *target, grpc_client_channel_factory *client_channel_factory);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
@@ -66,4 +66,4 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name);
representing the default authority to pass from a client. */
char *grpc_get_default_authority(const char *target);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_REGISTRY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H */
diff --git a/src/core/lib/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 47c53a16ba..20cf245a5b 100644
--- a/src/core/lib/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -31,18 +31,18 @@
*
*/
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/subchannel.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/initial_connect_string.h"
+#include "src/core/ext/client_config/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/client_config/initial_connect_string.h"
-#include "src/core/lib/client_config/subchannel_index.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/backoff.h"
@@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
-#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -352,6 +352,25 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->args->args[i].value.integer,
c->args->args[i].value.integer);
}
+ if (0 ==
+ strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ if (c->args->args[i].type == GRPC_ARG_INTEGER) {
+ if (c->args->args[i].value.integer >= 0) {
+ gpr_backoff_init(
+ &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ GPR_MIN(c->args->args[i].value.integer,
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
+ c->args->args[i].value.integer);
+ } else {
+ gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
+ " : must be non-negative");
+ }
+ } else {
+ gpr_log(GPR_ERROR,
+ GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer");
+ }
+ }
}
}
gpr_mu_init(&c->mu);
diff --git a/src/core/lib/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 68aeff39a1..0765a544e8 100644
--- a/src/core/lib/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H
+#include "src/core/ext/client_config/connector.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/connector.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A (sub-)channel that knows how to connect to exactly one target
@@ -171,4 +171,4 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
grpc_subchannel_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H */
diff --git a/src/core/lib/channel/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 22f3679bf5..3db462b246 100644
--- a/src/core/lib/channel/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/channel/subchannel_call_holder.h"
+#include "src/core/ext/client_config/subchannel_call_holder.h"
#include <grpc/support/alloc.h>
@@ -127,7 +127,7 @@ retry:
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
- &holder->connected_subchannel, NULL);
+ 0, &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@@ -145,7 +145,8 @@ retry:
GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
if (holder->pick_subchannel(
exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
- &holder->connected_subchannel, &holder->next_step)) {
+ op->send_initial_metadata_flags, &holder->connected_subchannel,
+ &holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
diff --git a/src/core/lib/channel/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 5cf291a266..9299908788 100644
--- a/src/core/lib/channel/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -31,10 +31,10 @@
*
*/
-#ifndef GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
-#define GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/subchannel.h"
/** Pick a subchannel for grpc_subchannel_call_holder;
Return 1 if subchannel is available immediately (in which case on_ready
@@ -42,6 +42,7 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {
@@ -94,4 +95,4 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
-#endif /* GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H */
diff --git a/src/core/lib/client_config/subchannel_factory.c b/src/core/ext/client_config/subchannel_factory.c
index 541368ec96..d1e4d75a02 100644
--- a/src/core/lib/client_config/subchannel_factory.c
+++ b/src/core/ext/client_config/subchannel_factory.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/subchannel_factory.h"
+#include "src/core/ext/client_config/subchannel_factory.h"
void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
factory->vtable->ref(factory);
diff --git a/src/core/lib/client_config/subchannel_factory.h b/src/core/ext/client_config/subchannel_factory.h
index 96d68a2079..0fb806d081 100644
--- a/src/core/lib/client_config/subchannel_factory.h
+++ b/src/core/ext/client_config/subchannel_factory.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
+#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/subchannel.h"
typedef struct grpc_subchannel_factory grpc_subchannel_factory;
typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
@@ -63,4 +63,4 @@ grpc_subchannel *grpc_subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
grpc_subchannel_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
diff --git a/src/core/lib/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c
index 2c545002a2..ab8d9bd91d 100644
--- a/src/core/lib/client_config/subchannel_index.c
+++ b/src/core/ext/client_config/subchannel_index.c
@@ -31,7 +31,7 @@
//
//
-#include "src/core/lib/client_config/subchannel_index.h"
+#include "src/core/ext/client_config/subchannel_index.h"
#include <stdbool.h>
#include <string.h>
diff --git a/src/core/lib/client_config/subchannel_index.h b/src/core/ext/client_config/subchannel_index.h
index bc5f03beb4..6b8d063855 100644
--- a/src/core/lib/client_config/subchannel_index.h
+++ b/src/core/ext/client_config/subchannel_index.h
@@ -31,11 +31,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
-#include "src/core/lib/client_config/connector.h"
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/ext/client_config/connector.h"
+#include "src/core/ext/client_config/subchannel.h"
/** \file Provides an index of active subchannels so that they can be
shared amongst channels */
@@ -74,4 +74,4 @@ void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */
diff --git a/src/core/lib/client_config/uri_parser.c b/src/core/ext/client_config/uri_parser.c
index 6bec70da2d..3ca1a58e69 100644
--- a/src/core/lib/client_config/uri_parser.c
+++ b/src/core/ext/client_config/uri_parser.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/lib/client_config/uri_parser.h"
+#include "src/core/ext/client_config/uri_parser.h"
#include <string.h>
diff --git a/src/core/lib/client_config/uri_parser.h b/src/core/ext/client_config/uri_parser.h
index 5d6785d293..875a7cb07c 100644
--- a/src/core/lib/client_config/uri_parser.h
+++ b/src/core/ext/client_config/uri_parser.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_URI_PARSER_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_URI_PARSER_H
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H
#include <stddef.h>
@@ -60,4 +60,4 @@ const char *grpc_uri_get_query_arg(const grpc_uri *uri, const char *key);
/** destroy a uri */
void grpc_uri_destroy(grpc_uri *uri);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_URI_PARSER_H */
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H */
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index d8af644870..459d6d9954 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -110,13 +110,15 @@ grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response) {
grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response));
memset(res, 0, sizeof(*res));
status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
- GPR_ASSERT(status == true);
+ if (!status) {
+ grpc_grpclb_response_destroy(res);
+ return NULL;
+ }
return res;
}
grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
gpr_slice encoded_response) {
- grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist));
bool status;
decode_serverlist_arg arg;
pb_istream_t stream =
@@ -131,15 +133,20 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
res->server_list.servers.arg = &arg;
arg.first_pass = 1;
status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
- GPR_ASSERT(status == true);
- GPR_ASSERT(arg.num_servers > 0);
+ if (!status) {
+ grpc_grpclb_response_destroy(res);
+ return NULL;
+ }
arg.first_pass = 0;
status =
pb_decode(&stream_at_start, grpc_lb_v0_LoadBalanceResponse_fields, res);
- GPR_ASSERT(status == true);
- GPR_ASSERT(arg.servers != NULL);
+ if (!status) {
+ grpc_grpclb_response_destroy(res);
+ return NULL;
+ }
+ grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist));
sl->num_servers = arg.num_servers;
sl->servers = arg.servers;
if (res->server_list.has_expiration_interval) {
@@ -150,8 +157,10 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
}
void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) {
- size_t i;
- for (i = 0; i < serverlist->num_servers; i++) {
+ if (serverlist == NULL) {
+ return;
+ }
+ for (size_t i = 0; i < serverlist->num_servers; i++) {
gpr_free(serverlist->servers[i]);
}
gpr_free(serverlist->servers);
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index d329a2ffe8..968f7d278a 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -36,8 +36,8 @@
#include <grpc/support/slice_buffer.h>
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h"
-#include "src/core/lib/client_config/lb_policy_factory.h"
#ifdef __cplusplus
extern "C" {
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index cb5c40501e..5926f9d70b 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -34,12 +34,13 @@
#include <string.h>
#include <grpc/support/alloc.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/transport/connectivity_state.h"
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -149,6 +150,31 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
@@ -171,6 +197,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -199,6 +226,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->next = p->pending_picks;
pp->pollset = pollset;
pp->target = target;
+ pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -286,11 +314,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
+ if (p->checking_subchannel == 0) {
+ /* only trigger transient failure when we've tried all alternatives */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connecting_transient_failure");
+ }
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@@ -378,14 +409,9 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy,
- pf_shutdown,
- pf_pick,
- pf_cancel_pick,
- pf_ping_one,
- pf_exit_idle,
- pf_check_connectivity,
- pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick,
+ pf_cancel_pick, pf_cancel_picks, pf_ping_one,
+ pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -395,7 +421,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
- GPR_ASSERT(args->subchannel_factory != NULL);
+ GPR_ASSERT(args->client_channel_factory != NULL);
if (args->addresses->naddrs == 0) return NULL;
@@ -412,8 +438,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
- grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
- exec_ctx, args->subchannel_factory, &sc_args);
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index d94c081494..3f6051b892 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -35,7 +35,7 @@
#include <grpc/support/alloc.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -49,6 +49,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -275,6 +276,32 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ *pp->target = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@@ -303,6 +330,7 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -330,6 +358,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollset = pollset;
pp->target = target;
pp->on_complete = on_complete;
+ pp->initial_metadata_flags = initial_metadata_flags;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -485,14 +514,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown,
- rr_pick,
- rr_cancel_pick,
- rr_ping_one,
- rr_exit_idle,
- rr_check_connectivity,
- rr_notify_on_state_change};
+ rr_destroy, rr_shutdown, rr_pick,
+ rr_cancel_pick, rr_cancel_picks, rr_ping_one,
+ rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -502,7 +526,7 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
- GPR_ASSERT(args->subchannel_factory != NULL);
+ GPR_ASSERT(args->client_channel_factory != NULL);
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
@@ -518,8 +542,8 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
- grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
- exec_ctx, args->subchannel_factory, &sc_args);
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 70d8a3fe2d..2749b0ca01 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -37,8 +37,8 @@
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/backoff.h"
@@ -59,7 +59,7 @@ typedef struct {
/** default port to use */
char *default_port;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -170,7 +170,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
config = grpc_client_config_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
@@ -228,7 +228,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
@@ -255,10 +255,10 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
- r->subchannel_factory = args->subchannel_factory;
+ r->client_channel_factory = args->client_channel_factory;
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 69595ca3db..1f14b40e18 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,17 +31,17 @@
*
*/
-#include <grpc/support/port_platform.h>
-
+#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
+#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
@@ -52,7 +52,7 @@ typedef struct {
/** refcount */
gpr_refcount refs;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -125,7 +125,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = r->addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(cfg, lb_policy);
@@ -140,7 +140,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -263,22 +263,24 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
- r->lb_policy_name = NULL;
- if (0 != strcmp(args->uri->query, "")) {
- gpr_slice query_slice;
- gpr_slice_buffer query_parts;
-
- query_slice =
- gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing);
- gpr_slice_buffer_init(&query_parts);
- gpr_slice_split(query_slice, "=", &query_parts);
- GPR_ASSERT(query_parts.count == 2);
- if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) {
- r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII);
- }
- gpr_slice_buffer_destroy(&query_parts);
- gpr_slice_unref(query_slice);
+ r->lb_policy_name =
+ gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy"));
+ const char *lb_enabled_qpart =
+ grpc_uri_get_query_arg(args->uri, "lb_enabled");
+ /* anything other than "0" is interpreted as true */
+ const bool lb_enabled =
+ (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0));
+
+ if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 &&
+ !lb_enabled) {
+ /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
+ * out, as this is meant mostly for tests. */
+ gpr_log(GPR_ERROR,
+ "Requested 'grpclb' LB policy but resolved addresses don't "
+ "support load balancing.");
+ abort();
}
+
if (r->lb_policy_name == NULL) {
r->lb_policy_name = gpr_strdup(default_lb_policy_name);
}
@@ -318,8 +320,8 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
- r->subchannel_factory = args->subchannel_factory;
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->client_channel_factory = args->client_channel_factory;
+ grpc_client_channel_factory_ref(r->client_channel_factory);
return &r->base;
}
diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
index 5acb0940c6..898632c3cd 100644
--- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
+++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
@@ -39,8 +39,8 @@
#include <grpc/grpc_zookeeper.h>
#include <zookeeper/zookeeper.h>
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/support/string.h"
@@ -57,7 +57,7 @@ typedef struct {
/** name to resolve */
char *name;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -187,9 +187,8 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
-
lb_policy_args.addresses = addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
@@ -424,7 +423,7 @@ static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config != NULL) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -454,8 +453,8 @@ static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
- r->subchannel_factory = args->subchannel_factory;
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->client_channel_factory = args->client_channel_factory;
+ grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 606fff5fb4..5484438f0a 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -41,12 +41,12 @@
#include <grpc/support/slice_buffer.h>
#include "src/core/ext/census/grpc_filter.h"
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -136,31 +136,35 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
- grpc_subchannel_factory base;
+ grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel *master;
-} subchannel_factory;
+} client_channel_factory;
-static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_ref(
+ grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_unref(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
+ if (f->master != NULL) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
+ "client_channel_factory");
+ }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
-static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
+static grpc_subchannel *client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
- subchannel_factory *f = (subchannel_factory *)scf;
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@@ -175,9 +179,33 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
-static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- subchannel_factory_ref, subchannel_factory_unref,
- subchannel_factory_create_subchannel};
+static grpc_channel *client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ const char *target, grpc_client_channel_type type,
+ grpc_channel_args *args) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
+ grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(final_args);
+ grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ if (!resolver) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
+ "client_channel_factory_create_channel");
+ return NULL;
+ }
+
+ grpc_client_channel_set_resolver(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
+
+ return channel;
+}
+
+static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
+ {client_channel_factory_ref, client_channel_factory_unref,
+ client_channel_factory_create_subchannel,
+ client_channel_factory_create_channel};
/* Create a client channel:
Asynchronously: - resolve target
@@ -186,38 +214,27 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_channel *channel = NULL;
- grpc_resolver *resolver;
- subchannel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
GPR_ASSERT(!reserved);
- channel =
- grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
-
- f = gpr_malloc(sizeof(*f));
- f->base.vtable = &subchannel_factory_vtable;
+ client_channel_factory *f = gpr_malloc(sizeof(*f));
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args);
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
- if (!resolver) {
- GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, f->master, "subchannel_factory");
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
- grpc_exec_ctx_finish(&exec_ctx);
- return NULL;
- }
- grpc_client_channel_set_resolver(
- &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
- GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
+ grpc_channel *channel = client_channel_factory_create_channel(
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ if (channel != NULL) {
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
+ }
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
- return channel;
+ return channel; /* may be NULL */
}
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 3465d2b6c4..58af6f995a 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -40,10 +40,10 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/client_channel.h"
-#include "src/core/lib/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/security/auth_filters.h"
#include "src/core/lib/security/credentials.h"
@@ -192,34 +192,38 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
- grpc_subchannel_factory base;
+ grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
grpc_channel *master;
-} subchannel_factory;
+} client_channel_factory;
-static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_ref(
+ grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_unref(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
- "subchannel_factory");
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
+ "client_channel_factory");
+ if (f->master != NULL) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
+ "client_channel_factory");
+ }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
-static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
+static grpc_subchannel *client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
- subchannel_factory *f = (subchannel_factory *)scf;
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@@ -236,9 +240,37 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
-static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- subchannel_factory_ref, subchannel_factory_unref,
- subchannel_factory_create_subchannel};
+static grpc_channel *client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ const char *target, grpc_client_channel_type type,
+ grpc_channel_args *args) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
+
+ grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(final_args);
+
+ grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ if (resolver != NULL) {
+ grpc_client_channel_set_resolver(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create");
+ } else {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
+ "client_channel_factory_create_channel");
+ channel = NULL;
+ }
+
+ GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
+ "client_channel_factory_create_channel");
+ return channel;
+}
+
+static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
+ {client_channel_factory_ref, client_channel_factory_unref,
+ client_channel_factory_create_subchannel,
+ client_channel_factory_create_channel};
/* Create a secure client channel:
Asynchronously: - resolve target
@@ -248,13 +280,11 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_channel *channel;
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *security_connector;
- grpc_resolver *resolver;
- subchannel_factory *f;
+ client_channel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
@@ -284,35 +314,30 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
- channel = grpc_channel_create(&exec_ctx, target, args_copy,
- GRPC_CLIENT_CHANNEL, NULL);
-
f = gpr_malloc(sizeof(*f));
- f->base.vtable = &subchannel_factory_vtable;
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
- GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "subchannel_factory");
- f->security_connector = security_connector;
+
f->merge_args = grpc_channel_args_copy(args_copy);
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
- if (resolver) {
- grpc_client_channel_set_resolver(
- &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
- GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
- }
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
- GRPC_SECURITY_CONNECTOR_UNREF(&security_connector->base, "channel_create");
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
- if (!resolver) {
- GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "subchannel_factory");
- channel = NULL;
+ GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
+ "grpc_secure_channel_create");
+ f->security_connector = security_connector;
+
+ grpc_channel *channel = client_channel_factory_create_channel(
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ if (channel != NULL) {
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create");
}
+
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
- return channel;
+ return channel; /* may be NULL */
}
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index deba23e21f..211f537c69 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -111,10 +111,12 @@ static void hc_mutate_op(grpc_call_element *elem,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
- grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
- op->send_idempotent_request
- ? GRPC_MDELEM_METHOD_PUT
- : GRPC_MDELEM_METHOD_POST);
+ grpc_metadata_batch_add_head(
+ op->send_initial_metadata, &calld->method,
+ op->send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ ? GRPC_MDELEM_METHOD_PUT
+ : GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index dba335490b..3c8127e1a8 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 235a7df08d..0eb95a2e09 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 9d27b2bcda..1fa9f5ef2d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h
index 752cab85a5..22d6af5044 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.h
+++ b/src/core/lib/iomgr/unix_sockets_posix.h
@@ -38,8 +38,8 @@
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/resolver_factory.h"
-#include "src/core/lib/client_config/uri_parser.h"
+#include "src/core/ext/client_config/resolver_factory.h"
+#include "src/core/ext/client_config/uri_parser.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 37cc724b53..ba94852f9a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -81,11 +81,11 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
- /* Status was created by some internal channel stack operation */
- STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
@@ -1110,6 +1110,9 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
+ if (!success) {
+ set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ }
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
@@ -1232,8 +1235,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op.send_idempotent_request =
- (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
+ stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 06f991b085..332f504507 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -40,7 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 640fd7e137..22dae930e4 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -35,7 +35,6 @@
#define GRPC_CORE_LIB_SURFACE_CHANNEL_H
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/surface/channel_stack_type.h"
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index aca4ce9d07..f221d8db35 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -39,17 +39,17 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/resolver_registry.h"
+#include "src/core/ext/client_config/subchannel.h"
+#include "src/core/ext/client_config/subchannel_index.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
-#include "src/core/lib/client_config/subchannel.h"
-#include "src/core/lib/client_config/subchannel_index.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 1898bee1c1..37cc2bd101 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -95,7 +95,6 @@ typedef struct requested_call {
grpc_byte_buffer **optional_payload;
} registered;
} data;
- grpc_closure publish;
} requested_call;
typedef struct channel_registered_method {
@@ -156,15 +155,21 @@ struct call_data {
bool recv_idempotent_request;
grpc_metadata_array initial_metadata;
+ request_matcher *request_matcher;
+ grpc_byte_buffer *payload;
+
grpc_closure got_initial_metadata;
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure publish;
+
call_data *pending_next;
};
struct request_matcher {
+ grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
gpr_stack_lockfree *requests;
@@ -173,6 +178,7 @@ struct request_matcher {
struct registered_method {
char *method;
char *host;
+ grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
request_matcher request_matcher;
registered_method *next;
@@ -226,8 +232,7 @@ struct grpc_server {
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc);
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
@@ -303,8 +308,10 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
-static void request_matcher_init(request_matcher *rm, size_t entries) {
+static void request_matcher_init(request_matcher *rm, size_t entries,
+ grpc_server *server) {
memset(rm, 0, sizeof(*rm));
+ rm->server = server;
rm->requests = gpr_stack_lockfree_create(entries);
}
@@ -417,21 +424,90 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
&op);
}
-static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
- grpc_call_element *elem, request_matcher *rm) {
- call_data *calld = elem->call_data;
- int request_id;
+static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
+ gpr_slice slice = value->slice;
+ size_t len = GPR_SLICE_LENGTH(slice);
- if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ if (len + 1 > *capacity) {
+ *capacity = GPR_MAX(len + 1, *capacity * 2);
+ *dest = gpr_realloc(*dest, *capacity);
+ }
+ memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
+}
+
+static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
+ grpc_cq_completion *c) {
+ requested_call *rc = req;
+ grpc_server *server = rc->server;
+
+ if (rc >= server->requested_calls &&
+ rc < server->requested_calls + server->max_requested_calls) {
+ GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
+ gpr_stack_lockfree_push(server->request_freelist,
+ (int)(rc - server->requested_calls));
+ } else {
+ gpr_free(req);
+ }
+
+ server_unref(exec_ctx, server);
+}
+
+static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
+ call_data *calld, requested_call *rc) {
+ grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
+ grpc_call *call = calld->call;
+ *rc->call = call;
+ calld->cq_new = rc->cq_for_notification;
+ GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
+ switch (rc->type) {
+ case BATCH_CALL:
+ GPR_ASSERT(calld->host != NULL);
+ GPR_ASSERT(calld->path != NULL);
+ cpstr(&rc->data.batch.details->host,
+ &rc->data.batch.details->host_capacity, calld->host);
+ cpstr(&rc->data.batch.details->method,
+ &rc->data.batch.details->method_capacity, calld->path);
+ rc->data.batch.details->deadline = calld->deadline;
+ rc->data.batch.details->flags =
+ 0 | (calld->recv_idempotent_request
+ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ : 0);
+ break;
+ case REGISTERED_CALL:
+ *rc->data.registered.deadline = calld->deadline;
+ if (rc->data.registered.optional_payload) {
+ *rc->data.registered.optional_payload = calld->payload;
+ }
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
+ grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc,
+ &rc->completion);
+}
+
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ call_data *calld = arg;
+ request_matcher *rm = calld->request_matcher;
+ grpc_server *server = rm->server;
+
+ if (!success || gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_closure_init(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
- request_id = gpr_stack_lockfree_pop(rm->requests);
+ int request_id = gpr_stack_lockfree_pop(rm->requests);
if (request_id == -1) {
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@@ -449,7 +525,41 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ }
+}
+
+static void finish_start_new_rpc(
+ grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem,
+ request_matcher *rm,
+ grpc_server_register_method_payload_handling payload_handling) {
+ call_data *calld = elem->call_data;
+
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_mu_lock(&calld->mu_state);
+ calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
+ return;
+ }
+
+ calld->request_matcher = rm;
+
+ switch (payload_handling) {
+ case GRPC_SRM_PAYLOAD_NONE:
+ publish_new_rpc(exec_ctx, calld, true);
+ break;
+ case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_RECV_MESSAGE;
+ op.data.recv_message = &calld->payload;
+ grpc_closure_init(&calld->publish, publish_new_rpc, calld);
+ grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
+ &calld->publish);
+ break;
+ }
}
}
@@ -475,7 +585,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ rm->server_registered_method->payload_handling);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -490,12 +601,14 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(exec_ctx, server, elem,
- &server->unregistered_request_matcher);
+ &server->unregistered_request_matcher,
+ GRPC_SRM_PAYLOAD_NONE);
}
static int num_listeners(grpc_server *server) {
@@ -824,7 +937,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
gpr_stack_lockfree_push(server->request_freelist, (int)i);
}
request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls);
+ server->max_requested_calls, server);
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
@@ -840,8 +953,10 @@ static int streq(const char *a, const char *b) {
return 0 == strcmp(a, b);
}
-void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host, uint32_t flags) {
+void *grpc_server_register_method(
+ grpc_server *server, const char *method, const char *host,
+ grpc_server_register_method_payload_handling payload_handling,
+ uint32_t flags) {
registered_method *m;
GRPC_API_TRACE(
"grpc_server_register_method(server=%p, method=%s, host=%s, "
@@ -866,10 +981,12 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
- request_matcher_init(&m->request_matcher, server->max_requested_calls);
+ request_matcher_init(&m->request_matcher, server->max_requested_calls,
+ server);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
+ m->payload_handling = payload_handling;
m->flags = flags;
server->registered_methods = m;
return m;
@@ -1143,8 +1260,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(exec_ctx, server, calld,
- &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld,
+ &server->requested_calls[request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1209,6 +1326,12 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
+ if ((optional_payload == NULL) !=
+ (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
+ goto done;
+ }
grpc_cq_begin_op(cq_for_notification, tag);
rc->type = REGISTERED_CALL;
rc->server = server;
@@ -1226,86 +1349,6 @@ done:
return error;
}
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
- void *user_data, bool success);
-
-static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
- gpr_slice slice = value->slice;
- size_t len = GPR_SLICE_LENGTH(slice);
-
- if (len + 1 > *capacity) {
- *capacity = GPR_MAX(len + 1, *capacity * 2);
- *dest = gpr_realloc(*dest, *capacity);
- }
- memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
-}
-
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc) {
- grpc_op ops[1];
- grpc_op *op = ops;
-
- memset(ops, 0, sizeof(ops));
-
- /* called once initial metadata has been read by the call, but BEFORE
- the ioreq to fetch it out of the call has been executed.
- This means metadata related fields can be relied on in calld, but to
- fill in the metadata array passed by the client, we need to perform
- an ioreq op, that should complete immediately. */
-
- grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
- grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
- *rc->call = calld->call;
- calld->cq_new = rc->cq_for_notification;
- GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
- switch (rc->type) {
- case BATCH_CALL:
- GPR_ASSERT(calld->host != NULL);
- GPR_ASSERT(calld->path != NULL);
- cpstr(&rc->data.batch.details->host,
- &rc->data.batch.details->host_capacity, calld->host);
- cpstr(&rc->data.batch.details->method,
- &rc->data.batch.details->method_capacity, calld->path);
- rc->data.batch.details->deadline = calld->deadline;
- rc->data.batch.details->flags =
- 0 | (calld->recv_idempotent_request
- ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
- : 0);
- break;
- case REGISTERED_CALL:
- *rc->data.registered.deadline = calld->deadline;
- if (rc->data.registered.optional_payload) {
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message = rc->data.registered.optional_payload;
- op++;
- }
- break;
- default:
- GPR_UNREACHABLE_CODE(return );
- }
-
- GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
- (size_t)(op - ops), &rc->publish);
-}
-
-static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
- grpc_cq_completion *c) {
- requested_call *rc = req;
- grpc_server *server = rc->server;
-
- if (rc >= server->requested_calls &&
- rc < server->requested_calls + server->max_requested_calls) {
- GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
- gpr_stack_lockfree_push(server->request_freelist,
- (int)(rc - server->requested_calls));
- } else {
- gpr_free(req);
- }
-
- server_unref(exec_ctx, server);
-}
-
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc) {
*rc->call = NULL;
@@ -1316,20 +1359,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
done_request_event, rc, &rc->completion);
}
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
- bool success) {
- requested_call *rc = prc;
- grpc_call *call = *rc->call;
- grpc_call_element *elem =
- grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- server_ref(chand->server);
- grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
- rc, &rc->completion);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
-}
-
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
return server->channel_args;
}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 460e4dcedc..bb1120ee3a 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -101,9 +101,9 @@ typedef struct grpc_transport_stream_op {
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
- /** Iff send_initial_metadata != NULL, flags if this is an idempotent request
- or not */
- bool send_idempotent_request;
+ /** Iff send_initial_metadata != NULL, flags associated with
+ send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
+ uint32_t send_initial_metadata_flags;
/** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;