aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/subchannel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc814
1 files changed, 814 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
new file mode 100644
index 0000000000..bff5001d69
--- /dev/null
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -0,0 +1,814 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/ext/filters/client_channel/subchannel.h"
+
+#include <inttypes.h>
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/avl.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/backoff.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+#define INTERNAL_REF_BITS 16
+#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
+
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20
+#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
+
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \
+ &(subchannel)->connected_subchannel)))
+
+typedef struct {
+ grpc_closure closure;
+ grpc_subchannel *subchannel;
+ grpc_connectivity_state connectivity_state;
+} state_watcher;
+
+typedef struct external_state_watcher {
+ grpc_subchannel *subchannel;
+ grpc_pollset_set *pollset_set;
+ grpc_closure *notify;
+ grpc_closure closure;
+ struct external_state_watcher *next;
+ struct external_state_watcher *prev;
+} external_state_watcher;
+
+struct grpc_subchannel {
+ grpc_connector *connector;
+
+ /** refcount
+ - lower INTERNAL_REF_BITS bits are for internal references:
+ these do not keep the subchannel open.
+ - upper remaining bits are for public references: these do
+ keep the subchannel open */
+ gpr_atm ref_pair;
+
+ /** non-transport related channel filters */
+ const grpc_channel_filter **filters;
+ size_t num_filters;
+ /** channel arguments */
+ grpc_channel_args *args;
+
+ grpc_subchannel_key *key;
+
+ /** set during connection */
+ grpc_connect_out_args connecting_result;
+
+ /** callback for connection finishing */
+ grpc_closure connected;
+
+ /** callback for our alarm */
+ grpc_closure on_alarm;
+
+ /** pollset_set tracking who's interested in a connection
+ being setup */
+ grpc_pollset_set *pollset_set;
+
+ /** active connection, or null; of type grpc_connected_subchannel */
+ gpr_atm connected_subchannel;
+
+ /** mutex protecting remaining elements */
+ gpr_mu mu;
+
+ /** have we seen a disconnection? */
+ bool disconnected;
+ /** are we connecting */
+ bool connecting;
+ /** connectivity state tracking */
+ grpc_connectivity_state_tracker state_tracker;
+
+ external_state_watcher root_external_state_watcher;
+
+ /** next connect attempt time */
+ gpr_timespec next_attempt;
+ /** backoff state */
+ gpr_backoff backoff_state;
+ /** do we have an active alarm? */
+ bool have_alarm;
+ /** have we started the backoff loop */
+ bool backoff_begun;
+ /** our alarm */
+ grpc_timer alarm;
+};
+
+struct grpc_subchannel_call {
+ grpc_connected_subchannel *connection;
+ grpc_closure *schedule_closure_after_destroy;
+};
+
+#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
+#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con))
+#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
+ (((grpc_subchannel_call *)(callstack)) - 1)
+
+static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
+ grpc_error *error);
+
+#ifndef NDEBUG
+#define REF_REASON reason
+#define REF_MUTATE_EXTRA_ARGS \
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
+#define REF_MUTATE_PURPOSE(x) , file, line, reason, x
+#else
+#define REF_REASON ""
+#define REF_MUTATE_EXTRA_ARGS
+#define REF_MUTATE_PURPOSE(x)
+#endif
+
+/*
+ * connection implementation
+ */
+
+static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_connected_subchannel *c = (grpc_connected_subchannel *)arg;
+ grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
+ gpr_free(c);
+}
+
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
+ grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+ return c;
+}
+
+void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c),
+ REF_REASON);
+}
+
+/*
+ * grpc_subchannel implementation
+ */
+
+static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_subchannel *c = (grpc_subchannel *)arg;
+ gpr_free((void *)c->filters);
+ grpc_channel_args_destroy(exec_ctx, c->args);
+ grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
+ grpc_connector_unref(exec_ctx, c->connector);
+ grpc_pollset_set_destroy(exec_ctx, c->pollset_set);
+ grpc_subchannel_key_destroy(exec_ctx, c->key);
+ gpr_mu_destroy(&c->mu);
+ gpr_free(c);
+}
+
+static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
+ int barrier REF_MUTATE_EXTRA_ARGS) {
+ gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
+ : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
+#ifndef NDEBUG
+ if (GRPC_TRACER_ON(grpc_trace_stream_refcount)) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c,
+ purpose, old_val, old_val + delta, reason);
+ }
+#endif
+ return old_val;
+}
+
+grpc_subchannel *grpc_subchannel_ref(
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
+ 0 REF_MUTATE_PURPOSE("STRONG_REF"));
+ GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
+ return c;
+}
+
+grpc_subchannel *grpc_subchannel_weak_ref(
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
+ GPR_ASSERT(old_refs != 0);
+ return c;
+}
+
+grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ if (!c) return NULL;
+ for (;;) {
+ gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
+ if (old_refs >= (1 << INTERNAL_REF_BITS)) {
+ gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
+ if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) {
+ return c;
+ }
+ } else {
+ return NULL;
+ }
+ }
+}
+
+static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+ grpc_connected_subchannel *con;
+ grpc_subchannel_index_unregister(exec_ctx, c->key, c);
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(!c->disconnected);
+ c->disconnected = true;
+ grpc_connector_shutdown(
+ exec_ctx, c->connector,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Subchannel disconnected"));
+ con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
+ if (con != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
+ gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
+ }
+ gpr_mu_unlock(&c->mu);
+}
+
+void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ // add a weak ref and subtract a strong ref (atomically)
+ old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS),
+ 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
+ if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
+ disconnect(exec_ctx, c);
+ }
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
+}
+
+void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
+ if (old_refs == 1) {
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(subchannel_destroy, c,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
+ }
+}
+
+grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
+ grpc_connector *connector,
+ const grpc_subchannel_args *args) {
+ grpc_subchannel_key *key = grpc_subchannel_key_create(args);
+ grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
+ if (c) {
+ grpc_subchannel_key_destroy(exec_ctx, key);
+ return c;
+ }
+
+ GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx);
+ c = (grpc_subchannel *)gpr_zalloc(sizeof(*c));
+ c->key = key;
+ gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
+ c->connector = connector;
+ grpc_connector_ref(c->connector);
+ c->num_filters = args->filter_count;
+ if (c->num_filters > 0) {
+ c->filters = (const grpc_channel_filter **)gpr_malloc(
+ sizeof(grpc_channel_filter *) * c->num_filters);
+ memcpy((void *)c->filters, args->filters,
+ sizeof(grpc_channel_filter *) * c->num_filters);
+ } else {
+ c->filters = NULL;
+ }
+ c->pollset_set = grpc_pollset_set_create();
+ grpc_resolved_address *addr =
+ (grpc_resolved_address *)gpr_malloc(sizeof(*addr));
+ grpc_get_subchannel_address_arg(exec_ctx, args->args, addr);
+ grpc_resolved_address *new_address = NULL;
+ grpc_channel_args *new_args = NULL;
+ if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address,
+ &new_args)) {
+ GPR_ASSERT(new_address != NULL);
+ gpr_free(addr);
+ addr = new_address;
+ }
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
+ gpr_free(addr);
+ c->args = grpc_channel_args_copy_and_add_and_remove(
+ new_args != NULL ? new_args : args->args, keys_to_remove,
+ GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
+ gpr_free(new_arg.value.string);
+ if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
+ c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
+ &c->root_external_state_watcher;
+ GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c,
+ grpc_schedule_on_exec_ctx);
+ grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
+ "subchannel");
+ int initial_backoff_ms =
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
+ int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ bool fixed_reconnect_backoff = false;
+ if (c->args) {
+ for (size_t i = 0; i < c->args->num_args; i++) {
+ if (0 == strcmp(c->args->args[i].key,
+ "grpc.testing.fixed_reconnect_backoff_ms")) {
+ fixed_reconnect_backoff = true;
+ initial_backoff_ms = min_backoff_ms = max_backoff_ms =
+ grpc_channel_arg_get_integer(&c->args->args[i],
+ {initial_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ min_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i], {min_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ max_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i], {max_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ initial_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i], {initial_backoff_ms, 100, INT_MAX});
+ }
+ }
+ }
+ gpr_backoff_init(
+ &c->backoff_state, initial_backoff_ms,
+ fixed_reconnect_backoff ? 1.0
+ : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ min_backoff_ms, max_backoff_ms);
+ gpr_mu_init(&c->mu);
+
+ return grpc_subchannel_index_register(exec_ctx, key, c);
+}
+
+static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ grpc_connect_in_args args;
+
+ args.interested_parties = c->pollset_set;
+ args.deadline = c->next_attempt;
+ args.channel_args = c->args;
+
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "state_change");
+ grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
+ &c->connected);
+}
+
+grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
+ grpc_error **error) {
+ grpc_connectivity_state state;
+ gpr_mu_lock(&c->mu);
+ state = grpc_connectivity_state_get(&c->state_tracker, error);
+ gpr_mu_unlock(&c->mu);
+ return state;
+}
+
+static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ external_state_watcher *w = (external_state_watcher *)arg;
+ grpc_closure *follow_up = w->notify;
+ if (w->pollset_set != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
+ w->pollset_set);
+ }
+ gpr_mu_lock(&w->subchannel->mu);
+ w->next->prev = w->prev;
+ w->prev->next = w->next;
+ gpr_mu_unlock(&w->subchannel->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
+ gpr_free(w);
+ GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
+}
+
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_subchannel *c = (grpc_subchannel *)arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = false;
+ if (c->disconnected) {
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
+ &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ if (c->disconnected) {
+ /* Don't try to connect if we're already disconnected */
+ return;
+ }
+
+ if (c->connecting) {
+ /* Already connecting: don't restart */
+ return;
+ }
+
+ if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) {
+ /* Already connected: don't restart */
+ return;
+ }
+
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ /* Nobody is interested in connecting: so don't just yet */
+ return;
+ }
+
+ c->connecting = true;
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!c->backoff_begun) {
+ c->backoff_begun = true;
+ c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);
+ continue_connect_locked(exec_ctx, c);
+ } else {
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = true;
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
+ GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now);
+ }
+}
+
+void grpc_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *notify) {
+ external_state_watcher *w;
+
+ if (state == NULL) {
+ gpr_mu_lock(&c->mu);
+ for (w = c->root_external_state_watcher.next;
+ w != &c->root_external_state_watcher; w = w->next) {
+ if (w->notify == notify) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &c->state_tracker, NULL, &w->closure);
+ }
+ }
+ gpr_mu_unlock(&c->mu);
+ } else {
+ w = (external_state_watcher *)gpr_malloc(sizeof(*w));
+ w->subchannel = c;
+ w->pollset_set = interested_parties;
+ w->notify = notify;
+ GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w,
+ grpc_schedule_on_exec_ctx);
+ if (interested_parties != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
+ interested_parties);
+ }
+ GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
+ gpr_mu_lock(&c->mu);
+ w->next = &c->root_external_state_watcher;
+ w->prev = w->next->prev;
+ w->next->prev = w->prev->next = w;
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker,
+ state, &w->closure);
+ maybe_start_connecting_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ }
+}
+
+void grpc_connected_subchannel_process_transport_op(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_transport_op *op) {
+ grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
+ top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
+}
+
+static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
+ state_watcher *sw = (state_watcher *)p;
+ grpc_subchannel *c = sw->subchannel;
+ gpr_mu *mu = &c->mu;
+
+ gpr_mu_lock(mu);
+
+ /* if we failed just leave this closure */
+ if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* any errors on a subchannel ==> we're done, create a new one */
+ sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ }
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ sw->connectivity_state, GRPC_ERROR_REF(error),
+ "reflect_child");
+ if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
+ &sw->connectivity_state, &sw->closure);
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ sw = NULL;
+ }
+
+ gpr_mu_unlock(mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher");
+ gpr_free(sw);
+}
+
+static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
+ grpc_pollset_set *interested_parties,
+ grpc_connectivity_state *state,
+ grpc_closure *closure) {
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_channel_element *elem;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+}
+
+void grpc_connected_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *closure) {
+ connected_subchannel_state_op(exec_ctx, con, interested_parties, state,
+ closure);
+}
+
+void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
+ grpc_closure *closure) {
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_channel_element *elem;
+ op->send_ping = closure;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+}
+
+static bool publish_transport_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ grpc_connected_subchannel *con;
+ grpc_channel_stack *stk;
+ state_watcher *sw_subchannel;
+
+ /* construct channel stack */
+ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_channel_stack_builder_set_channel_arguments(
+ exec_ctx, builder, c->connecting_result.channel_args);
+ grpc_channel_stack_builder_set_transport(builder,
+ c->connecting_result.transport);
+
+ if (!grpc_channel_init_create_stack(exec_ctx, builder,
+ GRPC_CLIENT_SUBCHANNEL)) {
+ grpc_channel_stack_builder_destroy(exec_ctx, builder);
+ return false;
+ }
+ grpc_error *error = grpc_channel_stack_builder_finish(
+ exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_destroy(exec_ctx, c->connecting_result.transport);
+ gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
+ grpc_error_string(error));
+ GRPC_ERROR_UNREF(error);
+ return false;
+ }
+ stk = CHANNEL_STACK_FROM_CONNECTION(con);
+ memset(&c->connecting_result, 0, sizeof(c->connecting_result));
+
+ /* initialize state watcher */
+ sw_subchannel = (state_watcher *)gpr_malloc(sizeof(*sw_subchannel));
+ sw_subchannel->subchannel = c;
+ sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
+ GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed,
+ sw_subchannel, grpc_schedule_on_exec_ctx);
+
+ if (c->disconnected) {
+ gpr_free(sw_subchannel);
+ grpc_channel_stack_destroy(exec_ctx, stk);
+ gpr_free(con);
+ return false;
+ }
+
+ /* publish */
+ /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+ I'd have expected the rel_cas below to be enough, but
+ seemingly it's not.
+ Re-evaluate if we really need this. */
+ gpr_atm_full_barrier();
+ GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+
+ /* setup subchannel watching connected subchannel for changes; subchannel
+ ref for connecting is donated to the state watcher */
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state,
+ &sw_subchannel->closure);
+
+ /* signal completion */
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "connected");
+ return true;
+}
+
+static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_subchannel *c = (grpc_subchannel *)arg;
+ grpc_channel_args *delete_channel_args = c->connecting_result.channel_args;
+
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
+ gpr_mu_lock(&c->mu);
+ c->connecting = false;
+ if (c->connecting_result.transport != NULL &&
+ publish_transport_locked(exec_ctx, c)) {
+ /* do nothing, transport was published */
+ } else if (c->disconnected) {
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ } else {
+ grpc_connectivity_state_set(
+ exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Connect Failed", &error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
+ "connect_failed");
+
+ const char *errmsg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
+
+ maybe_start_connecting_locked(exec_ctx, c);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
+ grpc_channel_args_destroy(exec_ctx, delete_channel_args);
+}
+
+/*
+ * grpc_subchannel_call implementation
+ */
+
+static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_error *error) {
+ grpc_subchannel_call *c = (grpc_subchannel_call *)call;
+ GPR_ASSERT(c->schedule_closure_after_destroy != NULL);
+ GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
+ grpc_connected_subchannel *connection = c->connection;
+ grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL,
+ c->schedule_closure_after_destroy);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
+ GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
+}
+
+void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call,
+ grpc_closure *closure) {
+ GPR_ASSERT(call->schedule_closure_after_destroy == NULL);
+ GPR_ASSERT(closure != NULL);
+ call->schedule_closure_after_destroy = closure;
+}
+
+void grpc_subchannel_call_ref(
+ grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
+}
+
+void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
+}
+
+void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *call,
+ grpc_transport_stream_op_batch *batch) {
+ GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
+ grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+ grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
+ top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, batch);
+ GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
+}
+
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
+ grpc_subchannel *c) {
+ return GET_CONNECTED_SUBCHANNEL(c, acq);
+}
+
+const grpc_subchannel_key *grpc_subchannel_get_key(
+ const grpc_subchannel *subchannel) {
+ return subchannel->key;
+}
+
+grpc_error *grpc_connected_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ const grpc_connected_subchannel_call_args *args,
+ grpc_subchannel_call **call) {
+ grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
+ *call = (grpc_subchannel_call *)gpr_arena_alloc(
+ args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
+ const grpc_call_element_args call_args = {
+ callstk, /* call_stack */
+ NULL, /* server_transport_data */
+ args->context, /* context */
+ args->path, /* path */
+ args->start_time, /* start_time */
+ args->deadline, /* deadline */
+ args->arena, /* arena */
+ args->call_combiner /* call_combiner */
+ };
+ grpc_error *error = grpc_call_stack_init(
+ exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args);
+ if (error != GRPC_ERROR_NONE) {
+ const char *error_string = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error: %s", error_string);
+ return error;
+ }
+ grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent);
+ return GRPC_ERROR_NONE;
+}
+
+grpc_call_stack *grpc_subchannel_call_get_call_stack(
+ grpc_subchannel_call *subchannel_call) {
+ return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
+}
+
+static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str,
+ grpc_resolved_address *addr) {
+ grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */);
+ GPR_ASSERT(uri != NULL);
+ if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr));
+ grpc_uri_destroy(uri);
+}
+
+void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx,
+ const grpc_channel_args *args,
+ grpc_resolved_address *addr) {
+ const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
+ memset(addr, 0, sizeof(*addr));
+ if (*addr_uri_str != '\0') {
+ grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr);
+ }
+}
+
+const char *grpc_get_subchannel_address_uri_arg(const grpc_channel_args *args) {
+ const grpc_arg *addr_arg =
+ grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
+ GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy.
+ GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING);
+ return addr_arg->value.string;
+}
+
+grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) {
+ return grpc_channel_arg_string_create(
+ (char *)GRPC_ARG_SUBCHANNEL_ADDRESS,
+ addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
+}