diff options
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 39 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 15 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 15 | ||||
-rw-r--r-- | src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.c | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_windows.c | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/unix_sockets_posix.c | 1 |
8 files changed, 65 insertions, 20 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index af913d8a9d..4032b76ac9 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -309,6 +309,11 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); args.addresses->addrs[out_addrs_idx].len = sa_len; + // These are, of course, actually balancer addresses. However, we + // want the round_robin LB policy to treat them as normal backend + // addresses, since we don't need to talk to balancers in order to + // find the balancers themselves. + args.addresses->addrs[out_addrs_idx].is_balancer = false; ++out_addrs_idx; } else { gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", @@ -414,6 +419,20 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + // Count the number of gRPC-LB addresses. There must be at least one. + // TODO(roth): For now, we ignore non-balancer addresses, so there must be + // at least one balancer address. In the future, we may change the + // behavior such that we fall back to using the non-balancer addresses + // if we cannot reach any balancers. At that time, this should be + // changed to allow a list with no balancer addresses, since the + // resolver might fail to return a balancer address even when this is + // the right LB policy to use. + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < args->addresses->naddrs; ++i) { + if (args->addresses->addrs[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); memset(glb_policy, 0, sizeof(*glb_policy)); @@ -424,25 +443,25 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * Create a client channel over them to communicate with a LB service */ glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->addresses->naddrs == 0) { - return NULL; - } /* construct a target from the args->addresses, in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); + char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); addr_strs[0] = grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); + size_t addr_index = 1; for (size_t i = 1; i < args->addresses->naddrs; i++) { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[i], - (const struct sockaddr *)&args->addresses->addrs[i], - true) == 0); + if (args->addresses->addrs[i].is_balancer) { + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[addr_index++], + (const struct sockaddr *)&args->addresses->addrs[i], + true) == 0); + } } size_t uri_path_len; char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + (const char **)addr_strs, num_grpclb_addrs, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -450,7 +469,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < num_grpclb_addrs; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9decf70692..858a53ae55 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -443,17 +443,26 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->addresses->naddrs == 0) return NULL; + // Find the number of backend addresses. We ignore balancer + // addresses, since we don't know how to handle them. + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + if (!args->addresses->addrs[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < args->addresses->naddrs; i++) { + // Skip balancer addresses, since we only know how to handle backends. + if (args->addresses->addrs[i].is_balancer) continue; + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr_len = (size_t)args->addresses->addrs[i].len; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 7bcf608ab9..986a57bff9 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -571,16 +571,27 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); + // Find the number of backend addresses. We ignore balancer + // addresses, since we don't know how to handle them. + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + if (!args->addresses->addrs[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + gpr_malloc(sizeof(*p->subchannels) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < args->addresses->naddrs; i++) { + // Skip balancer addresses, since we only know how to handle backends. + if (args->addresses->addrs[i].is_balancer) continue; + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr_len = (size_t)args->addresses->addrs[i].len; diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 3807522d2b..f0748ef583 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -63,7 +63,7 @@ typedef struct { /** mutex guarding the rest of the state */ gpr_mu mu; /** have we published? */ - int published; + bool published; /** pending next completion, or NULL */ grpc_closure *next_completion; /** target result address for next completion */ @@ -102,7 +102,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; gpr_mu_lock(&r->mu); - r->published = 0; + r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); gpr_mu_unlock(&r->mu); } @@ -131,7 +131,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); - r->published = 1; + r->published = true; *r->target_result = result; grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; @@ -175,7 +175,7 @@ static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_resolver_args *args, const char *default_lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - int errors_found = 0; /* GPR_FALSE */ + bool errors_found = false; sockaddr_resolver *r; gpr_slice path_slice; gpr_slice_buffer path_parts; @@ -228,9 +228,10 @@ static grpc_resolver *sockaddr_create( if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), &r->addresses->addrs[i].len)) { - errors_found = 1; /* GPR_TRUE */ + errors_found = true; } gpr_free(part_str); + r->addresses->addrs[i].is_balancer = lb_enabled; if (errors_found) break; } diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index ddbe375755..796e75da91 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H #define GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H +#include <stdbool.h> #include <stddef.h> #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" @@ -43,6 +44,7 @@ typedef struct { char addr[GRPC_MAX_SOCKADDR_SIZE]; size_t len; + bool is_balancer; } grpc_resolved_address; typedef struct { diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 4e9f978584..f69cc09593 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -132,6 +132,7 @@ static grpc_error *blocking_resolve_address_impl( for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); (*addresses)->addrs[i].len = resp->ai_addrlen; + (*addresses)->addrs[i].is_balancer = false; i++; } err = GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 2af8af82dc..e6bb209548 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -118,6 +118,7 @@ static grpc_error *blocking_resolve_address_impl( for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); (*addresses)->addrs[i].len = resp->ai_addrlen; + (*addresses)->addrs[i].is_balancer = false; i++; } diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 0e7670e5a5..2951286159 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -58,6 +58,7 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name, un->sun_family = AF_UNIX; strcpy(un->sun_path, name); (*addrs)->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + (*addrs)->addrs->is_balancer = false; return GRPC_ERROR_NONE; } |