aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar David G. Quintas <dgq@google.com>2017-06-29 11:45:47 -0700
committerGravatar GitHub <noreply@github.com>2017-06-29 11:45:47 -0700
commit9864662df17eabc6e944427cc221fd34dbf9e733 (patch)
tree90d6ff2e534832b2fcb01324d2a259ed8b04edb6 /src/core
parent85762752c4a12390f6815eb7ddd3833aa3bfa223 (diff)
parentaf084dc37c2d31f199cfa0516473fbe308ed9ba4 (diff)
Merge pull request #11604 from dgquintas/fix_rr_state_master
Fix RR policy connectivity state upon subchannels shutdown
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c45
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c38
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c18
3 files changed, 81 insertions, 20 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index 307e3bad67..d0acd7a901 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -95,6 +95,9 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(p->subchannels);
gpr_free(p->new_subchannels);
gpr_free(p);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
+ }
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@@ -268,11 +271,20 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
if (p->num_subchannels > 0) {
GPR_ASSERT(p->selected == NULL);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
+ (void *)p, (void *)p->subchannels[p->checking_subchannel]);
+ }
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
p->updating_subchannels = true;
} else if (p->selected != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Pick First %p unsubscribing from selected subchannel %p",
+ (void *)p, (void *)p->selected);
+ }
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
p->updating_selected = true;
@@ -451,12 +463,25 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *selected_subchannel;
pending_pick *pp;
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "Pick First %p connectivity changed. Updating selected: %d; Updating "
+ "subchannels: %d; Checking %lu index (%lu total); State: %d; ",
+ (void *)p, p->updating_selected, p->updating_subchannels,
+ (unsigned long)p->checking_subchannel,
+ (unsigned long)p->num_subchannels, p->checking_connectivity);
+ }
bool restart = false;
- if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
+ if (p->updating_selected && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for p->selected */
GPR_ASSERT(p->selected != NULL);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
"pf_update_connectivity");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
+ (void *)p, (void *)p->selected);
+ }
p->updating_selected = false;
if (p->num_new_subchannels == 0) {
p->selected = NULL;
@@ -464,12 +489,16 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
restart = true;
}
- if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
+ if (p->updating_subchannels && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for the checking subchannel */
GPR_ASSERT(p->selected == NULL);
for (size_t i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
"pf_update_connectivity");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
+ (void *)p->subchannels[i]);
+ }
}
gpr_free(p->subchannels);
p->subchannels = NULL;
@@ -481,14 +510,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (restart) {
p->selected = NULL;
p->selected_key = NULL;
-
GPR_ASSERT(p->new_subchannels != NULL);
GPR_ASSERT(p->num_new_subchannels > 0);
p->num_subchannels = p->num_new_subchannels;
p->subchannels = p->new_subchannels;
p->num_new_subchannels = 0;
p->new_subchannels = NULL;
-
if (p->started_picking) {
/* If we were picking, continue to do so over the new subchannels,
* starting from the 0th index. */
@@ -542,7 +569,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
"picked_first");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
+ gpr_log(GPR_INFO,
+ "Pick First %p selected subchannel %p (connected %p)",
+ (void *)p, (void *)selected_subchannel, (void *)p->selected);
}
p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */
@@ -568,7 +597,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
- /* only trigger transient failure when we've tried all alternatives */
+ /* only trigger transient failure when we've tried all alternatives
+ */
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
@@ -652,6 +682,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p);
+ }
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 3c8520cc1c..8e9d6b0f47 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -126,6 +126,8 @@ struct rr_subchannel_list {
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
+ /** how many subchannels are in state SHUTDOWN */
+ size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;
@@ -425,6 +427,9 @@ static void update_state_counters_locked(subchannel_data *sd) {
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ GPR_ASSERT(subchannel_list->num_shutdown > 0);
+ --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -433,6 +438,8 @@ static void update_state_counters_locked(subchannel_data *sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -455,7 +462,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->subchannel_list->num_subchannels = 0.
+ * CHECK: p->subchannel_list->num_shutdown ==
+ * p->subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
@@ -464,37 +472,39 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
* CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
+ grpc_connectivity_state new_state = sd->curr_connectivity_state;
rr_subchannel_list *subchannel_list = sd->subchannel_list;
round_robin_lb_policy *p = subchannel_list->policy;
if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- return GRPC_CHANNEL_READY;
+ new_state = GRPC_CHANNEL_READY;
} else if (sd->curr_connectivity_state ==
GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
- return GRPC_CHANNEL_CONNECTING;
- } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */
+ new_state = GRPC_CHANNEL_CONNECTING;
+ } else if (p->subchannel_list->num_shutdown ==
+ p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
- return GRPC_CHANNEL_SHUTDOWN;
+ new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- return GRPC_CHANNEL_TRANSIENT_FAILURE;
+ new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
} else if (subchannel_list->num_idle ==
p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
- return GRPC_CHANNEL_IDLE;
+ new_state = GRPC_CHANNEL_IDLE;
}
- /* no change */
- return sd->curr_connectivity_state;
+ GRPC_ERROR_UNREF(error);
+ return new_state;
}
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -571,13 +581,15 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(!sd->subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const unsigned long num_subchannels =
+ p->subchannel_list != NULL
+ ? (unsigned long)p->subchannel_list->num_subchannels
+ : 0;
gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %lu) in favor "
"of %p (size %lu)",
- (void *)p, (void *)p->subchannel_list,
- (unsigned long)p->subchannel_list->num_subchannels,
- (void *)sd->subchannel_list,
- (unsigned long)sd->subchannel_list->num_subchannels);
+ (void *)p, (void *)p->subchannel_list, num_subchannels,
+ (void *)sd->subchannel_list, num_subchannels);
}
if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index a311334d13..479ba393a2 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -56,6 +56,10 @@ typedef struct {
// grpc_resolver_next_locked()'s closure.
grpc_channel_args* next_results;
+ // Results to use for the pretended re-resolution in
+ // fake_resolver_channel_saw_error_locked().
+ grpc_channel_args* results_upon_error;
+
// pending next completion, or NULL
grpc_closure* next_completion;
// target result address for next completion
@@ -65,6 +69,7 @@ typedef struct {
static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
fake_resolver* r = (fake_resolver*)gr;
grpc_channel_args_destroy(exec_ctx, r->next_results);
+ grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
}
@@ -87,15 +92,22 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
*r->target_result =
grpc_channel_args_union(r->next_results, r->channel_args);
grpc_channel_args_destroy(exec_ctx, r->next_results);
+ r->next_results = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
- r->next_results = NULL;
}
}
static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver;
+ gpr_log(
+ GPR_INFO,
+ "FOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO");
+ if (r->next_results == NULL && r->results_upon_error != NULL) {
+ // Pretend we re-resolved.
+ r->next_results = grpc_channel_args_copy(r->results_upon_error);
+ }
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}
@@ -151,6 +163,10 @@ static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
grpc_channel_args_destroy(exec_ctx, r->next_results);
}
r->next_results = generator->next_response;
+ if (r->results_upon_error != NULL) {
+ grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
+ }
+ r->results_upon_error = grpc_channel_args_copy(generator->next_response);
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}