diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/child_channel.c | 269 | ||||
-rw-r--r-- | src/core/channel/child_channel.h | 63 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 259 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 1 |
4 files changed, 395 insertions, 197 deletions
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c new file mode 100644 index 0000000000..e1b890674f --- /dev/null +++ b/src/core/channel/child_channel.c @@ -0,0 +1,269 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/child_channel.h" +#include "src/core/iomgr/iomgr.h" +#include <grpc/support/alloc.h> + +/* Link back filter: passes up calls to the client channel, pushes down calls + down */ + +static void unref_channel(grpc_child_channel *channel); + +typedef struct { + gpr_mu mu; + gpr_cv cv; + grpc_channel_element *back; + gpr_refcount refs; + int calling_back; + int sent_goaway; +} lb_channel_data; + +typedef struct { + grpc_call_element *back; + gpr_refcount refs; + grpc_child_channel *channel; +} lb_call_data; + +static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem, + grpc_call_op *op) { + lb_call_data *calld = elem->call_data; + + switch (op->dir) { + case GRPC_CALL_UP: + calld->back->filter->call_op(calld->back, elem, op); + break; + case GRPC_CALL_DOWN: + grpc_call_next_op(elem, op); + break; + } +} + +static void delayed_unref(void *elem, grpc_iomgr_cb_status status) { + unref_channel(grpc_channel_stack_from_top_element(elem)); +} + +/* Currently we assume all channel operations should just be pushed up. */ +static void lb_channel_op(grpc_channel_element *elem, + grpc_channel_element *from_elem, + grpc_channel_op *op) { + lb_channel_data *chand = elem->channel_data; + grpc_channel_element *back; + + switch (op->dir) { + case GRPC_CALL_UP: + gpr_mu_lock(&chand->mu); + back = chand->back; + if (back) chand->calling_back++; + gpr_mu_unlock(&chand->mu); + if (back) { + back->filter->channel_op(chand->back, elem, op); + gpr_mu_lock(&chand->mu); + chand->calling_back--; + gpr_cv_broadcast(&chand->cv); + gpr_mu_unlock(&chand->mu); + } + break; + case GRPC_CALL_DOWN: + grpc_channel_next_op(elem, op); + break; + } + + switch (op->type) { + case GRPC_TRANSPORT_CLOSED: + grpc_iomgr_add_callback(delayed_unref, elem); + break; + case GRPC_CHANNEL_GOAWAY: + gpr_mu_lock(&chand->mu); + chand->sent_goaway = 1; + gpr_mu_unlock(&chand->mu); + break; + case GRPC_CHANNEL_DISCONNECT: + case GRPC_TRANSPORT_GOAWAY: + case GRPC_ACCEPT_CALL: + break; + } +} + +/* Constructor for call_data */ +static void lb_init_call_elem(grpc_call_element *elem, + const void *server_transport_data) {} + +/* Destructor for call_data */ +static void lb_destroy_call_elem(grpc_call_element *elem) {} + +/* Constructor for channel_data */ +static void lb_init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { + lb_channel_data *chand = elem->channel_data; + GPR_ASSERT(is_first); + GPR_ASSERT(!is_last); + gpr_mu_init(&chand->mu); + gpr_cv_init(&chand->cv); + /* one ref for getting grpc_child_channel_destroy called, one for getting + disconnected */ + gpr_ref_init(&chand->refs, 2); + chand->back = NULL; + chand->sent_goaway = 0; + chand->calling_back = 0; +} + +/* Destructor for channel_data */ +static void lb_destroy_channel_elem(grpc_channel_element *elem) { + lb_channel_data *chand = elem->channel_data; + gpr_mu_destroy(&chand->mu); + gpr_cv_destroy(&chand->cv); +} + +const grpc_channel_filter grpc_child_channel_top_filter = { + lb_call_op, lb_channel_op, + + sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem, + + sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem, + + "child-channel", +}; + +/* grpc_child_channel proper */ + +#define LINK_BACK_ELEM_FROM_CHANNEL(channel) \ + grpc_channel_stack_element((channel), 0) + +#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0) + +static void unref_channel(grpc_child_channel *channel) { + lb_channel_data *lb = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; + if (gpr_unref(&lb->refs)) { + grpc_channel_stack_destroy(channel); + gpr_free(channel); + } +} + +static void ref_channel(grpc_child_channel *channel) { + lb_channel_data *lb = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; + gpr_ref(&lb->refs); +} + +static void unref_call(grpc_child_call *call) { + lb_call_data *lb = LINK_BACK_ELEM_FROM_CALL(call)->call_data; + if (gpr_unref(&lb->refs)) { + grpc_child_channel *channel = lb->channel; + grpc_call_stack_destroy(call); + gpr_free(call); + unref_channel(channel); + } +} + +#if 0 +static void ref_call(grpc_child_call *call) { + lb_call_data *lb = LINK_BACK_ELEM_FROM_CALL(call)->call_data; + gpr_ref(&lb->refs); +} +#endif + +grpc_child_channel *grpc_child_channel_create( + grpc_channel_element *parent, const grpc_channel_filter **filters, + size_t filter_count, const grpc_channel_args *args, + grpc_mdctx *metadata_context) { + grpc_channel_stack *stk = + gpr_malloc(grpc_channel_stack_size(filters, filter_count)); + lb_channel_data *lb; + + grpc_channel_stack_init(filters, filter_count, args, metadata_context, stk); + + lb = LINK_BACK_ELEM_FROM_CHANNEL(stk)->channel_data; + gpr_mu_lock(&lb->mu); + lb->back = parent; + gpr_mu_unlock(&lb->mu); + + return (grpc_child_channel *)stk; +} + +void grpc_child_channel_destroy(grpc_child_channel *channel) { + grpc_channel_op op; + int send_goaway = 0; + grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel); + lb_channel_data *chand = lbelem->channel_data; + + gpr_mu_lock(&chand->mu); + while (chand->calling_back) { + gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future); + } + send_goaway = !chand->sent_goaway; + chand->sent_goaway = 1; + chand->back = NULL; + gpr_mu_unlock(&chand->mu); + + if (send_goaway) { + op.type = GRPC_CHANNEL_GOAWAY; + op.dir = GRPC_CALL_DOWN; + op.data.goaway.status = GRPC_STATUS_OK; + op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect"); + grpc_channel_next_op(lbelem, &op); + } + + op.type = GRPC_CHANNEL_DISCONNECT; + op.dir = GRPC_CALL_DOWN; + grpc_channel_next_op(lbelem, &op); + + unref_channel(channel); +} + +void grpc_child_channel_handle_op(grpc_child_channel *channel, + grpc_channel_op *op) { + grpc_channel_next_op(LINK_BACK_ELEM_FROM_CHANNEL(channel), op); +} + +grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, + grpc_call_element *parent) { + grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size); + lb_call_data *lbcalld; + ref_channel(channel); + + grpc_call_stack_init(channel, NULL, stk); + lbcalld = LINK_BACK_ELEM_FROM_CALL(stk)->call_data; + gpr_ref_init(&lbcalld->refs, 1); + lbcalld->back = parent; + lbcalld->channel = channel; + + return (grpc_child_call *)stk; +} + +void grpc_child_call_destroy(grpc_child_call *call) { unref_call(call); } + +grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call) { + return LINK_BACK_ELEM_FROM_CALL(call); +} diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h new file mode 100644 index 0000000000..9fb2a17e29 --- /dev/null +++ b/src/core/channel/child_channel.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_ +#define __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_ + +#include "src/core/channel/channel_stack.h" + +/* helper for filters that need to host child channel stacks... handles + lifetime and upwards propagation cleanly */ + +const grpc_channel_filter grpc_child_channel_top_filter; + +typedef grpc_channel_stack grpc_child_channel; +typedef grpc_call_stack grpc_child_call; + +/* filters[0] must be &grpc_child_channel_top_filter */ +grpc_child_channel *grpc_child_channel_create( + grpc_channel_element *parent, const grpc_channel_filter **filters, + size_t filter_count, const grpc_channel_args *args, + grpc_mdctx *metadata_context); +void grpc_child_channel_handle_op(grpc_child_channel *channel, + grpc_channel_op *op); +grpc_channel_element *grpc_child_channel_get_bottom_element( + grpc_child_channel *channel); +void grpc_child_channel_destroy(grpc_child_channel *channel); + +grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, + grpc_call_element *parent); +grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call); +void grpc_child_call_destroy(grpc_child_call *call); + +#endif /* __GRPC_INTERNAL_CHANNEL_CHILD_CHANNEL_H_ */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9ec19df8a9..4bd294dc96 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -36,6 +36,7 @@ #include <stdio.h> #include "src/core/channel/channel_args.h" +#include "src/core/channel/child_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/metadata_buffer.h" #include <grpc/support/alloc.h> @@ -44,79 +45,8 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> -/* Link back filter: passes up calls to the client channel, pushes down calls - down */ - -typedef struct { grpc_channel_element *back; } lb_channel_data; - -typedef struct { grpc_call_element *back; } lb_call_data; - -static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - lb_call_data *calld = elem->call_data; - - switch (op->dir) { - case GRPC_CALL_UP: - calld->back->filter->call_op(calld->back, elem, op); - break; - case GRPC_CALL_DOWN: - grpc_call_next_op(elem, op); - break; - } -} - -/* Currently we assume all channel operations should just be pushed up. */ -static void lb_channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, - grpc_channel_op *op) { - lb_channel_data *chand = elem->channel_data; - - switch (op->dir) { - case GRPC_CALL_UP: - chand->back->filter->channel_op(chand->back, elem, op); - break; - case GRPC_CALL_DOWN: - grpc_channel_next_op(elem, op); - break; - } -} - -/* Constructor for call_data */ -static void lb_init_call_elem(grpc_call_element *elem, - const void *server_transport_data) {} - -/* Destructor for call_data */ -static void lb_destroy_call_elem(grpc_call_element *elem) {} - -/* Constructor for channel_data */ -static void lb_init_channel_elem(grpc_channel_element *elem, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last) { - GPR_ASSERT(is_first); - GPR_ASSERT(!is_last); -} - -/* Destructor for channel_data */ -static void lb_destroy_channel_elem(grpc_channel_element *elem) {} - -static const grpc_channel_filter link_back_filter = { - lb_call_op, lb_channel_op, - - sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem, - - sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem, - - "clientchannel.linkback", -}; - /* Client channel implementation */ -typedef struct { - size_t inflight_requests; - grpc_channel_stack *channel_stack; -} child_entry; - typedef struct call_data call_data; typedef struct { @@ -126,12 +56,8 @@ typedef struct { transport_setup is assumed to be set once during construction */ gpr_mu mu; - /* the sending child (points somewhere in children, or NULL) */ - child_entry *active_child; - /* vector of child channels */ - child_entry *children; - size_t child_count; - size_t child_capacity; + /* the sending child (may be null) */ + grpc_child_channel *active_child; /* calls waiting for a channel to be ready */ call_data **waiting_children; @@ -165,9 +91,7 @@ struct call_data { union { struct { /* our child call stack */ - grpc_call_stack *child_stack; - /* ... and the channel stack associated with it */ - grpc_channel_stack *using_stack; + grpc_child_call *child_call; } active; struct { void (*on_complete)(void *user_data, grpc_op_error error); @@ -177,38 +101,27 @@ struct call_data { } s; }; -static int prepare_activate(call_data *calld, child_entry *on_child) { - grpc_call_element *child_elem; - grpc_channel_stack *use_stack = on_child->channel_stack; - +static int prepare_activate(grpc_call_element *elem, + grpc_child_channel *on_child) { + call_data *calld = elem->call_data; if (calld->state == CALL_CANCELLED) return 0; - on_child->inflight_requests++; - /* no more access to calld->s.waiting allowed */ GPR_ASSERT(calld->state == CALL_WAITING); calld->state = CALL_ACTIVE; - /* create a child stack, and record that we're using a particular channel - stack */ - calld->s.active.child_stack = gpr_malloc(use_stack->call_stack_size); - calld->s.active.using_stack = use_stack; - grpc_call_stack_init(use_stack, NULL, calld->s.active.child_stack); - /* initialize the top level link back element */ - child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0); - GPR_ASSERT(child_elem->filter == &link_back_filter); - ((lb_call_data *)child_elem->call_data)->back = calld->elem; + /* create a child call */ + calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem); return 1; } static void do_nothing(void *ignored, grpc_op_error error) {} -static void complete_activate(grpc_call_element *elem, child_entry *on_child, - grpc_call_op *op) { +static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = - grpc_call_stack_element(calld->s.active.child_stack, 0); + grpc_child_call_get_top_element(calld->s.active.child_call); GPR_ASSERT(calld->state == CALL_ACTIVE); @@ -244,10 +157,10 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { calld->state = CALL_WAITING; if (chand->active_child) { /* channel is connected - use the connected stack */ - if (prepare_activate(calld, chand->active_child)) { + if (prepare_activate(elem, chand->active_child)) { gpr_mu_unlock(&chand->mu); /* activate the request (pass it down) outside the lock */ - complete_activate(elem, chand->active_child, op); + complete_activate(elem, op); } else { gpr_mu_unlock(&chand->mu); } @@ -303,7 +216,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { gpr_mu_lock(&chand->mu); switch (calld->state) { case CALL_ACTIVE: - child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0); + child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); gpr_mu_unlock(&chand->mu); child_elem->filter->call_op(child_elem, elem, op); return; /* early out */ @@ -367,7 +280,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_next_op(elem, op); break; case GRPC_CALL_DOWN: - child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0); + child_elem = + grpc_child_call_get_top_element(calld->s.active.child_call); GPR_ASSERT(calld->state == CALL_ACTIVE); child_elem->filter->call_op(child_elem, elem, op); break; @@ -376,59 +290,42 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, } } -static void broadcast_channel_op_down(grpc_channel_element *elem, - grpc_channel_op *op) { - channel_data *chand = elem->channel_data; - grpc_channel_element *child_elem; - grpc_channel_stack **children; - size_t child_count; - size_t i; - - /* copy the current set of children, and mark them all as having an inflight - request */ - gpr_mu_lock(&chand->mu); - child_count = chand->child_count; - children = gpr_malloc(sizeof(grpc_channel_stack *) * child_count); - for (i = 0; i < child_count; i++) { - children[i] = chand->children[i].channel_stack; - chand->children[i].inflight_requests++; - } - gpr_mu_unlock(&chand->mu); - - /* send the message down */ - for (i = 0; i < child_count; i++) { - child_elem = grpc_channel_stack_element(children[i], 0); - if (op->type == GRPC_CHANNEL_GOAWAY) { - gpr_slice_ref(op->data.goaway.message); - } - child_elem->filter->channel_op(child_elem, elem, op); - } - if (op->type == GRPC_CHANNEL_GOAWAY) { - gpr_slice_unref(op->data.goaway.message); - } - - /* unmark the inflight requests */ - gpr_mu_lock(&chand->mu); - for (i = 0; i < child_count; i++) { - chand->children[i].inflight_requests--; - } - gpr_mu_unlock(&chand->mu); - - gpr_free(children); -} - static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { + channel_data *chand = elem->channel_data; + grpc_child_channel *child_channel; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); switch (op->type) { + case GRPC_CHANNEL_GOAWAY: + gpr_mu_lock(&chand->mu); + child_channel = chand->active_child; + chand->active_child = NULL; + gpr_mu_unlock(&chand->mu); + if (child_channel) { + grpc_child_channel_handle_op(child_channel, op); + grpc_child_channel_destroy(child_channel); + } else { + gpr_slice_unref(op->data.goaway.message); + } + break; + case GRPC_CHANNEL_DISCONNECT: + gpr_mu_lock(&chand->mu); + child_channel = chand->active_child; + chand->active_child = NULL; + gpr_mu_unlock(&chand->mu); + if (child_channel) { + grpc_child_channel_destroy(child_channel); + } + break; default: switch (op->dir) { case GRPC_CALL_UP: grpc_channel_next_op(elem, op); break; case GRPC_CALL_DOWN: - broadcast_channel_op_down(elem, op); + gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type); + abort(); break; } break; @@ -459,8 +356,6 @@ static void init_call_elem(grpc_call_element *elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - size_t i; /* if the metadata buffer is not flushed, destroy it here. */ grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK); @@ -468,18 +363,7 @@ static void destroy_call_elem(grpc_call_element *elem) { remove it from the in-flight requests tracked by the child_entry we picked */ if (calld->state == CALL_ACTIVE) { - grpc_call_stack_destroy(calld->s.active.child_stack); - gpr_free(calld->s.active.child_stack); - - gpr_mu_lock(&chand->mu); - for (i = 0; i < chand->child_count; i++) { - if (chand->children[i].channel_stack == calld->s.active.using_stack) { - chand->children[i].inflight_requests--; - /* TODO(ctiller): garbage collect channels that are not active - and have no inflight requests */ - } - } - gpr_mu_unlock(&chand->mu); + grpc_child_call_destroy(calld->s.active.child_call); } } @@ -497,9 +381,6 @@ static void init_channel_elem(grpc_channel_element *elem, gpr_mu_init(&chand->mu); chand->active_child = NULL; - chand->children = NULL; - chand->child_count = 0; - chand->child_capacity = 0; chand->waiting_children = NULL; chand->waiting_child_count = 0; chand->waiting_child_capacity = 0; @@ -515,14 +396,12 @@ static void init_channel_elem(grpc_channel_element *elem, /* Destructor for channel_data */ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - size_t i; grpc_transport_setup_cancel(chand->transport_setup); - for (i = 0; i < chand->child_count; i++) { - GPR_ASSERT(chand->children[i].inflight_requests == 0); - grpc_channel_stack_destroy(chand->children[i].channel_stack); - gpr_free(chand->children[i].channel_stack); + if (chand->active_child) { + grpc_child_channel_destroy(chand->active_child); + chand->active_child = NULL; } grpc_channel_args_destroy(chand->args); @@ -531,17 +410,16 @@ static void destroy_channel_elem(grpc_channel_element *elem) { gpr_mu_destroy(&chand->mu); GPR_ASSERT(chand->waiting_child_count == 0); gpr_free(chand->waiting_children); - gpr_free(chand->children); } const grpc_channel_filter grpc_client_channel_filter = { - call_op, channel_op, + call_op, channel_op, - sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, - "clientchannel", + "client-channel", }; grpc_transport_setup_result grpc_client_channel_transport_setup_complete( @@ -551,12 +429,10 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( /* we just got a new transport: lets create a child channel stack for it */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; - grpc_channel_element *lb_elem; - grpc_channel_stack *child_stack; size_t num_child_filters = 2 + num_channel_filters; grpc_channel_filter const **child_filters; grpc_transport_setup_result result; - child_entry *child_ent; + grpc_child_channel *old_active = NULL; call_data **waiting_children; size_t waiting_child_count; size_t i; @@ -565,7 +441,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( /* build the child filter stack */ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); /* we always need a link back filter to get back to the connected channel */ - child_filters[0] = &link_back_filter; + child_filters[0] = &grpc_child_channel_top_filter; for (i = 0; i < num_channel_filters; i++) { child_filters[i + 1] = channel_filters[i]; } @@ -578,28 +454,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( gpr_mu_lock(&chand->mu); chand->transport_setup_initiated = 0; - if (chand->child_count == chand->child_capacity) { - /* realloc will invalidate chand->active_child, but it's reset in the next - stanza anyway */ - chand->child_capacity = - GPR_MAX(2 * chand->child_capacity, chand->child_capacity + 2); - chand->children = gpr_realloc(chand->children, - sizeof(child_entry) * chand->child_capacity); + if (chand->active_child) { + old_active = chand->active_child; } - - /* build up the child stack */ - child_stack = - gpr_malloc(grpc_channel_stack_size(child_filters, num_child_filters)); - grpc_channel_stack_init(child_filters, num_child_filters, chand->args, mdctx, - child_stack); - lb_elem = grpc_channel_stack_element(child_stack, 0); - GPR_ASSERT(lb_elem->filter == &link_back_filter); - ((lb_channel_data *)lb_elem->channel_data)->back = elem; - result = grpc_connected_channel_bind_transport(child_stack, transport); - child_ent = &chand->children[chand->child_count++]; - child_ent->channel_stack = child_stack; - child_ent->inflight_requests = 0; - chand->active_child = child_ent; + chand->active_child = grpc_child_channel_create( + elem, child_filters, num_child_filters, chand->args, mdctx); + result = + grpc_connected_channel_bind_transport(chand->active_child, transport); /* capture the waiting children - we'll activate them outside the lock to avoid re-entrancy problems */ @@ -620,7 +481,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete; call_ops[i].user_data = waiting_children[i]->s.waiting.on_complete_user_data; - if (!prepare_activate(waiting_children[i], child_ent)) { + if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); } @@ -634,13 +495,17 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( that guarantee we need to do some curly locking here */ for (i = 0; i < waiting_child_count; i++) { if (waiting_children[i]) { - complete_activate(waiting_children[i]->elem, child_ent, &call_ops[i]); + complete_activate(waiting_children[i]->elem, &call_ops[i]); } } gpr_free(waiting_children); gpr_free(call_ops); gpr_free(child_filters); + if (old_active) { + grpc_child_channel_destroy(old_active); + } + return result; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c70c660fbe..0570439813 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -682,6 +682,7 @@ static void unlock(transport *t) { num_goaways = t->num_pending_goaways; t->pending_goaways = NULL; t->num_pending_goaways = 0; + t->cap_pending_goaways = 0; t->calling_back = 1; } } |