diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 40 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 7 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 36 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 4 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 5 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 182 |
6 files changed, 265 insertions, 9 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index f890f99237..b68954bfac 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -77,6 +77,8 @@ typedef struct { grpc_iomgr_closure on_config_changed; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; + /** when an lb_policy arrives, should we try to exit idle */ + int exit_idle_when_lb_policy_arrives; } channel_data; typedef enum { @@ -398,6 +400,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; grpc_iomgr_closure *wakeup_closures = NULL; + int exit_idle = 0; if (chand->incoming_configuration != NULL) { lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); @@ -415,8 +418,18 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { wakeup_closures = chand->waiting_for_config_closures; chand->waiting_for_config_closures = NULL; } + if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); + exit_idle = 1; + chand->exit_idle_when_lb_policy_arrives = 0; + } gpr_mu_unlock(&chand->mu_config); + if (exit_idle) { + grpc_lb_policy_exit_idle(lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle"); + } + if (old_lb_policy) { GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); } @@ -609,3 +622,30 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); } + +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_channel_element *elem, int try_to_connect) { + channel_data *chand = elem->channel_data; + grpc_connectivity_state out; + gpr_mu_lock(&chand->mu_config); + out = grpc_connectivity_state_check(&chand->state_tracker); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = 1; + } + } + gpr_mu_unlock(&chand->mu_config); + return out; +} + +void grpc_client_channel_watch_connectivity_state( + grpc_channel_element *elem, grpc_connectivity_state *state, + grpc_iomgr_closure *on_complete) { + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu_config); + grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, + on_complete); + gpr_mu_unlock(&chand->mu_config); +} diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index fd2be46145..16f1133359 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -52,4 +52,11 @@ extern const grpc_channel_filter grpc_client_channel_filter; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_channel_element *elem, int try_to_connect); + +void grpc_client_channel_watch_connectivity_state( + grpc_channel_element *elem, grpc_connectivity_state *state, + grpc_iomgr_closure *on_complete); + #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 73da624aff..4f4c7eb64c 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -97,6 +97,25 @@ void pf_shutdown(grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } +static void start_picking(pick_first_lb_policy *p) { + p->started_picking = 1; + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], + &p->checking_connectivity, + &p->connectivity_changed); +} + +void pf_exit_idle(grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(p); + } + gpr_mu_unlock(&p->mu); +} + void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, grpc_iomgr_closure *on_complete) { @@ -109,13 +128,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, on_complete->cb(on_complete->cb_arg, 1); } else { if (!p->started_picking) { - p->started_picking = 1; - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF(pol, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed); + start_picking(p); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); @@ -249,8 +262,13 @@ static void pf_notify_on_state_change(grpc_lb_policy *pol, } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, - pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, + pf_shutdown, + pf_pick, + pf_exit_idle, + pf_broadcast, + pf_check_connectivity, + pf_notify_on_state_change}; grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels) { diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 6d1c788742..9d5baf84fb 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -77,3 +77,7 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) { policy->vtable->broadcast(policy, op); } + +void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) { + policy->vtable->exit_idle(policy); +} diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index a468f761cc..363faf3ee3 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -59,6 +59,9 @@ struct grpc_lb_policy_vtable { grpc_metadata_batch *initial_metadata, grpc_subchannel **target, grpc_iomgr_closure *on_complete); + /** try to enter a READY connectivity state */ + void (*exit_idle)(grpc_lb_policy *policy); + /** broadcast a transport op to all subchannels */ void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op); @@ -106,4 +109,6 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op); +void grpc_lb_policy_exit_idle(grpc_lb_policy *policy); + #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c new file mode 100644 index 0000000000..bdd9678d66 --- /dev/null +++ b/src/core/surface/channel_connectivity.c @@ -0,0 +1,182 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/channel.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/channel/client_channel.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/surface/completion_queue.h" + +grpc_connectivity_state grpc_channel_check_connectivity_state( + grpc_channel *channel, int try_to_connect) { + /* forward through to the underlying client channel */ + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_check_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + return GRPC_CHANNEL_FATAL_FAILURE; + } + return grpc_client_channel_check_connectivity_state(client_channel_elem, + try_to_connect); +} + +typedef enum { + WAITING, + CALLING_BACK, + CALLING_BACK_AND_FINISHED, + CALLED_BACK +} callback_phase; + +typedef struct { + gpr_mu mu; + callback_phase phase; + int success; + grpc_iomgr_closure on_complete; + grpc_alarm alarm; + grpc_connectivity_state state; + grpc_connectivity_state *optional_new_state; + grpc_completion_queue *cq; + grpc_cq_completion completion_storage; + void *tag; +} state_watcher; + +static void delete_state_watcher(state_watcher *w) { + gpr_mu_destroy(&w->mu); + gpr_free(w); +} + +static void finished_completion(void *pw, grpc_cq_completion *ignored) { + int delete = 0; + state_watcher *w = pw; + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + case CALLED_BACK: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLING_BACK: + w->phase = CALLED_BACK; + break; + case CALLING_BACK_AND_FINISHED: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void partly_done(state_watcher *w, int due_to_completion) { + int delete = 0; + + if (due_to_completion) { + gpr_mu_lock(&w->mu); + w->success = 1; + gpr_mu_unlock(&w->mu); + grpc_alarm_cancel(&w->alarm); + } + + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + w->phase = CALLING_BACK; + if (w->optional_new_state) { + *w->optional_new_state = w->state; + } + grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w, + &w->completion_storage); + break; + case CALLING_BACK: + w->phase = CALLING_BACK_AND_FINISHED; + break; + case CALLING_BACK_AND_FINISHED: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLED_BACK: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void watch_complete(void *pw, int success) { partly_done(pw, 0); } + +static void timeout_complete(void *pw, int success) { partly_done(pw, 1); } + +void grpc_channel_watch_connectivity_state( + grpc_channel *channel, grpc_connectivity_state last_observed_state, + grpc_connectivity_state *optional_new_state, gpr_timespec deadline, + grpc_completion_queue *cq, void *tag) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + state_watcher *w = gpr_malloc(sizeof(*w)); + + grpc_cq_begin_op(cq); + + gpr_mu_init(&w->mu); + grpc_iomgr_closure_init(&w->on_complete, watch_complete, w); + w->phase = WAITING; + w->state = last_observed_state; + w->success = 0; + w->optional_new_state = optional_new_state; + w->cq = cq; + w->tag = tag; + + grpc_alarm_init(&w->alarm, deadline, timeout_complete, w, + gpr_now(GPR_CLOCK_REALTIME)); + + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_watch_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + grpc_iomgr_add_delayed_callback(&w->on_complete, 1); + } else { + grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, + &w->on_complete); + } +} |