aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c641
1 files changed, 641 insertions, 0 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
new file mode 100644
index 0000000000..90563683d5
--- /dev/null
+++ b/src/core/channel/client_channel.c
@@ -0,0 +1,641 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/client_channel.h"
+
+#include <stdio.h>
+
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/metadata_buffer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+/* Link back filter: passes up calls to the client channel, pushes down calls
+ down */
+
+typedef struct { grpc_channel_element *back; } lb_channel_data;
+
+typedef struct { grpc_call_element *back; } lb_call_data;
+
+static void lb_call_op(grpc_call_element *elem, grpc_call_op *op) {
+ lb_call_data *calld = elem->call_data;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ calld->back->filter->call_op(calld->back, op);
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+/* Currently we assume all channel operations should just be pushed up. */
+static void lb_channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ lb_channel_data *chand = elem->channel_data;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ chand->back->filter->channel_op(chand->back, op);
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void lb_init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {}
+
+/* Destructor for call_data */
+static void lb_destroy_call_elem(grpc_call_element *elem) {}
+
+/* Constructor for channel_data */
+static void lb_init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ GPR_ASSERT(is_first);
+ GPR_ASSERT(!is_last);
+}
+
+/* Destructor for channel_data */
+static void lb_destroy_channel_elem(grpc_channel_element *elem) {}
+
+static const grpc_channel_filter link_back_filter = {
+ lb_call_op, lb_channel_op,
+
+ sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
+
+ sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
+
+ "clientchannel.linkback",
+};
+
+/* Client channel implementation */
+
+typedef struct {
+ size_t inflight_requests;
+ grpc_channel_stack *channel_stack;
+} child_entry;
+
+typedef struct call_data call_data;
+
+typedef struct {
+ /* protects children, child_count, child_capacity, active_child,
+ transport_setup_initiated
+ does not protect channel stacks held by children
+ transport_setup is assumed to be set once during construction */
+ gpr_mu mu;
+
+ /* the sending child (points somewhere in children, or NULL) */
+ child_entry *active_child;
+ /* vector of child channels */
+ child_entry *children;
+ size_t child_count;
+ size_t child_capacity;
+
+ /* calls waiting for a channel to be ready */
+ call_data **waiting_children;
+ size_t waiting_child_count;
+ size_t waiting_child_capacity;
+
+ /* transport setup for this channel */
+ grpc_transport_setup *transport_setup;
+ int transport_setup_initiated;
+
+ grpc_channel_args *args;
+
+ /* metadata cache */
+ grpc_mdelem *cancel_status;
+} channel_data;
+
+typedef enum {
+ CALL_CREATED,
+ CALL_WAITING,
+ CALL_ACTIVE,
+ CALL_CANCELLED
+} call_state;
+
+struct call_data {
+ /* owning element */
+ grpc_call_element *elem;
+
+ call_state state;
+ grpc_metadata_buffer pending_metadata;
+ gpr_timespec deadline;
+ union {
+ struct {
+ /* our child call stack */
+ grpc_call_stack *child_stack;
+ /* ... and the channel stack associated with it */
+ grpc_channel_stack *using_stack;
+ } active;
+ struct {
+ void (*on_complete)(void *user_data, grpc_op_error error);
+ void *on_complete_user_data;
+ gpr_uint32 start_flags;
+ } waiting;
+ } s;
+};
+
+static int prepare_activate(call_data *calld, child_entry *on_child) {
+ grpc_call_element *child_elem;
+ grpc_channel_stack *use_stack = on_child->channel_stack;
+
+ if (calld->state == CALL_CANCELLED) return 0;
+
+ on_child->inflight_requests++;
+
+ /* no more access to calld->s.waiting allowed */
+ GPR_ASSERT(calld->state == CALL_WAITING);
+ calld->state = CALL_ACTIVE;
+
+ /* create a child stack, and record that we're using a particular channel
+ stack */
+ calld->s.active.child_stack = gpr_malloc(use_stack->call_stack_size);
+ calld->s.active.using_stack = use_stack;
+ grpc_call_stack_init(use_stack, NULL, calld->s.active.child_stack);
+ /* initialize the top level link back element */
+ child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ GPR_ASSERT(child_elem->filter == &link_back_filter);
+ ((lb_call_data *)child_elem->call_data)->back = calld->elem;
+
+ return 1;
+}
+
+static void do_nothing(void *ignored, grpc_op_error error) {}
+
+static void complete_activate(call_data *calld, child_entry *on_child,
+ grpc_call_op *op) {
+ grpc_call_element *child_elem =
+ grpc_call_stack_element(calld->s.active.child_stack, 0);
+
+ GPR_ASSERT(calld->state == CALL_ACTIVE);
+
+ /* sending buffered metadata down the stack before the start call */
+ grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem);
+
+ if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) {
+ grpc_call_op dop;
+ dop.type = GRPC_SEND_DEADLINE;
+ dop.dir = GRPC_CALL_DOWN;
+ dop.flags = 0;
+ dop.data.deadline = calld->deadline;
+ dop.done_cb = do_nothing;
+ dop.user_data = NULL;
+ child_elem->filter->call_op(child_elem, &dop);
+ }
+
+ /* continue the start call down the stack, this nees to happen after metadata
+ are flushed*/
+ child_elem->filter->call_op(child_elem, op);
+}
+
+static void start_rpc(call_data *calld, channel_data *chand, grpc_call_op *op) {
+ gpr_mu_lock(&chand->mu);
+ if (calld->state == CALL_CANCELLED) {
+ gpr_mu_unlock(&chand->mu);
+ op->done_cb(op->user_data, GRPC_OP_ERROR);
+ return;
+ }
+ GPR_ASSERT(calld->state == CALL_CREATED);
+ calld->state = CALL_WAITING;
+ if (chand->active_child) {
+ /* channel is connected - use the connected stack */
+ if (prepare_activate(calld, chand->active_child)) {
+ gpr_mu_unlock(&chand->mu);
+ /* activate the request (pass it down) outside the lock */
+ complete_activate(calld, chand->active_child, op);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ /* check to see if we should initiate a connection (if we're not already),
+ but don't do so until outside the lock to avoid re-entrancy problems if
+ the callback is immediate */
+ int initiate_transport_setup = 0;
+ if (!chand->transport_setup_initiated) {
+ chand->transport_setup_initiated = 1;
+ initiate_transport_setup = 1;
+ }
+ /* add this call to the waiting set to be resumed once we have a child
+ channel stack, growing the waiting set if needed */
+ if (chand->waiting_child_count == chand->waiting_child_capacity) {
+ chand->waiting_child_capacity =
+ GPR_MAX(chand->waiting_child_capacity * 2, 8);
+ chand->waiting_children =
+ gpr_realloc(chand->waiting_children,
+ chand->waiting_child_capacity * sizeof(call_data *));
+ }
+ calld->s.waiting.on_complete = op->done_cb;
+ calld->s.waiting.on_complete_user_data = op->user_data;
+ calld->s.waiting.start_flags = op->flags;
+ chand->waiting_children[chand->waiting_child_count++] = calld;
+ gpr_mu_unlock(&chand->mu);
+
+ /* finally initiate transport setup if needed */
+ if (initiate_transport_setup) {
+ grpc_transport_setup_initiate(chand->transport_setup);
+ }
+ }
+}
+
+static void remove_waiting_child(channel_data *chand, call_data *calld) {
+ size_t new_count;
+ size_t i;
+ for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
+ if (chand->waiting_children[i] == calld) continue;
+ chand->waiting_children[new_count++] = chand->waiting_children[i];
+ }
+ GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
+ new_count == chand->waiting_child_count);
+ chand->waiting_child_count = new_count;
+}
+
+static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_call_element *child_elem;
+ grpc_call_op finish_op;
+
+ gpr_mu_lock(&chand->mu);
+ switch (calld->state) {
+ case CALL_ACTIVE:
+ child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ gpr_mu_unlock(&chand->mu);
+ child_elem->filter->call_op(child_elem, op);
+ return; /* early out */
+ case CALL_WAITING:
+ remove_waiting_child(chand, calld);
+ calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
+ GRPC_OP_ERROR);
+ /* fallthrough intended */
+ case CALL_CREATED:
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ /* send up a synthesized status */
+ finish_op.type = GRPC_RECV_METADATA;
+ finish_op.dir = GRPC_CALL_UP;
+ finish_op.flags = 0;
+ finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
+ finish_op.done_cb = do_nothing;
+ finish_op.user_data = NULL;
+ grpc_call_next_op(elem, &finish_op);
+ /* send up a finish */
+ finish_op.type = GRPC_RECV_FINISH;
+ finish_op.dir = GRPC_CALL_UP;
+ finish_op.flags = 0;
+ finish_op.done_cb = do_nothing;
+ finish_op.user_data = NULL;
+ grpc_call_next_op(elem, &finish_op);
+ return; /* early out */
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&chand->mu);
+ return; /* early out */
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_call_element *child_elem;
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ switch (op->type) {
+ case GRPC_SEND_METADATA:
+ grpc_metadata_buffer_queue(&calld->pending_metadata, op);
+ break;
+ case GRPC_SEND_DEADLINE:
+ calld->deadline = op->data.deadline;
+ op->done_cb(op->user_data, GRPC_OP_OK);
+ break;
+ case GRPC_SEND_START:
+ /* filter out the start event to find which child to send on */
+ start_rpc(calld, chand, op);
+ break;
+ case GRPC_CANCEL_OP:
+ cancel_rpc(elem, op);
+ break;
+ default:
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ grpc_call_next_op(elem, op);
+ break;
+ case GRPC_CALL_DOWN:
+ child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ GPR_ASSERT(calld->state == CALL_ACTIVE);
+ child_elem->filter->call_op(child_elem, op);
+ break;
+ }
+ break;
+ }
+}
+
+static void broadcast_channel_op_down(grpc_channel_element *elem,
+ grpc_channel_op *op) {
+ channel_data *chand = elem->channel_data;
+ grpc_channel_element *child_elem;
+ grpc_channel_stack **children;
+ size_t child_count;
+ size_t i;
+
+ /* copy the current set of children, and mark them all as having an inflight
+ request */
+ gpr_mu_lock(&chand->mu);
+ child_count = chand->child_count;
+ children = gpr_malloc(sizeof(grpc_channel_stack *) * child_count);
+ for (i = 0; i < child_count; i++) {
+ children[i] = chand->children[i].channel_stack;
+ chand->children[i].inflight_requests++;
+ }
+ gpr_mu_unlock(&chand->mu);
+
+ /* send the message down */
+ for (i = 0; i < child_count; i++) {
+ child_elem = grpc_channel_stack_element(children[i], 0);
+ child_elem->filter->channel_op(child_elem, op);
+ }
+
+ /* unmark the inflight requests */
+ gpr_mu_lock(&chand->mu);
+ for (i = 0; i < child_count; i++) {
+ chand->children[i].inflight_requests--;
+ }
+ gpr_mu_unlock(&chand->mu);
+
+ gpr_free(children);
+}
+
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ switch (op->type) {
+ default:
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ grpc_channel_next_op(elem, op);
+ break;
+ case GRPC_CALL_DOWN:
+ broadcast_channel_op_down(elem, op);
+ break;
+ }
+ break;
+ }
+}
+
+static void error_bad_on_complete(void *arg, grpc_op_error error) {
+ gpr_log(GPR_ERROR,
+ "Waiting finished but not started? Bad on_complete callback");
+ abort();
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ call_data *calld = elem->call_data;
+
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GPR_ASSERT(server_transport_data == NULL);
+ calld->elem = elem;
+ calld->state = CALL_CREATED;
+ calld->deadline = gpr_inf_future;
+ calld->s.waiting.on_complete = error_bad_on_complete;
+ calld->s.waiting.on_complete_user_data = NULL;
+ grpc_metadata_buffer_init(&calld->pending_metadata);
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ size_t i;
+
+ /* if the metadata buffer is not flushed, destroy it here. */
+ grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
+ /* if the call got activated, we need to destroy the child stack also, and
+ remove it from the in-flight requests tracked by the child_entry we
+ picked */
+ if (calld->state == CALL_ACTIVE) {
+ grpc_call_stack_destroy(calld->s.active.child_stack);
+ gpr_free(calld->s.active.child_stack);
+
+ gpr_mu_lock(&chand->mu);
+ for (i = 0; i < chand->child_count; i++) {
+ if (chand->children[i].channel_stack == calld->s.active.using_stack) {
+ chand->children[i].inflight_requests--;
+ /* TODO(ctiller): garbage collect channels that are not active
+ and have no inflight requests */
+ }
+ }
+ gpr_mu_unlock(&chand->mu);
+ }
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ channel_data *chand = elem->channel_data;
+ char temp[16];
+
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(is_last);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ gpr_mu_init(&chand->mu);
+ chand->active_child = NULL;
+ chand->children = NULL;
+ chand->child_count = 0;
+ chand->child_capacity = 0;
+ chand->waiting_children = NULL;
+ chand->waiting_child_count = 0;
+ chand->waiting_child_capacity = 0;
+ chand->transport_setup = NULL;
+ chand->transport_setup_initiated = 0;
+ chand->args = grpc_channel_args_copy(args);
+
+ sprintf(temp, "%d", GRPC_STATUS_CANCELLED);
+ chand->cancel_status =
+ grpc_mdelem_from_strings(metadata_context, "grpc-status", temp);
+}
+
+/* Destructor for channel_data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ size_t i;
+
+ grpc_transport_setup_cancel(chand->transport_setup);
+
+ for (i = 0; i < chand->child_count; i++) {
+ GPR_ASSERT(chand->children[i].inflight_requests == 0);
+ grpc_channel_stack_destroy(chand->children[i].channel_stack);
+ gpr_free(chand->children[i].channel_stack);
+ }
+
+ grpc_channel_args_destroy(chand->args);
+ grpc_mdelem_unref(chand->cancel_status);
+
+ gpr_mu_destroy(&chand->mu);
+ GPR_ASSERT(chand->waiting_child_count == 0);
+ gpr_free(chand->waiting_children);
+ gpr_free(chand->children);
+}
+
+const grpc_channel_filter grpc_client_channel_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "clientchannel",
+};
+
+grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
+ grpc_channel_stack *channel_stack, grpc_transport *transport,
+ grpc_channel_filter const **channel_filters, size_t num_channel_filters,
+ grpc_mdctx *mdctx) {
+ /* we just got a new transport: lets create a child channel stack for it */
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *chand = elem->channel_data;
+ grpc_channel_element *lb_elem;
+ grpc_channel_stack *child_stack;
+ size_t num_child_filters = 2 + num_channel_filters;
+ grpc_channel_filter const **child_filters;
+ grpc_transport_setup_result result;
+ child_entry *child_ent;
+ call_data **waiting_children;
+ size_t waiting_child_count;
+ size_t i;
+ grpc_call_op *call_ops;
+
+ /* build the child filter stack */
+ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
+ /* we always need a link back filter to get back to the connected channel */
+ child_filters[0] = &link_back_filter;
+ for (i = 0; i < num_channel_filters; i++) {
+ child_filters[i + 1] = channel_filters[i];
+ }
+ /* and we always need a connected channel to talk to the transport */
+ child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
+
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+
+ /* BEGIN LOCKING CHANNEL */
+ gpr_mu_lock(&chand->mu);
+ chand->transport_setup_initiated = 0;
+
+ if (chand->child_count == chand->child_capacity) {
+ /* realloc will invalidate chand->active_child, but it's reset in the next
+ stanza anyway */
+ chand->child_capacity =
+ GPR_MAX(2 * chand->child_capacity, chand->child_capacity + 2);
+ chand->children = gpr_realloc(chand->children,
+ sizeof(child_entry) * chand->child_capacity);
+ }
+
+ /* build up the child stack */
+ child_stack =
+ gpr_malloc(grpc_channel_stack_size(child_filters, num_child_filters));
+ grpc_channel_stack_init(child_filters, num_child_filters, chand->args, mdctx,
+ child_stack);
+ lb_elem = grpc_channel_stack_element(child_stack, 0);
+ GPR_ASSERT(lb_elem->filter == &link_back_filter);
+ ((lb_channel_data *)lb_elem->channel_data)->back = elem;
+ result = grpc_connected_channel_bind_transport(child_stack, transport);
+ child_ent = &chand->children[chand->child_count++];
+ child_ent->channel_stack = child_stack;
+ child_ent->inflight_requests = 0;
+ chand->active_child = child_ent;
+
+ /* capture the waiting children - we'll activate them outside the lock
+ to avoid re-entrancy problems */
+ waiting_children = chand->waiting_children;
+ waiting_child_count = chand->waiting_child_count;
+ /* bumping up inflight_requests here avoids taking a lock per rpc below */
+
+ chand->waiting_children = NULL;
+ chand->waiting_child_count = 0;
+ chand->waiting_child_capacity = 0;
+
+ call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+
+ for (i = 0; i < waiting_child_count; i++) {
+ call_ops[i].type = GRPC_SEND_START;
+ call_ops[i].dir = GRPC_CALL_DOWN;
+ call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
+ call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
+ call_ops[i].user_data =
+ waiting_children[i]->s.waiting.on_complete_user_data;
+ if (!prepare_activate(waiting_children[i], child_ent)) {
+ waiting_children[i] = NULL;
+ call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ }
+ }
+
+ /* END LOCKING CHANNEL */
+ gpr_mu_unlock(&chand->mu);
+
+ /* activate any pending operations - this is safe to do as we guarantee one
+ and only one write operation per request at the surface api - if we lose
+ that guarantee we need to do some curly locking here */
+ for (i = 0; i < waiting_child_count; i++) {
+ if (waiting_children[i]) {
+ complete_activate(waiting_children[i], child_ent, &call_ops[i]);
+ }
+ }
+ gpr_free(waiting_children);
+ gpr_free(call_ops);
+ gpr_free(child_filters);
+
+ return result;
+}
+
+void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
+ grpc_transport_setup *setup) {
+ /* post construction initialization: set the transport setup pointer */
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(!chand->transport_setup);
+ chand->transport_setup = setup;
+}