aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/client_channel.c166
-rw-r--r--src/core/channel/connectivity_state.c92
-rw-r--r--src/core/channel/connectivity_state.h66
-rw-r--r--src/core/client_config/lb_policies/pick_first.c44
-rw-r--r--src/core/client_config/lb_policy.h10
-rw-r--r--src/core/client_config/subchannel.c58
-rw-r--r--src/core/client_config/subchannel.h15
7 files changed, 235 insertions, 216 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index dc838de715..4d082aceb8 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -426,88 +426,6 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
}
-#if 0
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- channel_data *chand = elem->channel_data;
- grpc_child_channel *child_channel;
- grpc_channel_op rop;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
-
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- /* sending goaway: clear out the active child on the way through */
- gpr_mu_lock(&chand->mu);
- child_channel = chand->active_child;
- chand->active_child = NULL;
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_handle_op(child_channel, op);
- grpc_child_channel_destroy(child_channel, 1);
- } else {
- gpr_slice_unref(op->data.goaway.message);
- }
- break;
- case GRPC_CHANNEL_DISCONNECT:
- /* sending disconnect: clear out the active child on the way through */
- gpr_mu_lock(&chand->mu);
- child_channel = chand->active_child;
- chand->active_child = NULL;
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 1);
- }
- /* fake a transport closed to satisfy the refcounting in client */
- rop.type = GRPC_TRANSPORT_CLOSED;
- rop.dir = GRPC_CALL_UP;
- grpc_channel_next_op(elem, &rop);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- /* receiving goaway: if it's from our active child, drop the active child;
- in all cases consume the event here */
- gpr_mu_lock(&chand->mu);
- child_channel = grpc_channel_stack_from_top_element(from_elem);
- if (child_channel == chand->active_child) {
- chand->active_child = NULL;
- } else {
- child_channel = NULL;
- }
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
- }
- gpr_slice_unref(op->data.goaway.message);
- break;
- case GRPC_TRANSPORT_CLOSED:
- /* receiving disconnect: if it's from our active child, drop the active
- child; in all cases consume the event here */
- gpr_mu_lock(&chand->mu);
- child_channel = grpc_channel_stack_from_top_element(from_elem);
- if (child_channel == chand->active_child) {
- chand->active_child = NULL;
- } else {
- child_channel = NULL;
- }
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
- }
- break;
- default:
- switch (op->dir) {
- case GRPC_CALL_UP:
- grpc_channel_next_op(elem, op);
- break;
- case GRPC_CALL_DOWN:
- gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
- abort();
- break;
- }
- break;
- }
-}
-#endif
-
static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {}
/* Constructor for call_data */
@@ -591,90 +509,6 @@ const grpc_channel_filter grpc_client_channel_filter = {
init_channel_elem, destroy_channel_elem, "client-channel",
};
-#if 0
-grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
- grpc_channel_stack *channel_stack, grpc_transport *transport,
- grpc_channel_filter const **channel_filters, size_t num_channel_filters,
- grpc_mdctx *mdctx) {
- /* we just got a new transport: lets create a child channel stack for it */
- grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
- channel_data *chand = elem->channel_data;
- size_t num_child_filters = 2 + num_channel_filters;
- grpc_channel_filter const **child_filters;
- grpc_transport_setup_result result;
- grpc_child_channel *old_active = NULL;
- call_data **waiting_children;
- size_t waiting_child_count;
- size_t i;
- grpc_transport_stream_op *call_ops;
-
- /* build the child filter stack */
- child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
- /* we always need a link back filter to get back to the connected channel */
- child_filters[0] = &grpc_child_channel_top_filter;
- for (i = 0; i < num_channel_filters; i++) {
- child_filters[i + 1] = channel_filters[i];
- }
- /* and we always need a connected channel to talk to the transport */
- child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
-
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
-
- /* BEGIN LOCKING CHANNEL */
- gpr_mu_lock(&chand->mu);
- chand->transport_setup_initiated = 0;
-
- if (chand->active_child) {
- old_active = chand->active_child;
- }
- chand->active_child = grpc_child_channel_create(
- elem, child_filters, num_child_filters, chand->args, mdctx);
- result =
- grpc_connected_channel_bind_transport(chand->active_child, transport);
-
- /* capture the waiting children - we'll activate them outside the lock
- to avoid re-entrancy problems */
- waiting_children = chand->waiting_children;
- waiting_child_count = chand->waiting_child_count;
- /* bumping up inflight_requests here avoids taking a lock per rpc below */
-
- chand->waiting_children = NULL;
- chand->waiting_child_count = 0;
- chand->waiting_child_capacity = 0;
-
- call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
-
- for (i = 0; i < waiting_child_count; i++) {
- call_ops[i] = waiting_children[i]->waiting_op;
- if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
- waiting_children[i] = NULL;
- grpc_transport_stream_op_finish_with_failure(&call_ops[i]);
- }
- }
-
- /* END LOCKING CHANNEL */
- gpr_mu_unlock(&chand->mu);
-
- /* activate any pending operations - this is safe to do as we guarantee one
- and only one write operation per request at the surface api - if we lose
- that guarantee we need to do some curly locking here */
- for (i = 0; i < waiting_child_count; i++) {
- if (waiting_children[i]) {
- complete_activate(waiting_children[i]->elem, &call_ops[i]);
- }
- }
- gpr_free(waiting_children);
- gpr_free(call_ops);
- gpr_free(child_filters);
-
- if (old_active) {
- grpc_child_channel_destroy(old_active, 1);
- }
-
- return result;
-}
-#endif
-
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver) {
/* post construction initialization: set the transport setup pointer */
diff --git a/src/core/channel/connectivity_state.c b/src/core/channel/connectivity_state.c
new file mode 100644
index 0000000000..566a2c3344
--- /dev/null
+++ b/src/core/channel/connectivity_state.c
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/connectivity_state.h"
+#include <grpc/support/alloc.h>
+
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state) {
+ tracker->current_state = init_state;
+ tracker->watchers = NULL;
+}
+
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
+ grpc_connectivity_state_watcher *w;
+ while ((w = tracker->watchers)) {
+ tracker->watchers = w->next;
+
+ if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
+ *w->current = GRPC_CHANNEL_FATAL_FAILURE;
+ grpc_iomgr_add_callback(w->notify);
+ } else {
+ grpc_iomgr_add_delayed_callback(w->notify, 0);
+ }
+ gpr_free(w);
+ }
+}
+
+grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker) {
+ return tracker->current_state;
+}
+
+int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify) {
+ if (tracker->current_state != *current) {
+ *current = tracker->current_state;
+ grpc_iomgr_add_callback(notify);
+ } else {
+ grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
+ w->current = current;
+ w->notify = notify;
+ w->next = tracker->watchers;
+ tracker->watchers = w;
+ }
+ return tracker->current_state == GRPC_CHANNEL_IDLE;
+}
+
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) {
+ grpc_connectivity_state_watcher *new = NULL;
+ grpc_connectivity_state_watcher *w;
+ tracker->current_state = state;
+ while ((w = tracker->watchers)) {
+ tracker->watchers = w->next;
+
+ if (state != *w->current) {
+ *w->current = state;
+ grpc_iomgr_add_callback(w->notify);
+ gpr_free(w);
+ } else {
+ w->next = new;
+ new = w;
+ }
+ }
+ tracker->watchers = new;
+}
diff --git a/src/core/channel/connectivity_state.h b/src/core/channel/connectivity_state.h
new file mode 100644
index 0000000000..ebc1acc559
--- /dev/null
+++ b/src/core/channel/connectivity_state.h
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H
+#define GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H
+
+#include <grpc/grpc.h>
+#include "src/core/iomgr/iomgr.h"
+
+typedef struct grpc_connectivity_state_watcher {
+ /** we keep watchers in a linked list */
+ struct grpc_connectivity_state_watcher *next;
+ /** closure to notify on change */
+ grpc_iomgr_closure *notify;
+ /** the current state as believed by the watcher */
+ grpc_connectivity_state *current;
+} grpc_connectivity_state_watcher;
+
+typedef struct {
+ /** current connectivity state */
+ grpc_connectivity_state current_state;
+ /** all our watchers */
+ grpc_connectivity_state_watcher *watchers;
+} grpc_connectivity_state_tracker;
+
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state);
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
+
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state);
+
+grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker);
+
+/** Return 1 if the channel should start connecting, 0 otherwise */
+int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify);
+
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H */
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index f3a6d21eb5..4fda07a63f 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -36,6 +36,7 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include "src/core/channel/connectivity_state.h"
typedef struct pending_pick {
struct pending_pick *next;
@@ -69,6 +70,9 @@ typedef struct {
grpc_connectivity_state checking_connectivity;
/** list of picks that are waiting on connectivity */
pending_pick *pending_picks;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
void pf_ref(grpc_lb_policy *pol) {
@@ -184,8 +188,46 @@ loop:
}
}
+static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ size_t i;
+ size_t n;
+ grpc_subchannel **subchannels;
+
+ gpr_mu_lock(&p->mu);
+ n = p->num_subchannels;
+ subchannels = gpr_malloc(n * sizeof(*subchannels));
+ for (i = 0; i < n; i++) {
+ subchannels[i] = p->subchannels[i];
+ grpc_subchannel_ref(subchannels[i]);
+ }
+ gpr_mu_unlock(&p->mu);
+
+ for (i = 0; i < n; i++) {
+ grpc_subchannel_process_transport_op(subchannels[i], op);
+ grpc_subchannel_unref(subchannels[i]);
+ }
+ gpr_free(subchannels);
+}
+
+static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ grpc_connectivity_state st;
+ gpr_mu_lock(&p->mu);
+ st = grpc_connectivity_state_check(&p->state_tracker);
+ gpr_mu_unlock(&p->mu);
+ return st;
+}
+
+static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ gpr_mu_lock(&p->mu);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify);
+ gpr_mu_unlock(&p->mu);
+}
+
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_ref, pf_unref, pf_shutdown, pf_pick};
+ pf_ref, pf_unref, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) {
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 42929e933b..42be9152cb 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -58,6 +58,16 @@ struct grpc_lb_policy_vtable {
void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete);
+
+ /** broadcast a transport op to all subchannels */
+ void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
+
+ /** check the current connectivity of the lb_policy */
+ grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy);
+
+ /** call notify when the connectivity state of a channel changes from *state.
+ Updates *state with the new state of the policy */
+ void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure);
};
void grpc_lb_policy_ref(grpc_lb_policy *policy);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index b489ce04a5..5cbb5d9971 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -39,6 +39,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/connectivity_state.h"
typedef struct {
gpr_refcount refs;
@@ -52,12 +53,6 @@ typedef struct waiting_for_connect {
grpc_subchannel_call **target;
} waiting_for_connect;
-typedef struct connectivity_state_watcher {
- struct connectivity_state_watcher *next;
- grpc_iomgr_closure *notify;
- grpc_connectivity_state *current;
-} connectivity_state_watcher;
-
struct grpc_subchannel {
gpr_refcount refs;
grpc_connector *connector;
@@ -92,8 +87,8 @@ struct grpc_subchannel {
int connecting;
/** things waiting for a connection */
waiting_for_connect *waiting;
- /** things watching the connectivity state */
- connectivity_state_watcher *watchers;
+ /** connectivity state tracking */
+ grpc_connectivity_state_tracker state_tracker;
};
struct grpc_subchannel_call {
@@ -123,6 +118,7 @@ void grpc_subchannel_unref(grpc_subchannel *c) {
gpr_free(c->addr);
grpc_mdctx_unref(c->mdctx);
grpc_pollset_set_destroy(&c->pollset_set);
+ grpc_connectivity_state_destroy(&c->state_tracker);
gpr_free(c);
}
}
@@ -156,6 +152,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_mdctx_ref(c->mdctx);
grpc_pollset_set_init(&c->pollset_set);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
+ grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
gpr_mu_init(&c->mu);
return c;
}
@@ -210,7 +207,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
- state = compute_connectivity_locked(c);
+ state = grpc_connectivity_state_check(&c->state_tracker);
gpr_mu_unlock(&c->mu);
return state;
}
@@ -218,35 +215,24 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify) {
- grpc_connectivity_state current;
int do_connect = 0;
- connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
- w->current = state;
- w->notify = notify;
gpr_mu_lock(&c->mu);
- current = compute_connectivity_locked(c);
- if (current == GRPC_CHANNEL_IDLE) {
- current = GRPC_CHANNEL_CONNECTING;
+ if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
+ do_connect = 1;
c->connecting = 1;
- do_connect = 1;
grpc_subchannel_ref(c);
- connectivity_state_changed_locked(c);
- }
- if (current != *state) {
- gpr_mu_unlock(&c->mu);
- *state = current;
- grpc_iomgr_add_callback(notify);
- gpr_free(w);
- } else {
- w->next = c->watchers;
- c->watchers = w;
- gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
}
+ gpr_mu_unlock(&c->mu);
if (do_connect) {
start_connect(c);
}
}
+void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) {
+ abort();
+}
+
static void publish_transport(grpc_subchannel *c) {
size_t channel_stack_size;
connection *con;
@@ -311,21 +297,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
static void connectivity_state_changed_locked(grpc_subchannel *c) {
grpc_connectivity_state current = compute_connectivity_locked(c);
- connectivity_state_watcher *new = NULL;
- connectivity_state_watcher *w;
- while ((w = c->watchers)) {
- c->watchers = w->next;
-
- if (current != *w->current) {
- *w->current = current;
- grpc_iomgr_add_callback(w->notify);
- gpr_free(w);
- } else {
- w->next = new;
- new = w;
- }
- }
- c->watchers = new;
+ grpc_connectivity_state_set(&c->state_tracker, current);
}
/*
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 8836f9b09c..60b95d3d8f 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -46,6 +46,15 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel);
+/** construct a call (possibly asynchronously) */
+void grpc_subchannel_create_call(grpc_subchannel *subchannel,
+ grpc_transport_stream_op *initial_op,
+ grpc_subchannel_call **target,
+ grpc_iomgr_closure *notify);
+
+/** process a transport level op */
+void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op);
+
void grpc_subchannel_call_ref(grpc_subchannel_call *call);
void grpc_subchannel_call_unref(grpc_subchannel_call *call);
@@ -62,12 +71,6 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
-/** construct a call (possibly asynchronously) */
-void grpc_subchannel_create_call(grpc_subchannel *subchannel,
- grpc_transport_stream_op *initial_op,
- grpc_subchannel_call **target,
- grpc_iomgr_closure *notify);
-
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,
grpc_transport_stream_op *op);