aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c39
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c15
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c15
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c11
-rw-r--r--src/core/lib/iomgr/resolve_address.h2
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c1
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c1
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.c1
8 files changed, 65 insertions, 20 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index af913d8a9d..4032b76ac9 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -309,6 +309,11 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
args.addresses->addrs[out_addrs_idx].len = sa_len;
+ // These are, of course, actually balancer addresses. However, we
+ // want the round_robin LB policy to treat them as normal backend
+ // addresses, since we don't need to talk to balancers in order to
+ // find the balancers themselves.
+ args.addresses->addrs[out_addrs_idx].is_balancer = false;
++out_addrs_idx;
} else {
gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
@@ -414,6 +419,20 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
+ // Count the number of gRPC-LB addresses. There must be at least one.
+ // TODO(roth): For now, we ignore non-balancer addresses, so there must be
+ // at least one balancer address. In the future, we may change the
+ // behavior such that we fall back to using the non-balancer addresses
+ // if we cannot reach any balancers. At that time, this should be
+ // changed to allow a list with no balancer addresses, since the
+ // resolver might fail to return a balancer address even when this is
+ // the right LB policy to use.
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < args->addresses->naddrs; ++i) {
+ if (args->addresses->addrs[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return NULL;
+
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy));
@@ -424,25 +443,25 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
- if (args->addresses->naddrs == 0) {
- return NULL;
- }
/* construct a target from the args->addresses, in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
+ char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
addr_strs[0] =
grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
+ size_t addr_index = 1;
for (size_t i = 1; i < args->addresses->naddrs; i++) {
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_strs[i],
- (const struct sockaddr *)&args->addresses->addrs[i],
- true) == 0);
+ if (args->addresses->addrs[i].is_balancer) {
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_strs[addr_index++],
+ (const struct sockaddr *)&args->addresses->addrs[i],
+ true) == 0);
+ }
}
size_t uri_path_len;
char *target_uri_str = gpr_strjoin_sep(
- (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
+ (const char **)addr_strs, num_grpclb_addrs, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@@ -450,7 +469,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < num_grpclb_addrs; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70692..858a53ae55 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -443,17 +443,26 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->addresses->naddrs == 0) return NULL;
+ // Find the number of backend addresses. We ignore balancer
+ // addresses, since we don't know how to handle them.
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ if (!args->addresses->addrs[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
p->subchannels =
- gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ gpr_malloc(sizeof(grpc_subchannel *) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ // Skip balancer addresses, since we only know how to handle backends.
+ if (args->addresses->addrs[i].is_balancer) continue;
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7bcf608ab9..986a57bff9 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -571,16 +571,27 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
+ // Find the number of backend addresses. We ignore balancer
+ // addresses, since we don't know how to handle them.
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ if (!args->addresses->addrs[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
+
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
p->subchannels =
- gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ gpr_malloc(sizeof(*p->subchannels) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ // Skip balancer addresses, since we only know how to handle backends.
+ if (args->addresses->addrs[i].is_balancer) continue;
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 3807522d2b..f0748ef583 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -63,7 +63,7 @@ typedef struct {
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
- int published;
+ bool published;
/** pending next completion, or NULL */
grpc_closure *next_completion;
/** target result address for next completion */
@@ -102,7 +102,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
gpr_mu_lock(&r->mu);
- r->published = 0;
+ r->published = false;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
gpr_mu_unlock(&r->mu);
}
@@ -131,7 +131,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
- r->published = 1;
+ r->published = true;
*r->target_result = result;
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
@@ -175,7 +175,7 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_resolver_args *args, const char *default_lb_policy_name,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
- int errors_found = 0; /* GPR_FALSE */
+ bool errors_found = false;
sockaddr_resolver *r;
gpr_slice path_slice;
gpr_slice_buffer path_parts;
@@ -228,9 +228,10 @@ static grpc_resolver *sockaddr_create(
if (!parse(&ith_uri,
(struct sockaddr_storage *)(&r->addresses->addrs[i].addr),
&r->addresses->addrs[i].len)) {
- errors_found = 1; /* GPR_TRUE */
+ errors_found = true;
}
gpr_free(part_str);
+ r->addresses->addrs[i].is_balancer = lb_enabled;
if (errors_found) break;
}
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index ddbe375755..796e75da91 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -34,6 +34,7 @@
#ifndef GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H
#define GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H
+#include <stdbool.h>
#include <stddef.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -43,6 +44,7 @@
typedef struct {
char addr[GRPC_MAX_SOCKADDR_SIZE];
size_t len;
+ bool is_balancer;
} grpc_resolved_address;
typedef struct {
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 4e9f978584..f69cc09593 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -132,6 +132,7 @@ static grpc_error *blocking_resolve_address_impl(
for (resp = result; resp != NULL; resp = resp->ai_next) {
memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
(*addresses)->addrs[i].len = resp->ai_addrlen;
+ (*addresses)->addrs[i].is_balancer = false;
i++;
}
err = GRPC_ERROR_NONE;
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 2af8af82dc..e6bb209548 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -118,6 +118,7 @@ static grpc_error *blocking_resolve_address_impl(
for (resp = result; resp != NULL; resp = resp->ai_next) {
memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
(*addresses)->addrs[i].len = resp->ai_addrlen;
+ (*addresses)->addrs[i].is_balancer = false;
i++;
}
diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c
index 0e7670e5a5..2951286159 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.c
+++ b/src/core/lib/iomgr/unix_sockets_posix.c
@@ -58,6 +58,7 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name,
un->sun_family = AF_UNIX;
strcpy(un->sun_path, name);
(*addrs)->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
+ (*addrs)->addrs->is_balancer = false;
return GRPC_ERROR_NONE;
}