aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-09-10 11:06:46 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-09-10 11:06:46 -0700
commitfe7a6368fc154e291dd91b002e22bca4970ef00a (patch)
tree280c28ab19c712028fcdec068df3a154bc54a22f /src/core
parenta7297eaa8fb99bb69b020e9d0520ca2d73ffa7c5 (diff)
Usage of ?lb_policy=xxx in sockaddr_resolver
Plus test tweaks and final touches to round robin policy
Diffstat (limited to 'src/core')
-rw-r--r--src/core/client_config/lb_policies/round_robin.c32
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c24
2 files changed, 47 insertions, 9 deletions
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index 8ab496229c..ea15d1dce0 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -201,11 +201,24 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
gpr_free(node);
}
+static void del_interested_parties_locked(round_robin_lb_policy *p,
+ const size_t subchannel_idx) {
+ pending_pick *pp;
+ for (pp = p->pending_picks; pp; pp = pp->next) {
+ grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx],
+ pp->pollset);
+ }
+}
+
+
void rr_destroy(grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) {
+ del_interested_parties_locked(p, i);
+ }
+ for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin");
}
gpr_free(p->connectivity_changed_cbs);
@@ -231,10 +244,15 @@ void rr_destroy(grpc_lb_policy *pol) {
}
void rr_shutdown(grpc_lb_policy *pol) {
+ size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
+ for (i = 0; i < p->num_subchannels; i++) {
+ del_interested_parties_locked(p, i);
+ }
+
p->shutdown = 1;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
@@ -350,7 +368,8 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) {
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_subchannel_del_interested_party(selected->subchannel, pp->pollset);
+ grpc_subchannel_del_interested_party(selected->subchannel,
+ pp->pollset);
grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
gpr_free(pp);
}
@@ -367,22 +386,23 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) {
&p->connectivity_changed_cbs[this_idx]);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- grpc_connectivity_state_set(&p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
-
+ del_interested_parties_locked(p, this_idx);
/* renew state notification */
grpc_subchannel_notify_on_state_change(
p->subchannels[this_idx], this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
- /* remove for ready list if still present */
+ /* remove from ready list if still present */
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]);
p->subchannel_index_to_readylist_node[this_idx] = NULL;
}
+ grpc_connectivity_state_set(&p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connecting_transient_failure");
break;
case GRPC_CHANNEL_FATAL_FAILURE:
+ del_interested_parties_locked(p, this_idx);
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]);
p->subchannel_index_to_readylist_node[this_idx] = NULL;
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 111c237a0d..1610893428 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -272,7 +272,7 @@ done:
static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
- grpc_uri *uri, const char *lb_policy_name,
+ grpc_uri *uri, const char *default_lb_policy_name,
grpc_subchannel_factory *subchannel_factory,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) {
size_t i;
@@ -289,6 +289,25 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
+ r->lb_policy_name = NULL;
+ if (0 != strcmp(uri->query, "")) {
+ gpr_slice query_slice;
+ gpr_slice_buffer query_parts;
+
+ query_slice = gpr_slice_new(uri->query, strlen(uri->query), do_nothing);
+ gpr_slice_buffer_init(&query_parts);
+ gpr_slice_split(query_slice, "=", &query_parts);
+ GPR_ASSERT(query_parts.count == 2);
+ if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) {
+ r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII);
+ }
+ gpr_slice_buffer_destroy(&query_parts);
+ gpr_slice_unref(query_slice);
+ }
+ if (r->lb_policy_name == NULL) {
+ r->lb_policy_name = gpr_strdup(default_lb_policy_name);
+ }
+
path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing);
gpr_slice_buffer_init(&path_parts);
@@ -319,7 +338,6 @@ static grpc_resolver *sockaddr_create(
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
r->subchannel_factory = subchannel_factory;
- r->lb_policy_name = gpr_strdup(lb_policy_name);
grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
@@ -337,7 +355,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *name##_factory_create_resolver( \
grpc_resolver_factory *factory, grpc_uri *uri, \
grpc_subchannel_factory *subchannel_factory) { \
- return sockaddr_create(uri, "round_robin", \
+ return sockaddr_create(uri, "pick_first", \
subchannel_factory, parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \