aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_channel')
-rw-r--r--src/core/ext/client_channel/README.md65
-rw-r--r--src/core/ext/client_channel/channel_connectivity.c222
-rw-r--r--src/core/ext/client_channel/client_channel.c1140
-rw-r--r--src/core/ext/client_channel/client_channel.h64
-rw-r--r--src/core/ext/client_channel/client_channel_factory.c57
-rw-r--r--src/core/ext/client_channel/client_channel_factory.h86
-rw-r--r--src/core/ext/client_channel/client_channel_plugin.c91
-rw-r--r--src/core/ext/client_channel/connector.c55
-rw-r--r--src/core/ext/client_channel/connector.h92
-rw-r--r--src/core/ext/client_channel/default_initial_connect_string.c38
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.c275
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.h47
-rw-r--r--src/core/ext/client_channel/initial_connect_string.c52
-rw-r--r--src/core/ext/client_channel/initial_connect_string.h50
-rw-r--r--src/core/ext/client_channel/lb_policy.c146
-rw-r--r--src/core/ext/client_channel/lb_policy.h202
-rw-r--r--src/core/ext/client_channel/lb_policy_factory.c161
-rw-r--r--src/core/ext/client_channel/lb_policy_factory.h129
-rw-r--r--src/core/ext/client_channel/lb_policy_registry.c83
-rw-r--r--src/core/ext/client_channel/lb_policy_registry.h55
-rw-r--r--src/core/ext/client_channel/parse_address.c144
-rw-r--r--src/core/ext/client_channel/parse_address.h54
-rw-r--r--src/core/ext/client_channel/resolver.c81
-rw-r--r--src/core/ext/client_channel/resolver.h90
-rw-r--r--src/core/ext/client_channel/resolver_factory.c55
-rw-r--r--src/core/ext/client_channel/resolver_factory.h80
-rw-r--r--src/core/ext/client_channel/resolver_registry.c154
-rw-r--r--src/core/ext/client_channel/resolver_registry.h74
-rw-r--r--src/core/ext/client_channel/subchannel.c729
-rw-r--r--src/core/ext/client_channel/subchannel.h178
-rw-r--r--src/core/ext/client_channel/subchannel_index.c278
-rw-r--r--src/core/ext/client_channel/subchannel_index.h77
-rw-r--r--src/core/ext/client_channel/uri_parser.c310
-rw-r--r--src/core/ext/client_channel/uri_parser.h63
34 files changed, 5477 insertions, 0 deletions
diff --git a/src/core/ext/client_channel/README.md b/src/core/ext/client_channel/README.md
new file mode 100644
index 0000000000..7c209db12e
--- /dev/null
+++ b/src/core/ext/client_channel/README.md
@@ -0,0 +1,65 @@
+Client Configuration Support for GRPC
+=====================================
+
+This library provides high level configuration machinery to construct client
+channels and load balance between them.
+
+Each grpc_channel is created with a grpc_resolver. It is the resolver's duty
+to resolve a name into a set of arguments for the channel. Such arguments
+might include:
+
+- a list of (ip, port) addresses to connect to
+- a load balancing policy to decide which server to send a request to
+- a set of filters to mutate outgoing requests (say, by adding metadata)
+
+The resolver provides this data as a stream of grpc_channel_args objects to
+the channel. We represent arguments as a stream so that they can be changed
+by the resolver during execution, by reacting to external events (such as
+new service configuration data being pushed to some store).
+
+
+Load Balancing
+--------------
+
+Load balancing configuration is provided by a grpc_lb_policy object.
+
+The primary job of the load balancing policies is to pick a target server
+given only the initial metadata for a request. It does this by providing
+a grpc_subchannel object to the owning channel.
+
+
+Sub-Channels
+------------
+
+A sub-channel provides a connection to a server for a client channel. It has a
+connectivity state like a regular channel, and so can be connected or
+disconnected. This connectivity state can be used to inform load balancing
+decisions (for example, by avoiding disconnected backends).
+
+Configured sub-channels are fully setup to participate in the grpc data plane.
+Their behavior is specified by a set of grpc channel filters defined at their
+construction. To customize this behavior, resolvers build
+grpc_client_channel_factory objects, which use the decorator pattern to customize
+construction arguments for concrete grpc_subchannel instances.
+
+
+Naming for GRPC
+===============
+
+Names in GRPC are represented by a URI (as defined in
+[RFC 3986](https://tools.ietf.org/html/rfc3986)).
+
+The following schemes are currently supported:
+
+dns:///host:port - dns schemes are currently supported so long as authority is
+ empty (authority based dns resolution is expected in a future
+ release)
+
+unix:path - the unix scheme is used to create and connect to unix domain
+ sockets - the authority must be empty, and the path
+ represents the absolute or relative path to the desired
+ socket
+
+ipv4:host:port - a pre-resolved ipv4 dotted decimal address/port combination
+
+ipv6:[host]:port - a pre-resolved ipv6 address/port combination
diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/client_channel/channel_connectivity.c
new file mode 100644
index 0000000000..9797e66564
--- /dev/null
+++ b/src/core/ext/client_channel/channel_connectivity.c
@@ -0,0 +1,222 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/surface/channel.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/completion_queue.h"
+
+grpc_connectivity_state grpc_channel_check_connectivity_state(
+ grpc_channel *channel, int try_to_connect) {
+ /* forward through to the underlying client channel */
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_connectivity_state state;
+ GRPC_API_TRACE(
+ "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
+ (channel, try_to_connect));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ state = grpc_client_channel_check_connectivity_state(
+ &exec_ctx, client_channel_elem, try_to_connect);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return state;
+ }
+ gpr_log(GPR_ERROR,
+ "grpc_channel_check_connectivity_state called on something that is "
+ "not a client channel, but '%s'",
+ client_channel_elem->filter->name);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return GRPC_CHANNEL_SHUTDOWN;
+}
+
+typedef enum {
+ WAITING,
+ CALLING_BACK,
+ CALLING_BACK_AND_FINISHED,
+ CALLED_BACK
+} callback_phase;
+
+typedef struct {
+ gpr_mu mu;
+ callback_phase phase;
+ grpc_closure on_complete;
+ grpc_timer alarm;
+ grpc_connectivity_state state;
+ grpc_completion_queue *cq;
+ grpc_cq_completion completion_storage;
+ grpc_channel *channel;
+ void *tag;
+} state_watcher;
+
+static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_channel_connectivity");
+ } else {
+ abort();
+ }
+ gpr_mu_destroy(&w->mu);
+ gpr_free(w);
+}
+
+static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_cq_completion *ignored) {
+ int delete = 0;
+ state_watcher *w = pw;
+ gpr_mu_lock(&w->mu);
+ switch (w->phase) {
+ case WAITING:
+ case CALLED_BACK:
+ GPR_UNREACHABLE_CODE(return );
+ case CALLING_BACK:
+ w->phase = CALLED_BACK;
+ break;
+ case CALLING_BACK_AND_FINISHED:
+ delete = 1;
+ break;
+ }
+ gpr_mu_unlock(&w->mu);
+
+ if (delete) {
+ delete_state_watcher(exec_ctx, w);
+ }
+}
+
+static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
+ bool due_to_completion, grpc_error *error) {
+ int delete = 0;
+
+ if (due_to_completion) {
+ grpc_timer_cancel(exec_ctx, &w->alarm);
+ }
+
+ gpr_mu_lock(&w->mu);
+
+ if (due_to_completion) {
+ if (grpc_trace_operation_failures) {
+ GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
+ }
+ GRPC_ERROR_UNREF(error);
+ error = GRPC_ERROR_NONE;
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ error =
+ GRPC_ERROR_CREATE("Timed out waiting for connection state change");
+ } else if (error == GRPC_ERROR_CANCELLED) {
+ error = GRPC_ERROR_NONE;
+ }
+ }
+ switch (w->phase) {
+ case WAITING:
+ w->phase = CALLING_BACK;
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error),
+ finished_completion, w, &w->completion_storage);
+ break;
+ case CALLING_BACK:
+ w->phase = CALLING_BACK_AND_FINISHED;
+ break;
+ case CALLING_BACK_AND_FINISHED:
+ GPR_UNREACHABLE_CODE(return );
+ case CALLED_BACK:
+ delete = 1;
+ break;
+ }
+ gpr_mu_unlock(&w->mu);
+
+ if (delete) {
+ delete_state_watcher(exec_ctx, w);
+ }
+
+ GRPC_ERROR_UNREF(error);
+}
+
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error));
+}
+
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error));
+}
+
+void grpc_channel_watch_connectivity_state(
+ grpc_channel *channel, grpc_connectivity_state last_observed_state,
+ gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ state_watcher *w = gpr_malloc(sizeof(*w));
+
+ GRPC_API_TRACE(
+ "grpc_channel_watch_connectivity_state("
+ "channel=%p, last_observed_state=%d, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
+ "cq=%p, tag=%p)",
+ 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
+ (int)deadline.clock_type, cq, tag));
+
+ grpc_cq_begin_op(cq, tag);
+
+ gpr_mu_init(&w->mu);
+ grpc_closure_init(&w->on_complete, watch_complete, w);
+ w->phase = WAITING;
+ w->state = last_observed_state;
+ w->cq = cq;
+ w->tag = tag;
+ w->channel = channel;
+
+ grpc_timer_init(&exec_ctx, &w->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
+
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
+ grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
+ grpc_cq_pollset(cq), &w->state,
+ &w->on_complete);
+ } else {
+ abort();
+ }
+
+ grpc_exec_ctx_finish(&exec_ctx);
+}
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
new file mode 100644
index 0000000000..ff773ac334
--- /dev/null
+++ b/src/core/ext/client_channel/client_channel.c
@@ -0,0 +1,1140 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/client_channel.h"
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/metadata.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/method_config.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+/* Client channel implementation */
+
+/*************************************************************************
+ * METHOD-CONFIG TABLE
+ */
+
+typedef enum {
+ WAIT_FOR_READY_UNSET,
+ WAIT_FOR_READY_FALSE,
+ WAIT_FOR_READY_TRUE
+} wait_for_ready_value;
+
+typedef struct method_parameters {
+ gpr_timespec timeout;
+ wait_for_ready_value wait_for_ready;
+} method_parameters;
+
+static void *method_parameters_copy(void *value) {
+ void *new_value = gpr_malloc(sizeof(method_parameters));
+ memcpy(new_value, value, sizeof(method_parameters));
+ return new_value;
+}
+
+static int method_parameters_cmp(void *value1, void *value2) {
+ const method_parameters *v1 = value1;
+ const method_parameters *v2 = value2;
+ const int retval = gpr_time_cmp(v1->timeout, v2->timeout);
+ if (retval != 0) return retval;
+ if (v1->wait_for_ready > v2->wait_for_ready) return 1;
+ if (v1->wait_for_ready < v2->wait_for_ready) return -1;
+ return 0;
+}
+
+static const grpc_mdstr_hash_table_vtable method_parameters_vtable = {
+ gpr_free, method_parameters_copy, method_parameters_cmp};
+
+static void *method_config_convert_value(
+ const grpc_method_config *method_config) {
+ method_parameters *value = gpr_malloc(sizeof(method_parameters));
+ const gpr_timespec *timeout = grpc_method_config_get_timeout(method_config);
+ value->timeout = timeout != NULL ? *timeout : gpr_time_0(GPR_TIMESPAN);
+ const bool *wait_for_ready =
+ grpc_method_config_get_wait_for_ready(method_config);
+ value->wait_for_ready =
+ wait_for_ready == NULL
+ ? WAIT_FOR_READY_UNSET
+ : (wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE);
+ return value;
+}
+
+/*************************************************************************
+ * CHANNEL-WIDE FUNCTIONS
+ */
+
+typedef struct client_channel_channel_data {
+ /** resolver for this channel */
+ grpc_resolver *resolver;
+ /** have we started resolving this channel */
+ bool started_resolving;
+ /** client channel factory */
+ grpc_client_channel_factory *client_channel_factory;
+
+ /** mutex protecting all variables below in this data structure */
+ gpr_mu mu;
+ /** currently active load balancer */
+ grpc_lb_policy *lb_policy;
+ /** maps method names to method_parameters structs */
+ grpc_mdstr_hash_table *method_params_table;
+ /** incoming resolver result - set by resolver.next() */
+ grpc_channel_args *resolver_result;
+ /** a list of closures that are all waiting for config to come in */
+ grpc_closure_list waiting_for_config_closures;
+ /** resolver callback */
+ grpc_closure on_resolver_result_changed;
+ /** connectivity state being tracked */
+ grpc_connectivity_state_tracker state_tracker;
+ /** when an lb_policy arrives, should we try to exit idle */
+ bool exit_idle_when_lb_policy_arrives;
+ /** owning stack */
+ grpc_channel_stack *owning_stack;
+ /** interested parties (owned) */
+ grpc_pollset_set *interested_parties;
+} channel_data;
+
+/** We create one watcher for each new lb_policy that is returned from a
+ resolver, to watch for state changes from the lb_policy. When a state
+ change is seen, we update the channel, and create a new watcher. */
+typedef struct {
+ channel_data *chand;
+ grpc_closure on_changed;
+ grpc_connectivity_state state;
+ grpc_lb_policy *lb_policy;
+} lb_policy_connectivity_watcher;
+
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state);
+
+static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand,
+ grpc_connectivity_state state,
+ grpc_error *error,
+ const char *reason) {
+ if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ state == GRPC_CHANNEL_SHUTDOWN) &&
+ chand->lb_policy != NULL) {
+ /* cancel picks with wait_for_ready=false */
+ grpc_lb_policy_cancel_picks(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
+ /* check= */ 0, GRPC_ERROR_REF(error));
+ }
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
+ reason);
+}
+
+static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
+ lb_policy_connectivity_watcher *w,
+ grpc_error *error) {
+ grpc_connectivity_state publish_state = w->state;
+ /* check if the notification is for a stale policy */
+ if (w->lb_policy != w->chand->lb_policy) return;
+
+ if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
+ publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
+ GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
+ w->chand->lb_policy = NULL;
+ }
+ set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+ GRPC_ERROR_REF(error), "lb_changed");
+ if (w->state != GRPC_CHANNEL_SHUTDOWN) {
+ watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
+ }
+}
+
+static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ lb_policy_connectivity_watcher *w = arg;
+
+ gpr_mu_lock(&w->chand->mu);
+ on_lb_policy_state_changed_locked(exec_ctx, w, error);
+ gpr_mu_unlock(&w->chand->mu);
+
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
+ gpr_free(w);
+}
+
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state) {
+ lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
+
+ w->chand = chand;
+ grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ w->state = current_state;
+ w->lb_policy = lb_policy;
+ grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
+ &w->on_changed);
+}
+
+static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ channel_data *chand = arg;
+ grpc_lb_policy *lb_policy = NULL;
+ grpc_lb_policy *old_lb_policy;
+ grpc_mdstr_hash_table *method_params_table = NULL;
+ grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ bool exit_idle = false;
+ grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
+
+ if (chand->resolver_result != NULL) {
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+
+ // Find LB policy name.
+ const char *lb_policy_name = NULL;
+ const grpc_arg *channel_arg =
+ grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ lb_policy_name = channel_arg->value.string;
+ }
+ // Special case: If all of the addresses are balancer addresses,
+ // assume that we should use the grpclb policy, regardless of what the
+ // resolver actually specified.
+ channel_arg =
+ grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
+ bool found_backend_address = false;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) {
+ found_backend_address = true;
+ break;
+ }
+ }
+ if (!found_backend_address) {
+ if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided only balancer "
+ "addresses, no backend addresses -- forcing use of grpclb LB "
+ "policy",
+ lb_policy_name);
+ }
+ lb_policy_name = "grpclb";
+ }
+ }
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (lb_policy_name == NULL) lb_policy_name = "pick_first";
+
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ GRPC_ERROR_UNREF(state_error);
+ state =
+ grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
+ }
+ channel_arg =
+ grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ method_params_table = grpc_method_config_table_convert(
+ (grpc_method_config_table *)channel_arg->value.pointer.p,
+ method_config_convert_value, &method_parameters_vtable);
+ }
+ grpc_channel_args_destroy(chand->resolver_result);
+ chand->resolver_result = NULL;
+ }
+
+ if (lb_policy != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
+ chand->interested_parties);
+ }
+
+ gpr_mu_lock(&chand->mu);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ if (chand->method_params_table != NULL) {
+ grpc_mdstr_hash_table_unref(chand->method_params_table);
+ }
+ chand->method_params_table = method_params_table;
+ if (lb_policy != NULL) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
+ NULL);
+ } else if (chand->resolver == NULL /* disconnected */) {
+ grpc_closure_list_fail_all(
+ &chand->waiting_for_config_closures,
+ GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
+ NULL);
+ }
+ if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
+ exit_idle = true;
+ chand->exit_idle_when_lb_policy_arrives = false;
+ }
+
+ if (error == GRPC_ERROR_NONE && chand->resolver) {
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy(exec_ctx, chand, lb_policy, state);
+ }
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+ gpr_mu_unlock(&chand->mu);
+ } else {
+ if (chand->resolver != NULL) {
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ chand->resolver = NULL;
+ }
+ grpc_error *refs[] = {error, state_error};
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs,
+ GPR_ARRAY_SIZE(refs)),
+ "resolver_gone");
+ gpr_mu_unlock(&chand->mu);
+ }
+
+ if (exit_idle) {
+ grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
+ }
+
+ if (old_lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(
+ exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
+ }
+
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
+ }
+
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
+ GRPC_ERROR_UNREF(state_error);
+}
+
+static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ channel_data *chand = elem->channel_data;
+
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+
+ GPR_ASSERT(op->set_accept_stream == false);
+ if (op->bind_pollset != NULL) {
+ grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
+ op->bind_pollset);
+ }
+
+ gpr_mu_lock(&chand->mu);
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
+
+ if (op->send_ping != NULL) {
+ if (chand->lb_policy == NULL) {
+ grpc_exec_ctx_sched(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("Ping with no load balancing"),
+ NULL);
+ } else {
+ grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
+ op->bind_pollset = NULL;
+ }
+ op->send_ping = NULL;
+ }
+
+ if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+ if (chand->resolver != NULL) {
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ chand->resolver = NULL;
+ if (!chand->started_resolving) {
+ grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
+ GRPC_ERROR_REF(op->disconnect_with_error));
+ grpc_exec_ctx_enqueue_list(exec_ctx,
+ &chand->waiting_for_config_closures, NULL);
+ }
+ if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ chand->lb_policy = NULL;
+ }
+ }
+ GRPC_ERROR_UNREF(op->disconnect_with_error);
+ }
+ gpr_mu_unlock(&chand->mu);
+}
+
+/* Constructor for channel_data */
+static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *chand = elem->channel_data;
+
+ memset(chand, 0, sizeof(*chand));
+
+ GPR_ASSERT(args->is_last);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ gpr_mu_init(&chand->mu);
+ grpc_closure_init(&chand->on_resolver_result_changed,
+ on_resolver_result_changed, chand);
+ chand->owning_stack = args->channel_stack;
+
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
+ "client_channel");
+ chand->interested_parties = grpc_pollset_set_create();
+}
+
+/* Destructor for channel_data */
+static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+
+ if (chand->resolver != NULL) {
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ }
+ if (chand->client_channel_factory != NULL) {
+ grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
+ }
+ if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ }
+ if (chand->method_params_table != NULL) {
+ grpc_mdstr_hash_table_unref(chand->method_params_table);
+ }
+ grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
+ grpc_pollset_set_destroy(chand->interested_parties);
+ gpr_mu_destroy(&chand->mu);
+}
+
+/*************************************************************************
+ * PER-CALL FUNCTIONS
+ */
+
+#define GET_CALL(call_data) \
+ ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
+
+#define CANCELLED_CALL ((grpc_subchannel_call *)1)
+
+typedef enum {
+ GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
+} subchannel_creation_phase;
+
+/** Call data. Holds a pointer to grpc_subchannel_call and the
+ associated machinery to create such a pointer.
+ Handles queueing of stream ops until a call object is ready, waiting
+ for initial metadata before trying to create a call object,
+ and handling cancellation gracefully. */
+typedef struct client_channel_call_data {
+ // State for handling deadlines.
+ // The code in deadline_filter.c requires this to be the first field.
+ // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
+ // and this struct both independently store a pointer to the call
+ // stack and each has its own mutex. If/when we have time, find a way
+ // to avoid this without breaking the grpc_deadline_state abstraction.
+ grpc_deadline_state deadline_state;
+
+ grpc_mdstr *path; // Request path.
+ gpr_timespec call_start_time;
+ gpr_timespec deadline;
+ wait_for_ready_value wait_for_ready_from_service_config;
+ grpc_closure read_service_config;
+
+ grpc_error *cancel_error;
+
+ /** either 0 for no call, 1 for cancelled, or a pointer to a
+ grpc_subchannel_call */
+ gpr_atm subchannel_call;
+
+ gpr_mu mu;
+
+ subchannel_creation_phase creation_phase;
+ grpc_connected_subchannel *connected_subchannel;
+ grpc_polling_entity *pollent;
+
+ grpc_transport_stream_op **waiting_ops;
+ size_t waiting_ops_count;
+ size_t waiting_ops_capacity;
+
+ grpc_closure next_step;
+
+ grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
+} call_data;
+
+static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("add_waiting_locked", 0);
+ if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
+ calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
+ calld->waiting_ops =
+ gpr_realloc(calld->waiting_ops,
+ calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
+ }
+ calld->waiting_ops[calld->waiting_ops_count++] = op;
+ GPR_TIMER_END("add_waiting_locked", 0);
+}
+
+static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
+ grpc_error *error) {
+ size_t i;
+ for (i = 0; i < calld->waiting_ops_count; i++) {
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
+ }
+ calld->waiting_ops_count = 0;
+ GRPC_ERROR_UNREF(error);
+}
+
+typedef struct {
+ grpc_transport_stream_op **ops;
+ size_t nops;
+ grpc_subchannel_call *call;
+} retry_ops_args;
+
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
+ retry_ops_args *a = args;
+ size_t i;
+ for (i = 0; i < a->nops; i++) {
+ grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
+ }
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
+ gpr_free(a->ops);
+ gpr_free(a);
+}
+
+static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
+ if (calld->waiting_ops_count == 0) {
+ return;
+ }
+
+ retry_ops_args *a = gpr_malloc(sizeof(*a));
+ a->ops = calld->waiting_ops;
+ a->nops = calld->waiting_ops_count;
+ a->call = GET_CALL(calld);
+ if (a->call == CANCELLED_CALL) {
+ gpr_free(a);
+ fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
+ return;
+ }
+ calld->waiting_ops = NULL;
+ calld->waiting_ops_count = 0;
+ calld->waiting_ops_capacity = 0;
+ GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a),
+ GRPC_ERROR_NONE, NULL);
+}
+
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = arg;
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&calld->mu);
+ GPR_ASSERT(calld->creation_phase ==
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
+ calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ if (calld->connected_subchannel == NULL) {
+ gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
+ fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
+ "Failed to create subchannel", &error, 1));
+ } else if (GET_CALL(calld) == CANCELLED_CALL) {
+ /* already cancelled before subchannel became ready */
+ fail_locked(exec_ctx, calld,
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Cancelled before creating subchannel", &error, 1));
+ } else {
+ /* Create call on subchannel. */
+ grpc_subchannel_call *subchannel_call = NULL;
+ grpc_error *new_error = grpc_connected_subchannel_create_call(
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
+ calld->deadline, &subchannel_call);
+ if (new_error != GRPC_ERROR_NONE) {
+ new_error = grpc_error_add_child(new_error, error);
+ subchannel_call = CANCELLED_CALL;
+ fail_locked(exec_ctx, calld, new_error);
+ }
+ gpr_atm_rel_store(&calld->subchannel_call,
+ (gpr_atm)(uintptr_t)subchannel_call);
+ retry_waiting_locked(exec_ctx, calld);
+ }
+ gpr_mu_unlock(&calld->mu);
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
+}
+
+static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ grpc_subchannel_call *subchannel_call = GET_CALL(calld);
+ if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
+ return NULL;
+ } else {
+ return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
+ }
+}
+
+typedef struct {
+ grpc_metadata_batch *initial_metadata;
+ uint32_t initial_metadata_flags;
+ grpc_connected_subchannel **connected_subchannel;
+ grpc_closure *on_ready;
+ grpc_call_element *elem;
+ grpc_closure closure;
+} continue_picking_args;
+
+/** Return true if subchannel is available immediately (in which case on_ready
+ should not be called), or false otherwise (in which case on_ready should be
+ called when the subchannel is available). */
+static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
+ grpc_connected_subchannel **connected_subchannel,
+ grpc_closure *on_ready, grpc_error *error);
+
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ continue_picking_args *cpa = arg;
+ if (cpa->connected_subchannel == NULL) {
+ /* cancelled, do nothing */
+ } else if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
+ } else {
+ call_data *calld = cpa->elem->call_data;
+ gpr_mu_lock(&calld->mu);
+ if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->initial_metadata_flags, cpa->connected_subchannel,
+ cpa->on_ready, GRPC_ERROR_NONE)) {
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
+ }
+ gpr_mu_unlock(&calld->mu);
+ }
+ gpr_free(cpa);
+}
+
+static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
+ grpc_connected_subchannel **connected_subchannel,
+ grpc_closure *on_ready, grpc_error *error) {
+ GPR_TIMER_BEGIN("pick_subchannel", 0);
+
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ continue_picking_args *cpa;
+ grpc_closure *closure;
+
+ GPR_ASSERT(connected_subchannel);
+
+ gpr_mu_lock(&chand->mu);
+ if (initial_metadata == NULL) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
+ connected_subchannel, GRPC_ERROR_REF(error));
+ }
+ for (closure = chand->waiting_for_config_closures.head; closure != NULL;
+ closure = closure->next_data.next) {
+ cpa = closure->cb_arg;
+ if (cpa->connected_subchannel == connected_subchannel) {
+ cpa->connected_subchannel = NULL;
+ grpc_exec_ctx_sched(
+ exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
+ }
+ }
+ gpr_mu_unlock(&chand->mu);
+ GPR_TIMER_END("pick_subchannel", 0);
+ GRPC_ERROR_UNREF(error);
+ return true;
+ }
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy *lb_policy = chand->lb_policy;
+ GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
+ gpr_mu_unlock(&chand->mu);
+ // If the application explicitly set wait_for_ready, use that.
+ // Otherwise, if the service config specified a value for this
+ // method, use that.
+ const bool wait_for_ready_set_from_api =
+ initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
+ const bool wait_for_ready_set_from_service_config =
+ calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET;
+ if (!wait_for_ready_set_from_api &&
+ wait_for_ready_set_from_service_config) {
+ if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) {
+ initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ } else {
+ initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ }
+ }
+ // TODO(dgq): make this deadline configurable somehow.
+ const grpc_lb_policy_pick_args inputs = {
+ initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC)};
+ const bool result = grpc_lb_policy_pick(
+ exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
+ GPR_TIMER_END("pick_subchannel", 0);
+ return result;
+ }
+ if (chand->resolver != NULL && !chand->started_resolving) {
+ chand->started_resolving = true;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+ }
+ if (chand->resolver != NULL) {
+ cpa = gpr_malloc(sizeof(*cpa));
+ cpa->initial_metadata = initial_metadata;
+ cpa->initial_metadata_flags = initial_metadata_flags;
+ cpa->connected_subchannel = connected_subchannel;
+ cpa->on_ready = on_ready;
+ cpa->elem = elem;
+ grpc_closure_init(&cpa->closure, continue_picking, cpa);
+ grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
+ GRPC_ERROR_NONE);
+ } else {
+ grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"),
+ NULL);
+ }
+ gpr_mu_unlock(&chand->mu);
+
+ GPR_TIMER_END("pick_subchannel", 0);
+ return false;
+}
+
+// The logic here is fairly complicated, due to (a) the fact that we
+// need to handle the case where we receive the send op before the
+// initial metadata op, and (b) the need for efficiency, especially in
+// the streaming case.
+// TODO(ctiller): Explain this more thoroughly.
+static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
+ /* try to (atomically) get the call */
+ grpc_subchannel_call *call = GET_CALL(calld);
+ GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
+ if (call == CANCELLED_CALL) {
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ return;
+ }
+ if (call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, call, op);
+ GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ return;
+ }
+ /* we failed; lock and figure out what to do */
+ gpr_mu_lock(&calld->mu);
+retry:
+ /* need to recheck that another thread hasn't set the call */
+ call = GET_CALL(calld);
+ if (call == CANCELLED_CALL) {
+ gpr_mu_unlock(&calld->mu);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ return;
+ }
+ if (call != NULL) {
+ gpr_mu_unlock(&calld->mu);
+ grpc_subchannel_call_process_op(exec_ctx, call, op);
+ GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ return;
+ }
+ /* if this is a cancellation, then we can raise our cancelled flag */
+ if (op->cancel_error != GRPC_ERROR_NONE) {
+ if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
+ (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
+ goto retry;
+ } else {
+ // Stash a copy of cancel_error in our call data, so that we can use
+ // it for subsequent operations. This ensures that if the call is
+ // cancelled before any ops are passed down (e.g., if the deadline
+ // is in the past when the call starts), we can return the right
+ // error to the caller when the first op does get passed down.
+ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
+ switch (calld->creation_phase) {
+ case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
+ break;
+ case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
+ pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
+ NULL, GRPC_ERROR_REF(op->cancel_error));
+ break;
+ }
+ gpr_mu_unlock(&calld->mu);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
+ GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ return;
+ }
+ }
+ /* if we don't have a subchannel, try to get one */
+ if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
+ calld->connected_subchannel == NULL &&
+ op->send_initial_metadata != NULL) {
+ calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
+ grpc_closure_init(&calld->next_step, subchannel_ready, elem);
+ GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
+ /* If a subchannel is not available immediately, the polling entity from
+ call_data should be provided to channel_data's interested_parties, so
+ that IO of the lb_policy and resolver could be done under it. */
+ if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
+ op->send_initial_metadata_flags,
+ &calld->connected_subchannel, &calld->next_step,
+ GRPC_ERROR_NONE)) {
+ calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
+ } else {
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
+ }
+ }
+ /* if we've got a subchannel, then let's ask it to create a call */
+ if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
+ calld->connected_subchannel != NULL) {
+ grpc_subchannel_call *subchannel_call = NULL;
+ grpc_error *error = grpc_connected_subchannel_create_call(
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
+ calld->deadline, &subchannel_call);
+ if (error != GRPC_ERROR_NONE) {
+ subchannel_call = CANCELLED_CALL;
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ }
+ gpr_atm_rel_store(&calld->subchannel_call,
+ (gpr_atm)(uintptr_t)subchannel_call);
+ retry_waiting_locked(exec_ctx, calld);
+ goto retry;
+ }
+ /* nothing to be done but wait */
+ add_waiting_locked(calld, op);
+ gpr_mu_unlock(&calld->mu);
+ GPR_TIMER_END("cc_start_transport_stream_op", 0);
+}
+
+// Gets data from the service config. Invoked when the resolver returns
+// its initial result.
+static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = arg;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ // If this is an error, there's no point in looking at the service config.
+ if (error == GRPC_ERROR_NONE) {
+ // Get the method config table from channel data.
+ gpr_mu_lock(&chand->mu);
+ grpc_mdstr_hash_table *method_params_table = NULL;
+ if (chand->method_params_table != NULL) {
+ method_params_table =
+ grpc_mdstr_hash_table_ref(chand->method_params_table);
+ }
+ gpr_mu_unlock(&chand->mu);
+ // If the method config table was present, use it.
+ if (method_params_table != NULL) {
+ const method_parameters *method_params =
+ grpc_method_config_table_get(method_params_table, calld->path);
+ if (method_params != NULL) {
+ const bool have_method_timeout =
+ gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0;
+ if (have_method_timeout ||
+ method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
+ gpr_mu_lock(&calld->mu);
+ if (have_method_timeout) {
+ const gpr_timespec per_method_deadline =
+ gpr_time_add(calld->call_start_time, method_params->timeout);
+ if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ calld->deadline = per_method_deadline;
+ // Reset deadline timer.
+ grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
+ }
+ }
+ if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
+ calld->wait_for_ready_from_service_config =
+ method_params->wait_for_ready;
+ }
+ gpr_mu_unlock(&calld->mu);
+ }
+ }
+ grpc_mdstr_hash_table_unref(method_params_table);
+ }
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
+}
+
+/* Constructor for call_data */
+static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ // Initialize data members.
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
+ calld->path = GRPC_MDSTR_REF(args->path);
+ calld->call_start_time = args->start_time;
+ calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+ calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET;
+ calld->cancel_error = GRPC_ERROR_NONE;
+ gpr_atm_rel_store(&calld->subchannel_call, 0);
+ gpr_mu_init(&calld->mu);
+ calld->connected_subchannel = NULL;
+ calld->waiting_ops = NULL;
+ calld->waiting_ops_count = 0;
+ calld->waiting_ops_capacity = 0;
+ calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ calld->owning_call = args->call_stack;
+ calld->pollent = NULL;
+ // If the resolver has already returned results, then we can access
+ // the service config parameters immediately. Otherwise, we need to
+ // defer that work until the resolver returns an initial result.
+ // TODO(roth): This code is almost but not quite identical to the code
+ // in read_service_config() above. It would be nice to find a way to
+ // combine them, to avoid having to maintain it twice.
+ gpr_mu_lock(&chand->mu);
+ if (chand->lb_policy != NULL) {
+ // We already have a resolver result, so check for service config.
+ if (chand->method_params_table != NULL) {
+ grpc_mdstr_hash_table *method_params_table =
+ grpc_mdstr_hash_table_ref(chand->method_params_table);
+ gpr_mu_unlock(&chand->mu);
+ method_parameters *method_params =
+ grpc_method_config_table_get(method_params_table, args->path);
+ if (method_params != NULL) {
+ if (gpr_time_cmp(method_params->timeout,
+ gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) {
+ gpr_timespec per_method_deadline =
+ gpr_time_add(calld->call_start_time, method_params->timeout);
+ calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
+ }
+ if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
+ calld->wait_for_ready_from_service_config =
+ method_params->wait_for_ready;
+ }
+ }
+ grpc_mdstr_hash_table_unref(method_params_table);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ // We don't yet have a resolver result, so register a callback to
+ // get the service config data once the resolver returns.
+ // Take a reference to the call stack to be owned by the callback.
+ GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config");
+ grpc_closure_init(&calld->read_service_config, read_service_config, elem);
+ grpc_closure_list_append(&chand->waiting_for_config_closures,
+ &calld->read_service_config, GRPC_ERROR_NONE);
+ gpr_mu_unlock(&chand->mu);
+ }
+ // Start the deadline timer with the current deadline value. If we
+ // do not yet have service config data, then the timer may be reset
+ // later.
+ grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for call_data */
+static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_call_final_info *final_info,
+ void *and_free_memory) {
+ call_data *calld = elem->call_data;
+ grpc_deadline_state_destroy(exec_ctx, elem);
+ GRPC_MDSTR_UNREF(calld->path);
+ GRPC_ERROR_UNREF(calld->cancel_error);
+ grpc_subchannel_call *call = GET_CALL(calld);
+ if (call != NULL && call != CANCELLED_CALL) {
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
+ }
+ GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
+ gpr_mu_destroy(&calld->mu);
+ GPR_ASSERT(calld->waiting_ops_count == 0);
+ if (calld->connected_subchannel != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
+ "picked");
+ }
+ gpr_free(calld->waiting_ops);
+ gpr_free(and_free_memory);
+}
+
+static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_polling_entity *pollent) {
+ call_data *calld = elem->call_data;
+ calld->pollent = pollent;
+}
+
+/*************************************************************************
+ * EXPORTED SYMBOLS
+ */
+
+const grpc_channel_filter grpc_client_channel_filter = {
+ cc_start_transport_stream_op,
+ cc_start_transport_op,
+ sizeof(call_data),
+ cc_init_call_elem,
+ cc_set_pollset_or_pollset_set,
+ cc_destroy_call_elem,
+ sizeof(channel_data),
+ cc_init_channel_elem,
+ cc_destroy_channel_elem,
+ cc_get_peer,
+ "client-channel",
+};
+
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory) {
+ /* post construction initialization: set the transport setup pointer */
+ GPR_ASSERT(client_channel_factory != NULL);
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->mu);
+ GPR_ASSERT(!chand->resolver);
+ chand->resolver = resolver;
+ GRPC_RESOLVER_REF(resolver, "channel");
+ if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
+ chand->exit_idle_when_lb_policy_arrives) {
+ chand->started_resolving = true;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+ }
+ chand->client_channel_factory = client_channel_factory;
+ grpc_client_channel_factory_ref(client_channel_factory);
+ gpr_mu_unlock(&chand->mu);
+}
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
+ channel_data *chand = elem->channel_data;
+ grpc_connectivity_state out;
+ gpr_mu_lock(&chand->mu);
+ out = grpc_connectivity_state_check(&chand->state_tracker, NULL);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
+ } else {
+ chand->exit_idle_when_lb_policy_arrives = true;
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ chand->started_resolving = true;
+ grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+ }
+ }
+ }
+ gpr_mu_unlock(&chand->mu);
+ return out;
+}
+
+typedef struct {
+ channel_data *chand;
+ grpc_pollset *pollset;
+ grpc_closure *on_complete;
+ grpc_closure my_closure;
+} external_connectivity_watcher;
+
+static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ external_connectivity_watcher *w = arg;
+ grpc_closure *follow_up = w->on_complete;
+ grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
+ w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
+}
+
+void grpc_client_channel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
+ grpc_connectivity_state *state, grpc_closure *on_complete) {
+ channel_data *chand = elem->channel_data;
+ external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ w->chand = chand;
+ w->pollset = pollset;
+ w->on_complete = on_complete;
+ grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
+ GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_mu_lock(&chand->mu);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, state, &w->my_closure);
+ gpr_mu_unlock(&chand->mu);
+}
diff --git a/src/core/ext/client_channel/client_channel.h b/src/core/ext/client_channel/client_channel.h
new file mode 100644
index 0000000000..ab5a84fdfb
--- /dev/null
+++ b/src/core/ext/client_channel/client_channel.h
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H
+
+#include "src/core/ext/client_channel/client_channel_factory.h"
+#include "src/core/ext/client_channel/resolver.h"
+#include "src/core/lib/channel/channel_stack.h"
+
+/* A client channel is a channel that begins disconnected, and can connect
+ to some endpoint on demand. If that endpoint disconnects, it will be
+ connected to again later.
+
+ Calls on a disconnected client channel are queued until a connection is
+ established. */
+
+extern const grpc_channel_filter grpc_client_channel_filter;
+
+/* Post-construction initializer to give the client channel its resolver
+ and factory. */
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory);
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
+
+void grpc_client_channel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
+ grpc_connectivity_state *state, grpc_closure *on_complete);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/ext/client_channel/client_channel_factory.c b/src/core/ext/client_channel/client_channel_factory.c
new file mode 100644
index 0000000000..4900832d57
--- /dev/null
+++ b/src/core/ext/client_channel/client_channel_factory.c
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/client_channel_factory.h"
+
+void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory) {
+ factory->vtable->ref(factory);
+}
+
+void grpc_client_channel_factory_unref(grpc_exec_ctx* exec_ctx,
+ grpc_client_channel_factory* factory) {
+ factory->vtable->unref(exec_ctx, factory);
+}
+
+grpc_subchannel* grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
+ const grpc_subchannel_args* args) {
+ return factory->vtable->create_subchannel(exec_ctx, factory, args);
+}
+
+grpc_channel* grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
+ const char* target, grpc_client_channel_type type,
+ const grpc_channel_args* args) {
+ return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
+ args);
+}
diff --git a/src/core/ext/client_channel/client_channel_factory.h b/src/core/ext/client_channel/client_channel_factory.h
new file mode 100644
index 0000000000..2b8fc577b3
--- /dev/null
+++ b/src/core/ext/client_channel/client_channel_factory.h
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/lib/channel/channel_stack.h"
+
+typedef struct grpc_client_channel_factory grpc_client_channel_factory;
+typedef struct grpc_client_channel_factory_vtable
+ grpc_client_channel_factory_vtable;
+
+typedef enum {
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, /** for the user-level regular calls */
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, /** for communication with a load
+ balancing service */
+} grpc_client_channel_type;
+
+/** Constructor for new configured channels.
+ Creating decorators around this type is encouraged to adapt behavior. */
+struct grpc_client_channel_factory {
+ const grpc_client_channel_factory_vtable *vtable;
+};
+
+struct grpc_client_channel_factory_vtable {
+ void (*ref)(grpc_client_channel_factory *factory);
+ void (*unref)(grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory);
+ grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory,
+ const grpc_subchannel_args *args);
+ grpc_channel *(*create_client_channel)(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory,
+ const char *target,
+ grpc_client_channel_type type,
+ const grpc_channel_args *args);
+};
+
+void grpc_client_channel_factory_ref(grpc_client_channel_factory *factory);
+void grpc_client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory);
+
+/** Create a new grpc_subchannel */
+grpc_subchannel *grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
+ const grpc_subchannel_args *args);
+
+/** Create a new grpc_channel */
+grpc_channel *grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
+ const char *target, grpc_client_channel_type type,
+ const grpc_channel_args *args);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c
new file mode 100644
index 0000000000..a3e5079843
--- /dev/null
+++ b/src/core/ext/client_channel/client_channel_plugin.c
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <limits.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/client_channel/subchannel_index.h"
+#include "src/core/lib/surface/channel_init.h"
+
+static bool append_filter(grpc_channel_stack_builder *builder, void *arg) {
+ return grpc_channel_stack_builder_append_filter(
+ builder, (const grpc_channel_filter *)arg, NULL, NULL);
+}
+
+static bool set_default_host_if_unset(grpc_channel_stack_builder *builder,
+ void *unused) {
+ const grpc_channel_args *args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ for (size_t i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY) ||
+ 0 == strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
+ return true;
+ }
+ }
+ char *default_authority = grpc_get_default_authority(
+ grpc_channel_stack_builder_get_target(builder));
+ if (default_authority != NULL) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_STRING;
+ arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
+ arg.value.string = default_authority;
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
+ gpr_free(default_authority);
+ grpc_channel_args_destroy(new_args);
+ }
+ return true;
+}
+
+void grpc_client_channel_init(void) {
+ grpc_lb_policy_registry_init();
+ grpc_resolver_registry_init();
+ grpc_subchannel_index_init();
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
+ set_default_host_if_unset, NULL);
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
+ (void *)&grpc_client_channel_filter);
+}
+
+void grpc_client_channel_shutdown(void) {
+ grpc_subchannel_index_shutdown();
+ grpc_channel_init_shutdown();
+ grpc_resolver_registry_shutdown();
+ grpc_lb_policy_registry_shutdown();
+}
diff --git a/src/core/ext/client_channel/connector.c b/src/core/ext/client_channel/connector.c
new file mode 100644
index 0000000000..0582e5b372
--- /dev/null
+++ b/src/core/ext/client_channel/connector.c
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/connector.h"
+
+grpc_connector* grpc_connector_ref(grpc_connector* connector) {
+ connector->vtable->ref(connector);
+ return connector;
+}
+
+void grpc_connector_unref(grpc_exec_ctx* exec_ctx, grpc_connector* connector) {
+ connector->vtable->unref(exec_ctx, connector);
+}
+
+void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
+ const grpc_connect_in_args* in_args,
+ grpc_connect_out_args* out_args,
+ grpc_closure* notify) {
+ connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify);
+}
+
+void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_connector* connector) {
+ connector->vtable->shutdown(exec_ctx, connector);
+}
diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h
new file mode 100644
index 0000000000..ed7d5450de
--- /dev/null
+++ b/src/core/ext/client_channel/connector.h
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H
+
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/transport/transport.h"
+
+typedef struct grpc_connector grpc_connector;
+typedef struct grpc_connector_vtable grpc_connector_vtable;
+
+struct grpc_connector {
+ const grpc_connector_vtable *vtable;
+};
+
+typedef struct {
+ /** set of pollsets interested in this connection */
+ grpc_pollset_set *interested_parties;
+ /** address to connect to */
+ const grpc_resolved_address *addr;
+ size_t addr_len;
+ /** initial connect string to send */
+ gpr_slice initial_connect_string;
+ /** deadline for connection */
+ gpr_timespec deadline;
+ /** channel arguments (to be passed to transport) */
+ const grpc_channel_args *channel_args;
+} grpc_connect_in_args;
+
+typedef struct {
+ /** the connected transport */
+ grpc_transport *transport;
+
+ /** channel arguments (to be passed to the filters) */
+ grpc_channel_args *channel_args;
+} grpc_connect_out_args;
+
+struct grpc_connector_vtable {
+ void (*ref)(grpc_connector *connector);
+ void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
+ /** Implementation of grpc_connector_shutdown */
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
+ /** Implementation of grpc_connector_connect */
+ void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args, grpc_closure *notify);
+};
+
+grpc_connector *grpc_connector_ref(grpc_connector *connector);
+void grpc_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
+/** Connect using the connector: max one outstanding call at a time */
+void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
+ grpc_closure *notify);
+/** Cancel any pending connection */
+void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_connector *connector);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */
diff --git a/src/core/ext/client_channel/default_initial_connect_string.c b/src/core/ext/client_channel/default_initial_connect_string.c
new file mode 100644
index 0000000000..0b251372fd
--- /dev/null
+++ b/src/core/ext/client_channel/default_initial_connect_string.c
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/slice.h>
+#include "src/core/lib/iomgr/resolve_address.h"
+
+void grpc_set_default_initial_connect_string(grpc_resolved_address **addr,
+ gpr_slice *initial_str) {}
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c
new file mode 100644
index 0000000000..ea2cbbdd97
--- /dev/null
+++ b/src/core/ext/client_channel/http_connect_handshaker.c
@@ -0,0 +1,275 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/http_connect_handshaker.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/lib/http/format_request.h"
+#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/env.h"
+
+typedef struct http_connect_handshaker {
+ // Base class. Must be first.
+ grpc_handshaker base;
+
+ char* proxy_server;
+ char* server_name;
+
+ // State saved while performing the handshake.
+ grpc_endpoint* endpoint;
+ grpc_channel_args* args;
+ grpc_handshaker_done_cb cb;
+ void* user_data;
+
+ // Objects for processing the HTTP CONNECT request and response.
+ gpr_slice_buffer write_buffer;
+ gpr_slice_buffer* read_buffer; // Ownership passes through this object.
+ grpc_closure request_done_closure;
+ grpc_closure response_read_closure;
+ grpc_http_parser http_parser;
+ grpc_http_response http_response;
+ grpc_timer timeout_timer;
+
+ gpr_refcount refcount;
+} http_connect_handshaker;
+
+// Unref and clean up handshaker.
+static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
+ if (gpr_unref(&handshaker->refcount)) {
+ gpr_free(handshaker->proxy_server);
+ gpr_free(handshaker->server_name);
+ gpr_slice_buffer_destroy(&handshaker->write_buffer);
+ grpc_http_parser_destroy(&handshaker->http_parser);
+ grpc_http_response_destroy(&handshaker->http_response);
+ gpr_free(handshaker);
+ }
+}
+
+// Callback invoked when deadline is exceeded.
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint);
+ }
+ http_connect_handshaker_unref(handshaker);
+}
+
+// Callback invoked when finished writing HTTP CONNECT request.
+static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error != GRPC_ERROR_NONE) {
+ // If the write failed, invoke the callback immediately with the error.
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data,
+ GRPC_ERROR_REF(error));
+ } else {
+ // Otherwise, read the response.
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
+ }
+}
+
+// Callback invoked for reading HTTP CONNECT response.
+static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ http_connect_handshaker* handshaker = arg;
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
+ goto done;
+ }
+ // Add buffer to parser.
+ for (size_t i = 0; i < handshaker->read_buffer->count; ++i) {
+ if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) {
+ size_t body_start_offset = 0;
+ error = grpc_http_parser_parse(&handshaker->http_parser,
+ handshaker->read_buffer->slices[i],
+ &body_start_offset);
+ if (error != GRPC_ERROR_NONE) goto done;
+ if (handshaker->http_parser.state == GRPC_HTTP_BODY) {
+ // We've gotten back a successul response, so stop the timeout timer.
+ grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer);
+ // Remove the data we've already read from the read buffer,
+ // leaving only the leftover bytes (if any).
+ gpr_slice_buffer tmp_buffer;
+ gpr_slice_buffer_init(&tmp_buffer);
+ if (body_start_offset <
+ GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) {
+ gpr_slice_buffer_add(
+ &tmp_buffer,
+ gpr_slice_split_tail(&handshaker->read_buffer->slices[i],
+ body_start_offset));
+ }
+ gpr_slice_buffer_addn(&tmp_buffer,
+ &handshaker->read_buffer->slices[i + 1],
+ handshaker->read_buffer->count - i - 1);
+ gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer);
+ gpr_slice_buffer_destroy(&tmp_buffer);
+ break;
+ }
+ }
+ }
+ // If we're not done reading the response, read more data.
+ // TODO(roth): In practice, I suspect that the response to a CONNECT
+ // request will never include a body, in which case this check is
+ // sufficient. However, the language of RFC-2817 doesn't explicitly
+ // forbid the response from including a body. If there is a body,
+ // it's possible that we might have parsed part but not all of the
+ // body, in which case this check will cause us to fail to parse the
+ // remainder of the body. If that ever becomes an issue, we may
+ // need to fix the HTTP parser to understand when the body is
+ // complete (e.g., handling chunked transfer encoding or looking
+ // at the Content-Length: header).
+ if (handshaker->http_parser.state != GRPC_HTTP_BODY) {
+ gpr_slice_buffer_reset_and_unref(handshaker->read_buffer);
+ grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ &handshaker->response_read_closure);
+ return;
+ }
+ // Make sure we got a 2xx response.
+ if (handshaker->http_response.status < 200 ||
+ handshaker->http_response.status >= 300) {
+ char* msg;
+ gpr_asprintf(&msg, "HTTP proxy returned response code %d",
+ handshaker->http_response.status);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ }
+done:
+ // Invoke handshake-done callback.
+ handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
+ handshaker->read_buffer, handshaker->user_data, error);
+}
+
+//
+// Public handshaker methods
+//
+
+static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker_in) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ http_connect_handshaker_unref(handshaker);
+}
+
+static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {}
+
+static void http_connect_handshaker_do_handshake(
+ grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in,
+ grpc_endpoint* endpoint, grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer, gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb,
+ void* user_data) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ // Save state in the handshaker object.
+ handshaker->endpoint = endpoint;
+ handshaker->args = args;
+ handshaker->cb = cb;
+ handshaker->user_data = user_data;
+ handshaker->read_buffer = read_buffer;
+ // Send HTTP CONNECT request.
+ gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
+ handshaker->server_name, handshaker->proxy_server);
+ grpc_httpcli_request request;
+ memset(&request, 0, sizeof(request));
+ request.host = handshaker->proxy_server;
+ request.http.method = "CONNECT";
+ request.http.path = handshaker->server_name;
+ request.handshaker = &grpc_httpcli_plaintext;
+ gpr_slice request_slice = grpc_httpcli_format_connect_request(&request);
+ gpr_slice_buffer_add(&handshaker->write_buffer, request_slice);
+ grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer,
+ &handshaker->request_done_closure);
+ // Set timeout timer. The timer gets a reference to the handshaker.
+ gpr_ref(&handshaker->refcount);
+ grpc_timer_init(exec_ctx, &handshaker->timeout_timer,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC));
+}
+
+static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = {
+ http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
+ http_connect_handshaker_do_handshake};
+
+grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
+ const char* server_name) {
+ GPR_ASSERT(proxy_server != NULL);
+ GPR_ASSERT(server_name != NULL);
+ http_connect_handshaker* handshaker =
+ gpr_malloc(sizeof(http_connect_handshaker));
+ memset(handshaker, 0, sizeof(*handshaker));
+ grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
+ handshaker->proxy_server = gpr_strdup(proxy_server);
+ handshaker->server_name = gpr_strdup(server_name);
+ gpr_slice_buffer_init(&handshaker->write_buffer);
+ grpc_closure_init(&handshaker->request_done_closure, on_write_done,
+ handshaker);
+ grpc_closure_init(&handshaker->response_read_closure, on_read_done,
+ handshaker);
+ grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
+ &handshaker->http_response);
+ gpr_ref_init(&handshaker->refcount, 1);
+ return &handshaker->base;
+}
+
+char* grpc_get_http_proxy_server() {
+ char* uri_str = gpr_getenv("http_proxy");
+ if (uri_str == NULL) return NULL;
+ grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */);
+ char* proxy_name = NULL;
+ if (uri == NULL || uri->authority == NULL) {
+ gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
+ goto done;
+ }
+ if (strcmp(uri->scheme, "http") != 0) {
+ gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme);
+ goto done;
+ }
+ if (strchr(uri->authority, '@') != NULL) {
+ gpr_log(GPR_ERROR, "userinfo not supported in proxy URI");
+ goto done;
+ }
+ proxy_name = gpr_strdup(uri->authority);
+done:
+ gpr_free(uri_str);
+ grpc_uri_destroy(uri);
+ return proxy_name;
+}
diff --git a/src/core/ext/client_channel/http_connect_handshaker.h b/src/core/ext/client_channel/http_connect_handshaker.h
new file mode 100644
index 0000000000..c689df2b2b
--- /dev/null
+++ b/src/core/ext/client_channel/http_connect_handshaker.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H
+
+#include "src/core/lib/channel/handshaker.h"
+
+/// Does NOT take ownership of \a proxy_server or \a server_name.
+grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
+ const char* server_name);
+
+/// Returns the name of the proxy to use, or NULL if no proxy is configured.
+/// Caller takes ownership of result.
+char* grpc_get_http_proxy_server();
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H */
diff --git a/src/core/ext/client_channel/initial_connect_string.c b/src/core/ext/client_channel/initial_connect_string.c
new file mode 100644
index 0000000000..fb1493d77d
--- /dev/null
+++ b/src/core/ext/client_channel/initial_connect_string.c
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/initial_connect_string.h"
+
+#include <stddef.h>
+
+extern void grpc_set_default_initial_connect_string(
+ grpc_resolved_address **addr, gpr_slice *initial_str);
+
+static grpc_set_initial_connect_string_func g_set_initial_connect_string_func =
+ grpc_set_default_initial_connect_string;
+
+void grpc_test_set_initial_connect_string_function(
+ grpc_set_initial_connect_string_func func) {
+ g_set_initial_connect_string_func = func;
+}
+
+void grpc_set_initial_connect_string(grpc_resolved_address **addr,
+ gpr_slice *initial_str) {
+ g_set_initial_connect_string_func(addr, initial_str);
+}
diff --git a/src/core/ext/client_channel/initial_connect_string.h b/src/core/ext/client_channel/initial_connect_string.h
new file mode 100644
index 0000000000..68adb0373c
--- /dev/null
+++ b/src/core/ext/client_channel/initial_connect_string.h
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H
+
+#include <grpc/support/slice.h>
+
+#include "src/core/lib/iomgr/resolve_address.h"
+
+typedef void (*grpc_set_initial_connect_string_func)(
+ grpc_resolved_address **addr, gpr_slice *initial_str);
+void grpc_test_set_initial_connect_string_function(
+ grpc_set_initial_connect_string_func func);
+
+/** Set a string to be sent once connected. Optionally reset addr. */
+void grpc_set_initial_connect_string(grpc_resolved_address **addr,
+ gpr_slice *connect_string);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H */
diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c
new file mode 100644
index 0000000000..45ee72e2f0
--- /dev/null
+++ b/src/core/ext/client_channel/lb_policy.c
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/lb_policy.h"
+
+#define WEAK_REF_BITS 16
+
+void grpc_lb_policy_init(grpc_lb_policy *policy,
+ const grpc_lb_policy_vtable *vtable) {
+ policy->vtable = vtable;
+ gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
+ policy->interested_parties = grpc_pollset_set_create();
+}
+
+#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason
+#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char *purpose
+#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason
+#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose
+#else
+#define REF_FUNC_EXTRA_ARGS
+#define REF_MUTATE_EXTRA_ARGS
+#define REF_FUNC_PASS_ARGS(new_reason)
+#define REF_MUTATE_PASS_ARGS(x)
+#endif
+
+static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta,
+ int barrier REF_MUTATE_EXTRA_ARGS) {
+ gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
+ : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
+#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "LB_POLICY: 0x%" PRIxPTR " %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR
+ " [%s]",
+ (intptr_t)c, purpose, old_val, old_val + delta, reason);
+#endif
+ return old_val;
+}
+
+void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
+}
+
+void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ gpr_atm old_val =
+ ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS),
+ 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
+ gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
+ gpr_atm check = 1 << WEAK_REF_BITS;
+ if ((old_val & mask) == check) {
+ policy->vtable->shutdown(exec_ctx, policy);
+ }
+ grpc_lb_policy_weak_unref(exec_ctx,
+ policy REF_FUNC_PASS_ARGS("strong-unref"));
+}
+
+void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF"));
+}
+
+void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ gpr_atm old_val =
+ ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
+ if (old_val == 1) {
+ grpc_pollset_set_destroy(policy->interested_parties);
+ policy->vtable->destroy(exec_ctx, policy);
+ }
+}
+
+int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete) {
+ return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
+ on_complete);
+}
+
+void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
+ policy->vtable->cancel_pick(exec_ctx, policy, target, error);
+}
+
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
+ policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
+ initial_metadata_flags_eq, error);
+}
+
+void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
+ policy->vtable->exit_idle(exec_ctx, policy);
+}
+
+void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_closure *closure) {
+ policy->vtable->ping_one(exec_ctx, policy, closure);
+}
+
+void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_closure *closure) {
+ policy->vtable->notify_on_state_change(exec_ctx, policy, state, closure);
+}
+
+grpc_connectivity_state grpc_lb_policy_check_connectivity(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error) {
+ return policy->vtable->check_connectivity(exec_ctx, policy,
+ connectivity_error);
+}
diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h
new file mode 100644
index 0000000000..120c641edc
--- /dev/null
+++ b/src/core/ext/client_channel/lb_policy.h
@@ -0,0 +1,202 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H
+
+#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+/** A load balancing policy: specified by a vtable and a struct (which
+ is expected to be extended to contain some parameters) */
+typedef struct grpc_lb_policy grpc_lb_policy;
+typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable;
+
+typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
+ grpc_status_code status, const char *errmsg);
+
+struct grpc_lb_policy {
+ const grpc_lb_policy_vtable *vtable;
+ gpr_atm ref_pair;
+ /* owned pointer to interested parties in load balancing decisions */
+ grpc_pollset_set *interested_parties;
+};
+
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+ /** Initial metadata associated with the picking call. */
+ grpc_metadata_batch *initial_metadata;
+ /** Bitmask used for selective cancelling. See \a
+ * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
+ * grpc_types.h */
+ uint32_t initial_metadata_flags;
+ /** Storage for LB token in \a initial_metadata, or NULL if not used */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+ /** Deadline for the call to the LB server */
+ gpr_timespec deadline;
+} grpc_lb_policy_pick_args;
+
+struct grpc_lb_policy_vtable {
+ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+
+ /** \see grpc_lb_policy_pick */
+ int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete);
+
+ /** \see grpc_lb_policy_cancel_pick */
+ void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_connected_subchannel **target, grpc_error *error);
+
+ /** \see grpc_lb_policy_cancel_picks */
+ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq, grpc_error *error);
+
+ /** \see grpc_lb_policy_ping_one */
+ void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_closure *closure);
+
+ /** Try to enter a READY connectivity state */
+ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+
+ /** check the current connectivity of the lb_policy */
+ grpc_connectivity_state (*check_connectivity)(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error);
+
+ /** call notify when the connectivity state of a channel changes from *state.
+ Updates *state with the new state of the policy. Calling with a NULL \a
+ state cancels the subscription. */
+ void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_closure *closure);
+};
+
+/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
+#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+
+/* Strong references: the policy will shutdown when they reach zero */
+#define GRPC_LB_POLICY_REF(p, r) \
+ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \
+ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
+
+/* Weak references: they don't prevent the shutdown of the LB policy. When no
+ * strong references are left but there are still weak ones, shutdown is called.
+ * Once the weak reference also reaches zero, the LB policy is destroyed. */
+#define GRPC_LB_POLICY_WEAK_REF(p, r) \
+ grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \
+ grpc_lb_policy_weak_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
+void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
+ const char *reason);
+void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const char *file, int line, const char *reason);
+void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line,
+ const char *reason);
+void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const char *file, int line, const char *reason);
+#else
+#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
+#define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p))
+#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p))
+#define GRPC_LB_POLICY_WEAK_UNREF(cl, p, r) grpc_lb_policy_weak_unref((cl), (p))
+void grpc_lb_policy_ref(grpc_lb_policy *policy);
+void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+void grpc_lb_policy_weak_ref(grpc_lb_policy *policy);
+void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+#endif
+
+/** called by concrete implementations to initialize the base struct */
+void grpc_lb_policy_init(grpc_lb_policy *policy,
+ const grpc_lb_policy_vtable *vtable);
+
+/** Finds an appropriate subchannel for a call, based on \a pick_args.
+
+ \a target will be set to the selected subchannel, or NULL on failure.
+ Upon success, \a user_data will be set to whatever opaque information
+ may need to be propagated from the LB policy, or NULL if not needed.
+
+ If the pick succeeds and a result is known immediately, a non-zero
+ value will be returned. Otherwise, \a on_complete will be invoked
+ once the pick is complete with its error argument set to indicate
+ success or failure.
+
+ Any IO should be done under the \a interested_parties \a grpc_pollset_set
+ in the \a grpc_lb_policy struct. */
+int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete);
+
+/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+ against one of the connected subchannels managed by \a policy. */
+void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_closure *closure);
+
+/** Cancel picks for \a target.
+ The \a on_complete callback of the pending picks will be invoked with \a
+ *target set to NULL. */
+void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_connected_subchannel **target,
+ grpc_error *error);
+
+/** Cancel all pending picks for which their \a initial_metadata_flags (as given
+ in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
+ when AND'd with \a initial_metadata_flags_mask */
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error);
+
+/** Try to enter a READY connectivity state */
+void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+
+/* Call notify when the connectivity state of a channel changes from \a *state.
+ * Updates \a *state with the new state of the policy */
+void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_closure *closure);
+
+grpc_connectivity_state grpc_lb_policy_check_connectivity(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/client_channel/lb_policy_factory.c b/src/core/ext/client_channel/lb_policy_factory.c
new file mode 100644
index 0000000000..8a474c8818
--- /dev/null
+++ b/src/core/ext/client_channel/lb_policy_factory.c
@@ -0,0 +1,161 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_channel/lb_policy_factory.h"
+
+grpc_lb_addresses* grpc_lb_addresses_create(
+ size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) {
+ grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
+ addresses->num_addresses = num_addresses;
+ addresses->user_data_vtable = user_data_vtable;
+ const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
+ addresses->addresses = gpr_malloc(addresses_size);
+ memset(addresses->addresses, 0, addresses_size);
+ return addresses;
+}
+
+grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
+ grpc_lb_addresses* new_addresses = grpc_lb_addresses_create(
+ addresses->num_addresses, addresses->user_data_vtable);
+ memcpy(new_addresses->addresses, addresses->addresses,
+ sizeof(grpc_lb_address) * addresses->num_addresses);
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (new_addresses->addresses[i].balancer_name != NULL) {
+ new_addresses->addresses[i].balancer_name =
+ gpr_strdup(new_addresses->addresses[i].balancer_name);
+ }
+ if (new_addresses->addresses[i].user_data != NULL) {
+ new_addresses->addresses[i].user_data = addresses->user_data_vtable->copy(
+ new_addresses->addresses[i].user_data);
+ }
+ }
+ return new_addresses;
+}
+
+void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
+ void* address, size_t address_len,
+ bool is_balancer, char* balancer_name,
+ void* user_data) {
+ GPR_ASSERT(index < addresses->num_addresses);
+ if (user_data != NULL) GPR_ASSERT(addresses->user_data_vtable != NULL);
+ grpc_lb_address* target = &addresses->addresses[index];
+ memcpy(target->address.addr, address, address_len);
+ target->address.len = address_len;
+ target->is_balancer = is_balancer;
+ target->balancer_name = balancer_name;
+ target->user_data = user_data;
+}
+
+int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
+ const grpc_lb_addresses* addresses2) {
+ if (addresses1->num_addresses > addresses2->num_addresses) return 1;
+ if (addresses1->num_addresses < addresses2->num_addresses) return -1;
+ if (addresses1->user_data_vtable > addresses2->user_data_vtable) return 1;
+ if (addresses1->user_data_vtable < addresses2->user_data_vtable) return -1;
+ for (size_t i = 0; i < addresses1->num_addresses; ++i) {
+ const grpc_lb_address* target1 = &addresses1->addresses[i];
+ const grpc_lb_address* target2 = &addresses2->addresses[i];
+ if (target1->address.len > target2->address.len) return 1;
+ if (target1->address.len < target2->address.len) return -1;
+ int retval = memcmp(target1->address.addr, target2->address.addr,
+ target1->address.len);
+ if (retval != 0) return retval;
+ if (target1->is_balancer > target2->is_balancer) return 1;
+ if (target1->is_balancer < target2->is_balancer) return -1;
+ const char* balancer_name1 =
+ target1->balancer_name != NULL ? target1->balancer_name : "";
+ const char* balancer_name2 =
+ target2->balancer_name != NULL ? target2->balancer_name : "";
+ retval = strcmp(balancer_name1, balancer_name2);
+ if (retval != 0) return retval;
+ if (addresses1->user_data_vtable != NULL) {
+ retval = addresses1->user_data_vtable->cmp(target1->user_data,
+ target2->user_data);
+ if (retval != 0) return retval;
+ }
+ }
+ return 0;
+}
+
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) {
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ gpr_free(addresses->addresses[i].balancer_name);
+ if (addresses->addresses[i].user_data != NULL) {
+ addresses->user_data_vtable->destroy(addresses->addresses[i].user_data);
+ }
+ }
+ gpr_free(addresses->addresses);
+ gpr_free(addresses);
+}
+
+static void* lb_addresses_copy(void* addresses) {
+ return grpc_lb_addresses_copy(addresses);
+}
+static void lb_addresses_destroy(void* addresses) {
+ grpc_lb_addresses_destroy(addresses);
+}
+static int lb_addresses_cmp(void* addresses1, void* addresses2) {
+ return grpc_lb_addresses_cmp(addresses1, addresses2);
+}
+static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = {
+ lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp};
+
+grpc_arg grpc_lb_addresses_create_channel_arg(
+ const grpc_lb_addresses* addresses) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_POINTER;
+ arg.key = GRPC_ARG_LB_ADDRESSES;
+ arg.value.pointer.p = (void*)addresses;
+ arg.value.pointer.vtable = &lb_addresses_arg_vtable;
+ return arg;
+}
+
+void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
+ factory->vtable->ref(factory);
+}
+
+void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) {
+ factory->vtable->unref(factory);
+}
+
+grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy_factory* factory,
+ grpc_lb_policy_args* args) {
+ if (factory == NULL) return NULL;
+ return factory->vtable->create_lb_policy(exec_ctx, factory, args);
+}
diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h
new file mode 100644
index 0000000000..e2b8080a32
--- /dev/null
+++ b/src/core/ext/client_channel/lb_policy_factory.h
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
+
+#include "src/core/ext/client_channel/client_channel_factory.h"
+#include "src/core/ext/client_channel/lb_policy.h"
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+
+typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
+typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
+
+struct grpc_lb_policy_factory {
+ const grpc_lb_policy_factory_vtable *vtable;
+};
+
+/** A resolved address alongside any LB related information associated with it.
+ * \a user_data, if not NULL, contains opaque data meant to be consumed by the
+ * gRPC LB policy. Note that no all LB policies support \a user_data as input.
+ * Those who don't will simply ignore it and will correspondingly return NULL in
+ * their namesake pick() output argument. */
+typedef struct grpc_lb_address {
+ grpc_resolved_address address;
+ bool is_balancer;
+ char *balancer_name; /* For secure naming. */
+ void *user_data;
+} grpc_lb_address;
+
+typedef struct grpc_lb_user_data_vtable {
+ void *(*copy)(void *);
+ void (*destroy)(void *);
+ int (*cmp)(void *, void *);
+} grpc_lb_user_data_vtable;
+
+typedef struct grpc_lb_addresses {
+ size_t num_addresses;
+ grpc_lb_address *addresses;
+ const grpc_lb_user_data_vtable *user_data_vtable;
+} grpc_lb_addresses;
+
+/** Returns a grpc_addresses struct with enough space for
+ \a num_addresses addresses. The \a user_data_vtable argument may be
+ NULL if no user data will be added. */
+grpc_lb_addresses *grpc_lb_addresses_create(
+ size_t num_addresses, const grpc_lb_user_data_vtable *user_data_vtable);
+
+/** Creates a copy of \a addresses. */
+grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses);
+
+/** Sets the value of the address at index \a index of \a addresses.
+ * \a address is a socket address of length \a address_len.
+ * Takes ownership of \a balancer_name. */
+void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
+ void *address, size_t address_len,
+ bool is_balancer, char *balancer_name,
+ void *user_data);
+
+/** Compares \a addresses1 and \a addresses2. */
+int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1,
+ const grpc_lb_addresses *addresses2);
+
+/** Destroys \a addresses. */
+void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses);
+
+/** Returns a channel arg containing \a addresses. */
+grpc_arg grpc_lb_addresses_create_channel_arg(
+ const grpc_lb_addresses *addresses);
+
+/** Arguments passed to LB policies. */
+typedef struct grpc_lb_policy_args {
+ grpc_client_channel_factory *client_channel_factory;
+ grpc_channel_args *args;
+} grpc_lb_policy_args;
+
+struct grpc_lb_policy_factory_vtable {
+ void (*ref)(grpc_lb_policy_factory *factory);
+ void (*unref)(grpc_lb_policy_factory *factory);
+
+ /** Implementation of grpc_lb_policy_factory_create_lb_policy */
+ grpc_lb_policy *(*create_lb_policy)(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args);
+
+ /** Name for the LB policy this factory implements */
+ const char *name;
+};
+
+void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory);
+void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory);
+
+/** Create a lb_policy instance. */
+grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H */
diff --git a/src/core/ext/client_channel/lb_policy_registry.c b/src/core/ext/client_channel/lb_policy_registry.c
new file mode 100644
index 0000000000..f46a721f9d
--- /dev/null
+++ b/src/core/ext/client_channel/lb_policy_registry.c
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/lb_policy_registry.h"
+
+#include <string.h>
+
+#define MAX_POLICIES 10
+
+static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES];
+static int g_number_of_lb_policies = 0;
+
+void grpc_lb_policy_registry_init(void) { g_number_of_lb_policies = 0; }
+
+void grpc_lb_policy_registry_shutdown(void) {
+ int i;
+ for (i = 0; i < g_number_of_lb_policies; i++) {
+ grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]);
+ }
+}
+
+void grpc_register_lb_policy(grpc_lb_policy_factory *factory) {
+ int i;
+ for (i = 0; i < g_number_of_lb_policies; i++) {
+ GPR_ASSERT(0 != strcmp(factory->vtable->name,
+ g_all_of_the_lb_policies[i]->vtable->name));
+ }
+ GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES);
+ grpc_lb_policy_factory_ref(factory);
+ g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory;
+}
+
+static grpc_lb_policy_factory *lookup_factory(const char *name) {
+ int i;
+
+ if (name == NULL) return NULL;
+
+ for (i = 0; i < g_number_of_lb_policies; i++) {
+ if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) {
+ return g_all_of_the_lb_policies[i];
+ }
+ }
+
+ return NULL;
+}
+
+grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
+ grpc_lb_policy_args *args) {
+ grpc_lb_policy_factory *factory = lookup_factory(name);
+ grpc_lb_policy *lb_policy =
+ grpc_lb_policy_factory_create_lb_policy(exec_ctx, factory, args);
+ return lb_policy;
+}
diff --git a/src/core/ext/client_channel/lb_policy_registry.h b/src/core/ext/client_channel/lb_policy_registry.h
new file mode 100644
index 0000000000..21c468e021
--- /dev/null
+++ b/src/core/ext/client_channel/lb_policy_registry.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
+
+#include "src/core/ext/client_channel/lb_policy_factory.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+/** Initialize the registry and set \a default_factory as the factory to be
+ * returned when no name is provided in a lookup */
+void grpc_lb_policy_registry_init(void);
+void grpc_lb_policy_registry_shutdown(void);
+
+/** Register a LB policy factory. */
+void grpc_register_lb_policy(grpc_lb_policy_factory *factory);
+
+/** Create a \a grpc_lb_policy instance.
+ *
+ * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init
+ * will be returned. */
+grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
+ grpc_lb_policy_args *args);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H */
diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c
new file mode 100644
index 0000000000..b1d55ad0f5
--- /dev/null
+++ b/src/core/ext/client_channel/parse_address.c
@@ -0,0 +1,144 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/parse_address.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+
+#include <stdio.h>
+#include <string.h>
+#ifdef GRPC_HAVE_UNIX_SOCKET
+#include <sys/un.h>
+#endif
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#ifdef GRPC_HAVE_UNIX_SOCKET
+
+int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+ struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr;
+
+ un->sun_family = AF_UNIX;
+ strcpy(un->sun_path, uri->path);
+ resolved_addr->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
+
+ return 1;
+}
+
+#else /* GRPC_HAVE_UNIX_SOCKET */
+
+int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { abort(); }
+
+#endif /* GRPC_HAVE_UNIX_SOCKET */
+
+int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+ const char *host_port = uri->path;
+ char *host;
+ char *port;
+ int port_num;
+ int result = 0;
+ struct sockaddr_in *in = (struct sockaddr_in *)resolved_addr->addr;
+
+ if (*host_port == '/') ++host_port;
+ if (!gpr_split_host_port(host_port, &host, &port)) {
+ return 0;
+ }
+
+ memset(resolved_addr, 0, sizeof(grpc_resolved_address));
+ resolved_addr->len = sizeof(struct sockaddr_in);
+ in->sin_family = AF_INET;
+ if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
+ goto done;
+ }
+
+ if (port != NULL) {
+ if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
+ port_num > 65535) {
+ gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
+ goto done;
+ }
+ in->sin_port = htons((uint16_t)port_num);
+ } else {
+ gpr_log(GPR_ERROR, "no port given for ipv4 scheme");
+ goto done;
+ }
+
+ result = 1;
+done:
+ gpr_free(host);
+ gpr_free(port);
+ return result;
+}
+
+int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+ const char *host_port = uri->path;
+ char *host;
+ char *port;
+ int port_num;
+ int result = 0;
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)resolved_addr->addr;
+
+ if (*host_port == '/') ++host_port;
+ if (!gpr_split_host_port(host_port, &host, &port)) {
+ return 0;
+ }
+
+ memset(in6, 0, sizeof(*in6));
+ resolved_addr->len = sizeof(*in6);
+ in6->sin6_family = AF_INET6;
+ if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
+ goto done;
+ }
+
+ if (port != NULL) {
+ if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
+ port_num > 65535) {
+ gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
+ goto done;
+ }
+ in6->sin6_port = htons((uint16_t)port_num);
+ } else {
+ gpr_log(GPR_ERROR, "no port given for ipv6 scheme");
+ goto done;
+ }
+
+ result = 1;
+done:
+ gpr_free(host);
+ gpr_free(port);
+ return result;
+}
diff --git a/src/core/ext/client_channel/parse_address.h b/src/core/ext/client_channel/parse_address.h
new file mode 100644
index 0000000000..bf99c5298a
--- /dev/null
+++ b/src/core/ext/client_channel/parse_address.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H
+
+#include <stddef.h>
+
+#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+
+/** Populate \a addr and \a len from \a uri, whose path is expected to contain a
+ * unix socket path. Returns true upon success. */
+int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+
+/** Populate /a addr and \a len from \a uri, whose path is expected to contain a
+ * host:port pair. Returns true upon success. */
+int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+
+/** Populate /a addr and \a len from \a uri, whose path is expected to contain a
+ * host:port pair. Returns true upon success. */
+int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H */
diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c
new file mode 100644
index 0000000000..2ae4fe862e
--- /dev/null
+++ b/src/core/ext/client_channel/resolver.c
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/resolver.h"
+
+void grpc_resolver_init(grpc_resolver *resolver,
+ const grpc_resolver_vtable *vtable) {
+ resolver->vtable = vtable;
+ gpr_ref_init(&resolver->refs, 1);
+}
+
+#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
+void grpc_resolver_ref(grpc_resolver *resolver, grpc_closure_list *closure_list,
+ const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s",
+ resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1,
+ reason);
+#else
+void grpc_resolver_ref(grpc_resolver *resolver) {
+#endif
+ gpr_ref(&resolver->refs);
+}
+
+#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
+void grpc_resolver_unref(grpc_resolver *resolver,
+ grpc_closure_list *closure_list, const char *file,
+ int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s",
+ resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1,
+ reason);
+#else
+void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
+#endif
+ if (gpr_unref(&resolver->refs)) {
+ resolver->vtable->destroy(exec_ctx, resolver);
+ }
+}
+
+void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
+ resolver->vtable->shutdown(exec_ctx, resolver);
+}
+
+void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ resolver->vtable->channel_saw_error(exec_ctx, resolver);
+}
+
+void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result, grpc_closure *on_complete) {
+ resolver->vtable->next(exec_ctx, resolver, result, on_complete);
+}
diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h
new file mode 100644
index 0000000000..96ece92b9d
--- /dev/null
+++ b/src/core/ext/client_channel/resolver.h
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H
+
+#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/lib/iomgr/iomgr.h"
+
+typedef struct grpc_resolver grpc_resolver;
+typedef struct grpc_resolver_vtable grpc_resolver_vtable;
+
+/** \a grpc_resolver provides \a grpc_channel_args objects to its caller */
+struct grpc_resolver {
+ const grpc_resolver_vtable *vtable;
+ gpr_refcount refs;
+};
+
+struct grpc_resolver_vtable {
+ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+ void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+ void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result, grpc_closure *on_complete);
+};
+
+#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
+#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_RESOLVER_UNREF(cl, p, r) \
+ grpc_resolver_unref((cl), (p), __FILE__, __LINE__, (r))
+void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line,
+ const char *reason);
+void grpc_resolver_unref(grpc_resolver *policy, grpc_closure_list *closure_list,
+ const char *file, int line, const char *reason);
+#else
+#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p))
+#define GRPC_RESOLVER_UNREF(cl, p, r) grpc_resolver_unref((cl), (p))
+void grpc_resolver_ref(grpc_resolver *policy);
+void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy);
+#endif
+
+void grpc_resolver_init(grpc_resolver *resolver,
+ const grpc_resolver_vtable *vtable);
+
+void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+
+/** Notification that the channel has seen an error on some address.
+ Can be used as a hint that re-resolution is desirable soon. */
+void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver);
+
+/** Get the next result from the resolver. Expected to set \a *result with
+ new channel args and then schedule \a on_complete for execution.
+
+ If resolution is fatally broken, set \a *result to NULL and
+ schedule \a on_complete. */
+void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result, grpc_closure *on_complete);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */
diff --git a/src/core/ext/client_channel/resolver_factory.c b/src/core/ext/client_channel/resolver_factory.c
new file mode 100644
index 0000000000..7c3d644257
--- /dev/null
+++ b/src/core/ext/client_channel/resolver_factory.c
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/resolver_factory.h"
+
+void grpc_resolver_factory_ref(grpc_resolver_factory* factory) {
+ factory->vtable->ref(factory);
+}
+
+void grpc_resolver_factory_unref(grpc_resolver_factory* factory) {
+ factory->vtable->unref(factory);
+}
+
+/** Create a resolver instance for a name */
+grpc_resolver* grpc_resolver_factory_create_resolver(
+ grpc_resolver_factory* factory, grpc_resolver_args* args) {
+ if (factory == NULL) return NULL;
+ return factory->vtable->create_resolver(factory, args);
+}
+
+char* grpc_resolver_factory_get_default_authority(
+ grpc_resolver_factory* factory, grpc_uri* uri) {
+ if (factory == NULL) return NULL;
+ return factory->vtable->get_default_authority(factory, uri);
+}
diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h
new file mode 100644
index 0000000000..4da42e84d2
--- /dev/null
+++ b/src/core/ext/client_channel/resolver_factory.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H
+
+#include "src/core/ext/client_channel/client_channel_factory.h"
+#include "src/core/ext/client_channel/resolver.h"
+#include "src/core/ext/client_channel/uri_parser.h"
+
+typedef struct grpc_resolver_factory grpc_resolver_factory;
+typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
+
+struct grpc_resolver_factory {
+ const grpc_resolver_factory_vtable *vtable;
+};
+
+typedef struct grpc_resolver_args {
+ grpc_uri *uri;
+ const grpc_channel_args *args;
+} grpc_resolver_args;
+
+struct grpc_resolver_factory_vtable {
+ void (*ref)(grpc_resolver_factory *factory);
+ void (*unref)(grpc_resolver_factory *factory);
+
+ /** Implementation of grpc_resolver_factory_create_resolver */
+ grpc_resolver *(*create_resolver)(grpc_resolver_factory *factory,
+ grpc_resolver_args *args);
+
+ /** Implementation of grpc_resolver_factory_get_default_authority */
+ char *(*get_default_authority)(grpc_resolver_factory *factory, grpc_uri *uri);
+
+ /** URI scheme that this factory implements */
+ const char *scheme;
+};
+
+void grpc_resolver_factory_ref(grpc_resolver_factory *resolver);
+void grpc_resolver_factory_unref(grpc_resolver_factory *resolver);
+
+/** Create a resolver instance for a name */
+grpc_resolver *grpc_resolver_factory_create_resolver(
+ grpc_resolver_factory *factory, grpc_resolver_args *args);
+
+/** Return a (freshly allocated with gpr_malloc) string representing
+ the default authority to use for this scheme. */
+char *grpc_resolver_factory_get_default_authority(
+ grpc_resolver_factory *factory, grpc_uri *uri);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H */
diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c
new file mode 100644
index 0000000000..d0f0fc3f33
--- /dev/null
+++ b/src/core/ext/client_channel/resolver_registry.c
@@ -0,0 +1,154 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/resolver_registry.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#define MAX_RESOLVERS 10
+#define DEFAULT_RESOLVER_PREFIX_MAX_LENGTH 32
+
+static grpc_resolver_factory *g_all_of_the_resolvers[MAX_RESOLVERS];
+static int g_number_of_resolvers = 0;
+
+static char g_default_resolver_prefix[DEFAULT_RESOLVER_PREFIX_MAX_LENGTH] =
+ "dns:///";
+
+void grpc_resolver_registry_init() {}
+
+void grpc_resolver_registry_shutdown(void) {
+ for (int i = 0; i < g_number_of_resolvers; i++) {
+ grpc_resolver_factory_unref(g_all_of_the_resolvers[i]);
+ }
+ // FIXME(ctiller): this should live in grpc_resolver_registry_init,
+ // however that would have the client_channel plugin call this AFTER we start
+ // registering resolvers from third party plugins, and so they'd never show
+ // up.
+ // We likely need some kind of dependency system for plugins.... what form
+ // that takes is TBD.
+ g_number_of_resolvers = 0;
+}
+
+void grpc_resolver_registry_set_default_prefix(
+ const char *default_resolver_prefix) {
+ const size_t len = strlen(default_resolver_prefix);
+ GPR_ASSERT(len < DEFAULT_RESOLVER_PREFIX_MAX_LENGTH &&
+ "default resolver prefix too long");
+ GPR_ASSERT(len > 0 && "default resolver prefix can't be empty");
+ // By the previous assert, default_resolver_prefix is safe to be copied with a
+ // plain strcpy.
+ strcpy(g_default_resolver_prefix, default_resolver_prefix);
+}
+
+void grpc_register_resolver_type(grpc_resolver_factory *factory) {
+ int i;
+ for (i = 0; i < g_number_of_resolvers; i++) {
+ GPR_ASSERT(0 != strcmp(factory->vtable->scheme,
+ g_all_of_the_resolvers[i]->vtable->scheme));
+ }
+ GPR_ASSERT(g_number_of_resolvers != MAX_RESOLVERS);
+ grpc_resolver_factory_ref(factory);
+ g_all_of_the_resolvers[g_number_of_resolvers++] = factory;
+}
+
+static grpc_resolver_factory *lookup_factory(const char *name) {
+ int i;
+
+ for (i = 0; i < g_number_of_resolvers; i++) {
+ if (0 == strcmp(name, g_all_of_the_resolvers[i]->vtable->scheme)) {
+ return g_all_of_the_resolvers[i];
+ }
+ }
+
+ return NULL;
+}
+
+grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name) {
+ grpc_resolver_factory *f = lookup_factory(name);
+ if (f) grpc_resolver_factory_ref(f);
+ return f;
+}
+
+static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) {
+ if (!uri) return NULL;
+ return lookup_factory(uri->scheme);
+}
+
+static grpc_resolver_factory *resolve_factory(const char *target,
+ grpc_uri **uri) {
+ char *tmp;
+ grpc_resolver_factory *factory = NULL;
+
+ GPR_ASSERT(uri != NULL);
+ *uri = grpc_uri_parse(target, 1);
+ factory = lookup_factory_by_uri(*uri);
+ if (factory == NULL) {
+ grpc_uri_destroy(*uri);
+ gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target);
+ *uri = grpc_uri_parse(tmp, 1);
+ factory = lookup_factory_by_uri(*uri);
+ if (factory == NULL) {
+ grpc_uri_destroy(grpc_uri_parse(target, 0));
+ grpc_uri_destroy(grpc_uri_parse(tmp, 0));
+ gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp);
+ }
+ gpr_free(tmp);
+ }
+ return factory;
+}
+
+grpc_resolver *grpc_resolver_create(const char *target,
+ const grpc_channel_args *args) {
+ grpc_uri *uri = NULL;
+ grpc_resolver_factory *factory = resolve_factory(target, &uri);
+ grpc_resolver *resolver;
+ grpc_resolver_args resolver_args;
+ memset(&resolver_args, 0, sizeof(resolver_args));
+ resolver_args.uri = uri;
+ resolver_args.args = args;
+ resolver = grpc_resolver_factory_create_resolver(factory, &resolver_args);
+ grpc_uri_destroy(uri);
+ return resolver;
+}
+
+char *grpc_get_default_authority(const char *target) {
+ grpc_uri *uri = NULL;
+ grpc_resolver_factory *factory = resolve_factory(target, &uri);
+ char *authority = grpc_resolver_factory_get_default_authority(factory, uri);
+ grpc_uri_destroy(uri);
+ return authority;
+}
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h
new file mode 100644
index 0000000000..2a95a669f0
--- /dev/null
+++ b/src/core/ext/client_channel/resolver_registry.h
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H
+
+#include "src/core/ext/client_channel/resolver_factory.h"
+
+void grpc_resolver_registry_init();
+void grpc_resolver_registry_shutdown(void);
+
+/** Set the default URI prefix to \a default_prefix. */
+void grpc_resolver_registry_set_default_prefix(const char *default_prefix);
+
+/** Register a resolver type.
+ URI's of \a scheme will be resolved with the given resolver.
+ If \a priority is greater than zero, then the resolver will be eligible
+ to resolve names that are passed in with no scheme. Higher priority
+ resolvers will be tried before lower priority schemes. */
+void grpc_register_resolver_type(grpc_resolver_factory *factory);
+
+/** Create a resolver given \a target.
+ First tries to parse \a target as a URI. If this succeeds, tries
+ to locate a registered resolver factory based on the URI scheme.
+ If parsing or location fails, prefixes default_prefix from
+ grpc_resolver_registry_init to target, and tries again (if default_prefix
+ was not NULL).
+ If a resolver factory was found, use it to instantiate a resolver and
+ return it.
+ If a resolver factory was not found, return NULL.
+ \a args is a set of channel arguments to be included in the result
+ (typically the set of arguments passed in from the client API). */
+grpc_resolver *grpc_resolver_create(const char *target,
+ const grpc_channel_args *args);
+
+/** Find a resolver factory given a name and return an (owned-by-the-caller)
+ * reference to it */
+grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name);
+
+/** Given a target, return a (freshly allocated with gpr_malloc) string
+ representing the default authority to pass from a client. */
+char *grpc_get_default_authority(const char *target);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
new file mode 100644
index 0000000000..789966cb69
--- /dev/null
+++ b/src/core/ext/client_channel/subchannel.c
@@ -0,0 +1,729 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/subchannel.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/avl.h>
+
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/initial_connect_string.h"
+#include "src/core/ext/client_channel/subchannel_index.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/backoff.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+#define INTERNAL_REF_BITS 16
+#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
+
+#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
+
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \
+ &(subchannel)->connected_subchannel)))
+
+typedef struct {
+ grpc_closure closure;
+ grpc_subchannel *subchannel;
+ grpc_connectivity_state connectivity_state;
+} state_watcher;
+
+typedef struct external_state_watcher {
+ grpc_subchannel *subchannel;
+ grpc_pollset_set *pollset_set;
+ grpc_closure *notify;
+ grpc_closure closure;
+ struct external_state_watcher *next;
+ struct external_state_watcher *prev;
+} external_state_watcher;
+
+struct grpc_subchannel {
+ grpc_connector *connector;
+
+ /** refcount
+ - lower INTERNAL_REF_BITS bits are for internal references:
+ these do not keep the subchannel open.
+ - upper remaining bits are for public references: these do
+ keep the subchannel open */
+ gpr_atm ref_pair;
+
+ /** non-transport related channel filters */
+ const grpc_channel_filter **filters;
+ size_t num_filters;
+ /** channel arguments */
+ grpc_channel_args *args;
+ /** address to connect to */
+ grpc_resolved_address *addr;
+
+ grpc_subchannel_key *key;
+
+ /** initial string to send to peer */
+ gpr_slice initial_connect_string;
+
+ /** set during connection */
+ grpc_connect_out_args connecting_result;
+
+ /** callback for connection finishing */
+ grpc_closure connected;
+
+ /** pollset_set tracking who's interested in a connection
+ being setup */
+ grpc_pollset_set *pollset_set;
+
+ /** active connection, or null; of type grpc_connected_subchannel */
+ gpr_atm connected_subchannel;
+
+ /** mutex protecting remaining elements */
+ gpr_mu mu;
+
+ /** have we seen a disconnection? */
+ int disconnected;
+ /** are we connecting */
+ int connecting;
+ /** connectivity state tracking */
+ grpc_connectivity_state_tracker state_tracker;
+
+ external_state_watcher root_external_state_watcher;
+
+ /** next connect attempt time */
+ gpr_timespec next_attempt;
+ /** backoff state */
+ gpr_backoff backoff_state;
+ /** do we have an active alarm? */
+ int have_alarm;
+ /** our alarm */
+ grpc_timer alarm;
+};
+
+struct grpc_subchannel_call {
+ grpc_connected_subchannel *connection;
+};
+
+#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
+#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con))
+#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
+ (((grpc_subchannel_call *)(callstack)) - 1)
+
+static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
+ grpc_error *error);
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define REF_REASON reason
+#define REF_LOG(name, p) \
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
+ (name), (p), (p)->refs.count, (p)->refs.count + 1, reason)
+#define UNREF_LOG(name, p) \
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
+ (name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
+#define REF_MUTATE_EXTRA_ARGS \
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
+#define REF_MUTATE_PURPOSE(x) , file, line, reason, x
+#else
+#define REF_REASON ""
+#define REF_LOG(name, p) \
+ do { \
+ } while (0)
+#define UNREF_LOG(name, p) \
+ do { \
+ } while (0)
+#define REF_MUTATE_EXTRA_ARGS
+#define REF_MUTATE_PURPOSE(x)
+#endif
+
+/*
+ * connection implementation
+ */
+
+static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_connected_subchannel *c = arg;
+ grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
+ gpr_free(c);
+}
+
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
+ grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+ return c;
+}
+
+void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c),
+ REF_REASON);
+}
+
+/*
+ * grpc_subchannel implementation
+ */
+
+static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_subchannel *c = arg;
+ gpr_free((void *)c->filters);
+ grpc_channel_args_destroy(c->args);
+ gpr_free(c->addr);
+ gpr_slice_unref(c->initial_connect_string);
+ grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
+ grpc_connector_unref(exec_ctx, c->connector);
+ grpc_pollset_set_destroy(c->pollset_set);
+ grpc_subchannel_key_destroy(exec_ctx, c->key);
+ gpr_free(c);
+}
+
+static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
+ int barrier REF_MUTATE_EXTRA_ARGS) {
+ gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
+ : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "SUBCHANNEL: %p %s 0x%08" PRIxPTR " -> 0x%08" PRIxPTR " [%s]", c,
+ purpose, old_val, old_val + delta, reason);
+#endif
+ return old_val;
+}
+
+grpc_subchannel *grpc_subchannel_ref(
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
+ 0 REF_MUTATE_PURPOSE("STRONG_REF"));
+ GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
+ return c;
+}
+
+grpc_subchannel *grpc_subchannel_weak_ref(
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
+ GPR_ASSERT(old_refs != 0);
+ return c;
+}
+
+grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ if (!c) return NULL;
+ for (;;) {
+ gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
+ if (old_refs >= (1 << INTERNAL_REF_BITS)) {
+ gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
+ if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) {
+ return c;
+ }
+ } else {
+ return NULL;
+ }
+ }
+}
+
+static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+ grpc_connected_subchannel *con;
+ grpc_subchannel_index_unregister(exec_ctx, c->key, c);
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(!c->disconnected);
+ c->disconnected = 1;
+ grpc_connector_shutdown(exec_ctx, c->connector);
+ con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
+ if (con != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
+ gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
+ }
+ gpr_mu_unlock(&c->mu);
+}
+
+void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS),
+ 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
+ if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
+ disconnect(exec_ctx, c);
+ }
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
+}
+
+void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
+ if (old_refs == 1) {
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c),
+ GRPC_ERROR_NONE, NULL);
+ }
+}
+
+grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
+ grpc_connector *connector,
+ const grpc_subchannel_args *args) {
+ grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args);
+ grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
+ if (c) {
+ grpc_subchannel_key_destroy(exec_ctx, key);
+ return c;
+ }
+
+ c = gpr_malloc(sizeof(*c));
+ memset(c, 0, sizeof(*c));
+ c->key = key;
+ gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
+ c->connector = connector;
+ grpc_connector_ref(c->connector);
+ c->num_filters = args->filter_count;
+ if (c->num_filters > 0) {
+ c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
+ memcpy((void *)c->filters, args->filters,
+ sizeof(grpc_channel_filter *) * c->num_filters);
+ } else {
+ c->filters = NULL;
+ }
+ c->addr = gpr_malloc(sizeof(grpc_resolved_address));
+ if (args->addr->len)
+ memcpy(c->addr, args->addr, sizeof(grpc_resolved_address));
+ c->pollset_set = grpc_pollset_set_create();
+ grpc_set_initial_connect_string(&c->addr, &c->initial_connect_string);
+ c->args = grpc_channel_args_copy(args->args);
+ c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
+ &c->root_external_state_watcher;
+ grpc_closure_init(&c->connected, subchannel_connected, c);
+ grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
+ "subchannel");
+ int initial_backoff_ms =
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ bool fixed_reconnect_backoff = false;
+ if (c->args) {
+ for (size_t i = 0; i < c->args->num_args; i++) {
+ if (0 == strcmp(c->args->args[i].key,
+ "grpc.testing.fixed_reconnect_backoff")) {
+ GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
+ fixed_reconnect_backoff = true;
+ initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ max_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){max_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ initial_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
+ }
+ }
+ }
+ gpr_backoff_init(
+ &c->backoff_state,
+ fixed_reconnect_backoff ? 1.0
+ : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ initial_backoff_ms, max_backoff_ms);
+ gpr_mu_init(&c->mu);
+
+ return grpc_subchannel_index_register(exec_ctx, key, c);
+}
+
+static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+ grpc_connect_in_args args;
+
+ args.interested_parties = c->pollset_set;
+ args.addr = c->addr;
+ args.deadline = c->next_attempt;
+ args.channel_args = c->args;
+ args.initial_connect_string = c->initial_connect_string;
+
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "state_change");
+ grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
+ &c->connected);
+}
+
+static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+ c->next_attempt =
+ gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect(exec_ctx, c);
+}
+
+grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
+ grpc_error **error) {
+ grpc_connectivity_state state;
+ gpr_mu_lock(&c->mu);
+ state = grpc_connectivity_state_check(&c->state_tracker, error);
+ gpr_mu_unlock(&c->mu);
+ return state;
+}
+
+static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ external_state_watcher *w = arg;
+ grpc_closure *follow_up = w->notify;
+ if (w->pollset_set != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
+ w->pollset_set);
+ }
+ gpr_mu_lock(&w->subchannel->mu);
+ w->next->prev = w->prev;
+ w->prev->next = w->next;
+ gpr_mu_unlock(&w->subchannel->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
+}
+
+void grpc_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *notify) {
+ external_state_watcher *w;
+
+ if (state == NULL) {
+ gpr_mu_lock(&c->mu);
+ for (w = c->root_external_state_watcher.next;
+ w != &c->root_external_state_watcher; w = w->next) {
+ if (w->notify == notify) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &c->state_tracker, NULL, &w->closure);
+ }
+ }
+ gpr_mu_unlock(&c->mu);
+ } else {
+ w = gpr_malloc(sizeof(*w));
+ w->subchannel = c;
+ w->pollset_set = interested_parties;
+ w->notify = notify;
+ grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
+ if (interested_parties != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
+ interested_parties);
+ }
+ GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
+ gpr_mu_lock(&c->mu);
+ w->next = &c->root_external_state_watcher;
+ w->prev = w->next->prev;
+ w->next->prev = w->prev->next = w;
+ if (grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &c->state_tracker, state, &w->closure)) {
+ c->connecting = 1;
+ /* released by connection */
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+ start_connect(exec_ctx, c);
+ }
+ gpr_mu_unlock(&c->mu);
+ }
+}
+
+void grpc_connected_subchannel_process_transport_op(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_transport_op *op) {
+ grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
+ top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
+}
+
+static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
+ state_watcher *sw = p;
+ grpc_subchannel *c = sw->subchannel;
+ gpr_mu *mu = &c->mu;
+
+ gpr_mu_lock(mu);
+
+ /* if we failed just leave this closure */
+ if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* any errors on a subchannel ==> we're done, create a new one */
+ sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ }
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ sw->connectivity_state, GRPC_ERROR_REF(error),
+ "reflect_child");
+ if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
+ &sw->connectivity_state, &sw->closure);
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ sw = NULL;
+ }
+
+ gpr_mu_unlock(mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher");
+ gpr_free(sw);
+}
+
+static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
+ grpc_pollset_set *interested_parties,
+ grpc_connectivity_state *state,
+ grpc_closure *closure) {
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_channel_element *elem;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+}
+
+void grpc_connected_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *closure) {
+ connected_subchannel_state_op(exec_ctx, con, interested_parties, state,
+ closure);
+}
+
+void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
+ grpc_closure *closure) {
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_channel_element *elem;
+ op->send_ping = closure;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+}
+
+static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ grpc_connected_subchannel *con;
+ grpc_channel_stack *stk;
+ state_watcher *sw_subchannel;
+
+ /* construct channel stack */
+ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_channel_stack_builder_set_channel_arguments(
+ builder, c->connecting_result.channel_args);
+ grpc_channel_stack_builder_set_transport(builder,
+ c->connecting_result.transport);
+
+ if (grpc_channel_init_create_stack(exec_ctx, builder,
+ GRPC_CLIENT_SUBCHANNEL)) {
+ con = grpc_channel_stack_builder_finish(exec_ctx, builder, 0, 1,
+ connection_destroy, NULL);
+ } else {
+ grpc_channel_stack_builder_destroy(builder);
+ abort(); /* TODO(ctiller): what to do here (previously we just crashed) */
+ }
+ stk = CHANNEL_STACK_FROM_CONNECTION(con);
+ memset(&c->connecting_result, 0, sizeof(c->connecting_result));
+
+ /* initialize state watcher */
+ sw_subchannel = gpr_malloc(sizeof(*sw_subchannel));
+ sw_subchannel->subchannel = c;
+ sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
+ grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
+ sw_subchannel);
+
+ if (c->disconnected) {
+ gpr_free(sw_subchannel);
+ grpc_channel_stack_destroy(exec_ctx, stk);
+ gpr_free(con);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ return;
+ }
+
+ /* publish */
+ /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+ I'd have expected the rel_cas below to be enough, but
+ seemingly it's not.
+ Re-evaluate if we really need this. */
+ gpr_atm_full_barrier();
+ GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ c->connecting = 0;
+
+ /* setup subchannel watching connected subchannel for changes; subchannel
+ ref
+ for connecting is donated
+ to the state watcher */
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state,
+ &sw_subchannel->closure);
+
+ /* signal completion */
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "connected");
+}
+
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_subchannel *c = arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = 0;
+ if (c->disconnected) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_subchannel *c = arg;
+ grpc_channel_args *delete_channel_args = c->connecting_result.channel_args;
+
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
+ gpr_mu_lock(&c->mu);
+ if (c->connecting_result.transport != NULL) {
+ publish_transport_locked(exec_ctx, c);
+ } else if (c->disconnected) {
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ } else {
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = 1;
+ grpc_connectivity_state_set(
+ exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
+ "connect_failed");
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ const char *errmsg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ grpc_error_free_string(errmsg);
+ }
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ grpc_channel_args_destroy(delete_channel_args);
+}
+
+/*
+ * grpc_subchannel_call implementation
+ */
+
+static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_error *error) {
+ grpc_subchannel_call *c = call;
+ GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
+ grpc_connected_subchannel *connection = c->connection;
+ grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
+ GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
+}
+
+void grpc_subchannel_call_ref(
+ grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
+}
+
+void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
+}
+
+char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *call) {
+ grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+ grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
+ return top_elem->filter->get_peer(exec_ctx, top_elem);
+}
+
+void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *call,
+ grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
+ grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+ grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
+ top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
+ GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
+}
+
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
+ grpc_subchannel *c) {
+ return GET_CONNECTED_SUBCHANNEL(c, acq);
+}
+
+grpc_error *grpc_connected_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
+ grpc_subchannel_call **call) {
+ grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
+ *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ (*call)->connection = con; // Ref is added below.
+ grpc_error *error =
+ grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
+ NULL, NULL, path, deadline, callstk);
+ if (error != GRPC_ERROR_NONE) {
+ const char *error_string = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error: %s", error_string);
+ grpc_error_free_string(error_string);
+ gpr_free(*call);
+ return error;
+ }
+ GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
+ grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
+ return GRPC_ERROR_NONE;
+}
+
+grpc_call_stack *grpc_subchannel_call_get_call_stack(
+ grpc_subchannel_call *subchannel_call) {
+ return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
+}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
new file mode 100644
index 0000000000..93bd72d20d
--- /dev/null
+++ b/src/core/ext/client_channel/subchannel.h
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H
+
+#include "src/core/ext/client_channel/connector.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/metadata.h"
+
+/** A (sub-)channel that knows how to connect to exactly one target
+ address. Provides a target for load balancing. */
+typedef struct grpc_subchannel grpc_subchannel;
+typedef struct grpc_connected_subchannel grpc_connected_subchannel;
+typedef struct grpc_subchannel_call grpc_subchannel_call;
+typedef struct grpc_subchannel_args grpc_subchannel_args;
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define GRPC_SUBCHANNEL_REF(p, r) \
+ grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
+ grpc_subchannel_ref_from_weak_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_UNREF(cl, p, r) \
+ grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \
+ grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \
+ grpc_subchannel_weak_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
+ grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
+ grpc_connected_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
+ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \
+ grpc_subchannel_call_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
+ , const char *file, int line, const char *reason
+#else
+#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
+#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
+ grpc_subchannel_ref_from_weak_ref((p))
+#define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p))
+#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
+#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \
+ grpc_subchannel_weak_unref((cl), (p))
+#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
+#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
+ grpc_connected_subchannel_unref((cl), (p))
+#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
+#define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \
+ grpc_subchannel_call_unref((cl), (p))
+#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
+#endif
+
+grpc_subchannel *grpc_subchannel_ref(
+ grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
+ grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+grpc_subchannel *grpc_subchannel_weak_ref(
+ grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
+ grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_call_ref(
+ grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *call
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+
+/** construct a subchannel call */
+grpc_error *grpc_connected_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
+ grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
+ grpc_subchannel_call **subchannel_call);
+
+/** process a transport level op */
+void grpc_connected_subchannel_process_transport_op(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel,
+ grpc_transport_op *op);
+
+/** poll the current connectivity state of a channel */
+grpc_connectivity_state grpc_subchannel_check_connectivity(
+ grpc_subchannel *channel, grpc_error **error);
+
+/** call notify when the connectivity state of a channel changes from *state.
+ Updates *state with the new state of the channel */
+void grpc_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *channel,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *notify);
+void grpc_connected_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *notify);
+void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *channel,
+ grpc_closure *notify);
+
+/** retrieve the grpc_connected_subchannel - or NULL if called before
+ the subchannel becomes connected */
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
+ grpc_subchannel *subchannel);
+
+/** continue processing a transport op */
+void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *subchannel_call,
+ grpc_transport_stream_op *op);
+
+/** continue querying for peer */
+char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *subchannel_call);
+
+grpc_call_stack *grpc_subchannel_call_get_call_stack(
+ grpc_subchannel_call *subchannel_call);
+
+struct grpc_subchannel_args {
+ /* When updating this struct, also update subchannel_index.c */
+
+ /** Channel filters for this channel - wrapped factories will likely
+ want to mutate this */
+ const grpc_channel_filter **filters;
+ /** The number of filters in the above array */
+ size_t filter_count;
+ /** Channel arguments to be supplied to the newly created channel */
+ const grpc_channel_args *args;
+ /** Server name */
+ const char *server_name;
+ /** Address to connect to */
+ grpc_resolved_address *addr;
+};
+
+/** create a subchannel given a connector */
+grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
+ grpc_connector *connector,
+ const grpc_subchannel_args *args);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */
diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c
new file mode 100644
index 0000000000..227013a7d7
--- /dev/null
+++ b/src/core/ext/client_channel/subchannel_index.c
@@ -0,0 +1,278 @@
+//
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+//
+
+#include "src/core/ext/client_channel/subchannel_index.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/avl.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/tls.h>
+
+#include "src/core/lib/channel/channel_args.h"
+
+// a map of subchannel_key --> subchannel, used for detecting connections
+// to the same destination in order to share them
+static gpr_avl g_subchannel_index;
+
+static gpr_mu g_mu;
+
+struct grpc_subchannel_key {
+ grpc_connector *connector;
+ grpc_subchannel_args args;
+};
+
+GPR_TLS_DECL(subchannel_index_exec_ctx);
+
+static void enter_ctx(grpc_exec_ctx *exec_ctx) {
+ GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0);
+ gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx);
+}
+
+static void leave_ctx(grpc_exec_ctx *exec_ctx) {
+ GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == (intptr_t)exec_ctx);
+ gpr_tls_set(&subchannel_index_exec_ctx, 0);
+}
+
+static grpc_exec_ctx *current_ctx() {
+ grpc_exec_ctx *c = (grpc_exec_ctx *)gpr_tls_get(&subchannel_index_exec_ctx);
+ GPR_ASSERT(c != NULL);
+ return c;
+}
+
+static grpc_subchannel_key *create_key(
+ grpc_connector *connector, const grpc_subchannel_args *args,
+ grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) {
+ grpc_subchannel_key *k = gpr_malloc(sizeof(*k));
+ k->connector = grpc_connector_ref(connector);
+ k->args.filter_count = args->filter_count;
+ if (k->args.filter_count > 0) {
+ k->args.filters =
+ gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count);
+ memcpy((grpc_channel_filter *)k->args.filters, args->filters,
+ sizeof(*k->args.filters) * k->args.filter_count);
+ } else {
+ k->args.filters = NULL;
+ }
+ k->args.server_name = gpr_strdup(args->server_name);
+ k->args.addr = gpr_malloc(sizeof(grpc_resolved_address));
+ k->args.addr->len = args->addr->len;
+ if (k->args.addr->len > 0) {
+ memcpy(k->args.addr, args->addr, sizeof(grpc_resolved_address));
+ }
+ k->args.args = copy_channel_args(args->args);
+ return k;
+}
+
+grpc_subchannel_key *grpc_subchannel_key_create(
+ grpc_connector *connector, const grpc_subchannel_args *args) {
+ return create_key(connector, args, grpc_channel_args_normalize);
+}
+
+static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) {
+ return create_key(k->connector, &k->args, grpc_channel_args_copy);
+}
+
+static int subchannel_key_compare(grpc_subchannel_key *a,
+ grpc_subchannel_key *b) {
+ int c = GPR_ICMP(a->connector, b->connector);
+ if (c != 0) return c;
+ c = GPR_ICMP(a->args.addr->len, b->args.addr->len);
+ if (c != 0) return c;
+ c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
+ if (c != 0) return c;
+ c = strcmp(a->args.server_name, b->args.server_name);
+ if (c != 0) return c;
+ if (a->args.addr->len) {
+ c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len);
+ if (c != 0) return c;
+ }
+ if (a->args.filter_count > 0) {
+ c = memcmp(a->args.filters, b->args.filters,
+ a->args.filter_count * sizeof(*a->args.filters));
+ if (c != 0) return c;
+ }
+ return grpc_channel_args_compare(a->args.args, b->args.args);
+}
+
+void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *k) {
+ grpc_connector_unref(exec_ctx, k->connector);
+ gpr_free((grpc_channel_args *)k->args.filters);
+ grpc_channel_args_destroy((grpc_channel_args *)k->args.args);
+ gpr_free((void *)k->args.server_name);
+ gpr_free(k->args.addr);
+ gpr_free(k);
+}
+
+static void sck_avl_destroy(void *p) {
+ grpc_subchannel_key_destroy(current_ctx(), p);
+}
+
+static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); }
+
+static long sck_avl_compare(void *a, void *b) {
+ return subchannel_key_compare(a, b);
+}
+
+static void scv_avl_destroy(void *p) {
+ GRPC_SUBCHANNEL_WEAK_UNREF(current_ctx(), p, "subchannel_index");
+}
+
+static void *scv_avl_copy(void *p) {
+ GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index");
+ return p;
+}
+
+static const gpr_avl_vtable subchannel_avl_vtable = {
+ .destroy_key = sck_avl_destroy,
+ .copy_key = sck_avl_copy,
+ .compare_keys = sck_avl_compare,
+ .destroy_value = scv_avl_destroy,
+ .copy_value = scv_avl_copy};
+
+void grpc_subchannel_index_init(void) {
+ g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable);
+ gpr_mu_init(&g_mu);
+ gpr_tls_init(&subchannel_index_exec_ctx);
+}
+
+void grpc_subchannel_index_shutdown(void) {
+ gpr_mu_destroy(&g_mu);
+ gpr_avl_unref(g_subchannel_index);
+ gpr_tls_destroy(&subchannel_index_exec_ctx);
+}
+
+grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key) {
+ enter_ctx(exec_ctx);
+
+ // Lock, and take a reference to the subchannel index.
+ // We don't need to do the search under a lock as avl's are immutable.
+ gpr_mu_lock(&g_mu);
+ gpr_avl index = gpr_avl_ref(g_subchannel_index);
+ gpr_mu_unlock(&g_mu);
+
+ grpc_subchannel *c =
+ GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find");
+ gpr_avl_unref(index);
+
+ leave_ctx(exec_ctx);
+ return c;
+}
+
+grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key,
+ grpc_subchannel *constructed) {
+ enter_ctx(exec_ctx);
+
+ grpc_subchannel *c = NULL;
+
+ while (c == NULL) {
+ // Compare and swap loop:
+ // - take a reference to the current index
+ gpr_mu_lock(&g_mu);
+ gpr_avl index = gpr_avl_ref(g_subchannel_index);
+ gpr_mu_unlock(&g_mu);
+
+ // - Check to see if a subchannel already exists
+ c = gpr_avl_get(index, key);
+ if (c != NULL) {
+ // yes -> we're done
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register");
+ } else {
+ // no -> update the avl and compare/swap
+ gpr_avl updated =
+ gpr_avl_add(gpr_avl_ref(index), subchannel_key_copy(key),
+ GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"));
+
+ // it may happen (but it's expected to be unlikely)
+ // that some other thread has changed the index:
+ // compare/swap here to check that, and retry as necessary
+ gpr_mu_lock(&g_mu);
+ if (index.root == g_subchannel_index.root) {
+ GPR_SWAP(gpr_avl, updated, g_subchannel_index);
+ c = constructed;
+ }
+ gpr_mu_unlock(&g_mu);
+
+ gpr_avl_unref(updated);
+ }
+ gpr_avl_unref(index);
+ }
+
+ leave_ctx(exec_ctx);
+
+ return c;
+}
+
+void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key,
+ grpc_subchannel *constructed) {
+ enter_ctx(exec_ctx);
+
+ bool done = false;
+ while (!done) {
+ // Compare and swap loop:
+ // - take a reference to the current index
+ gpr_mu_lock(&g_mu);
+ gpr_avl index = gpr_avl_ref(g_subchannel_index);
+ gpr_mu_unlock(&g_mu);
+
+ // Check to see if this key still refers to the previously
+ // registered subchannel
+ grpc_subchannel *c = gpr_avl_get(index, key);
+ if (c != constructed) {
+ gpr_avl_unref(index);
+ break;
+ }
+
+ // compare and swap the update (some other thread may have
+ // mutated the index behind us)
+ gpr_avl updated = gpr_avl_remove(gpr_avl_ref(index), key);
+
+ gpr_mu_lock(&g_mu);
+ if (index.root == g_subchannel_index.root) {
+ GPR_SWAP(gpr_avl, updated, g_subchannel_index);
+ done = true;
+ }
+ gpr_mu_unlock(&g_mu);
+
+ gpr_avl_unref(updated);
+ gpr_avl_unref(index);
+ }
+
+ leave_ctx(exec_ctx);
+}
diff --git a/src/core/ext/client_channel/subchannel_index.h b/src/core/ext/client_channel/subchannel_index.h
new file mode 100644
index 0000000000..a67bd5e219
--- /dev/null
+++ b/src/core/ext/client_channel/subchannel_index.h
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
+
+#include "src/core/ext/client_channel/connector.h"
+#include "src/core/ext/client_channel/subchannel.h"
+
+/** \file Provides an index of active subchannels so that they can be
+ shared amongst channels */
+
+typedef struct grpc_subchannel_key grpc_subchannel_key;
+
+/** Create a key that can be used to uniquely identify a subchannel */
+grpc_subchannel_key *grpc_subchannel_key_create(
+ grpc_connector *con, const grpc_subchannel_args *args);
+
+/** Destroy a subchannel key */
+void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key);
+
+/** Given a subchannel key, find the subchannel registered for it.
+ Returns NULL if no such channel exists.
+ Thread-safe. */
+grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key);
+
+/** Register a subchannel against a key.
+ Takes ownership of \a constructed.
+ Returns the registered subchannel. This may be different from
+ \a constructed in the case of a registration race. */
+grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key,
+ grpc_subchannel *constructed);
+
+/** Remove \a constructed as the registered subchannel for \a key. */
+void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_key *key,
+ grpc_subchannel *constructed);
+
+/** Initialize the subchannel index (global) */
+void grpc_subchannel_index_init(void);
+/** Shutdown the subchannel index (global) */
+void grpc_subchannel_index_shutdown(void);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H */
diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c
new file mode 100644
index 0000000000..bcb6a1dee4
--- /dev/null
+++ b/src/core/ext/client_channel/uri_parser.c
@@ -0,0 +1,310 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/uri_parser.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/support/string.h"
+
+/** a size_t default value... maps to all 1's */
+#define NOT_SET (~(size_t)0)
+
+static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section,
+ int suppress_errors) {
+ char *line_prefix;
+ size_t pfx_len;
+
+ if (!suppress_errors) {
+ gpr_asprintf(&line_prefix, "bad uri.%s: '", section);
+ pfx_len = strlen(line_prefix) + pos;
+ gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text);
+ gpr_free(line_prefix);
+
+ line_prefix = gpr_malloc(pfx_len + 1);
+ memset(line_prefix, ' ', pfx_len);
+ line_prefix[pfx_len] = 0;
+ gpr_log(GPR_ERROR, "%s^ here", line_prefix);
+ gpr_free(line_prefix);
+ }
+
+ return NULL;
+}
+
+/** Returns a copy of \a src[begin, end) */
+static char *copy_component(const char *src, size_t begin, size_t end) {
+ char *out = gpr_malloc(end - begin + 1);
+ memcpy(out, src + begin, end - begin);
+ out[end - begin] = 0;
+ return out;
+}
+
+/** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar
+ * production. If \a uri_text[i] introduces an invalid \a pchar (such as percent
+ * sign not followed by two hex digits), NOT_SET is returned. */
+static size_t parse_pchar(const char *uri_text, size_t i) {
+ /* pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
+ * unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
+ * pct-encoded = "%" HEXDIG HEXDIG
+ * sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
+ / "*" / "+" / "," / ";" / "=" */
+ char c = uri_text[i];
+ if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) ||
+ ((c >= '0') && (c <= '9')) ||
+ (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */
+ (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' ||
+ c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' ||
+ c == '=') /* sub-delims */) {
+ return 1;
+ }
+ if (c == '%') { /* pct-encoded */
+ size_t j;
+ if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) {
+ return NOT_SET;
+ }
+ for (j = i + 1; j < 2; j++) {
+ c = uri_text[j];
+ if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) ||
+ ((c >= 'A') && (c <= 'F')))) {
+ return NOT_SET;
+ }
+ }
+ return 2;
+ }
+ return 0;
+}
+
+/* *( pchar / "?" / "/" ) */
+static int parse_fragment_or_query(const char *uri_text, size_t *i) {
+ char c;
+ while ((c = uri_text[*i]) != 0) {
+ const size_t advance = parse_pchar(uri_text, *i); /* pchar */
+ switch (advance) {
+ case 0: /* uri_text[i] isn't in pchar */
+ /* maybe it's ? or / */
+ if (uri_text[*i] == '?' || uri_text[*i] == '/') {
+ (*i)++;
+ break;
+ } else {
+ return 1;
+ }
+ GPR_UNREACHABLE_CODE(return 0);
+ default:
+ (*i) += advance;
+ break;
+ case NOT_SET: /* uri_text[i] introduces an invalid URI */
+ return 0;
+ }
+ }
+ /* *i is the first uri_text position past the \a query production, maybe \0 */
+ return 1;
+}
+
+static void do_nothing(void *ignored) {}
+static void parse_query_parts(grpc_uri *uri) {
+ static const char *QUERY_PARTS_SEPARATOR = "&";
+ static const char *QUERY_PARTS_VALUE_SEPARATOR = "=";
+ GPR_ASSERT(uri->query != NULL);
+ if (uri->query[0] == '\0') {
+ uri->query_parts = NULL;
+ uri->query_parts_values = NULL;
+ uri->num_query_parts = 0;
+ return;
+ }
+ gpr_slice query_slice =
+ gpr_slice_new(uri->query, strlen(uri->query), do_nothing);
+ gpr_slice_buffer query_parts; /* the &-separated elements of the query */
+ gpr_slice_buffer query_param_parts; /* the =-separated subelements */
+
+ gpr_slice_buffer_init(&query_parts);
+ gpr_slice_buffer_init(&query_param_parts);
+
+ gpr_slice_split(query_slice, QUERY_PARTS_SEPARATOR, &query_parts);
+ uri->query_parts = gpr_malloc(query_parts.count * sizeof(char *));
+ uri->query_parts_values = gpr_malloc(query_parts.count * sizeof(char *));
+ uri->num_query_parts = query_parts.count;
+ for (size_t i = 0; i < query_parts.count; i++) {
+ gpr_slice_split(query_parts.slices[i], QUERY_PARTS_VALUE_SEPARATOR,
+ &query_param_parts);
+ GPR_ASSERT(query_param_parts.count > 0);
+ uri->query_parts[i] =
+ gpr_dump_slice(query_param_parts.slices[0], GPR_DUMP_ASCII);
+ if (query_param_parts.count > 1) {
+ /* TODO(dgq): only the first value after the separator is considered.
+ * Perhaps all chars after the first separator for the query part should
+ * be included, even if they include the separator. */
+ uri->query_parts_values[i] =
+ gpr_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII);
+ } else {
+ uri->query_parts_values[i] = NULL;
+ }
+ gpr_slice_buffer_reset_and_unref(&query_param_parts);
+ }
+ gpr_slice_buffer_destroy(&query_parts);
+ gpr_slice_buffer_destroy(&query_param_parts);
+ gpr_slice_unref(query_slice);
+}
+
+grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
+ grpc_uri *uri;
+ size_t scheme_begin = 0;
+ size_t scheme_end = NOT_SET;
+ size_t authority_begin = NOT_SET;
+ size_t authority_end = NOT_SET;
+ size_t path_begin = NOT_SET;
+ size_t path_end = NOT_SET;
+ size_t query_begin = NOT_SET;
+ size_t query_end = NOT_SET;
+ size_t fragment_begin = NOT_SET;
+ size_t fragment_end = NOT_SET;
+ size_t i;
+
+ for (i = scheme_begin; uri_text[i] != 0; i++) {
+ if (uri_text[i] == ':') {
+ scheme_end = i;
+ break;
+ }
+ if (uri_text[i] >= 'a' && uri_text[i] <= 'z') continue;
+ if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') continue;
+ if (i != scheme_begin) {
+ if (uri_text[i] >= '0' && uri_text[i] <= '9') continue;
+ if (uri_text[i] == '+') continue;
+ if (uri_text[i] == '-') continue;
+ if (uri_text[i] == '.') continue;
+ }
+ break;
+ }
+ if (scheme_end == NOT_SET) {
+ return bad_uri(uri_text, i, "scheme", suppress_errors);
+ }
+
+ if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') {
+ authority_begin = scheme_end + 3;
+ for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET;
+ i++) {
+ if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') {
+ authority_end = i;
+ }
+ }
+ if (authority_end == NOT_SET && uri_text[i] == 0) {
+ authority_end = i;
+ }
+ if (authority_end == NOT_SET) {
+ return bad_uri(uri_text, i, "authority", suppress_errors);
+ }
+ /* TODO(ctiller): parse the authority correctly */
+ path_begin = authority_end;
+ } else {
+ path_begin = scheme_end + 1;
+ }
+
+ for (i = path_begin; uri_text[i] != 0; i++) {
+ if (uri_text[i] == '?' || uri_text[i] == '#') {
+ path_end = i;
+ break;
+ }
+ }
+ if (path_end == NOT_SET && uri_text[i] == 0) {
+ path_end = i;
+ }
+ if (path_end == NOT_SET) {
+ return bad_uri(uri_text, i, "path", suppress_errors);
+ }
+
+ if (uri_text[i] == '?') {
+ query_begin = ++i;
+ if (!parse_fragment_or_query(uri_text, &i)) {
+ return bad_uri(uri_text, i, "query", suppress_errors);
+ } else if (uri_text[i] != 0 && uri_text[i] != '#') {
+ /* We must be at the end or at the beginning of a fragment */
+ return bad_uri(uri_text, i, "query", suppress_errors);
+ }
+ query_end = i;
+ }
+ if (uri_text[i] == '#') {
+ fragment_begin = ++i;
+ if (!parse_fragment_or_query(uri_text, &i)) {
+ return bad_uri(uri_text, i - fragment_end, "fragment", suppress_errors);
+ } else if (uri_text[i] != 0) {
+ /* We must be at the end */
+ return bad_uri(uri_text, i, "fragment", suppress_errors);
+ }
+ fragment_end = i;
+ }
+
+ uri = gpr_malloc(sizeof(*uri));
+ memset(uri, 0, sizeof(*uri));
+ uri->scheme = copy_component(uri_text, scheme_begin, scheme_end);
+ uri->authority = copy_component(uri_text, authority_begin, authority_end);
+ uri->path = copy_component(uri_text, path_begin, path_end);
+ uri->query = copy_component(uri_text, query_begin, query_end);
+ uri->fragment = copy_component(uri_text, fragment_begin, fragment_end);
+ parse_query_parts(uri);
+
+ return uri;
+}
+
+const char *grpc_uri_get_query_arg(const grpc_uri *uri, const char *key) {
+ GPR_ASSERT(key != NULL);
+ if (key[0] == '\0') return NULL;
+
+ for (size_t i = 0; i < uri->num_query_parts; ++i) {
+ if (0 == strcmp(key, uri->query_parts[i])) {
+ return uri->query_parts_values[i];
+ }
+ }
+ return NULL;
+}
+
+void grpc_uri_destroy(grpc_uri *uri) {
+ if (!uri) return;
+ gpr_free(uri->scheme);
+ gpr_free(uri->authority);
+ gpr_free(uri->path);
+ gpr_free(uri->query);
+ for (size_t i = 0; i < uri->num_query_parts; ++i) {
+ gpr_free(uri->query_parts[i]);
+ gpr_free(uri->query_parts_values[i]);
+ }
+ gpr_free(uri->query_parts);
+ gpr_free(uri->query_parts_values);
+ gpr_free(uri->fragment);
+ gpr_free(uri);
+}
diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/client_channel/uri_parser.h
new file mode 100644
index 0000000000..5fe0e8f35e
--- /dev/null
+++ b/src/core/ext/client_channel/uri_parser.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H
+
+#include <stddef.h>
+
+typedef struct {
+ char *scheme;
+ char *authority;
+ char *path;
+ char *query;
+ /** Query substrings separated by '&' */
+ char **query_parts;
+ /** Number of elements in \a query_parts and \a query_parts_values */
+ size_t num_query_parts;
+ /** Split each query part by '='. NULL if not present. */
+ char **query_parts_values;
+ char *fragment;
+} grpc_uri;
+
+/** parse a uri, return NULL on failure */
+grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors);
+
+/** return the part of a query string after the '=' in "?key=xxx&...", or NULL
+ * if key is not present */
+const char *grpc_uri_get_query_arg(const grpc_uri *uri, const char *key);
+
+/** destroy a uri */
+void grpc_uri_destroy(grpc_uri *uri);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H */