aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h4
-rw-r--r--src/core/ext/client_config/resolver_result.c24
-rw-r--r--src/core/ext/client_config/resolver_result.h24
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c58
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c23
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c23
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c15
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c31
8 files changed, 143 insertions, 59 deletions
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index da1de3579a..6919b1eb98 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -36,7 +36,7 @@
#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/lb_policy.h"
-#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/ext/client_config/resolver_result.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -48,7 +48,7 @@ struct grpc_lb_policy_factory {
};
typedef struct grpc_lb_policy_args {
- grpc_resolved_addresses *addresses;
+ grpc_addresses *addresses;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c
index c6c4166e83..e14f761f05 100644
--- a/src/core/ext/client_config/resolver_result.c
+++ b/src/core/ext/client_config/resolver_result.c
@@ -37,6 +37,30 @@
#include <grpc/support/alloc.h>
+grpc_addresses *grpc_addresses_create(size_t num_addresses) {
+ grpc_addresses *addresses = gpr_malloc(sizeof(grpc_addresses));
+ addresses->num_addresses = num_addresses;
+ const size_t addresses_size = sizeof(grpc_address) * num_addresses;
+ addresses->addresses = gpr_malloc(addresses_size);
+ memset(addresses->addresses, 0, addresses_size);
+ return addresses;
+}
+
+void grpc_addresses_set_address(grpc_addresses *addresses, size_t index,
+ void *address, size_t address_len,
+ bool is_balancer) {
+ GPR_ASSERT(index < addresses->num_addresses);
+ grpc_address *target = &addresses->addresses[index];
+ memcpy(target->address.addr, address, address_len);
+ target->address.len = address_len;
+ target->is_balancer = is_balancer;
+}
+
+void grpc_addresses_destroy(grpc_addresses *addresses) {
+ gpr_free(addresses->addresses);
+ gpr_free(addresses);
+}
+
struct grpc_resolver_result {
gpr_refcount refs;
grpc_lb_policy *lb_policy;
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index 402f7dbd7e..4199ef512a 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -34,7 +34,31 @@
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
+#include <stdbool.h>
+
#include "src/core/ext/client_config/lb_policy.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+
+/** Used to represent addresses returned by the resolver. */
+typedef struct grpc_address {
+ grpc_resolved_address address;
+ bool is_balancer;
+} grpc_address;
+
+typedef struct grpc_addresses {
+ size_t num_addresses;
+ grpc_address *addresses;
+} grpc_addresses;
+
+/** Returns a grpc_addresses struct with enough space for
+ \a num_addresses addresses. */
+grpc_addresses *grpc_addresses_create(size_t num_addresses);
+
+void grpc_addresses_set_address(grpc_addresses *addresses, size_t index,
+ void *address, size_t address_len,
+ bool is_balancer);
+
+void grpc_addresses_destroy(grpc_addresses *addresses);
/** Results reported from a grpc_resolver. */
typedef struct grpc_resolver_result grpc_resolver_result;
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index af913d8a9d..61721a4a9e 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -296,10 +296,7 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args args;
args.client_channel_factory = glb_policy->cc_factory;
- args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
- args.addresses->naddrs = serverlist->num_servers;
- args.addresses->addrs =
- gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
+ args.addresses = grpc_addresses_create(serverlist->num_servers);
size_t out_addrs_idx = 0;
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_uri uri;
@@ -307,8 +304,12 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
size_t sa_len;
uri.path = host_ports[i];
if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
- memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
- args.addresses->addrs[out_addrs_idx].len = sa_len;
+ /* These are, of course, actually balancer addresses. However, we
+ * want the round_robin LB policy to treat them as normal backend
+ * addresses, since we don't need to talk to balancers in order to
+ * find the balancers themselves, so we set is_balancer=false. */
+ grpc_addresses_set_address(args.addresses, out_addrs_idx, &sa, sa_len,
+ false /* is_balancer */);
++out_addrs_idx;
} else {
gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
@@ -323,8 +324,7 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
gpr_free(host_ports[i]);
}
gpr_free(host_ports);
- gpr_free(args.addresses->addrs);
- gpr_free(args.addresses);
+ grpc_addresses_destroy(args.addresses);
return rr;
}
@@ -414,6 +414,19 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
+ /* Count the number of gRPC-LB addresses. There must be at least one.
+ * TODO(roth): For now, we ignore non-balancer addresses, but in the
+ * future, we may change the behavior such that we fall back to using
+ * the non-balancer addresses if we cannot reach any balancers. At that
+ * time, this should be changed to allow a list with no balancer addresses,
+ * since the resolver might fail to return a balancer address even when
+ * this is the right LB policy to use. */
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; ++i) {
+ if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return NULL;
+
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy));
@@ -424,25 +437,26 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
- if (args->addresses->naddrs == 0) {
- return NULL;
- }
/* construct a target from the args->addresses, in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
- addr_strs[0] =
- grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
- for (size_t i = 1; i < args->addresses->naddrs; i++) {
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_strs[i],
- (const struct sockaddr *)&args->addresses->addrs[i],
- true) == 0);
+ char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
+ addr_strs[0] = grpc_sockaddr_to_uri(
+ (const struct sockaddr *)&args->addresses->addresses[0].address.addr);
+ size_t addr_index = 1;
+ for (size_t i = 1; i < args->addresses->num_addresses; i++) {
+ if (args->addresses->addresses[i].is_balancer) {
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_strs[addr_index++],
+ (const struct sockaddr *)&args->addresses->addresses[i]
+ .address.addr,
+ true) == 0);
+ }
}
size_t uri_path_len;
- char *target_uri_str = gpr_strjoin_sep(
- (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
+ char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
+ num_grpclb_addrs, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@@ -450,7 +464,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < num_grpclb_addrs; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70692..9f3a6939e6 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -443,20 +443,29 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->addresses->naddrs == 0) return NULL;
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ /* Skip balancer addresses, since we only know how to handle backends. */
+ if (args->addresses->addresses[i].is_balancer) continue;
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ sc_args.addr =
+ (struct sockaddr *)(&args->addresses->addresses[i].address.addr);
+ sc_args.addr_len = args->addresses->addresses[i].address.len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7bcf608ab9..fc1534887a 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -571,19 +571,30 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
+
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ /* Skip balancer addresses, since we only know how to handle backends. */
+ if (args->addresses->addresses[i].is_balancer) continue;
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ sc_args.addr =
+ (struct sockaddr *)(&args->addresses->addresses[i].address.addr);
+ sc_args.addr_len = args->addresses->addresses[i].address.len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 79682e78b5..8fc10d98a8 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -170,20 +170,25 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
- grpc_resolved_addresses *addresses = r->addresses;
- if (addresses != NULL) {
+ if (r->addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
- result = grpc_resolver_result_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.addresses = addresses;
+ lb_policy_args.addresses = grpc_addresses_create(r->addresses->naddrs);
+ for (size_t i = 0; i < r->addresses->naddrs; ++i) {
+ grpc_addresses_set_address(
+ lb_policy_args.addresses, i, &r->addresses->addrs[i].addr,
+ r->addresses->addrs[i].len, false /* is_balancer */);
+ }
+ grpc_resolved_addresses_destroy(r->addresses);
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+ grpc_addresses_destroy(lb_policy_args.addresses);
+ result = grpc_resolver_result_create();
if (lb_policy != NULL) {
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
}
- grpc_resolved_addresses_destroy(addresses);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 3807522d2b..94d2f892eb 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -58,12 +58,12 @@ typedef struct {
char *lb_policy_name;
/** the addresses that we've 'resolved' */
- grpc_resolved_addresses *addresses;
+ grpc_addresses *addresses;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
- int published;
+ bool published;
/** pending next completion, or NULL */
grpc_closure *next_completion;
/** target result address for next completion */
@@ -102,7 +102,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
gpr_mu_lock(&r->mu);
- r->published = 0;
+ r->published = false;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
gpr_mu_unlock(&r->mu);
}
@@ -131,7 +131,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
- r->published = 1;
+ r->published = true;
*r->target_result = result;
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
@@ -142,7 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
- grpc_resolved_addresses_destroy(r->addresses);
+ grpc_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
@@ -175,7 +175,7 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_resolver_args *args, const char *default_lb_policy_name,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
- int errors_found = 0; /* GPR_FALSE */
+ bool errors_found = false;
sockaddr_resolver *r;
gpr_slice path_slice;
gpr_slice_buffer path_parts;
@@ -216,21 +216,18 @@ static grpc_resolver *sockaddr_create(
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
- r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
- r->addresses->naddrs = path_parts.count;
- r->addresses->addrs =
- gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs);
-
- for (size_t i = 0; i < r->addresses->naddrs; i++) {
+ r->addresses = grpc_addresses_create(path_parts.count);
+ for (size_t i = 0; i < r->addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
- if (!parse(&ith_uri,
- (struct sockaddr_storage *)(&r->addresses->addrs[i].addr),
- &r->addresses->addrs[i].len)) {
- errors_found = 1; /* GPR_TRUE */
+ if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addresses[i]
+ .address.addr),
+ &r->addresses->addresses[i].address.len)) {
+ errors_found = true;
}
gpr_free(part_str);
+ r->addresses->addresses[i].is_balancer = lb_enabled;
if (errors_found) break;
}
@@ -238,7 +235,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r->lb_policy_name);
- grpc_resolved_addresses_destroy(r->addresses);
+ grpc_addresses_destroy(r->addresses);
gpr_free(r);
return NULL;
}