diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-29 10:55:46 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-29 10:55:46 -0700 |
commit | df91ba52d0ebbe98ce84508701283b82b2c0441b (patch) | |
tree | 9ae7394804add9fef7808d0d85ff7ef213f32d13 /src/core | |
parent | 4ab82d2c4dd6a21cf2a13662fb2efa9171efe104 (diff) |
Add ability to continue waiting calls
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/client_config/subchannel.c | 134 |
1 files changed, 124 insertions, 10 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index c770cb3b20..7bd6717a3d 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -49,11 +49,20 @@ typedef struct { grpc_subchannel *subchannel; } connection; +typedef struct { + grpc_iomgr_closure closure; + size_t version; + grpc_subchannel *subchannel; + grpc_connectivity_state connectivity_state; +} state_watcher; + typedef struct waiting_for_connect { struct waiting_for_connect *next; grpc_iomgr_closure *notify; - grpc_transport_stream_op *initial_op; + grpc_transport_stream_op initial_op; grpc_subchannel_call **target; + grpc_subchannel *subchannel; + grpc_iomgr_closure continuation; } waiting_for_connect; struct grpc_subchannel { @@ -85,6 +94,8 @@ struct grpc_subchannel { /** active connection */ connection *active; + /** version number for the active connection */ + size_t active_version; /** refcount */ int refs; /** are we connecting */ @@ -228,6 +239,16 @@ static void start_connect(grpc_subchannel *c) { &c->connected); } +static void continue_creating_call(void *arg, int iomgr_success) { + waiting_for_connect *w4c = arg; + grpc_subchannel_create_call(w4c->subchannel, + &w4c->initial_op, + w4c->target, + w4c->notify); + grpc_subchannel_unref(w4c->subchannel); + gpr_free(w4c); +} + void grpc_subchannel_create_call(grpc_subchannel *c, grpc_transport_stream_op *initial_op, grpc_subchannel_call **target, @@ -245,8 +266,11 @@ void grpc_subchannel_create_call(grpc_subchannel *c, waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; w4c->notify = notify; - w4c->initial_op = initial_op; + w4c->initial_op = *initial_op; w4c->target = target; + w4c->subchannel = c; + subchannel_ref_locked(c); + grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; grpc_subchannel_add_interested_party(c, initial_op->bind_pollset); if (!c->connecting) { @@ -291,7 +315,70 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { - abort(); + abort(); /* not implemented */ +} + +static void on_state_changed(void *p, int iomgr_success) { + state_watcher *sw = p; + grpc_subchannel *c = sw->subchannel; + gpr_mu *mu = &c->mu; + int destroy; + grpc_transport_op op; + grpc_channel_element *elem; + connection *destroy_connection = NULL; + int do_connect = 0; + + gpr_mu_lock(mu); + + /* if we failed or there is a version number mismatch, just leave + this closure */ + if (!iomgr_success || sw->subchannel->active_version != sw->version) { + goto done; + } + + switch (sw->connectivity_state) { + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_READY: + case GRPC_CHANNEL_IDLE: + /* all is still good: keep watching */ + memset(&op, 0, sizeof(op)); + op.connectivity_state = &sw->connectivity_state; + op.on_connectivity_state_change = &sw->closure; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); + elem->filter->start_transport_op(elem, &op); + /* early out */ + gpr_mu_unlock(mu); + return; + case GRPC_CHANNEL_FATAL_FAILURE: + /* things have gone wrong, deactivate and enter idle */ + if (sw->subchannel->active->refs == 0) { + destroy_connection = sw->subchannel->active; + } + sw->subchannel->active = NULL; + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + /* things are starting to go wrong, reconnect but don't deactivate */ + subchannel_ref_locked(c); + do_connect = 1; + c->connecting = 1; + break; + } + +done: + grpc_connectivity_state_set(&c->state_tracker, + compute_connectivity_locked(c)); + destroy = subchannel_unref_locked(c); + gpr_free(sw); + gpr_mu_unlock(mu); + if (do_connect) { + start_connect(c); + } + if (destroy) { + subchannel_destroy(c); + } + if (destroy_connection != NULL) { + connection_destroy(destroy_connection); + } } static void publish_transport(grpc_subchannel *c) { @@ -301,8 +388,12 @@ static void publish_transport(grpc_subchannel *c) { size_t num_filters; const grpc_channel_filter **filters; waiting_for_connect *w4c; - int destroy; + grpc_transport_op op; + state_watcher *sw; + connection *destroy_connection = NULL; + grpc_channel_element *elem; + /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; filters = gpr_malloc(sizeof(*filters) * num_filters); memcpy(filters, c->filters, sizeof(*filters) * c->num_filters); @@ -310,31 +401,54 @@ static void publish_transport(grpc_subchannel *c) { sizeof(*filters) * c->connecting_result.num_filters); filters[num_filters - 1] = &grpc_connected_channel_filter; + /* construct channel stack */ channel_stack_size = grpc_channel_stack_size(filters, num_filters); con = gpr_malloc(sizeof(connection) + channel_stack_size); stk = (grpc_channel_stack *)(con + 1); - con->refs = 0; con->subchannel = c; grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); + /* initialize state watcher */ + sw = gpr_malloc(sizeof(*sw)); + grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw); + sw->subchannel = c; + sw->connectivity_state = GRPC_CHANNEL_READY; + gpr_mu_lock(&c->mu); - GPR_ASSERT(c->active == NULL); + + /* publish */ + if (c->active != NULL && c->active->refs == 0) { + destroy_connection = c->active; + } c->active = con; + c->active_version++; + sw->version = c->active_version; c->connecting = 0; + + /* watch for changes; subchannel ref for connecting is donated + to the state watcher */ + memset(&op, 0, sizeof(op)); + op.connectivity_state = &sw->connectivity_state; + op.on_connectivity_state_change = &sw->closure; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); + elem->filter->start_transport_op(elem, &op); + + /* signal completion */ connectivity_state_changed_locked(c); while ((w4c = c->waiting)) { - abort(); /* not implemented */ + c->waiting = w4c->next; + grpc_iomgr_add_callback(&w4c->continuation); } - destroy = subchannel_unref_locked(c); + gpr_mu_unlock(&c->mu); gpr_free(filters); - if (destroy) { - subchannel_destroy(c); + if (destroy_connection != NULL) { + connection_destroy(destroy_connection); } } |