aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/child_channel.c269
-rw-r--r--src/core/channel/child_channel.h63
-rw-r--r--src/core/channel/client_channel.c259
-rw-r--r--src/core/transport/chttp2_transport.c1
4 files changed, 395 insertions, 197 deletions
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
new file mode 100644
index 0000000000..e1b890674f
--- /dev/null
+++ b/src/core/channel/child_channel.c
@@ -0,0 +1,269 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/child_channel.h"
+#include "src/core/iomgr/iomgr.h"
+#include <grpc/support/alloc.h>
+
+/* Link back filter: passes up calls to the client channel, pushes down calls
+ down */
+
+static void unref_channel(grpc_child_channel *channel);
+
+typedef struct {
+ gpr_mu mu;
+ gpr_cv cv;
+ grpc_channel_element *back;
+ gpr_refcount refs;
+ int calling_back;
+ int sent_goaway;
+} lb_channel_data;
+
+typedef struct {
+ grpc_call_element *back;
+ gpr_refcount refs;
+ grpc_child_channel *channel;
+} lb_call_data;
+
+static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
+ grpc_call_op *op) {
+ lb_call_data *calld = elem->call_data;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ calld->back->filter->call_op(calld->back, elem, op);
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+static void delayed_unref(void *elem, grpc_iomgr_cb_status status) {
+ unref_channel(grpc_channel_stack_from_top_element(elem));
+}
+
+/* Currently we assume all channel operations should just be pushed up. */
+static void lb_channel_op(grpc_channel_element *elem,
+ grpc_channel_element *from_elem,
+ grpc_channel_op *op) {
+ lb_channel_data *chand = elem->channel_data;
+ grpc_channel_element *back;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ gpr_mu_lock(&chand->mu);
+ back = chand->back;
+ if (back) chand->calling_back++;
+ gpr_mu_unlock(&chand->mu);
+ if (back) {
+ back->filter->channel_op(chand->back, elem, op);
+ gpr_mu_lock(&chand->mu);
+ chand->calling_back--;
+ gpr_cv_broadcast(&chand->cv);
+ gpr_mu_unlock(&chand->mu);
+ }
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+
+ switch (op->type) {
+ case GRPC_TRANSPORT_CLOSED:
+ grpc_iomgr_add_callback(delayed_unref, elem);
+ break;
+ case GRPC_CHANNEL_GOAWAY:
+ gpr_mu_lock(&chand->mu);
+ chand->sent_goaway = 1;
+ gpr_mu_unlock(&chand->mu);
+ break;
+ case GRPC_CHANNEL_DISCONNECT:
+ case GRPC_TRANSPORT_GOAWAY:
+ case GRPC_ACCEPT_CALL:
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void lb_init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {}
+
+/* Destructor for call_data */
+static void lb_destroy_call_elem(grpc_call_element *elem) {}
+
+/* Constructor for channel_data */
+static void lb_init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ lb_channel_data *chand = elem->channel_data;
+ GPR_ASSERT(is_first);
+ GPR_ASSERT(!is_last);
+ gpr_mu_init(&chand->mu);
+ gpr_cv_init(&chand->cv);
+ /* one ref for getting grpc_child_channel_destroy called, one for getting
+ disconnected */
+ gpr_ref_init(&chand->refs, 2);
+ chand->back = NULL;
+ chand->sent_goaway = 0;
+ chand->calling_back = 0;
+}
+
+/* Destructor for channel_data */
+static void lb_destroy_channel_elem(grpc_channel_element *elem) {
+ lb_channel_data *chand = elem->channel_data;
+ gpr_mu_destroy(&chand->mu);
+ gpr_cv_destroy(&chand->cv);
+}
+
+const grpc_channel_filter grpc_child_channel_top_filter = {
+ lb_call_op, lb_channel_op,
+
+ sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
+
+ sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
+
+ "child-channel",
+};
+
+/* grpc_child_channel proper */
+
+#define LINK_BACK_ELEM_FROM_CHANNEL(channel) \
+ grpc_channel_stack_element((channel), 0)
+
+#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0)
+
+static void unref_channel(grpc_child_channel *channel) {
+ lb_channel_data *lb = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
+ if (gpr_unref(&lb->refs)) {
+ grpc_channel_stack_destroy(channel);
+ gpr_free(channel);
+ }
+}
+
+static void ref_channel(grpc_child_channel *channel) {
+ lb_channel_data *lb = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
+ gpr_ref(&lb->refs);
+}
+
+static void unref_call(grpc_child_call *call) {
+ lb_call_data *lb = LINK_BACK_ELEM_FROM_CALL(call)->call_data;
+ if (gpr_unref(&lb->refs)) {
+ grpc_child_channel *channel = lb->channel;
+ grpc_call_stack_destroy(call);
+ gpr_free(call);
+ unref_channel(channel);
+ }
+}
+
+#if 0
+static void ref_call(grpc_child_call *call) {
+ lb_call_data *lb = LINK_BACK_ELEM_FROM_CALL(call)->call_data;
+ gpr_ref(&lb->refs);
+}
+#endif
+
+grpc_child_channel *grpc_child_channel_create(
+ grpc_channel_element *parent, const grpc_channel_filter **filters,
+ size_t filter_count, const grpc_channel_args *args,
+ grpc_mdctx *metadata_context) {
+ grpc_channel_stack *stk =
+ gpr_malloc(grpc_channel_stack_size(filters, filter_count));
+ lb_channel_data *lb;
+
+ grpc_channel_stack_init(filters, filter_count, args, metadata_context, stk);
+
+ lb = LINK_BACK_ELEM_FROM_CHANNEL(stk)->channel_data;
+ gpr_mu_lock(&lb->mu);
+ lb->back = parent;
+ gpr_mu_unlock(&lb->mu);
+
+ return (grpc_child_channel *)stk;
+}
+
+void grpc_child_channel_destroy(grpc_child_channel *channel) {
+ grpc_channel_op op;
+ int send_goaway = 0;
+ grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
+ lb_channel_data *chand = lbelem->channel_data;
+
+ gpr_mu_lock(&chand->mu);
+ while (chand->calling_back) {
+ gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future);
+ }
+ send_goaway = !chand->sent_goaway;
+ chand->sent_goaway = 1;
+ chand->back = NULL;
+ gpr_mu_unlock(&chand->mu);
+
+ if (send_goaway) {
+ op.type = GRPC_CHANNEL_GOAWAY;
+ op.dir = GRPC_CALL_DOWN;
+ op.data.goaway.status = GRPC_STATUS_OK;
+ op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect");
+ grpc_channel_next_op(lbelem, &op);
+ }
+
+ op.type = GRPC_CHANNEL_DISCONNECT;
+ op.dir = GRPC_CALL_DOWN;
+ grpc_channel_next_op(lbelem, &op);
+
+ unref_channel(channel);
+}
+
+void grpc_child_channel_handle_op(grpc_child_channel *channel,
+ grpc_channel_op *op) {
+ grpc_channel_next_op(LINK_BACK_ELEM_FROM_CHANNEL(channel), op);
+}
+
+grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
+ grpc_call_element *parent) {
+ grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
+ lb_call_data *lbcalld;
+ ref_channel(channel);
+
+ grpc_call_stack_init(channel, NULL, stk);
+ lbcalld = LINK_BACK_ELEM_FROM_CALL(stk)->call_data;
+ gpr_ref_init(&lbcalld->refs, 1);
+ lbcalld->back = parent;
+ lbcalld->channel = channel;
+
+ return (grpc_child_call *)stk;
+}
+
+void grpc_child_call_destroy(grpc_child_call *call) { unref_call(call); }
+
+grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call) {
+ return LINK_BACK_ELEM_FROM_CALL(call);
+}
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
new file mode 100644
index 0000000000..9fb2a17e29
--- /dev/null
+++ b/src/core/channel/child_channel.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_
+#define __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_
+
+#include "src/core/channel/channel_stack.h"
+
+/* helper for filters that need to host child channel stacks... handles
+ lifetime and upwards propagation cleanly */
+
+const grpc_channel_filter grpc_child_channel_top_filter;
+
+typedef grpc_channel_stack grpc_child_channel;
+typedef grpc_call_stack grpc_child_call;
+
+/* filters[0] must be &grpc_child_channel_top_filter */
+grpc_child_channel *grpc_child_channel_create(
+ grpc_channel_element *parent, const grpc_channel_filter **filters,
+ size_t filter_count, const grpc_channel_args *args,
+ grpc_mdctx *metadata_context);
+void grpc_child_channel_handle_op(grpc_child_channel *channel,
+ grpc_channel_op *op);
+grpc_channel_element *grpc_child_channel_get_bottom_element(
+ grpc_child_channel *channel);
+void grpc_child_channel_destroy(grpc_child_channel *channel);
+
+grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
+ grpc_call_element *parent);
+grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
+void grpc_child_call_destroy(grpc_child_call *call);
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_ */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9ec19df8a9..4bd294dc96 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -36,6 +36,7 @@
#include <stdio.h>
#include "src/core/channel/channel_args.h"
+#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/metadata_buffer.h"
#include <grpc/support/alloc.h>
@@ -44,79 +45,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
-/* Link back filter: passes up calls to the client channel, pushes down calls
- down */
-
-typedef struct { grpc_channel_element *back; } lb_channel_data;
-
-typedef struct { grpc_call_element *back; } lb_call_data;
-
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Currently we assume all channel operations should just be pushed up. */
-static void lb_channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem,
- grpc_channel_op *op) {
- lb_channel_data *chand = elem->channel_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- chand->back->filter->channel_op(chand->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void lb_init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
-
-/* Destructor for call_data */
-static void lb_destroy_call_elem(grpc_call_element *elem) {}
-
-/* Constructor for channel_data */
-static void lb_init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last) {
- GPR_ASSERT(is_first);
- GPR_ASSERT(!is_last);
-}
-
-/* Destructor for channel_data */
-static void lb_destroy_channel_elem(grpc_channel_element *elem) {}
-
-static const grpc_channel_filter link_back_filter = {
- lb_call_op, lb_channel_op,
-
- sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
-
- sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
-
- "clientchannel.linkback",
-};
-
/* Client channel implementation */
-typedef struct {
- size_t inflight_requests;
- grpc_channel_stack *channel_stack;
-} child_entry;
-
typedef struct call_data call_data;
typedef struct {
@@ -126,12 +56,8 @@ typedef struct {
transport_setup is assumed to be set once during construction */
gpr_mu mu;
- /* the sending child (points somewhere in children, or NULL) */
- child_entry *active_child;
- /* vector of child channels */
- child_entry *children;
- size_t child_count;
- size_t child_capacity;
+ /* the sending child (may be null) */
+ grpc_child_channel *active_child;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -165,9 +91,7 @@ struct call_data {
union {
struct {
/* our child call stack */
- grpc_call_stack *child_stack;
- /* ... and the channel stack associated with it */
- grpc_channel_stack *using_stack;
+ grpc_child_call *child_call;
} active;
struct {
void (*on_complete)(void *user_data, grpc_op_error error);
@@ -177,38 +101,27 @@ struct call_data {
} s;
};
-static int prepare_activate(call_data *calld, child_entry *on_child) {
- grpc_call_element *child_elem;
- grpc_channel_stack *use_stack = on_child->channel_stack;
-
+static int prepare_activate(grpc_call_element *elem,
+ grpc_child_channel *on_child) {
+ call_data *calld = elem->call_data;
if (calld->state == CALL_CANCELLED) return 0;
- on_child->inflight_requests++;
-
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
calld->state = CALL_ACTIVE;
- /* create a child stack, and record that we're using a particular channel
- stack */
- calld->s.active.child_stack = gpr_malloc(use_stack->call_stack_size);
- calld->s.active.using_stack = use_stack;
- grpc_call_stack_init(use_stack, NULL, calld->s.active.child_stack);
- /* initialize the top level link back element */
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
- GPR_ASSERT(child_elem->filter == &link_back_filter);
- ((lb_call_data *)child_elem->call_data)->back = calld->elem;
+ /* create a child call */
+ calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
return 1;
}
static void do_nothing(void *ignored, grpc_op_error error) {}
-static void complete_activate(grpc_call_element *elem, child_entry *on_child,
- grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
- grpc_call_stack_element(calld->s.active.child_stack, 0);
+ grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
@@ -244,10 +157,10 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
calld->state = CALL_WAITING;
if (chand->active_child) {
/* channel is connected - use the connected stack */
- if (prepare_activate(calld, chand->active_child)) {
+ if (prepare_activate(elem, chand->active_child)) {
gpr_mu_unlock(&chand->mu);
/* activate the request (pass it down) outside the lock */
- complete_activate(elem, chand->active_child, op);
+ complete_activate(elem, op);
} else {
gpr_mu_unlock(&chand->mu);
}
@@ -303,7 +216,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
child_elem->filter->call_op(child_elem, elem, op);
return; /* early out */
@@ -367,7 +280,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_next_op(elem, op);
break;
case GRPC_CALL_DOWN:
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ child_elem =
+ grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
child_elem->filter->call_op(child_elem, elem, op);
break;
@@ -376,59 +290,42 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
}
}
-static void broadcast_channel_op_down(grpc_channel_element *elem,
- grpc_channel_op *op) {
- channel_data *chand = elem->channel_data;
- grpc_channel_element *child_elem;
- grpc_channel_stack **children;
- size_t child_count;
- size_t i;
-
- /* copy the current set of children, and mark them all as having an inflight
- request */
- gpr_mu_lock(&chand->mu);
- child_count = chand->child_count;
- children = gpr_malloc(sizeof(grpc_channel_stack *) * child_count);
- for (i = 0; i < child_count; i++) {
- children[i] = chand->children[i].channel_stack;
- chand->children[i].inflight_requests++;
- }
- gpr_mu_unlock(&chand->mu);
-
- /* send the message down */
- for (i = 0; i < child_count; i++) {
- child_elem = grpc_channel_stack_element(children[i], 0);
- if (op->type == GRPC_CHANNEL_GOAWAY) {
- gpr_slice_ref(op->data.goaway.message);
- }
- child_elem->filter->channel_op(child_elem, elem, op);
- }
- if (op->type == GRPC_CHANNEL_GOAWAY) {
- gpr_slice_unref(op->data.goaway.message);
- }
-
- /* unmark the inflight requests */
- gpr_mu_lock(&chand->mu);
- for (i = 0; i < child_count; i++) {
- chand->children[i].inflight_requests--;
- }
- gpr_mu_unlock(&chand->mu);
-
- gpr_free(children);
-}
-
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
+ channel_data *chand = elem->channel_data;
+ grpc_child_channel *child_channel;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
switch (op->type) {
+ case GRPC_CHANNEL_GOAWAY:
+ gpr_mu_lock(&chand->mu);
+ child_channel = chand->active_child;
+ chand->active_child = NULL;
+ gpr_mu_unlock(&chand->mu);
+ if (child_channel) {
+ grpc_child_channel_handle_op(child_channel, op);
+ grpc_child_channel_destroy(child_channel);
+ } else {
+ gpr_slice_unref(op->data.goaway.message);
+ }
+ break;
+ case GRPC_CHANNEL_DISCONNECT:
+ gpr_mu_lock(&chand->mu);
+ child_channel = chand->active_child;
+ chand->active_child = NULL;
+ gpr_mu_unlock(&chand->mu);
+ if (child_channel) {
+ grpc_child_channel_destroy(child_channel);
+ }
+ break;
default:
switch (op->dir) {
case GRPC_CALL_UP:
grpc_channel_next_op(elem, op);
break;
case GRPC_CALL_DOWN:
- broadcast_channel_op_down(elem, op);
+ gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
+ abort();
break;
}
break;
@@ -459,8 +356,6 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- size_t i;
/* if the metadata buffer is not flushed, destroy it here. */
grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
@@ -468,18 +363,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
remove it from the in-flight requests tracked by the child_entry we
picked */
if (calld->state == CALL_ACTIVE) {
- grpc_call_stack_destroy(calld->s.active.child_stack);
- gpr_free(calld->s.active.child_stack);
-
- gpr_mu_lock(&chand->mu);
- for (i = 0; i < chand->child_count; i++) {
- if (chand->children[i].channel_stack == calld->s.active.using_stack) {
- chand->children[i].inflight_requests--;
- /* TODO(ctiller): garbage collect channels that are not active
- and have no inflight requests */
- }
- }
- gpr_mu_unlock(&chand->mu);
+ grpc_child_call_destroy(calld->s.active.child_call);
}
}
@@ -497,9 +381,6 @@ static void init_channel_elem(grpc_channel_element *elem,
gpr_mu_init(&chand->mu);
chand->active_child = NULL;
- chand->children = NULL;
- chand->child_count = 0;
- chand->child_capacity = 0;
chand->waiting_children = NULL;
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
@@ -515,14 +396,12 @@ static void init_channel_elem(grpc_channel_element *elem,
/* Destructor for channel_data */
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- size_t i;
grpc_transport_setup_cancel(chand->transport_setup);
- for (i = 0; i < chand->child_count; i++) {
- GPR_ASSERT(chand->children[i].inflight_requests == 0);
- grpc_channel_stack_destroy(chand->children[i].channel_stack);
- gpr_free(chand->children[i].channel_stack);
+ if (chand->active_child) {
+ grpc_child_channel_destroy(chand->active_child);
+ chand->active_child = NULL;
}
grpc_channel_args_destroy(chand->args);
@@ -531,17 +410,16 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
gpr_mu_destroy(&chand->mu);
GPR_ASSERT(chand->waiting_child_count == 0);
gpr_free(chand->waiting_children);
- gpr_free(chand->children);
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op,
+ call_op, channel_op,
- sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "clientchannel",
+ "client-channel",
};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
@@ -551,12 +429,10 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
/* we just got a new transport: lets create a child channel stack for it */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
- grpc_channel_element *lb_elem;
- grpc_channel_stack *child_stack;
size_t num_child_filters = 2 + num_channel_filters;
grpc_channel_filter const **child_filters;
grpc_transport_setup_result result;
- child_entry *child_ent;
+ grpc_child_channel *old_active = NULL;
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
@@ -565,7 +441,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
/* we always need a link back filter to get back to the connected channel */
- child_filters[0] = &link_back_filter;
+ child_filters[0] = &grpc_child_channel_top_filter;
for (i = 0; i < num_channel_filters; i++) {
child_filters[i + 1] = channel_filters[i];
}
@@ -578,28 +454,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
gpr_mu_lock(&chand->mu);
chand->transport_setup_initiated = 0;
- if (chand->child_count == chand->child_capacity) {
- /* realloc will invalidate chand->active_child, but it's reset in the next
- stanza anyway */
- chand->child_capacity =
- GPR_MAX(2 * chand->child_capacity, chand->child_capacity + 2);
- chand->children = gpr_realloc(chand->children,
- sizeof(child_entry) * chand->child_capacity);
+ if (chand->active_child) {
+ old_active = chand->active_child;
}
-
- /* build up the child stack */
- child_stack =
- gpr_malloc(grpc_channel_stack_size(child_filters, num_child_filters));
- grpc_channel_stack_init(child_filters, num_child_filters, chand->args, mdctx,
- child_stack);
- lb_elem = grpc_channel_stack_element(child_stack, 0);
- GPR_ASSERT(lb_elem->filter == &link_back_filter);
- ((lb_channel_data *)lb_elem->channel_data)->back = elem;
- result = grpc_connected_channel_bind_transport(child_stack, transport);
- child_ent = &chand->children[chand->child_count++];
- child_ent->channel_stack = child_stack;
- child_ent->inflight_requests = 0;
- chand->active_child = child_ent;
+ chand->active_child = grpc_child_channel_create(
+ elem, child_filters, num_child_filters, chand->args, mdctx);
+ result =
+ grpc_connected_channel_bind_transport(chand->active_child, transport);
/* capture the waiting children - we'll activate them outside the lock
to avoid re-entrancy problems */
@@ -620,7 +481,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
call_ops[i].user_data =
waiting_children[i]->s.waiting.on_complete_user_data;
- if (!prepare_activate(waiting_children[i], child_ent)) {
+ if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
}
@@ -634,13 +495,17 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
that guarantee we need to do some curly locking here */
for (i = 0; i < waiting_child_count; i++) {
if (waiting_children[i]) {
- complete_activate(waiting_children[i]->elem, child_ent, &call_ops[i]);
+ complete_activate(waiting_children[i]->elem, &call_ops[i]);
}
}
gpr_free(waiting_children);
gpr_free(call_ops);
gpr_free(child_filters);
+ if (old_active) {
+ grpc_child_channel_destroy(old_active);
+ }
+
return result;
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index c70c660fbe..0570439813 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -682,6 +682,7 @@ static void unlock(transport *t) {
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
+ t->cap_pending_goaways = 0;
t->calling_back = 1;
}
}