aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-29 10:55:46 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-29 10:55:46 -0700
commitdf91ba52d0ebbe98ce84508701283b82b2c0441b (patch)
tree9ae7394804add9fef7808d0d85ff7ef213f32d13 /src/core
parent4ab82d2c4dd6a21cf2a13662fb2efa9171efe104 (diff)
Add ability to continue waiting calls
Diffstat (limited to 'src/core')
-rw-r--r--src/core/client_config/subchannel.c134
1 files changed, 124 insertions, 10 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index c770cb3b20..7bd6717a3d 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -49,11 +49,20 @@ typedef struct {
grpc_subchannel *subchannel;
} connection;
+typedef struct {
+ grpc_iomgr_closure closure;
+ size_t version;
+ grpc_subchannel *subchannel;
+ grpc_connectivity_state connectivity_state;
+} state_watcher;
+
typedef struct waiting_for_connect {
struct waiting_for_connect *next;
grpc_iomgr_closure *notify;
- grpc_transport_stream_op *initial_op;
+ grpc_transport_stream_op initial_op;
grpc_subchannel_call **target;
+ grpc_subchannel *subchannel;
+ grpc_iomgr_closure continuation;
} waiting_for_connect;
struct grpc_subchannel {
@@ -85,6 +94,8 @@ struct grpc_subchannel {
/** active connection */
connection *active;
+ /** version number for the active connection */
+ size_t active_version;
/** refcount */
int refs;
/** are we connecting */
@@ -228,6 +239,16 @@ static void start_connect(grpc_subchannel *c) {
&c->connected);
}
+static void continue_creating_call(void *arg, int iomgr_success) {
+ waiting_for_connect *w4c = arg;
+ grpc_subchannel_create_call(w4c->subchannel,
+ &w4c->initial_op,
+ w4c->target,
+ w4c->notify);
+ grpc_subchannel_unref(w4c->subchannel);
+ gpr_free(w4c);
+}
+
void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
@@ -245,8 +266,11 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
w4c->notify = notify;
- w4c->initial_op = initial_op;
+ w4c->initial_op = *initial_op;
w4c->target = target;
+ w4c->subchannel = c;
+ subchannel_ref_locked(c);
+ grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
if (!c->connecting) {
@@ -291,7 +315,70 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_transport_op *op) {
- abort();
+ abort(); /* not implemented */
+}
+
+static void on_state_changed(void *p, int iomgr_success) {
+ state_watcher *sw = p;
+ grpc_subchannel *c = sw->subchannel;
+ gpr_mu *mu = &c->mu;
+ int destroy;
+ grpc_transport_op op;
+ grpc_channel_element *elem;
+ connection *destroy_connection = NULL;
+ int do_connect = 0;
+
+ gpr_mu_lock(mu);
+
+ /* if we failed or there is a version number mismatch, just leave
+ this closure */
+ if (!iomgr_success || sw->subchannel->active_version != sw->version) {
+ goto done;
+ }
+
+ switch (sw->connectivity_state) {
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_READY:
+ case GRPC_CHANNEL_IDLE:
+ /* all is still good: keep watching */
+ memset(&op, 0, sizeof(op));
+ op.connectivity_state = &sw->connectivity_state;
+ op.on_connectivity_state_change = &sw->closure;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
+ elem->filter->start_transport_op(elem, &op);
+ /* early out */
+ gpr_mu_unlock(mu);
+ return;
+ case GRPC_CHANNEL_FATAL_FAILURE:
+ /* things have gone wrong, deactivate and enter idle */
+ if (sw->subchannel->active->refs == 0) {
+ destroy_connection = sw->subchannel->active;
+ }
+ sw->subchannel->active = NULL;
+ break;
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ /* things are starting to go wrong, reconnect but don't deactivate */
+ subchannel_ref_locked(c);
+ do_connect = 1;
+ c->connecting = 1;
+ break;
+ }
+
+done:
+ grpc_connectivity_state_set(&c->state_tracker,
+ compute_connectivity_locked(c));
+ destroy = subchannel_unref_locked(c);
+ gpr_free(sw);
+ gpr_mu_unlock(mu);
+ if (do_connect) {
+ start_connect(c);
+ }
+ if (destroy) {
+ subchannel_destroy(c);
+ }
+ if (destroy_connection != NULL) {
+ connection_destroy(destroy_connection);
+ }
}
static void publish_transport(grpc_subchannel *c) {
@@ -301,8 +388,12 @@ static void publish_transport(grpc_subchannel *c) {
size_t num_filters;
const grpc_channel_filter **filters;
waiting_for_connect *w4c;
- int destroy;
+ grpc_transport_op op;
+ state_watcher *sw;
+ connection *destroy_connection = NULL;
+ grpc_channel_element *elem;
+ /* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
@@ -310,31 +401,54 @@ static void publish_transport(grpc_subchannel *c) {
sizeof(*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
+ /* construct channel stack */
channel_stack_size = grpc_channel_stack_size(filters, num_filters);
con = gpr_malloc(sizeof(connection) + channel_stack_size);
stk = (grpc_channel_stack *)(con + 1);
-
con->refs = 0;
con->subchannel = c;
grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
+ /* initialize state watcher */
+ sw = gpr_malloc(sizeof(*sw));
+ grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
+ sw->subchannel = c;
+ sw->connectivity_state = GRPC_CHANNEL_READY;
+
gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->active == NULL);
+
+ /* publish */
+ if (c->active != NULL && c->active->refs == 0) {
+ destroy_connection = c->active;
+ }
c->active = con;
+ c->active_version++;
+ sw->version = c->active_version;
c->connecting = 0;
+
+ /* watch for changes; subchannel ref for connecting is donated
+ to the state watcher */
+ memset(&op, 0, sizeof(op));
+ op.connectivity_state = &sw->connectivity_state;
+ op.on_connectivity_state_change = &sw->closure;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
+ elem->filter->start_transport_op(elem, &op);
+
+ /* signal completion */
connectivity_state_changed_locked(c);
while ((w4c = c->waiting)) {
- abort(); /* not implemented */
+ c->waiting = w4c->next;
+ grpc_iomgr_add_callback(&w4c->continuation);
}
- destroy = subchannel_unref_locked(c);
+
gpr_mu_unlock(&c->mu);
gpr_free(filters);
- if (destroy) {
- subchannel_destroy(c);
+ if (destroy_connection != NULL) {
+ connection_destroy(destroy_connection);
}
}