From fe7a6368fc154e291dd91b002e22bca4970ef00a Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 10 Sep 2015 11:06:46 -0700 Subject: Usage of ?lb_policy=xxx in sockaddr_resolver Plus test tweaks and final touches to round robin policy --- src/core/client_config/lb_policies/round_robin.c | 32 ++++++++++++++++++---- .../client_config/resolvers/sockaddr_resolver.c | 24 ++++++++++++++-- 2 files changed, 47 insertions(+), 9 deletions(-) (limited to 'src/core/client_config') diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 8ab496229c..ea15d1dce0 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -201,10 +201,23 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } +static void del_interested_parties_locked(round_robin_lb_policy *p, + const size_t subchannel_idx) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx], + pp->pollset); + } +} + + void rr_destroy(grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i); + } for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin"); } @@ -231,10 +244,15 @@ void rr_destroy(grpc_lb_policy *pol) { } void rr_shutdown(grpc_lb_policy *pol) { + size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i); + } + p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -350,7 +368,8 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) { "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party(selected->subchannel, pp->pollset); + grpc_subchannel_del_interested_party(selected->subchannel, + pp->pollset); grpc_iomgr_add_delayed_callback(pp->on_complete, 1); gpr_free(pp); } @@ -367,22 +386,23 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) { &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure"); - + del_interested_parties_locked(p, this_idx); /* renew state notification */ grpc_subchannel_notify_on_state_change( p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); - /* remove for ready list if still present */ + /* remove from ready list if still present */ if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); p->subchannel_index_to_readylist_node[this_idx] = NULL; } + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(p, this_idx); if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); p->subchannel_index_to_readylist_node[this_idx] = NULL; diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 111c237a0d..1610893428 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -272,7 +272,7 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_uri *uri, const char *lb_policy_name, + grpc_uri *uri, const char *default_lb_policy_name, grpc_subchannel_factory *subchannel_factory, int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) { size_t i; @@ -289,6 +289,25 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); + r->lb_policy_name = NULL; + if (0 != strcmp(uri->query, "")) { + gpr_slice query_slice; + gpr_slice_buffer query_parts; + + query_slice = gpr_slice_new(uri->query, strlen(uri->query), do_nothing); + gpr_slice_buffer_init(&query_parts); + gpr_slice_split(query_slice, "=", &query_parts); + GPR_ASSERT(query_parts.count == 2); + if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { + r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + } + gpr_slice_buffer_destroy(&query_parts); + gpr_slice_unref(query_slice); + } + if (r->lb_policy_name == NULL) { + r->lb_policy_name = gpr_strdup(default_lb_policy_name); + } + path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing); gpr_slice_buffer_init(&path_parts); @@ -319,7 +338,6 @@ static grpc_resolver *sockaddr_create( gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = subchannel_factory; - r->lb_policy_name = gpr_strdup(lb_policy_name); grpc_subchannel_factory_ref(subchannel_factory); return &r->base; @@ -337,7 +355,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_uri *uri, \ grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, "round_robin", \ + return sockaddr_create(uri, "pick_first", \ subchannel_factory, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ -- cgit v1.2.3