diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 149 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 14 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 192 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 15 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 12 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 68 | ||||
-rw-r--r-- | src/core/iomgr/alarm.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 1 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.c | 12 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 32 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 7 | ||||
-rw-r--r-- | src/core/security/secure_endpoint.c | 8 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 190 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 8 | ||||
-rw-r--r-- | src/core/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 8 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 41 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.c | 44 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.h | 13 | ||||
-rw-r--r-- | src/core/transport/transport.h | 2 |
22 files changed, 687 insertions, 142 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index c1aa580b2d..5dba84c801 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -40,7 +40,6 @@ #include "src/core/channel/connected_channel.h" #include "src/core/surface/channel.h" #include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/pollset_set.h" #include "src/core/support/string.h" #include "src/core/transport/connectivity_state.h" #include <grpc/support/alloc.h> @@ -77,8 +76,22 @@ typedef struct { grpc_iomgr_closure on_config_changed; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; + /** when an lb_policy arrives, should we try to exit idle */ + int exit_idle_when_lb_policy_arrives; + /** pollset_set of interested parties in a new connection */ + grpc_pollset_set pollset_set; } channel_data; +/** We create one watcher for each new lb_policy that is returned from a resolver, + to watch for state changes from the lb_policy. When a state change is seen, we + update the channel, and create a new watcher */ +typedef struct { + channel_data *chand; + grpc_iomgr_closure on_changed; + grpc_connectivity_state state; + grpc_lb_policy *lb_policy; +} lb_policy_connectivity_watcher; + typedef enum { CALL_CREATED, CALL_WAITING_FOR_SEND, @@ -388,16 +401,53 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, perform_transport_stream_op(elem, op, 0); } +static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state); + +static void on_lb_policy_state_changed(void *arg, int iomgr_success) { + lb_policy_connectivity_watcher *w = arg; + + gpr_mu_lock(&w->chand->mu_config); + /* check if the notification is for a stale policy */ + if (w->lb_policy == w->chand->lb_policy) { + grpc_connectivity_state_set(&w->chand->state_tracker, w->state, + "lb_changed"); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { + watch_lb_policy(w->chand, w->lb_policy, w->state); + } + } + gpr_mu_unlock(&w->chand->mu_config); + + GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); + gpr_free(w); +} + +static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { + lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); + + w->chand = chand; + grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + w->state = current_state; + w->lb_policy = lb_policy; + grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed); +} + static void cc_on_config_changed(void *arg, int iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; grpc_iomgr_closure *wakeup_closures = NULL; + grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; + int exit_idle = 0; if (chand->incoming_configuration != NULL) { lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); - GRPC_LB_POLICY_REF(lb_policy, "channel"); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "channel"); + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + state = grpc_lb_policy_check_connectivity(lb_policy); + } grpc_client_config_unref(chand->incoming_configuration); } @@ -411,13 +461,12 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { wakeup_closures = chand->waiting_for_config_closures; chand->waiting_for_config_closures = NULL; } - gpr_mu_unlock(&chand->mu_config); - - if (old_lb_policy) { - GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); + if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); + exit_idle = 1; + chand->exit_idle_when_lb_policy_arrives = 0; } - gpr_mu_lock(&chand->mu_config); if (iomgr_success && chand->resolver) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF(resolver, "channel-next"); @@ -426,11 +475,16 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); + grpc_connectivity_state_set(&chand->state_tracker, state, + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy(chand, lb_policy, state); + } } else { old_resolver = chand->resolver; chand->resolver = NULL; grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE); + GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); gpr_mu_unlock(&chand->mu_config); if (old_resolver != NULL) { grpc_resolver_shutdown(old_resolver); @@ -438,12 +492,24 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } } + if (exit_idle) { + grpc_lb_policy_exit_idle(lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle"); + } + + if (old_lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); + } + while (wakeup_closures) { grpc_iomgr_closure *next = wakeup_closures->next; grpc_iomgr_add_callback(wakeup_closures); wakeup_closures = next; } + if (lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(lb_policy, "config_change"); + } GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } @@ -467,20 +533,22 @@ static void cc_start_transport_op(grpc_channel_element *elem, op->connectivity_state = NULL; } + if (!is_empty(op, sizeof(*op))) { + lb_policy = chand->lb_policy; + if (lb_policy) { + GRPC_LB_POLICY_REF(lb_policy, "broadcast"); + } + } + if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE); + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_lb_policy_shutdown(chand->lb_policy); - } - } - - if (!is_empty(op, sizeof(*op))) { - lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + chand->lb_policy = NULL; } } gpr_mu_unlock(&chand->mu_config); @@ -561,10 +629,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, gpr_mu_init(&chand->mu_config); chand->mdctx = metadata_context; chand->master = master; + grpc_pollset_set_init(&chand->pollset_set); grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE); + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); } /* Destructor for channel_data */ @@ -578,6 +647,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { if (chand->lb_policy != NULL) { GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } + grpc_connectivity_state_destroy(&chand->state_tracker); + grpc_pollset_set_destroy(&chand->pollset_set); gpr_mu_destroy(&chand->mu_config); } @@ -605,3 +676,47 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); } + +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_channel_element *elem, int try_to_connect) { + channel_data *chand = elem->channel_data; + grpc_connectivity_state out; + gpr_mu_lock(&chand->mu_config); + out = grpc_connectivity_state_check(&chand->state_tracker); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = 1; + } + } + gpr_mu_unlock(&chand->mu_config); + return out; +} + +void grpc_client_channel_watch_connectivity_state( + grpc_channel_element *elem, grpc_connectivity_state *state, + grpc_iomgr_closure *on_complete) { + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu_config); + grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, + on_complete); + gpr_mu_unlock(&chand->mu_config); +} + +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + return &chand->pollset_set; +} + +void grpc_client_channel_add_interested_party(grpc_channel_element *elem, + grpc_pollset *pollset) { + channel_data *chand = elem->channel_data; + grpc_pollset_set_add_pollset(&chand->pollset_set, pollset); +} + +void grpc_client_channel_del_interested_party(grpc_channel_element *elem, + grpc_pollset *pollset) { + channel_data *chand = elem->channel_data; + grpc_pollset_set_del_pollset(&chand->pollset_set, pollset); +} diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index fd2be46145..cd81294eb3 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -52,4 +52,18 @@ extern const grpc_channel_filter grpc_client_channel_filter; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_channel_element *elem, int try_to_connect); + +void grpc_client_channel_watch_connectivity_state( + grpc_channel_element *elem, grpc_connectivity_state *state, + grpc_iomgr_closure *on_complete); + +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem); + +void grpc_client_channel_add_interested_party(grpc_channel_element *channel, + grpc_pollset *pollset); +void grpc_client_channel_del_interested_party(grpc_channel_element *channel, + grpc_pollset *pollset); + #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 73da624aff..5ae2e0ea52 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -62,6 +62,8 @@ typedef struct { grpc_subchannel *selected; /** have we started picking? */ int started_picking; + /** are we shut down? */ + int shutdown; /** which subchannel are we watching? */ size_t checking_subchannel; /** what is the connectivity of that channel? */ @@ -73,12 +75,30 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; +static void del_interested_parties_locked(pick_first_lb_policy *p) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], + pp->pollset); + } +} + +static void add_interested_parties_locked(pick_first_lb_policy *p) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], + pp->pollset); + } +} + void pf_destroy(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; + del_interested_parties_locked(p); for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first"); } + grpc_connectivity_state_destroy(&p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); gpr_free(p); @@ -88,12 +108,35 @@ void pf_shutdown(grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); + del_interested_parties_locked(p); + p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; grpc_iomgr_add_delayed_callback(pp->on_complete, 0); gpr_free(pp); } + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, + "shutdown"); + gpr_mu_unlock(&p->mu); +} + +static void start_picking(pick_first_lb_policy *p) { + p->started_picking = 1; + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], + &p->checking_connectivity, + &p->connectivity_changed); +} + +void pf_exit_idle(grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(p); + } gpr_mu_unlock(&p->mu); } @@ -109,13 +152,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, on_complete->cb(on_complete->cb_arg, 1); } else { if (!p->started_picking) { - p->started_picking = 1; - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF(pol, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed); + start_picking(p); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); @@ -129,77 +166,97 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, } } -static void del_interested_parties_locked(pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset); - } -} - -static void add_interested_parties_locked(pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset); - } -} - static void pf_connectivity_changed(void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; int unref = 0; gpr_mu_lock(&p->mu); -loop: - switch (p->checking_connectivity) { - case GRPC_CHANNEL_READY: - p->selected = p->subchannels[p->checking_subchannel]; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = p->selected; - grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_iomgr_add_delayed_callback(pp->on_complete, 1); - gpr_free(pp); - } - unref = 1; - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(p); - p->checking_subchannel = - (p->checking_subchannel + 1) % p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p); - goto loop; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: + + if (p->shutdown) { + unref = 1; + } else if (p->selected != NULL) { + grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, + "selected_changed"); + if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(p); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels - 1]); - p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); - if (p->num_subchannels == 0) { + p->selected, &p->checking_connectivity, &p->connectivity_changed); + } else { + unref = 1; + } + } else { + loop: + switch (p->checking_connectivity) { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + "connecting_ready"); + p->selected = p->subchannels[p->checking_subchannel]; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = NULL; + *pp->target = p->selected; + grpc_subchannel_del_interested_party(p->selected, pp->pollset); grpc_iomgr_add_delayed_callback(pp->on_complete, 1); gpr_free(pp); } - unref = 1; - } else { - p->checking_subchannel %= p->num_subchannels; + grpc_subchannel_notify_on_state_change( + p->selected, &p->checking_connectivity, &p->connectivity_changed); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure"); + del_interested_parties_locked(p); + p->checking_subchannel = + (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); add_interested_parties_locked(p); - goto loop; - } + if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed); + } else { + goto loop; + } + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, + "connecting_changed"); + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(p); + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], + p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); + if (p->num_subchannels == 0) { + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, + "no_more_channels"); + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + gpr_free(pp); + } + unref = 1; + } else { + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "subchannel_failed"); + p->checking_subchannel %= p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity( + p->subchannels[p->checking_subchannel]); + add_interested_parties_locked(p); + goto loop; + } + } } + gpr_mu_unlock(&p->mu); if (unref) { @@ -249,8 +306,13 @@ static void pf_notify_on_state_change(grpc_lb_policy *pol, } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, - pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, + pf_shutdown, + pf_pick, + pf_exit_idle, + pf_broadcast, + pf_check_connectivity, + pf_notify_on_state_change}; grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels) { @@ -260,6 +322,8 @@ grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); p->num_subchannels = num_subchannels; + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "pick_first"); memcpy(p->subchannels, subchannels, sizeof(grpc_subchannel *) * num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 6d1c788742..90ec44432f 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -77,3 +77,18 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) { policy->vtable->broadcast(policy, op); } + +void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) { + policy->vtable->exit_idle(policy); +} + +void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure) { + policy->vtable->notify_on_state_change(policy, state, closure); +} + +grpc_connectivity_state grpc_lb_policy_check_connectivity( + grpc_lb_policy *policy) { + return policy->vtable->check_connectivity(policy); +} diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index a468f761cc..3f7ca8f28d 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -59,6 +59,9 @@ struct grpc_lb_policy_vtable { grpc_metadata_batch *initial_metadata, grpc_subchannel **target, grpc_iomgr_closure *on_complete); + /** try to enter a READY connectivity state */ + void (*exit_idle)(grpc_lb_policy *policy); + /** broadcast a transport op to all subchannels */ void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op); @@ -106,4 +109,13 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op); +void grpc_lb_policy_exit_idle(grpc_lb_policy *policy); + +void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure); + +grpc_connectivity_state grpc_lb_policy_check_connectivity( + grpc_lb_policy *policy); + #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 487f5afb35..98e311eac0 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -38,9 +38,11 @@ #include <grpc/support/alloc.h> #include "src/core/channel/channel_args.h" +#include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/alarm.h" #include "src/core/transport/connectivity_state.h" +#include "src/core/surface/channel.h" typedef struct { /* all fields protected by subchannel->mu */ @@ -94,8 +96,10 @@ struct grpc_subchannel { grpc_iomgr_closure connected; /** pollset_set tracking who's interested in a connection - being setup */ - grpc_pollset_set pollset_set; + being setup - owned by the master channel (in particular the + client_channel + filter there-in) */ + grpc_pollset_set *pollset_set; /** mutex protecting remaining elements */ gpr_mu mu; @@ -132,7 +136,8 @@ struct grpc_subchannel_call { #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) static grpc_subchannel_call *create_call(connection *con); -static void connectivity_state_changed_locked(grpc_subchannel *c); +static void connectivity_state_changed_locked(grpc_subchannel *c, + const char *reason); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); @@ -244,7 +249,6 @@ static void subchannel_destroy(grpc_subchannel *c) { grpc_channel_args_destroy(c->args); gpr_free(c->addr); grpc_mdctx_unref(c->mdctx); - grpc_pollset_set_destroy(&c->pollset_set); grpc_connectivity_state_destroy(&c->state_tracker); grpc_connector_unref(c->connector); gpr_free(c); @@ -252,17 +256,19 @@ static void subchannel_destroy(grpc_subchannel *c) { void grpc_subchannel_add_interested_party(grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(&c->pollset_set, pollset); + grpc_pollset_set_add_pollset(c->pollset_set, pollset); } void grpc_subchannel_del_interested_party(grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(&c->pollset_set, pollset); + grpc_pollset_set_del_pollset(c->pollset_set, pollset); } grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); + grpc_channel_element *parent_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(args->master)); memset(c, 0, sizeof(*c)); c->refs = 1; c->connector = connector; @@ -277,10 +283,11 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->args = grpc_channel_args_copy(args->args); c->mdctx = args->mdctx; c->master = args->master; + c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); grpc_mdctx_ref(c->mdctx); - grpc_pollset_set_init(&c->pollset_set); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); - grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE); + grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, + "subchannel"); gpr_mu_init(&c->mu); return c; } @@ -288,7 +295,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, static void continue_connect(grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = &c->pollset_set; + args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -309,6 +316,7 @@ static void start_connect(grpc_subchannel *c) { static void continue_creating_call(void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; + grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset); grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect"); @@ -341,9 +349,10 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, grpc_subchannel_add_interested_party(c, pollset); if (!c->connecting) { c->connecting = 1; - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "create_call"); /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); + GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); gpr_mu_unlock(&c->mu); start_connect(c); @@ -372,7 +381,8 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, c->connecting = 1; /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); - connectivity_state_changed_locked(c); + GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); + connectivity_state_changed_locked(c, "state_change"); } gpr_mu_unlock(&c->mu); if (do_connect) { @@ -388,7 +398,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, gpr_mu_lock(&c->mu); if (op->disconnect) { c->disconnected = 1; - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; } @@ -456,13 +466,15 @@ static void on_state_changed(void *p, int iomgr_success) { destroy_connection = sw->subchannel->active; } sw->subchannel->active = NULL; - grpc_connectivity_state_set(&c->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE); + grpc_connectivity_state_set( + &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE + : GRPC_CHANNEL_TRANSIENT_FAILURE, + "connection_failed"); break; } done: - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "transport_state_changed"); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); gpr_mu_unlock(mu); @@ -486,6 +498,8 @@ static void publish_transport(grpc_subchannel *c) { connection *destroy_connection = NULL; grpc_channel_element *elem; + gpr_log(GPR_DEBUG, "publish_transport: %p", c->master); + /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; filters = gpr_malloc(sizeof(*filters) * num_filters); @@ -519,6 +533,8 @@ static void publish_transport(grpc_subchannel *c) { gpr_free(sw); gpr_free(filters); grpc_channel_stack_destroy(stk); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); + GRPC_SUBCHANNEL_UNREF(c, "connecting"); return; } @@ -536,14 +552,16 @@ static void publish_transport(grpc_subchannel *c) { memset(&op, 0, sizeof(op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; + op.bind_pollset_set = c->pollset_set; SUBCHANNEL_REF_LOCKED(c, "state_watcher"); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); elem->filter->start_transport_op(elem, &op); /* signal completion */ - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "connected"); while ((w4c = c->waiting)) { c->waiting = w4c->next; grpc_iomgr_add_callback(&w4c->continuation); @@ -565,11 +583,12 @@ static void on_alarm(void *arg, int iomgr_success) { if (c->disconnected) { iomgr_success = 0; } - connectivity_state_changed_locked(c); + connectivity_state_changed_locked(c, "alarm"); gpr_mu_unlock(&c->mu); if (iomgr_success) { continue_connect(c); } else { + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(c, "connecting"); } } @@ -579,13 +598,17 @@ static void subchannel_connected(void *arg, int iomgr_success) { if (c->connecting_result.transport != NULL) { publish_transport(c); } else { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_mu_lock(&c->mu); - connectivity_state_changed_locked(c); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; + connectivity_state_changed_locked(c, "connect_failed"); c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta); - c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); - grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_MONOTONIC)); + if (gpr_time_cmp(c->backoff_delta, + gpr_time_from_seconds(60, GPR_TIMESPAN)) < 0) { + c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta); + } + grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } } @@ -610,9 +633,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { return GRPC_CHANNEL_IDLE; } -static void connectivity_state_changed_locked(grpc_subchannel *c) { +static void connectivity_state_changed_locked(grpc_subchannel *c, + const char *reason) { grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(&c->state_tracker, current); + grpc_connectivity_state_set(&c->state_tracker, current, reason); } /* diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 931f746f75..68d33b9cf6 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -361,7 +361,9 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - return run_some_expired_alarms(drop_mu, now, next, 1); + return run_some_expired_alarms( + drop_mu, now, next, + gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); } gpr_timespec grpc_alarm_list_next_timeout(void) { diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 96487958a7..24c6270ab2 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -50,6 +50,10 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { ep->vtable->add_to_pollset(ep, pollset); } +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { + ep->vtable->add_to_pollset_set(ep, pollset_set); +} + void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index 881e851800..572a070890 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/pollset_set.h" #include <grpc/support/slice.h> #include <grpc/support/time.h> @@ -70,6 +71,7 @@ struct grpc_endpoint_vtable { size_t nslices, grpc_endpoint_write_cb cb, void *user_data); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); + void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); }; @@ -98,6 +100,7 @@ void grpc_endpoint_destroy(grpc_endpoint *ep); /* Add an endpoint to a pollset, so that when the pollset is polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset); +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set); struct grpc_endpoint { const grpc_endpoint_vtable *vtable; diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index a18c176b30..97b4ce5765 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -146,7 +146,6 @@ void grpc_iomgr_shutdown(void) { continue; } if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { - gpr_log(GPR_DEBUG, "got late alarm"); continue; } if (g_root_object.next != &g_root_object) { diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 5ff7df1dcd..2076ac70ef 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -60,7 +60,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset) { - size_t i; + size_t i, j; gpr_mu_lock(&pollset_set->mu); if (pollset_set->pollset_count == pollset_set->pollset_capacity) { pollset_set->pollset_capacity = @@ -70,9 +70,15 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, sizeof(*pollset_set->pollsets)); } pollset_set->pollsets[pollset_set->pollset_count++] = pollset; - for (i = 0; i < pollset_set->fd_count; i++) { - grpc_pollset_add_fd(pollset, pollset_set->fds[i]); + for (i = 0, j = 0; i < pollset_set->fd_count; i++) { + if (grpc_fd_is_orphaned(pollset_set->fds[i])) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + } else { + grpc_pollset_add_fd(pollset, pollset_set->fds[i]); + pollset_set->fds[j++] = pollset_set->fds[i]; + } } + pollset_set->fd_count = j; gpr_mu_unlock(&pollset_set->mu); } diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 41d8b169e0..23efc3d526 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -88,11 +88,11 @@ error: return 0; } -static void on_alarm(void *acp, int success) { +static void tc_on_alarm(void *acp, int success) { int done; async_connect *ac = acp; gpr_mu_lock(&ac->mu); - if (ac->fd != NULL && success) { + if (ac->fd != NULL) { grpc_fd_shutdown(ac->fd); } done = (--ac->refs == 0); @@ -108,11 +108,17 @@ static void on_writable(void *acp, int success) { int so_error = 0; socklen_t so_error_size; int err; - int fd = ac->fd->fd; int done; grpc_endpoint *ep = NULL; void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; + grpc_fd *fd; + + gpr_mu_lock(&ac->mu); + GPR_ASSERT(ac->fd); + fd = ac->fd; + ac->fd = NULL; + gpr_mu_unlock(&ac->mu); grpc_alarm_cancel(&ac->alarm); @@ -120,7 +126,7 @@ static void on_writable(void *acp, int success) { if (success) { do { so_error_size = sizeof(so_error); - err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); + err = getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); @@ -143,7 +149,7 @@ static void on_writable(void *acp, int success) { don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); gpr_mu_unlock(&ac->mu); - grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + grpc_fd_notify_on_write(fd, &ac->write_closure); return; } else { switch (so_error) { @@ -157,8 +163,9 @@ static void on_writable(void *acp, int success) { goto finish; } } else { - grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + grpc_pollset_set_del_fd(ac->interested_parties, fd); + ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + fd = NULL; goto finish; } } else { @@ -169,11 +176,10 @@ static void on_writable(void *acp, int success) { abort(); finish: - if (ep == NULL) { - grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan"); - } else { - ac->fd = NULL; + if (fd != NULL) { + grpc_pollset_set_del_fd(ac->interested_parties, fd); + grpc_fd_orphan(fd, NULL, "tcp_client_orphan"); + fd = NULL; } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); @@ -254,7 +260,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); + tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index b6d6efc9fb..6996255131 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -567,9 +567,14 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_pollset_add_fd(pollset, tcp->em_fd); } +static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); +} + static const grpc_endpoint_vtable vtable = { grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, - grpc_tcp_shutdown, grpc_tcp_destroy}; + grpc_tcp_add_to_pollset_set, grpc_tcp_shutdown, grpc_tcp_destroy}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 3548198046..e94b6cf408 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -331,9 +331,15 @@ static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); } +static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep, + grpc_pollset_set *pollset_set) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set); +} + static const grpc_endpoint_vtable vtable = { endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset, - endpoint_shutdown, endpoint_unref}; + endpoint_add_to_pollset_set, endpoint_shutdown, endpoint_unref}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c new file mode 100644 index 0000000000..dd12ff5611 --- /dev/null +++ b/src/core/surface/channel_connectivity.c @@ -0,0 +1,190 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/channel.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/channel/client_channel.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/surface/completion_queue.h" + +grpc_connectivity_state grpc_channel_check_connectivity_state( + grpc_channel *channel, int try_to_connect) { + /* forward through to the underlying client channel */ + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_check_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + return GRPC_CHANNEL_FATAL_FAILURE; + } + return grpc_client_channel_check_connectivity_state(client_channel_elem, + try_to_connect); +} + +typedef enum { + WAITING, + CALLING_BACK, + CALLING_BACK_AND_FINISHED, + CALLED_BACK +} callback_phase; + +typedef struct { + gpr_mu mu; + callback_phase phase; + int success; + grpc_iomgr_closure on_complete; + grpc_alarm alarm; + grpc_connectivity_state state; + grpc_connectivity_state *optional_new_state; + grpc_completion_queue *cq; + grpc_cq_completion completion_storage; + grpc_channel *channel; + void *tag; +} state_watcher; + +static void delete_state_watcher(state_watcher *w) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq)); + GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); + gpr_mu_destroy(&w->mu); + gpr_free(w); +} + +static void finished_completion(void *pw, grpc_cq_completion *ignored) { + int delete = 0; + state_watcher *w = pw; + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + case CALLED_BACK: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLING_BACK: + w->phase = CALLED_BACK; + break; + case CALLING_BACK_AND_FINISHED: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void partly_done(state_watcher *w, int due_to_completion) { + int delete = 0; + + if (due_to_completion) { + gpr_mu_lock(&w->mu); + w->success = 1; + gpr_mu_unlock(&w->mu); + grpc_alarm_cancel(&w->alarm); + } + + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + w->phase = CALLING_BACK; + if (w->optional_new_state) { + *w->optional_new_state = w->state; + } + grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w, + &w->completion_storage); + break; + case CALLING_BACK: + w->phase = CALLING_BACK_AND_FINISHED; + break; + case CALLING_BACK_AND_FINISHED: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLED_BACK: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void watch_complete(void *pw, int success) { partly_done(pw, 1); } + +static void timeout_complete(void *pw, int success) { partly_done(pw, 0); } + +void grpc_channel_watch_connectivity_state( + grpc_channel *channel, grpc_connectivity_state last_observed_state, + grpc_connectivity_state *optional_new_state, gpr_timespec deadline, + grpc_completion_queue *cq, void *tag) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + state_watcher *w = gpr_malloc(sizeof(*w)); + + grpc_cq_begin_op(cq); + + gpr_mu_init(&w->mu); + grpc_iomgr_closure_init(&w->on_complete, watch_complete, w); + w->phase = WAITING; + w->state = last_observed_state; + w->success = 0; + w->optional_new_state = optional_new_state; + w->cq = cq; + w->tag = tag; + w->channel = channel; + + grpc_alarm_init(&w->alarm, deadline, timeout_complete, w, + gpr_now(GPR_CLOCK_REALTIME)); + + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_watch_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + grpc_iomgr_add_delayed_callback(&w->on_complete, 1); + } else { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); + grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq)); + grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, + &w->on_complete); + } +} diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 91c7b35550..f055efbec2 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -109,6 +109,7 @@ typedef struct { gpr_refcount refs; grpc_mdctx *mdctx; grpc_channel_args *merge_args; + grpc_channel *master; } subchannel_factory; static void subchannel_factory_ref(grpc_subchannel_factory *scf) { @@ -119,6 +120,7 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { static void subchannel_factory_unref(grpc_subchannel_factory *scf) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -137,6 +139,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; + args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); grpc_channel_args_destroy(final_args); @@ -168,18 +171,21 @@ grpc_channel *grpc_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); + channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); + f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); grpc_mdctx_ref(mdctx); f->mdctx = mdctx; f->merge_args = grpc_channel_args_copy(args); + f->master = channel; + GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 04e27d30ac..6e2a15b712 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -46,6 +46,7 @@ #include "src/core/surface/init.h" #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" +#include "src/core/transport/connectivity_state.h" #ifdef GPR_POSIX_SOCKET #include "src/core/client_config/resolvers/unix_resolver_posix.h" @@ -76,6 +77,7 @@ void grpc_init(void) { grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("flowctl", &grpc_flowctl_trace); grpc_register_tracer("batch", &grpc_trace_batch); + grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_tracer_init("GRPC_TRACE"); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index d87ec97b53..5854505ab9 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -134,6 +134,7 @@ typedef struct { grpc_mdctx *mdctx; grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; + grpc_channel *master; } subchannel_factory; static void subchannel_factory_ref(grpc_subchannel_factory *scf) { @@ -146,6 +147,7 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) { if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "subchannel_factory"); + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -165,6 +167,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; + args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); grpc_channel_args_destroy(final_args); @@ -218,6 +221,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); + channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); + f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); @@ -226,12 +231,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory"); f->security_connector = connector; f->merge_args = grpc_channel_args_copy(args_copy); + f->master = channel; + GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c923d5e42f..bbce033684 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -110,6 +110,8 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset); +static void add_to_pollset_set_locked(grpc_chttp2_transport *t, + grpc_pollset_set *pollset_set); /** Start new streams that have been created if we can */ static void maybe_start_some_streams( @@ -117,7 +119,7 @@ static void maybe_start_some_streams( static void connectivity_state_set( grpc_chttp2_transport_global *transport_global, - grpc_connectivity_state state); + grpc_connectivity_state state, const char *reason); /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING @@ -233,7 +235,7 @@ static void init_transport(grpc_chttp2_transport *t, is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; grpc_connectivity_state_init(&t->channel_callback.state_tracker, - GRPC_CHANNEL_READY); + GRPC_CHANNEL_READY, "transport"); gpr_slice_buffer_init(&t->global.qbuf); @@ -327,7 +329,8 @@ static void destroy_transport(grpc_transport *gt) { static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; - connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE); + connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE, + "close_transport"); if (t->ep) { grpc_endpoint_shutdown(t->ep); } @@ -530,7 +533,8 @@ void grpc_chttp2_add_incoming_goaway( gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; - connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE); + connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE, + "got_goaway"); } static void maybe_start_some_streams( @@ -555,7 +559,8 @@ static void maybe_start_some_streams( transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { - connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE); + connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, + "no_more_stream_ids"); } stream_global->outgoing_window = @@ -686,6 +691,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + int close_transport = 0; lock(t); @@ -705,9 +711,7 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), gpr_slice_ref(*op->goaway_message), &t->global.qbuf); - if (!grpc_chttp2_has_streams(t)) { - close_transport_locked(t); - } + close_transport = !grpc_chttp2_has_streams(t); } if (op->set_accept_stream != NULL) { @@ -720,6 +724,10 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { add_to_pollset_locked(t, op->bind_pollset); } + if (op->bind_pollset_set) { + add_to_pollset_set_locked(t, op->bind_pollset_set); + } + if (op->send_ping) { send_ping_locked(t, op->send_ping); } @@ -729,6 +737,12 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } unlock(t); + + if (close_transport) { + lock(t); + close_transport_locked(t); + unlock(t); + } } /* @@ -1001,12 +1015,12 @@ static void schedule_closure_for_connectivity(void *a, static void connectivity_state_set( grpc_chttp2_transport_global *transport_global, - grpc_connectivity_state state) { + grpc_connectivity_state state, const char *reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set_with_scheduler( &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, schedule_closure_for_connectivity, transport_global); + state, schedule_closure_for_connectivity, transport_global, reason); } void grpc_chttp2_schedule_closure( @@ -1034,6 +1048,13 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t, } } +static void add_to_pollset_set_locked(grpc_chttp2_transport *t, + grpc_pollset_set *pollset_set) { + if (t->ep) { + grpc_endpoint_add_to_pollset_set(t->ep, pollset_set); + } +} + /* * TRACING */ diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 1091ceae44..61d26f06f0 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -34,11 +34,33 @@ #include "src/core/transport/connectivity_state.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +int grpc_connectivity_state_trace = 0; + +const char *grpc_connectivity_state_name(grpc_connectivity_state state) { + switch (state) { + case GRPC_CHANNEL_IDLE: + return "IDLE"; + case GRPC_CHANNEL_CONNECTING: + return "CONNECTING"; + case GRPC_CHANNEL_READY: + return "READY"; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return "TRANSIENT_FAILURE"; + case GRPC_CHANNEL_FATAL_FAILURE: + return "FATAL_FAILURE"; + } + abort(); + return "UNKNOWN"; +} void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state init_state) { + grpc_connectivity_state init_state, + const char *name) { tracker->current_state = init_state; tracker->watchers = NULL; + tracker->name = gpr_strdup(name); } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { @@ -54,6 +76,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { } gpr_free(w); } + gpr_free(tracker->name); } grpc_connectivity_state grpc_connectivity_state_check( @@ -64,6 +87,11 @@ grpc_connectivity_state grpc_connectivity_state_check( int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name, + grpc_connectivity_state_name(*current), + grpc_connectivity_state_name(tracker->current_state)); + } if (tracker->current_state != *current) { *current = tracker->current_state; grpc_iomgr_add_callback(notify); @@ -79,12 +107,19 @@ int grpc_connectivity_state_notify_on_state_change( void grpc_connectivity_state_set_with_scheduler( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg) { + void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg, + const char *reason) { grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *w; + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, + grpc_connectivity_state_name(tracker->current_state), + grpc_connectivity_state_name(state), reason); + } if (tracker->current_state == state) { return; } + GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE); tracker->current_state = state; while ((w = tracker->watchers)) { tracker->watchers = w->next; @@ -106,7 +141,8 @@ static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) { } void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state state) { + grpc_connectivity_state state, + const char *reason) { grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, - NULL); + NULL, reason); } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index bbdcbcb069..a3b0b80c98 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -51,17 +51,24 @@ typedef struct { grpc_connectivity_state current_state; /** all our watchers */ grpc_connectivity_state_watcher *watchers; + /** a name to help debugging */ + char *name; } grpc_connectivity_state_tracker; +extern int grpc_connectivity_state_trace; + void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state init_state); + grpc_connectivity_state init_state, + const char *name); void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state state); + grpc_connectivity_state state, + const char *reason); void grpc_connectivity_state_set_with_scheduler( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg); + void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg, + const char *reason); grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 64503604ee..aac42303a9 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -109,6 +109,8 @@ typedef struct grpc_transport_op { void *set_accept_stream_user_data; /** add this transport to a pollset */ grpc_pollset *bind_pollset; + /** add this transport to a pollset_set */ + grpc_pollset_set *bind_pollset_set; /** send a ping, call this back if not NULL */ grpc_iomgr_closure *send_ping; } grpc_transport_op; |