aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2014-12-12 08:43:28 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2014-12-12 16:17:37 -0800
commit82e275ffaf7210b4e0721aec70ba8c4807440da3 (patch)
tree84502e5c856fa8d044a1688ed9b868715ca7aed2
parente99270d63a230bf345f4323e1fd6f11c2fb842dc (diff)
Split client_channel into client_channel & child_channel.
Safely managing disconnection and goaways needs reference counting at the child channel level, which was near impossible to provide with the previous embedding of the child channel in the client channel. This is a (hopefully) no-op refactoring to provide that split. The next CL in this series will actually get disconnection and goaway somewhat right. Change on 2014/12/12 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=81985590
-rw-r--r--Makefile2
-rw-r--r--build.json2
-rw-r--r--src/core/channel/child_channel.c269
-rw-r--r--src/core/channel/child_channel.h63
-rw-r--r--src/core/channel/client_channel.c259
-rw-r--r--src/core/transport/chttp2_transport.c1
-rw-r--r--vsprojects/vs2013/grpc.vcxproj3
7 files changed, 402 insertions, 197 deletions
diff --git a/Makefile b/Makefile
index 589023dd01..03de0aaae2 100644
--- a/Makefile
+++ b/Makefile
@@ -636,6 +636,7 @@ LIBGRPC_SRC = \
src/core/channel/census_filter.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
+ src/core/channel/child_channel.c \
src/core/channel/client_channel.c \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
@@ -1559,6 +1560,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/channel/census_filter.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
+ src/core/channel/child_channel.c \
src/core/channel/client_channel.c \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
diff --git a/build.json b/build.json
index 0e3152f9b3..e6158262ff 100644
--- a/build.json
+++ b/build.json
@@ -99,6 +99,7 @@
"src/core/channel/census_filter.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
+ "src/core/channel/child_channel.c",
"src/core/channel/client_channel.c",
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
@@ -189,6 +190,7 @@
"src/core/channel/census_filter.h",
"src/core/channel/channel_args.h",
"src/core/channel/channel_stack.h",
+ "src/core/channel/child_channel.h",
"src/core/channel/client_channel.h",
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
new file mode 100644
index 0000000000..e1b890674f
--- /dev/null
+++ b/src/core/channel/child_channel.c
@@ -0,0 +1,269 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/child_channel.h"
+#include "src/core/iomgr/iomgr.h"
+#include <grpc/support/alloc.h>
+
+/* Link back filter: passes up calls to the client channel, pushes down calls
+ down */
+
+static void unref_channel(grpc_child_channel *channel);
+
+typedef struct {
+ gpr_mu mu;
+ gpr_cv cv;
+ grpc_channel_element *back;
+ gpr_refcount refs;
+ int calling_back;
+ int sent_goaway;
+} lb_channel_data;
+
+typedef struct {
+ grpc_call_element *back;
+ gpr_refcount refs;
+ grpc_child_channel *channel;
+} lb_call_data;
+
+static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
+ grpc_call_op *op) {
+ lb_call_data *calld = elem->call_data;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ calld->back->filter->call_op(calld->back, elem, op);
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+static void delayed_unref(void *elem, grpc_iomgr_cb_status status) {
+ unref_channel(grpc_channel_stack_from_top_element(elem));
+}
+
+/* Currently we assume all channel operations should just be pushed up. */
+static void lb_channel_op(grpc_channel_element *elem,
+ grpc_channel_element *from_elem,
+ grpc_channel_op *op) {
+ lb_channel_data *chand = elem->channel_data;
+ grpc_channel_element *back;
+
+ switch (op->dir) {
+ case GRPC_CALL_UP:
+ gpr_mu_lock(&chand->mu);
+ back = chand->back;
+ if (back) chand->calling_back++;
+ gpr_mu_unlock(&chand->mu);
+ if (back) {
+ back->filter->channel_op(chand->back, elem, op);
+ gpr_mu_lock(&chand->mu);
+ chand->calling_back--;
+ gpr_cv_broadcast(&chand->cv);
+ gpr_mu_unlock(&chand->mu);
+ }
+ break;
+ case GRPC_CALL_DOWN:
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+
+ switch (op->type) {
+ case GRPC_TRANSPORT_CLOSED:
+ grpc_iomgr_add_callback(delayed_unref, elem);
+ break;
+ case GRPC_CHANNEL_GOAWAY:
+ gpr_mu_lock(&chand->mu);
+ chand->sent_goaway = 1;
+ gpr_mu_unlock(&chand->mu);
+ break;
+ case GRPC_CHANNEL_DISCONNECT:
+ case GRPC_TRANSPORT_GOAWAY:
+ case GRPC_ACCEPT_CALL:
+ break;
+ }
+}
+
+/* Constructor for call_data */
+static void lb_init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {}
+
+/* Destructor for call_data */
+static void lb_destroy_call_elem(grpc_call_element *elem) {}
+
+/* Constructor for channel_data */
+static void lb_init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ lb_channel_data *chand = elem->channel_data;
+ GPR_ASSERT(is_first);
+ GPR_ASSERT(!is_last);
+ gpr_mu_init(&chand->mu);
+ gpr_cv_init(&chand->cv);
+ /* one ref for getting grpc_child_channel_destroy called, one for getting
+ disconnected */
+ gpr_ref_init(&chand->refs, 2);
+ chand->back = NULL;
+ chand->sent_goaway = 0;
+ chand->calling_back = 0;
+}
+
+/* Destructor for channel_data */
+static void lb_destroy_channel_elem(grpc_channel_element *elem) {
+ lb_channel_data *chand = elem->channel_data;
+ gpr_mu_destroy(&chand->mu);
+ gpr_cv_destroy(&chand->cv);
+}
+
+const grpc_channel_filter grpc_child_channel_top_filter = {
+ lb_call_op, lb_channel_op,
+
+ sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
+
+ sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
+
+ "child-channel",
+};
+
+/* grpc_child_channel proper */
+
+#define LINK_BACK_ELEM_FROM_CHANNEL(channel) \
+ grpc_channel_stack_element((channel), 0)
+
+#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0)
+
+static void unref_channel(grpc_child_channel *channel) {
+ lb_channel_data *lb = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
+ if (gpr_unref(&lb->refs)) {
+ grpc_channel_stack_destroy(channel);
+ gpr_free(channel);
+ }
+}
+
+static void ref_channel(grpc_child_channel *channel) {
+ lb_channel_data *lb = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
+ gpr_ref(&lb->refs);
+}
+
+static void unref_call(grpc_child_call *call) {
+ lb_call_data *lb = LINK_BACK_ELEM_FROM_CALL(call)->call_data;
+ if (gpr_unref(&lb->refs)) {
+ grpc_child_channel *channel = lb->channel;
+ grpc_call_stack_destroy(call);
+ gpr_free(call);
+ unref_channel(channel);
+ }
+}
+
+#if 0
+static void ref_call(grpc_child_call *call) {
+ lb_call_data *lb = LINK_BACK_ELEM_FROM_CALL(call)->call_data;
+ gpr_ref(&lb->refs);
+}
+#endif
+
+grpc_child_channel *grpc_child_channel_create(
+ grpc_channel_element *parent, const grpc_channel_filter **filters,
+ size_t filter_count, const grpc_channel_args *args,
+ grpc_mdctx *metadata_context) {
+ grpc_channel_stack *stk =
+ gpr_malloc(grpc_channel_stack_size(filters, filter_count));
+ lb_channel_data *lb;
+
+ grpc_channel_stack_init(filters, filter_count, args, metadata_context, stk);
+
+ lb = LINK_BACK_ELEM_FROM_CHANNEL(stk)->channel_data;
+ gpr_mu_lock(&lb->mu);
+ lb->back = parent;
+ gpr_mu_unlock(&lb->mu);
+
+ return (grpc_child_channel *)stk;
+}
+
+void grpc_child_channel_destroy(grpc_child_channel *channel) {
+ grpc_channel_op op;
+ int send_goaway = 0;
+ grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
+ lb_channel_data *chand = lbelem->channel_data;
+
+ gpr_mu_lock(&chand->mu);
+ while (chand->calling_back) {
+ gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future);
+ }
+ send_goaway = !chand->sent_goaway;
+ chand->sent_goaway = 1;
+ chand->back = NULL;
+ gpr_mu_unlock(&chand->mu);
+
+ if (send_goaway) {
+ op.type = GRPC_CHANNEL_GOAWAY;
+ op.dir = GRPC_CALL_DOWN;
+ op.data.goaway.status = GRPC_STATUS_OK;
+ op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect");
+ grpc_channel_next_op(lbelem, &op);
+ }
+
+ op.type = GRPC_CHANNEL_DISCONNECT;
+ op.dir = GRPC_CALL_DOWN;
+ grpc_channel_next_op(lbelem, &op);
+
+ unref_channel(channel);
+}
+
+void grpc_child_channel_handle_op(grpc_child_channel *channel,
+ grpc_channel_op *op) {
+ grpc_channel_next_op(LINK_BACK_ELEM_FROM_CHANNEL(channel), op);
+}
+
+grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
+ grpc_call_element *parent) {
+ grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
+ lb_call_data *lbcalld;
+ ref_channel(channel);
+
+ grpc_call_stack_init(channel, NULL, stk);
+ lbcalld = LINK_BACK_ELEM_FROM_CALL(stk)->call_data;
+ gpr_ref_init(&lbcalld->refs, 1);
+ lbcalld->back = parent;
+ lbcalld->channel = channel;
+
+ return (grpc_child_call *)stk;
+}
+
+void grpc_child_call_destroy(grpc_child_call *call) { unref_call(call); }
+
+grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call) {
+ return LINK_BACK_ELEM_FROM_CALL(call);
+}
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
new file mode 100644
index 0000000000..9fb2a17e29
--- /dev/null
+++ b/src/core/channel/child_channel.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_
+#define __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_
+
+#include "src/core/channel/channel_stack.h"
+
+/* helper for filters that need to host child channel stacks... handles
+ lifetime and upwards propagation cleanly */
+
+const grpc_channel_filter grpc_child_channel_top_filter;
+
+typedef grpc_channel_stack grpc_child_channel;
+typedef grpc_call_stack grpc_child_call;
+
+/* filters[0] must be &grpc_child_channel_top_filter */
+grpc_child_channel *grpc_child_channel_create(
+ grpc_channel_element *parent, const grpc_channel_filter **filters,
+ size_t filter_count, const grpc_channel_args *args,
+ grpc_mdctx *metadata_context);
+void grpc_child_channel_handle_op(grpc_child_channel *channel,
+ grpc_channel_op *op);
+grpc_channel_element *grpc_child_channel_get_bottom_element(
+ grpc_child_channel *channel);
+void grpc_child_channel_destroy(grpc_child_channel *channel);
+
+grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
+ grpc_call_element *parent);
+grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
+void grpc_child_call_destroy(grpc_child_call *call);
+
+#endif /* __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_ */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9ec19df8a9..4bd294dc96 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -36,6 +36,7 @@
#include <stdio.h>
#include "src/core/channel/channel_args.h"
+#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/metadata_buffer.h"
#include <grpc/support/alloc.h>
@@ -44,79 +45,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
-/* Link back filter: passes up calls to the client channel, pushes down calls
- down */
-
-typedef struct { grpc_channel_element *back; } lb_channel_data;
-
-typedef struct { grpc_call_element *back; } lb_call_data;
-
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Currently we assume all channel operations should just be pushed up. */
-static void lb_channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem,
- grpc_channel_op *op) {
- lb_channel_data *chand = elem->channel_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- chand->back->filter->channel_op(chand->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void lb_init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
-
-/* Destructor for call_data */
-static void lb_destroy_call_elem(grpc_call_element *elem) {}
-
-/* Constructor for channel_data */
-static void lb_init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last) {
- GPR_ASSERT(is_first);
- GPR_ASSERT(!is_last);
-}
-
-/* Destructor for channel_data */
-static void lb_destroy_channel_elem(grpc_channel_element *elem) {}
-
-static const grpc_channel_filter link_back_filter = {
- lb_call_op, lb_channel_op,
-
- sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
-
- sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
-
- "clientchannel.linkback",
-};
-
/* Client channel implementation */
-typedef struct {
- size_t inflight_requests;
- grpc_channel_stack *channel_stack;
-} child_entry;
-
typedef struct call_data call_data;
typedef struct {
@@ -126,12 +56,8 @@ typedef struct {
transport_setup is assumed to be set once during construction */
gpr_mu mu;
- /* the sending child (points somewhere in children, or NULL) */
- child_entry *active_child;
- /* vector of child channels */
- child_entry *children;
- size_t child_count;
- size_t child_capacity;
+ /* the sending child (may be null) */
+ grpc_child_channel *active_child;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -165,9 +91,7 @@ struct call_data {
union {
struct {
/* our child call stack */
- grpc_call_stack *child_stack;
- /* ... and the channel stack associated with it */
- grpc_channel_stack *using_stack;
+ grpc_child_call *child_call;
} active;
struct {
void (*on_complete)(void *user_data, grpc_op_error error);
@@ -177,38 +101,27 @@ struct call_data {
} s;
};
-static int prepare_activate(call_data *calld, child_entry *on_child) {
- grpc_call_element *child_elem;
- grpc_channel_stack *use_stack = on_child->channel_stack;
-
+static int prepare_activate(grpc_call_element *elem,
+ grpc_child_channel *on_child) {
+ call_data *calld = elem->call_data;
if (calld->state == CALL_CANCELLED) return 0;
- on_child->inflight_requests++;
-
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
calld->state = CALL_ACTIVE;
- /* create a child stack, and record that we're using a particular channel
- stack */
- calld->s.active.child_stack = gpr_malloc(use_stack->call_stack_size);
- calld->s.active.using_stack = use_stack;
- grpc_call_stack_init(use_stack, NULL, calld->s.active.child_stack);
- /* initialize the top level link back element */
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
- GPR_ASSERT(child_elem->filter == &link_back_filter);
- ((lb_call_data *)child_elem->call_data)->back = calld->elem;
+ /* create a child call */
+ calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
return 1;
}
static void do_nothing(void *ignored, grpc_op_error error) {}
-static void complete_activate(grpc_call_element *elem, child_entry *on_child,
- grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
- grpc_call_stack_element(calld->s.active.child_stack, 0);
+ grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
@@ -244,10 +157,10 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
calld->state = CALL_WAITING;
if (chand->active_child) {
/* channel is connected - use the connected stack */
- if (prepare_activate(calld, chand->active_child)) {
+ if (prepare_activate(elem, chand->active_child)) {
gpr_mu_unlock(&chand->mu);
/* activate the request (pass it down) outside the lock */
- complete_activate(elem, chand->active_child, op);
+ complete_activate(elem, op);
} else {
gpr_mu_unlock(&chand->mu);
}
@@ -303,7 +216,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
child_elem->filter->call_op(child_elem, elem, op);
return; /* early out */
@@ -367,7 +280,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_next_op(elem, op);
break;
case GRPC_CALL_DOWN:
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ child_elem =
+ grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
child_elem->filter->call_op(child_elem, elem, op);
break;
@@ -376,59 +290,42 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
}
}
-static void broadcast_channel_op_down(grpc_channel_element *elem,
- grpc_channel_op *op) {
- channel_data *chand = elem->channel_data;
- grpc_channel_element *child_elem;
- grpc_channel_stack **children;
- size_t child_count;
- size_t i;
-
- /* copy the current set of children, and mark them all as having an inflight
- request */
- gpr_mu_lock(&chand->mu);
- child_count = chand->child_count;
- children = gpr_malloc(sizeof(grpc_channel_stack *) * child_count);
- for (i = 0; i < child_count; i++) {
- children[i] = chand->children[i].channel_stack;
- chand->children[i].inflight_requests++;
- }
- gpr_mu_unlock(&chand->mu);
-
- /* send the message down */
- for (i = 0; i < child_count; i++) {
- child_elem = grpc_channel_stack_element(children[i], 0);
- if (op->type == GRPC_CHANNEL_GOAWAY) {
- gpr_slice_ref(op->data.goaway.message);
- }
- child_elem->filter->channel_op(child_elem, elem, op);
- }
- if (op->type == GRPC_CHANNEL_GOAWAY) {
- gpr_slice_unref(op->data.goaway.message);
- }
-
- /* unmark the inflight requests */
- gpr_mu_lock(&chand->mu);
- for (i = 0; i < child_count; i++) {
- chand->children[i].inflight_requests--;
- }
- gpr_mu_unlock(&chand->mu);
-
- gpr_free(children);
-}
-
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
+ channel_data *chand = elem->channel_data;
+ grpc_child_channel *child_channel;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
switch (op->type) {
+ case GRPC_CHANNEL_GOAWAY:
+ gpr_mu_lock(&chand->mu);
+ child_channel = chand->active_child;
+ chand->active_child = NULL;
+ gpr_mu_unlock(&chand->mu);
+ if (child_channel) {
+ grpc_child_channel_handle_op(child_channel, op);
+ grpc_child_channel_destroy(child_channel);
+ } else {
+ gpr_slice_unref(op->data.goaway.message);
+ }
+ break;
+ case GRPC_CHANNEL_DISCONNECT:
+ gpr_mu_lock(&chand->mu);
+ child_channel = chand->active_child;
+ chand->active_child = NULL;
+ gpr_mu_unlock(&chand->mu);
+ if (child_channel) {
+ grpc_child_channel_destroy(child_channel);
+ }
+ break;
default:
switch (op->dir) {
case GRPC_CALL_UP:
grpc_channel_next_op(elem, op);
break;
case GRPC_CALL_DOWN:
- broadcast_channel_op_down(elem, op);
+ gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
+ abort();
break;
}
break;
@@ -459,8 +356,6 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- size_t i;
/* if the metadata buffer is not flushed, destroy it here. */
grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
@@ -468,18 +363,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
remove it from the in-flight requests tracked by the child_entry we
picked */
if (calld->state == CALL_ACTIVE) {
- grpc_call_stack_destroy(calld->s.active.child_stack);
- gpr_free(calld->s.active.child_stack);
-
- gpr_mu_lock(&chand->mu);
- for (i = 0; i < chand->child_count; i++) {
- if (chand->children[i].channel_stack == calld->s.active.using_stack) {
- chand->children[i].inflight_requests--;
- /* TODO(ctiller): garbage collect channels that are not active
- and have no inflight requests */
- }
- }
- gpr_mu_unlock(&chand->mu);
+ grpc_child_call_destroy(calld->s.active.child_call);
}
}
@@ -497,9 +381,6 @@ static void init_channel_elem(grpc_channel_element *elem,
gpr_mu_init(&chand->mu);
chand->active_child = NULL;
- chand->children = NULL;
- chand->child_count = 0;
- chand->child_capacity = 0;
chand->waiting_children = NULL;
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
@@ -515,14 +396,12 @@ static void init_channel_elem(grpc_channel_element *elem,
/* Destructor for channel_data */
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- size_t i;
grpc_transport_setup_cancel(chand->transport_setup);
- for (i = 0; i < chand->child_count; i++) {
- GPR_ASSERT(chand->children[i].inflight_requests == 0);
- grpc_channel_stack_destroy(chand->children[i].channel_stack);
- gpr_free(chand->children[i].channel_stack);
+ if (chand->active_child) {
+ grpc_child_channel_destroy(chand->active_child);
+ chand->active_child = NULL;
}
grpc_channel_args_destroy(chand->args);
@@ -531,17 +410,16 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
gpr_mu_destroy(&chand->mu);
GPR_ASSERT(chand->waiting_child_count == 0);
gpr_free(chand->waiting_children);
- gpr_free(chand->children);
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op,
+ call_op, channel_op,
- sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "clientchannel",
+ "client-channel",
};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
@@ -551,12 +429,10 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
/* we just got a new transport: lets create a child channel stack for it */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
- grpc_channel_element *lb_elem;
- grpc_channel_stack *child_stack;
size_t num_child_filters = 2 + num_channel_filters;
grpc_channel_filter const **child_filters;
grpc_transport_setup_result result;
- child_entry *child_ent;
+ grpc_child_channel *old_active = NULL;
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
@@ -565,7 +441,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
/* we always need a link back filter to get back to the connected channel */
- child_filters[0] = &link_back_filter;
+ child_filters[0] = &grpc_child_channel_top_filter;
for (i = 0; i < num_channel_filters; i++) {
child_filters[i + 1] = channel_filters[i];
}
@@ -578,28 +454,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
gpr_mu_lock(&chand->mu);
chand->transport_setup_initiated = 0;
- if (chand->child_count == chand->child_capacity) {
- /* realloc will invalidate chand->active_child, but it's reset in the next
- stanza anyway */
- chand->child_capacity =
- GPR_MAX(2 * chand->child_capacity, chand->child_capacity + 2);
- chand->children = gpr_realloc(chand->children,
- sizeof(child_entry) * chand->child_capacity);
+ if (chand->active_child) {
+ old_active = chand->active_child;
}
-
- /* build up the child stack */
- child_stack =
- gpr_malloc(grpc_channel_stack_size(child_filters, num_child_filters));
- grpc_channel_stack_init(child_filters, num_child_filters, chand->args, mdctx,
- child_stack);
- lb_elem = grpc_channel_stack_element(child_stack, 0);
- GPR_ASSERT(lb_elem->filter == &link_back_filter);
- ((lb_channel_data *)lb_elem->channel_data)->back = elem;
- result = grpc_connected_channel_bind_transport(child_stack, transport);
- child_ent = &chand->children[chand->child_count++];
- child_ent->channel_stack = child_stack;
- child_ent->inflight_requests = 0;
- chand->active_child = child_ent;
+ chand->active_child = grpc_child_channel_create(
+ elem, child_filters, num_child_filters, chand->args, mdctx);
+ result =
+ grpc_connected_channel_bind_transport(chand->active_child, transport);
/* capture the waiting children - we'll activate them outside the lock
to avoid re-entrancy problems */
@@ -620,7 +481,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
call_ops[i].user_data =
waiting_children[i]->s.waiting.on_complete_user_data;
- if (!prepare_activate(waiting_children[i], child_ent)) {
+ if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
}
@@ -634,13 +495,17 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
that guarantee we need to do some curly locking here */
for (i = 0; i < waiting_child_count; i++) {
if (waiting_children[i]) {
- complete_activate(waiting_children[i]->elem, child_ent, &call_ops[i]);
+ complete_activate(waiting_children[i]->elem, &call_ops[i]);
}
}
gpr_free(waiting_children);
gpr_free(call_ops);
gpr_free(child_filters);
+ if (old_active) {
+ grpc_child_channel_destroy(old_active);
+ }
+
return result;
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index c70c660fbe..0570439813 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -682,6 +682,7 @@ static void unlock(transport *t) {
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
+ t->cap_pending_goaways = 0;
t->calling_back = 1;
}
}
diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj
index c3e421bc2d..4a0c2a1025 100644
--- a/vsprojects/vs2013/grpc.vcxproj
+++ b/vsprojects/vs2013/grpc.vcxproj
@@ -85,6 +85,7 @@
<ClInclude Include="..\..\src\core\channel\census_filter.h" />
<ClInclude Include="..\..\src\core\channel\channel_args.h" />
<ClInclude Include="..\..\src\core\channel\channel_stack.h" />
+ <ClInclude Include="..\..\src\core\channel\child_channel.h" />
<ClInclude Include="..\..\src\core\channel\client_channel.h" />
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
@@ -169,6 +170,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_stack.c">
</ClCompile>
+ <ClCompile Include="..\..\src\core\channel\child_channel.c">
+ </ClCompile>
<ClCompile Include="..\..\src\core\channel\client_channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\client_setup.c">