diff options
Diffstat (limited to 'src/core/lib/client_config/resolvers')
3 files changed, 65 insertions, 75 deletions
diff --git a/src/core/lib/client_config/resolvers/dns_resolver.c b/src/core/lib/client_config/resolvers/dns_resolver.c index ab445730ad..62bccdd045 100644 --- a/src/core/lib/client_config/resolvers/dns_resolver.c +++ b/src/core/lib/client_config/resolvers/dns_resolver.c @@ -162,38 +162,23 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { dns_resolver *r = arg; grpc_client_config *config = NULL; - grpc_subchannel **subchannels; - grpc_subchannel_args args; grpc_lb_policy *lb_policy; - size_t i; gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = 0; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - size_t naddrs = 0; - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = (size_t)addresses->addrs[i].len; - grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); - if (subchannel != NULL) { - subchannels[naddrs++] = subchannel; - } - } memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); } grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); diff --git a/src/core/lib/client_config/resolvers/sockaddr_resolver.c b/src/core/lib/client_config/resolvers/sockaddr_resolver.c index 66cddc3ed9..5c5133649a 100644 --- a/src/core/lib/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/lib/client_config/resolvers/sockaddr_resolver.c @@ -58,11 +58,7 @@ typedef struct { char *lb_policy_name; /** the addresses that we've 'resolved' */ - struct sockaddr_storage *addrs; - /** the corresponding length of the addresses */ - size_t *addrs_len; - /** how many elements in \a addrs */ - size_t num_addrs; + grpc_resolved_addresses *addresses; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -125,28 +121,14 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r) { - grpc_client_config *cfg; - grpc_lb_policy *lb_policy; - grpc_lb_policy_args lb_policy_args; - grpc_subchannel **subchannels; - grpc_subchannel_args args; - if (r->next_completion != NULL && !r->published) { - size_t i; - cfg = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs); - for (i = 0; i < r->num_addrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)&r->addrs[i]; - args.addr_len = r->addrs_len[i]; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); - } + grpc_client_config *cfg = grpc_client_config_create(); + grpc_lb_policy_args lb_policy_args; memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = r->num_addrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - gpr_free(subchannels); + lb_policy_args.addresses = r->addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + grpc_lb_policy *lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; @@ -160,8 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); - gpr_free(r->addrs); - gpr_free(r->addrs_len); + grpc_resolved_addresses_destroy(r->addresses); gpr_free(r->lb_policy_name); gpr_free(r); } @@ -269,7 +250,6 @@ static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_resolver_args *args, const char *default_lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - size_t i; int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; gpr_slice path_slice; @@ -287,17 +267,46 @@ static grpc_resolver *sockaddr_create( r->lb_policy_name = NULL; if (0 != strcmp(args->uri->query, "")) { gpr_slice query_slice; - gpr_slice_buffer query_parts; + gpr_slice_buffer query_parts; /* the &-separated elements of the query */ + gpr_slice_buffer query_param_parts; /* the =-separated subelements */ query_slice = gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing); gpr_slice_buffer_init(&query_parts); - gpr_slice_split(query_slice, "=", &query_parts); - GPR_ASSERT(query_parts.count == 2); - if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { - r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + gpr_slice_buffer_init(&query_param_parts); + /* the query can contain "lb_policy=<policy>" and "lb_enabled=<1|0>" */ + + bool lb_enabled; + gpr_slice_split(query_slice, "&", &query_parts); + for (i = 0; i < query_parts.count; i++) { + gpr_slice_split(query_parts.slices[i], "=", &query_param_parts); + GPR_ASSERT(query_param_parts.count == 2); + if (0 == gpr_slice_str_cmp(query_param_parts.slices[0], "lb_policy")) { + r->lb_policy_name = + gpr_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII); + } else if (0 == + gpr_slice_str_cmp(query_param_parts.slices[0], "lb_enabled")) { + if (0 != gpr_slice_str_cmp(query_param_parts.slices[1], "0")) { + /* anything other than 0 is taken to be true */ + lb_enabled = true; + } + } else { + gpr_log(GPR_ERROR, "invalid query element value: '%s'", + query_parts.slices[0]); + } + gpr_slice_buffer_reset_and_unref(&query_param_parts); + } + + if (strcmp("grpclb", r->lb_policy_name) == 0 && !lb_enabled) { + /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail + * out, as this is meant mostly for tests. */ + gpr_log(GPR_ERROR, + "Requested 'grpclb' LB policy but resolved addresses don't " + "support load balancing."); + abort(); } gpr_slice_buffer_destroy(&query_parts); + gpr_slice_buffer_destroy(&query_param_parts); gpr_slice_unref(query_slice); } if (r->lb_policy_name == NULL) { @@ -309,15 +318,18 @@ static grpc_resolver *sockaddr_create( gpr_slice_buffer_init(&path_parts); gpr_slice_split(path_slice, ",", &path_parts); - r->num_addrs = path_parts.count; - r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); - r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); + r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->addresses->naddrs = path_parts.count; + r->addresses->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs); - for (i = 0; i < r->num_addrs; i++) { + for (size_t i = 0; i < r->addresses->naddrs; i++) { grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; - if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { + if (!parse(&ith_uri, + (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), + &r->addresses->addrs[i].len)) { errors_found = 1; /* GPR_TRUE */ } gpr_free(part_str); @@ -328,8 +340,7 @@ static grpc_resolver *sockaddr_create( gpr_slice_unref(path_slice); if (errors_found) { gpr_free(r->lb_policy_name); - gpr_free(r->addrs); - gpr_free(r->addrs_len); + grpc_resolved_addresses_destroy(r->addresses); gpr_free(r); return NULL; } diff --git a/src/core/lib/client_config/resolvers/zookeeper_resolver.c b/src/core/lib/client_config/resolvers/zookeeper_resolver.c index 3bb0bbdf5c..404dfcd423 100644 --- a/src/core/lib/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/lib/client_config/resolvers/zookeeper_resolver.c @@ -184,28 +184,22 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; grpc_client_config *config = NULL; - grpc_subchannel **subchannels; - grpc_subchannel_args args; grpc_lb_policy *lb_policy; - size_t i; + if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); + + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); } - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); } gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving == 1); |