aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Juanli Shen <aspirinsjl@gmail.com>2018-02-07 14:51:20 -0800
committerGravatar Juanli Shen <juanlishen@google.com>2018-02-12 11:35:40 -0800
commit776490af0eae820fb8919b2ea6f245aff197a1ab (patch)
treeaa4c439abd4774e2c34a06a573d4ce47f14c14e9 /src
parent86accb1768cc860a7579bb5fff9bd3c2ba7274af (diff)
Revert "Revert "grpclb re-resolution""
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc133
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc11
5 files changed, 77 insertions, 89 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index cc4fe7ec62..27fb2ad1f4 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -118,7 +118,8 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
- policy->vtable->set_reresolve_closure_locked(policy, request_reresolution);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
}
void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
@@ -133,8 +134,8 @@ void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
grpc_lb_trace->name(), policy, grpc_error_string(error));
}
} else {
- if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
- gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
+ if (grpc_lb_trace->enabled()) {
+ gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.",
grpc_lb_trace->name(), policy);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 30660cb83d..6edd314d5e 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -107,10 +107,6 @@ struct grpc_lb_policy_vtable {
void (*update_locked)(grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
-
- /** \see grpc_lb_policy_set_reresolve_closure */
- void (*set_reresolve_closure_locked)(grpc_lb_policy* policy,
- grpc_closure* request_reresolution);
};
#ifndef NDEBUG
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 04bf798267..1c8809eabc 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -249,7 +249,7 @@ typedef struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy* rr_policy;
- grpc_closure on_rr_connectivity_changed;
+ /** the connectivity state of the embedded RR policy */
grpc_connectivity_state rr_connectivity_state;
bool started_picking;
@@ -292,6 +292,12 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
+ /** called upon changes to the RR's connectivity. */
+ grpc_closure rr_on_connectivity_changed;
+
+ /** called upon reresolution request from the RR policy. */
+ grpc_closure rr_on_reresolution_requested;
+
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@@ -590,9 +596,8 @@ static grpc_lb_addresses* extract_backend_addresses_locked(
return backend_addresses;
}
-static void update_lb_connectivity_status_locked(
- glb_lb_policy* glb_policy, grpc_connectivity_state rr_state,
- grpc_error* rr_state_error) {
+static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
+ grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
/* The new connectivity status is a function of the previous one and the new
@@ -624,7 +629,7 @@ static void update_lb_connectivity_status_locked(
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
- switch (rr_state) {
+ switch (glb_policy->rr_connectivity_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
@@ -638,11 +643,12 @@ static void update_lb_connectivity_status_locked(
gpr_log(
GPR_INFO,
"[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
- glb_policy, grpc_connectivity_state_name(rr_state),
+ glb_policy,
+ grpc_connectivity_state_name(glb_policy->rr_connectivity_state),
glb_policy->rr_policy);
}
- grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state,
- rr_state_error,
+ grpc_connectivity_state_set(&glb_policy->state_tracker,
+ glb_policy->rr_connectivity_state, rr_state_error,
"update_lb_connectivity_status_locked");
}
@@ -740,11 +746,36 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free(args);
}
-static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
+static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+ if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) {
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "rr_on_reresolution_requested_locked");
+ return;
+ }
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
+ glb_policy, glb_policy->rr_policy);
+ }
+ // If we are talking to a balancer, we expect to get updated addresses form
+ // the balancer, so we can ignore the re-resolution request from the RR
+ // policy. Otherwise, handle the re-resolution request using glb's original
+ // re-resolution closure.
+ if (glb_policy->lb_calld == nullptr ||
+ !glb_policy->lb_calld->seen_initial_response) {
+ grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
+ GRPC_ERROR_NONE);
+ }
+ // Give back the wrapper closure to the RR policy.
+ grpc_lb_policy_set_reresolve_closure_locked(
+ glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested);
+}
+
static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr);
-
grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
if (new_rr_policy == nullptr) {
gpr_log(GPR_ERROR,
@@ -757,29 +788,25 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
+ GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked");
grpc_lb_policy_set_reresolve_closure_locked(
- new_rr_policy, glb_policy->base.request_reresolution);
- glb_policy->base.request_reresolution = nullptr;
+ new_rr_policy, &glb_policy->rr_on_reresolution_requested);
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
glb_policy->rr_policy, &rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
- update_lb_connectivity_status_locked(
- glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
+ update_lb_connectivity_status_locked(glb_policy, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
- GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
- on_rr_connectivity_changed_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked");
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
- &glb_policy->on_rr_connectivity_changed);
+ &glb_policy->rr_on_connectivity_changed);
grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
// Send pending picks to RR policy.
pending_pick* pp;
@@ -827,28 +854,18 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy(args);
}
-static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
+static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down) {
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
- return;
- }
- if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- /* An RR policy that has transitioned into the SHUTDOWN connectivity state
- * should not be considered for picks or updates: the SHUTDOWN state is a
- * sink, policies can't transition back from it. .*/
- GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
- glb_policy->rr_policy = nullptr;
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "rr_on_connectivity_changed_locked");
return;
}
- /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
- update_lb_connectivity_status_locked(
- glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
- /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
+ update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error));
+ // Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref.
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
- &glb_policy->on_rr_connectivity_changed);
+ &glb_policy->rr_on_connectivity_changed);
}
static void destroy_balancer_name(void* balancer_name) {
@@ -971,8 +988,6 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
- } else {
- grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -984,6 +999,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
}
grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "glb_shutdown");
+ grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
// Clear pending picks.
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
@@ -1623,6 +1639,8 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
lb_calld, lb_calld->lb_call, grpc_error_string(error));
gpr_free(status_details);
}
+ grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
+ GRPC_ERROR_NONE);
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
@@ -1651,16 +1669,15 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
glb_policy->fallback_timer_callback_pending = false;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't fall back. */
- if (glb_policy->serverlist == nullptr) {
- if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Falling back to use backends from resolver",
- glb_policy);
- }
- GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
- rr_handover_locked(glb_policy);
+ if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down &&
+ error == GRPC_ERROR_NONE) {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Falling back to use backends from resolver",
+ glb_policy);
}
+ GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
+ rr_handover_locked(glb_policy);
}
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
@@ -1781,19 +1798,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
}
}
-static void glb_set_reresolve_closure_locked(
- grpc_lb_policy* policy, grpc_closure* request_reresolution) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(policy);
- GPR_ASSERT(!glb_policy->shutting_down);
- GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
- if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy,
- request_reresolution);
- } else {
- glb_policy->base.request_reresolution = request_reresolution;
- }
-}
-
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1805,8 +1809,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
- glb_update_locked,
- glb_set_reresolve_closure_locked};
+ glb_update_locked};
static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
@@ -1887,6 +1890,12 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
return nullptr;
}
grpc_subchannel_index_ref();
+ GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed,
+ rr_on_connectivity_changed_locked, glb_policy,
+ grpc_combiner_scheduler(args->combiner));
+ GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested,
+ rr_on_reresolution_requested_locked, glb_policy,
+ grpc_combiner_scheduler(args->combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 1485f7caf5..296bdcb247 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -520,14 +520,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
}
-static void pf_set_reresolve_closure_locked(
- grpc_lb_policy* policy, grpc_closure* request_reresolution) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(policy);
- GPR_ASSERT(!p->shutdown);
- GPR_ASSERT(policy->request_reresolution == nullptr);
- policy->request_reresolution = request_reresolution;
-}
-
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@@ -538,8 +530,7 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
- pf_update_locked,
- pf_set_reresolve_closure_locked};
+ pf_update_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index cefd0d8d7d..b5b4c44ef1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -620,14 +620,6 @@ static void rr_update_locked(grpc_lb_policy* policy,
}
}
-static void rr_set_reresolve_closure_locked(
- grpc_lb_policy* policy, grpc_closure* request_reresolution) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy);
- GPR_ASSERT(!p->shutdown);
- GPR_ASSERT(policy->request_reresolution == nullptr);
- policy->request_reresolution = request_reresolution;
-}
-
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@@ -638,8 +630,7 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
- rr_update_locked,
- rr_set_reresolve_closure_locked};
+ rr_update_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}