diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-26 16:57:20 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-26 16:57:20 -0700 |
commit | ff54c92adc8c46a0cdce74d06f98faea14f52100 (patch) | |
tree | 4b257e425840ca777fd919502829242ef7a6b29a | |
parent | 4b804104b67c1c10b358877ed5090df62c422fc1 (diff) |
Get the success case through to call creation
-rw-r--r-- | src/core/channel/connected_channel.c | 1 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 5 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 45 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 1 |
4 files changed, 47 insertions, 5 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 80a3100af0..84caecb6b3 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -107,7 +107,6 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; - GPR_ASSERT(!is_first); GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 83a25a9a72..f3a6d21eb5 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -106,7 +106,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; pf_ref(pol); - grpc_subchannel_notify_on_state_change(p->subchannels[0], &p->checking_connectivity, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); pp = gpr_malloc(sizeof(*pp)); @@ -142,7 +142,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { loop: switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: - p->selected = p->subchannels[p->checking_connectivity]; + p->selected = p->subchannels[p->checking_subchannel]; + GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) == GRPC_CHANNEL_READY); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 2b4c7ea1d3..e863c5b97c 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include "src/core/channel/channel_args.h" +#include "src/core/channel/connected_channel.h" typedef struct { gpr_refcount refs; @@ -106,6 +107,7 @@ static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_ static void connectivity_state_changed_locked(grpc_subchannel *c); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); +static void subchannel_connected(void *subchannel, int iomgr_success); /* * grpc_subchannel implementation @@ -119,6 +121,7 @@ void grpc_subchannel_unref(grpc_subchannel *c) { grpc_channel_args_destroy(c->args); gpr_free(c->addr); grpc_mdctx_unref(c->mdctx); + grpc_pollset_set_destroy(&c->pollset_set); gpr_free(c); } } @@ -140,16 +143,19 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, gpr_ref_init(&c->refs, 1); c->connector = connector; grpc_connector_ref(c->connector); - c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * args->filter_count); + c->filter_count = args->filter_count + 1; + c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->filter_count); memcpy(c->filters, args->filters, sizeof(grpc_channel_filter *) * args->filter_count); - c->filter_count = args->filter_count; + c->filters[c->filter_count - 1] = &grpc_connected_channel_filter; c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); c->addr_len = args->addr_len; c->args = grpc_channel_args_copy(args->args); c->mdctx = args->mdctx; grpc_mdctx_ref(c->mdctx); + grpc_pollset_set_init(&c->pollset_set); + grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); gpr_mu_init(&c->mu); return c; } @@ -178,6 +184,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, if (!c->connecting) { c->connecting = 1; connectivity_state_changed_locked(c); + grpc_subchannel_ref(c); gpr_mu_unlock(&c->mu); grpc_connector_connect(c->connector, &c->pollset_set, c->addr, @@ -211,6 +218,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, current = GRPC_CHANNEL_CONNECTING; c->connecting = 1; do_connect = 1; + grpc_subchannel_ref(c); connectivity_state_changed_locked(c); } if (current != *state) { @@ -230,6 +238,39 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, } } +static void publish_transport(grpc_subchannel *c) { + size_t channel_stack_size = grpc_channel_stack_size(c->filters, c->filter_count); + connection *con = gpr_malloc(sizeof(connection) + channel_stack_size); + grpc_channel_stack *stk = (grpc_channel_stack *)(con + 1); + waiting_for_connect *w4c; + gpr_ref_init(&con->refs, 1); + con->subchannel = c; + grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk); + grpc_connected_channel_bind_transport(stk, c->connecting_transport); + c->connecting_transport = NULL; + + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->active == NULL); + c->active = con; + c->connecting = 0; + connectivity_state_changed_locked(c); + while ((w4c = c->waiting)) { + abort(); /* not implemented */ + } + gpr_mu_unlock(&c->mu); +} + +static void subchannel_connected(void *arg, int iomgr_success) { + grpc_subchannel *c = arg; + if (c->connecting_transport) { + publish_transport(c); + } else { + grpc_subchannel_unref(c); + /* TODO(ctiller): retry after sleeping */ + abort(); + } +} + static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { return gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index a98f550ede..3d54ff58d8 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -88,6 +88,7 @@ static void connector_connect( grpc_transport **transport, grpc_iomgr_closure *notify) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); + GPR_ASSERT(notify->cb); c->notify = notify; c->args = channel_args; c->mdctx = metadata_context; |