aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2014-12-12 08:43:28 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2014-12-12 16:17:37 -0800
commit82e275ffaf7210b4e0721aec70ba8c4807440da3 (patch)
tree84502e5c856fa8d044a1688ed9b868715ca7aed2 /src/core/channel/client_channel.c
parente99270d63a230bf345f4323e1fd6f11c2fb842dc (diff)
Split client_channel into client_channel & child_channel.
Safely managing disconnection and goaways needs reference counting at the child channel level, which was near impossible to provide with the previous embedding of the child channel in the client channel. This is a (hopefully) no-op refactoring to provide that split. The next CL in this series will actually get disconnection and goaway somewhat right. Change on 2014/12/12 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=81985590
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c259
1 files changed, 62 insertions, 197 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9ec19df8a9..4bd294dc96 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -36,6 +36,7 @@
#include <stdio.h>
#include "src/core/channel/channel_args.h"
+#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/metadata_buffer.h"
#include <grpc/support/alloc.h>
@@ -44,79 +45,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
-/* Link back filter: passes up calls to the client channel, pushes down calls
- down */
-
-typedef struct { grpc_channel_element *back; } lb_channel_data;
-
-typedef struct { grpc_call_element *back; } lb_call_data;
-
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Currently we assume all channel operations should just be pushed up. */
-static void lb_channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem,
- grpc_channel_op *op) {
- lb_channel_data *chand = elem->channel_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- chand->back->filter->channel_op(chand->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void lb_init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
-
-/* Destructor for call_data */
-static void lb_destroy_call_elem(grpc_call_element *elem) {}
-
-/* Constructor for channel_data */
-static void lb_init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last) {
- GPR_ASSERT(is_first);
- GPR_ASSERT(!is_last);
-}
-
-/* Destructor for channel_data */
-static void lb_destroy_channel_elem(grpc_channel_element *elem) {}
-
-static const grpc_channel_filter link_back_filter = {
- lb_call_op, lb_channel_op,
-
- sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
-
- sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
-
- "clientchannel.linkback",
-};
-
/* Client channel implementation */
-typedef struct {
- size_t inflight_requests;
- grpc_channel_stack *channel_stack;
-} child_entry;
-
typedef struct call_data call_data;
typedef struct {
@@ -126,12 +56,8 @@ typedef struct {
transport_setup is assumed to be set once during construction */
gpr_mu mu;
- /* the sending child (points somewhere in children, or NULL) */
- child_entry *active_child;
- /* vector of child channels */
- child_entry *children;
- size_t child_count;
- size_t child_capacity;
+ /* the sending child (may be null) */
+ grpc_child_channel *active_child;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -165,9 +91,7 @@ struct call_data {
union {
struct {
/* our child call stack */
- grpc_call_stack *child_stack;
- /* ... and the channel stack associated with it */
- grpc_channel_stack *using_stack;
+ grpc_child_call *child_call;
} active;
struct {
void (*on_complete)(void *user_data, grpc_op_error error);
@@ -177,38 +101,27 @@ struct call_data {
} s;
};
-static int prepare_activate(call_data *calld, child_entry *on_child) {
- grpc_call_element *child_elem;
- grpc_channel_stack *use_stack = on_child->channel_stack;
-
+static int prepare_activate(grpc_call_element *elem,
+ grpc_child_channel *on_child) {
+ call_data *calld = elem->call_data;
if (calld->state == CALL_CANCELLED) return 0;
- on_child->inflight_requests++;
-
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
calld->state = CALL_ACTIVE;
- /* create a child stack, and record that we're using a particular channel
- stack */
- calld->s.active.child_stack = gpr_malloc(use_stack->call_stack_size);
- calld->s.active.using_stack = use_stack;
- grpc_call_stack_init(use_stack, NULL, calld->s.active.child_stack);
- /* initialize the top level link back element */
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
- GPR_ASSERT(child_elem->filter == &link_back_filter);
- ((lb_call_data *)child_elem->call_data)->back = calld->elem;
+ /* create a child call */
+ calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
return 1;
}
static void do_nothing(void *ignored, grpc_op_error error) {}
-static void complete_activate(grpc_call_element *elem, child_entry *on_child,
- grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
- grpc_call_stack_element(calld->s.active.child_stack, 0);
+ grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
@@ -244,10 +157,10 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
calld->state = CALL_WAITING;
if (chand->active_child) {
/* channel is connected - use the connected stack */
- if (prepare_activate(calld, chand->active_child)) {
+ if (prepare_activate(elem, chand->active_child)) {
gpr_mu_unlock(&chand->mu);
/* activate the request (pass it down) outside the lock */
- complete_activate(elem, chand->active_child, op);
+ complete_activate(elem, op);
} else {
gpr_mu_unlock(&chand->mu);
}
@@ -303,7 +216,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
child_elem->filter->call_op(child_elem, elem, op);
return; /* early out */
@@ -367,7 +280,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_next_op(elem, op);
break;
case GRPC_CALL_DOWN:
- child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
+ child_elem =
+ grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
child_elem->filter->call_op(child_elem, elem, op);
break;
@@ -376,59 +290,42 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
}
}
-static void broadcast_channel_op_down(grpc_channel_element *elem,
- grpc_channel_op *op) {
- channel_data *chand = elem->channel_data;
- grpc_channel_element *child_elem;
- grpc_channel_stack **children;
- size_t child_count;
- size_t i;
-
- /* copy the current set of children, and mark them all as having an inflight
- request */
- gpr_mu_lock(&chand->mu);
- child_count = chand->child_count;
- children = gpr_malloc(sizeof(grpc_channel_stack *) * child_count);
- for (i = 0; i < child_count; i++) {
- children[i] = chand->children[i].channel_stack;
- chand->children[i].inflight_requests++;
- }
- gpr_mu_unlock(&chand->mu);
-
- /* send the message down */
- for (i = 0; i < child_count; i++) {
- child_elem = grpc_channel_stack_element(children[i], 0);
- if (op->type == GRPC_CHANNEL_GOAWAY) {
- gpr_slice_ref(op->data.goaway.message);
- }
- child_elem->filter->channel_op(child_elem, elem, op);
- }
- if (op->type == GRPC_CHANNEL_GOAWAY) {
- gpr_slice_unref(op->data.goaway.message);
- }
-
- /* unmark the inflight requests */
- gpr_mu_lock(&chand->mu);
- for (i = 0; i < child_count; i++) {
- chand->children[i].inflight_requests--;
- }
- gpr_mu_unlock(&chand->mu);
-
- gpr_free(children);
-}
-
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
+ channel_data *chand = elem->channel_data;
+ grpc_child_channel *child_channel;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
switch (op->type) {
+ case GRPC_CHANNEL_GOAWAY:
+ gpr_mu_lock(&chand->mu);
+ child_channel = chand->active_child;
+ chand->active_child = NULL;
+ gpr_mu_unlock(&chand->mu);
+ if (child_channel) {
+ grpc_child_channel_handle_op(child_channel, op);
+ grpc_child_channel_destroy(child_channel);
+ } else {
+ gpr_slice_unref(op->data.goaway.message);
+ }
+ break;
+ case GRPC_CHANNEL_DISCONNECT:
+ gpr_mu_lock(&chand->mu);
+ child_channel = chand->active_child;
+ chand->active_child = NULL;
+ gpr_mu_unlock(&chand->mu);
+ if (child_channel) {
+ grpc_child_channel_destroy(child_channel);
+ }
+ break;
default:
switch (op->dir) {
case GRPC_CALL_UP:
grpc_channel_next_op(elem, op);
break;
case GRPC_CALL_DOWN:
- broadcast_channel_op_down(elem, op);
+ gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
+ abort();
break;
}
break;
@@ -459,8 +356,6 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- size_t i;
/* if the metadata buffer is not flushed, destroy it here. */
grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
@@ -468,18 +363,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
remove it from the in-flight requests tracked by the child_entry we
picked */
if (calld->state == CALL_ACTIVE) {
- grpc_call_stack_destroy(calld->s.active.child_stack);
- gpr_free(calld->s.active.child_stack);
-
- gpr_mu_lock(&chand->mu);
- for (i = 0; i < chand->child_count; i++) {
- if (chand->children[i].channel_stack == calld->s.active.using_stack) {
- chand->children[i].inflight_requests--;
- /* TODO(ctiller): garbage collect channels that are not active
- and have no inflight requests */
- }
- }
- gpr_mu_unlock(&chand->mu);
+ grpc_child_call_destroy(calld->s.active.child_call);
}
}
@@ -497,9 +381,6 @@ static void init_channel_elem(grpc_channel_element *elem,
gpr_mu_init(&chand->mu);
chand->active_child = NULL;
- chand->children = NULL;
- chand->child_count = 0;
- chand->child_capacity = 0;
chand->waiting_children = NULL;
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
@@ -515,14 +396,12 @@ static void init_channel_elem(grpc_channel_element *elem,
/* Destructor for channel_data */
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- size_t i;
grpc_transport_setup_cancel(chand->transport_setup);
- for (i = 0; i < chand->child_count; i++) {
- GPR_ASSERT(chand->children[i].inflight_requests == 0);
- grpc_channel_stack_destroy(chand->children[i].channel_stack);
- gpr_free(chand->children[i].channel_stack);
+ if (chand->active_child) {
+ grpc_child_channel_destroy(chand->active_child);
+ chand->active_child = NULL;
}
grpc_channel_args_destroy(chand->args);
@@ -531,17 +410,16 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
gpr_mu_destroy(&chand->mu);
GPR_ASSERT(chand->waiting_child_count == 0);
gpr_free(chand->waiting_children);
- gpr_free(chand->children);
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op,
+ call_op, channel_op,
- sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "clientchannel",
+ "client-channel",
};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
@@ -551,12 +429,10 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
/* we just got a new transport: lets create a child channel stack for it */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
- grpc_channel_element *lb_elem;
- grpc_channel_stack *child_stack;
size_t num_child_filters = 2 + num_channel_filters;
grpc_channel_filter const **child_filters;
grpc_transport_setup_result result;
- child_entry *child_ent;
+ grpc_child_channel *old_active = NULL;
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
@@ -565,7 +441,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
/* we always need a link back filter to get back to the connected channel */
- child_filters[0] = &link_back_filter;
+ child_filters[0] = &grpc_child_channel_top_filter;
for (i = 0; i < num_channel_filters; i++) {
child_filters[i + 1] = channel_filters[i];
}
@@ -578,28 +454,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
gpr_mu_lock(&chand->mu);
chand->transport_setup_initiated = 0;
- if (chand->child_count == chand->child_capacity) {
- /* realloc will invalidate chand->active_child, but it's reset in the next
- stanza anyway */
- chand->child_capacity =
- GPR_MAX(2 * chand->child_capacity, chand->child_capacity + 2);
- chand->children = gpr_realloc(chand->children,
- sizeof(child_entry) * chand->child_capacity);
+ if (chand->active_child) {
+ old_active = chand->active_child;
}
-
- /* build up the child stack */
- child_stack =
- gpr_malloc(grpc_channel_stack_size(child_filters, num_child_filters));
- grpc_channel_stack_init(child_filters, num_child_filters, chand->args, mdctx,
- child_stack);
- lb_elem = grpc_channel_stack_element(child_stack, 0);
- GPR_ASSERT(lb_elem->filter == &link_back_filter);
- ((lb_channel_data *)lb_elem->channel_data)->back = elem;
- result = grpc_connected_channel_bind_transport(child_stack, transport);
- child_ent = &chand->children[chand->child_count++];
- child_ent->channel_stack = child_stack;
- child_ent->inflight_requests = 0;
- chand->active_child = child_ent;
+ chand->active_child = grpc_child_channel_create(
+ elem, child_filters, num_child_filters, chand->args, mdctx);
+ result =
+ grpc_connected_channel_bind_transport(chand->active_child, transport);
/* capture the waiting children - we'll activate them outside the lock
to avoid re-entrancy problems */
@@ -620,7 +481,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
call_ops[i].user_data =
waiting_children[i]->s.waiting.on_complete_user_data;
- if (!prepare_activate(waiting_children[i], child_ent)) {
+ if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
}
@@ -634,13 +495,17 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
that guarantee we need to do some curly locking here */
for (i = 0; i < waiting_child_count; i++) {
if (waiting_children[i]) {
- complete_activate(waiting_children[i]->elem, child_ent, &call_ops[i]);
+ complete_activate(waiting_children[i]->elem, &call_ops[i]);
}
}
gpr_free(waiting_children);
gpr_free(call_ops);
gpr_free(child_filters);
+ if (old_active) {
+ grpc_child_channel_destroy(old_active);
+ }
+
return result;
}