aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-26 16:57:20 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-26 16:57:20 -0700
commitff54c92adc8c46a0cdce74d06f98faea14f52100 (patch)
tree4b257e425840ca777fd919502829242ef7a6b29a /src/core/client_config
parent4b804104b67c1c10b358877ed5090df62c422fc1 (diff)
Get the success case through to call creation
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c5
-rw-r--r--src/core/client_config/subchannel.c45
2 files changed, 46 insertions, 4 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 83a25a9a72..f3a6d21eb5 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -106,7 +106,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
pf_ref(pol);
- grpc_subchannel_notify_on_state_change(p->subchannels[0], &p->checking_connectivity, &p->connectivity_changed);
+ grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset);
pp = gpr_malloc(sizeof(*pp));
@@ -142,7 +142,8 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
loop:
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
- p->selected = p->subchannels[p->checking_connectivity];
+ p->selected = p->subchannels[p->checking_subchannel];
+ GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) == GRPC_CHANNEL_READY);
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2b4c7ea1d3..e863c5b97c 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include "src/core/channel/channel_args.h"
+#include "src/core/channel/connected_channel.h"
typedef struct {
gpr_refcount refs;
@@ -106,6 +107,7 @@ static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_
static void connectivity_state_changed_locked(grpc_subchannel *c);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
+static void subchannel_connected(void *subchannel, int iomgr_success);
/*
* grpc_subchannel implementation
@@ -119,6 +121,7 @@ void grpc_subchannel_unref(grpc_subchannel *c) {
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
grpc_mdctx_unref(c->mdctx);
+ grpc_pollset_set_destroy(&c->pollset_set);
gpr_free(c);
}
}
@@ -140,16 +143,19 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
gpr_ref_init(&c->refs, 1);
c->connector = connector;
grpc_connector_ref(c->connector);
- c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * args->filter_count);
+ c->filter_count = args->filter_count + 1;
+ c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->filter_count);
memcpy(c->filters, args->filters,
sizeof(grpc_channel_filter *) * args->filter_count);
- c->filter_count = args->filter_count;
+ c->filters[c->filter_count - 1] = &grpc_connected_channel_filter;
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
c->addr_len = args->addr_len;
c->args = grpc_channel_args_copy(args->args);
c->mdctx = args->mdctx;
grpc_mdctx_ref(c->mdctx);
+ grpc_pollset_set_init(&c->pollset_set);
+ grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
gpr_mu_init(&c->mu);
return c;
}
@@ -178,6 +184,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
if (!c->connecting) {
c->connecting = 1;
connectivity_state_changed_locked(c);
+ grpc_subchannel_ref(c);
gpr_mu_unlock(&c->mu);
grpc_connector_connect(c->connector, &c->pollset_set, c->addr,
@@ -211,6 +218,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
current = GRPC_CHANNEL_CONNECTING;
c->connecting = 1;
do_connect = 1;
+ grpc_subchannel_ref(c);
connectivity_state_changed_locked(c);
}
if (current != *state) {
@@ -230,6 +238,39 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
}
}
+static void publish_transport(grpc_subchannel *c) {
+ size_t channel_stack_size = grpc_channel_stack_size(c->filters, c->filter_count);
+ connection *con = gpr_malloc(sizeof(connection) + channel_stack_size);
+ grpc_channel_stack *stk = (grpc_channel_stack *)(con + 1);
+ waiting_for_connect *w4c;
+ gpr_ref_init(&con->refs, 1);
+ con->subchannel = c;
+ grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk);
+ grpc_connected_channel_bind_transport(stk, c->connecting_transport);
+ c->connecting_transport = NULL;
+
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->active == NULL);
+ c->active = con;
+ c->connecting = 0;
+ connectivity_state_changed_locked(c);
+ while ((w4c = c->waiting)) {
+ abort(); /* not implemented */
+ }
+ gpr_mu_unlock(&c->mu);
+}
+
+static void subchannel_connected(void *arg, int iomgr_success) {
+ grpc_subchannel *c = arg;
+ if (c->connecting_transport) {
+ publish_transport(c);
+ } else {
+ grpc_subchannel_unref(c);
+ /* TODO(ctiller): retry after sleeping */
+ abort();
+ }
+}
+
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
}