aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-04-05 10:47:31 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-04-05 10:47:31 -0700
commit52c85cecf365fb534827ac15780d803993693c46 (patch)
tree38e6a58cf315d88b5d10cfb35888f07d25ce1887 /src/core/ext
parentf40a1972d00645ec9212f26085d6ed28cf3ea12c (diff)
parente6aa55c014364878bf539e95649b0e3327fd225f (diff)
Merge github.com:grpc/grpc into optionalize_client_config
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_config/client_channel.c40
-rw-r--r--src/core/ext/client_config/lb_policy.c11
-rw-r--r--src/core/ext/client_config/lb_policy.h13
-rw-r--r--src/core/ext/client_config/subchannel.c2
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c48
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c40
6 files changed, 124 insertions, 30 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 0ff67a7d87..3b2f7b8785 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
+static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand,
+ grpc_connectivity_state state,
+ const char *reason) {
+ if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ state == GRPC_CHANNEL_FATAL_FAILURE) &&
+ chand->lb_policy != NULL) {
+ /* cancel fail-fast picks */
+ grpc_lb_policy_cancel_picks(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
+ /* check= */ 0);
+ }
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+}
+
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
grpc_connectivity_state publish_state = w->state;
@@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
- "lb_changed");
+ set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+ "lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
@@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (iomgr_success && chand->resolver) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
- "new_lb+resolver");
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect && chand->resolver != NULL) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
@@ -298,6 +315,7 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
@@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
@@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
@@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
- initial_metadata, connected_subchannel, on_ready);
+ initial_metadata, initial_metadata_flags,
+ connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
@@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
+ cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 72c1cf250c..a7ad9842dc 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -101,10 +101,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
- target, on_complete);
+ initial_metadata_flags, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -112,6 +113,14 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
+ initial_metadata_flags_eq);
+}
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 44b715c7a1..0384e0b2eb 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -60,9 +60,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -122,6 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
@@ -131,6 +136,14 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+/** Cancel all pending picks which have:
+ (initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq */
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 5285041dd5..e6e3885e40 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
-#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index cac62b81ea..5926f9d70b 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -40,6 +40,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -149,6 +150,31 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
@@ -171,6 +197,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -199,6 +226,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->next = p->pending_picks;
pp->pollset = pollset;
pp->target = target;
+ pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -286,11 +314,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
+ if (p->checking_subchannel == 0) {
+ /* only trigger transient failure when we've tried all alternatives */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connecting_transient_failure");
+ }
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@@ -378,14 +409,9 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy,
- pf_shutdown,
- pf_pick,
- pf_cancel_pick,
- pf_ping_one,
- pf_exit_idle,
- pf_check_connectivity,
- pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick,
+ pf_cancel_pick, pf_cancel_picks, pf_ping_one,
+ pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 16f3942d3a..3f6051b892 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -49,6 +49,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -275,6 +276,32 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ *pp->target = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@@ -303,6 +330,7 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -330,6 +358,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollset = pollset;
pp->target = target;
pp->on_complete = on_complete;
+ pp->initial_metadata_flags = initial_metadata_flags;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -485,14 +514,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown,
- rr_pick,
- rr_cancel_pick,
- rr_ping_one,
- rr_exit_idle,
- rr_check_connectivity,
- rr_notify_on_state_change};
+ rr_destroy, rr_shutdown, rr_pick,
+ rr_cancel_pick, rr_cancel_picks, rr_ping_one,
+ rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}