diff options
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 44 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 10 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 58 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 15 |
4 files changed, 77 insertions, 50 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index f3a6d21eb5..4fda07a63f 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -36,6 +36,7 @@ #include <string.h> #include <grpc/support/alloc.h> +#include "src/core/channel/connectivity_state.h" typedef struct pending_pick { struct pending_pick *next; @@ -69,6 +70,9 @@ typedef struct { grpc_connectivity_state checking_connectivity; /** list of picks that are waiting on connectivity */ pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; void pf_ref(grpc_lb_policy *pol) { @@ -184,8 +188,46 @@ loop: } } +static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + size_t i; + size_t n; + grpc_subchannel **subchannels; + + gpr_mu_lock(&p->mu); + n = p->num_subchannels; + subchannels = gpr_malloc(n * sizeof(*subchannels)); + for (i = 0; i < n; i++) { + subchannels[i] = p->subchannels[i]; + grpc_subchannel_ref(subchannels[i]); + } + gpr_mu_unlock(&p->mu); + + for (i = 0; i < n; i++) { + grpc_subchannel_process_transport_op(subchannels[i], op); + grpc_subchannel_unref(subchannels[i]); + } + gpr_free(subchannels); +} + +static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + grpc_connectivity_state st; + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker); + gpr_mu_unlock(&p->mu); + return st; +} + +static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify); + gpr_mu_unlock(&p->mu); +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_ref, pf_unref, pf_shutdown, pf_pick}; + pf_ref, pf_unref, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels) { diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 42929e933b..42be9152cb 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -58,6 +58,16 @@ struct grpc_lb_policy_vtable { void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, grpc_iomgr_closure *on_complete); + + /** broadcast a transport op to all subchannels */ + void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op); + + /** check the current connectivity of the lb_policy */ + grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy); + + /** call notify when the connectivity state of a channel changes from *state. + Updates *state with the new state of the policy */ + void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure); }; void grpc_lb_policy_ref(grpc_lb_policy *policy); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index b489ce04a5..5cbb5d9971 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -39,6 +39,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/channel/connectivity_state.h" typedef struct { gpr_refcount refs; @@ -52,12 +53,6 @@ typedef struct waiting_for_connect { grpc_subchannel_call **target; } waiting_for_connect; -typedef struct connectivity_state_watcher { - struct connectivity_state_watcher *next; - grpc_iomgr_closure *notify; - grpc_connectivity_state *current; -} connectivity_state_watcher; - struct grpc_subchannel { gpr_refcount refs; grpc_connector *connector; @@ -92,8 +87,8 @@ struct grpc_subchannel { int connecting; /** things waiting for a connection */ waiting_for_connect *waiting; - /** things watching the connectivity state */ - connectivity_state_watcher *watchers; + /** connectivity state tracking */ + grpc_connectivity_state_tracker state_tracker; }; struct grpc_subchannel_call { @@ -123,6 +118,7 @@ void grpc_subchannel_unref(grpc_subchannel *c) { gpr_free(c->addr); grpc_mdctx_unref(c->mdctx); grpc_pollset_set_destroy(&c->pollset_set); + grpc_connectivity_state_destroy(&c->state_tracker); gpr_free(c); } } @@ -156,6 +152,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_mdctx_ref(c->mdctx); grpc_pollset_set_init(&c->pollset_set); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); + grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE); gpr_mu_init(&c->mu); return c; } @@ -210,7 +207,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = compute_connectivity_locked(c); + state = grpc_connectivity_state_check(&c->state_tracker); gpr_mu_unlock(&c->mu); return state; } @@ -218,35 +215,24 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, grpc_connectivity_state *state, grpc_iomgr_closure *notify) { - grpc_connectivity_state current; int do_connect = 0; - connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); - w->current = state; - w->notify = notify; gpr_mu_lock(&c->mu); - current = compute_connectivity_locked(c); - if (current == GRPC_CHANNEL_IDLE) { - current = GRPC_CHANNEL_CONNECTING; + if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) { + do_connect = 1; c->connecting = 1; - do_connect = 1; grpc_subchannel_ref(c); - connectivity_state_changed_locked(c); - } - if (current != *state) { - gpr_mu_unlock(&c->mu); - *state = current; - grpc_iomgr_add_callback(notify); - gpr_free(w); - } else { - w->next = c->watchers; - c->watchers = w; - gpr_mu_unlock(&c->mu); + grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); } + gpr_mu_unlock(&c->mu); if (do_connect) { start_connect(c); } } +void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { + abort(); +} + static void publish_transport(grpc_subchannel *c) { size_t channel_stack_size; connection *con; @@ -311,21 +297,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { static void connectivity_state_changed_locked(grpc_subchannel *c) { grpc_connectivity_state current = compute_connectivity_locked(c); - connectivity_state_watcher *new = NULL; - connectivity_state_watcher *w; - while ((w = c->watchers)) { - c->watchers = w->next; - - if (current != *w->current) { - *w->current = current; - grpc_iomgr_add_callback(w->notify); - gpr_free(w); - } else { - w->next = new; - new = w; - } - } - c->watchers = new; + grpc_connectivity_state_set(&c->state_tracker, current); } /* diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 8836f9b09c..60b95d3d8f 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -46,6 +46,15 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; void grpc_subchannel_ref(grpc_subchannel *channel); void grpc_subchannel_unref(grpc_subchannel *channel); +/** construct a call (possibly asynchronously) */ +void grpc_subchannel_create_call(grpc_subchannel *subchannel, + grpc_transport_stream_op *initial_op, + grpc_subchannel_call **target, + grpc_iomgr_closure *notify); + +/** process a transport level op */ +void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op); + void grpc_subchannel_call_ref(grpc_subchannel_call *call); void grpc_subchannel_call_unref(grpc_subchannel_call *call); @@ -62,12 +71,6 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); -/** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call(grpc_subchannel *subchannel, - grpc_transport_stream_op *initial_op, - grpc_subchannel_call **target, - grpc_iomgr_closure *notify); - /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, grpc_transport_stream_op *op); |