From 8aace513d090679bd15ceb68eb4824cec3dc044e Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 15 Aug 2016 14:55:12 -0700 Subject: Introduced grpc_lb_policy_pick_args and added some LB docs --- src/core/ext/lb_policy/pick_first/pick_first.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c') diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9decf70692..e1277b353f 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -199,9 +199,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, + const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; @@ -225,13 +223,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pollent; + pp->pollent = pick_args->pollent; pp->target = target; - pp->initial_metadata_flags = initial_metadata_flags; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); -- cgit v1.2.3 From f655c85140986a4b1298b1a0820fe76d8b8c6409 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 6 Sep 2016 10:40:38 -0700 Subject: Add is_resolver bit to grpc_resolved_address. --- src/core/ext/lb_policy/grpclb/grpclb.c | 39 ++++++++++++++++------ src/core/ext/lb_policy/pick_first/pick_first.c | 15 +++++++-- src/core/ext/lb_policy/round_robin/round_robin.c | 15 +++++++-- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 11 +++--- src/core/lib/iomgr/resolve_address.h | 2 ++ src/core/lib/iomgr/resolve_address_posix.c | 1 + src/core/lib/iomgr/resolve_address_windows.c | 1 + src/core/lib/iomgr/unix_sockets_posix.c | 1 + 8 files changed, 65 insertions(+), 20 deletions(-) (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c') diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index af913d8a9d..4032b76ac9 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -309,6 +309,11 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); args.addresses->addrs[out_addrs_idx].len = sa_len; + // These are, of course, actually balancer addresses. However, we + // want the round_robin LB policy to treat them as normal backend + // addresses, since we don't need to talk to balancers in order to + // find the balancers themselves. + args.addresses->addrs[out_addrs_idx].is_balancer = false; ++out_addrs_idx; } else { gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", @@ -414,6 +419,20 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + // Count the number of gRPC-LB addresses. There must be at least one. + // TODO(roth): For now, we ignore non-balancer addresses, so there must be + // at least one balancer address. In the future, we may change the + // behavior such that we fall back to using the non-balancer addresses + // if we cannot reach any balancers. At that time, this should be + // changed to allow a list with no balancer addresses, since the + // resolver might fail to return a balancer address even when this is + // the right LB policy to use. + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < args->addresses->naddrs; ++i) { + if (args->addresses->addrs[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); memset(glb_policy, 0, sizeof(*glb_policy)); @@ -424,25 +443,25 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * Create a client channel over them to communicate with a LB service */ glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->addresses->naddrs == 0) { - return NULL; - } /* construct a target from the args->addresses, in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); + char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); addr_strs[0] = grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); + size_t addr_index = 1; for (size_t i = 1; i < args->addresses->naddrs; i++) { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[i], - (const struct sockaddr *)&args->addresses->addrs[i], - true) == 0); + if (args->addresses->addrs[i].is_balancer) { + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[addr_index++], + (const struct sockaddr *)&args->addresses->addrs[i], + true) == 0); + } } size_t uri_path_len; char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + (const char **)addr_strs, num_grpclb_addrs, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -450,7 +469,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < num_grpclb_addrs; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9decf70692..858a53ae55 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -443,17 +443,26 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->addresses->naddrs == 0) return NULL; + // Find the number of backend addresses. We ignore balancer + // addresses, since we don't know how to handle them. + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + if (!args->addresses->addrs[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < args->addresses->naddrs; i++) { + // Skip balancer addresses, since we only know how to handle backends. + if (args->addresses->addrs[i].is_balancer) continue; + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr_len = (size_t)args->addresses->addrs[i].len; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 7bcf608ab9..986a57bff9 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -571,16 +571,27 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); + // Find the number of backend addresses. We ignore balancer + // addresses, since we don't know how to handle them. + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + if (!args->addresses->addrs[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + gpr_malloc(sizeof(*p->subchannels) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < args->addresses->naddrs; i++) { + // Skip balancer addresses, since we only know how to handle backends. + if (args->addresses->addrs[i].is_balancer) continue; + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr_len = (size_t)args->addresses->addrs[i].len; diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 3807522d2b..f0748ef583 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -63,7 +63,7 @@ typedef struct { /** mutex guarding the rest of the state */ gpr_mu mu; /** have we published? */ - int published; + bool published; /** pending next completion, or NULL */ grpc_closure *next_completion; /** target result address for next completion */ @@ -102,7 +102,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; gpr_mu_lock(&r->mu); - r->published = 0; + r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); gpr_mu_unlock(&r->mu); } @@ -131,7 +131,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); - r->published = 1; + r->published = true; *r->target_result = result; grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; @@ -175,7 +175,7 @@ static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_resolver_args *args, const char *default_lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - int errors_found = 0; /* GPR_FALSE */ + bool errors_found = false; sockaddr_resolver *r; gpr_slice path_slice; gpr_slice_buffer path_parts; @@ -228,9 +228,10 @@ static grpc_resolver *sockaddr_create( if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), &r->addresses->addrs[i].len)) { - errors_found = 1; /* GPR_TRUE */ + errors_found = true; } gpr_free(part_str); + r->addresses->addrs[i].is_balancer = lb_enabled; if (errors_found) break; } diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index ddbe375755..796e75da91 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H #define GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H +#include #include #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" @@ -43,6 +44,7 @@ typedef struct { char addr[GRPC_MAX_SOCKADDR_SIZE]; size_t len; + bool is_balancer; } grpc_resolved_address; typedef struct { diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 4e9f978584..f69cc09593 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -132,6 +132,7 @@ static grpc_error *blocking_resolve_address_impl( for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); (*addresses)->addrs[i].len = resp->ai_addrlen; + (*addresses)->addrs[i].is_balancer = false; i++; } err = GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 2af8af82dc..e6bb209548 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -118,6 +118,7 @@ static grpc_error *blocking_resolve_address_impl( for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); (*addresses)->addrs[i].len = resp->ai_addrlen; + (*addresses)->addrs[i].is_balancer = false; i++; } diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 0e7670e5a5..2951286159 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -58,6 +58,7 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name, un->sun_family = AF_UNIX; strcpy(un->sun_path, name); (*addrs)->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + (*addrs)->addrs->is_balancer = false; return GRPC_ERROR_NONE; } -- cgit v1.2.3 From 989cdcd6103c258efdb6548248b68f01ec44a873 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 6 Sep 2016 13:28:28 -0700 Subject: clang-format --- src/core/ext/lb_policy/grpclb/grpclb.c | 4 ++-- src/core/ext/lb_policy/pick_first/pick_first.c | 3 +-- src/core/ext/lb_policy/round_robin/round_robin.c | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c') diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 4032b76ac9..5d3021b57e 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -460,8 +460,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, } } size_t uri_path_len; - char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, num_grpclb_addrs, ",", &uri_path_len); + char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs, + num_grpclb_addrs, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 858a53ae55..d374f0e4e5 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -454,8 +454,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 986a57bff9..2dc0a52744 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -582,8 +582,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * num_addrs); + p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs); memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; -- cgit v1.2.3 From e011b1e4cae5cedebea25f75fda69ba56b124572 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 7 Sep 2016 08:28:00 -0700 Subject: Move is_balancer into a new struct in the client_config directory. --- src/core/ext/client_config/lb_policy_factory.h | 4 +- src/core/ext/client_config/resolver_result.c | 24 ++++++++++ src/core/ext/client_config/resolver_result.h | 24 ++++++++++ src/core/ext/lb_policy/grpclb/grpclb.c | 51 ++++++++++------------ src/core/ext/lb_policy/pick_first/pick_first.c | 19 ++++---- src/core/ext/lb_policy/round_robin/round_robin.c | 19 ++++---- src/core/ext/resolver/dns/native/dns_resolver.c | 15 ++++--- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 22 ++++------ src/core/lib/iomgr/resolve_address.h | 2 - src/core/lib/iomgr/resolve_address_posix.c | 1 - src/core/lib/iomgr/resolve_address_windows.c | 1 - src/core/lib/iomgr/unix_sockets_posix.c | 1 - 12 files changed, 112 insertions(+), 71 deletions(-) (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c') diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index da1de3579a..6919b1eb98 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -36,7 +36,7 @@ #include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/lb_policy.h" -#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/ext/client_config/resolver_result.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -48,7 +48,7 @@ struct grpc_lb_policy_factory { }; typedef struct grpc_lb_policy_args { - grpc_resolved_addresses *addresses; + grpc_addresses *addresses; grpc_client_channel_factory *client_channel_factory; } grpc_lb_policy_args; diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c index c6c4166e83..e14f761f05 100644 --- a/src/core/ext/client_config/resolver_result.c +++ b/src/core/ext/client_config/resolver_result.c @@ -37,6 +37,30 @@ #include +grpc_addresses *grpc_addresses_create(size_t num_addresses) { + grpc_addresses *addresses = gpr_malloc(sizeof(grpc_addresses)); + addresses->num_addresses = num_addresses; + const size_t addresses_size = sizeof(grpc_address) * num_addresses; + addresses->addresses = gpr_malloc(addresses_size); + memset(addresses->addresses, 0, addresses_size); + return addresses; +} + +void grpc_addresses_set_address(grpc_addresses *addresses, size_t index, + void *address, size_t address_len, + bool is_balancer) { + GPR_ASSERT(index < addresses->num_addresses); + grpc_address *target = &addresses->addresses[index]; + memcpy(target->address.addr, address, address_len); + target->address.len = address_len; + target->is_balancer = is_balancer; +} + +void grpc_addresses_destroy(grpc_addresses *addresses) { + gpr_free(addresses->addresses); + gpr_free(addresses); +} + struct grpc_resolver_result { gpr_refcount refs; grpc_lb_policy *lb_policy; diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h index 402f7dbd7e..4199ef512a 100644 --- a/src/core/ext/client_config/resolver_result.h +++ b/src/core/ext/client_config/resolver_result.h @@ -34,7 +34,31 @@ #ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H #define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H +#include + #include "src/core/ext/client_config/lb_policy.h" +#include "src/core/lib/iomgr/resolve_address.h" + +/** Used to represent addresses returned by the resolver. */ +typedef struct grpc_address { + grpc_resolved_address address; + bool is_balancer; +} grpc_address; + +typedef struct grpc_addresses { + size_t num_addresses; + grpc_address *addresses; +} grpc_addresses; + +/** Returns a grpc_addresses struct with enough space for + \a num_addresses addresses. */ +grpc_addresses *grpc_addresses_create(size_t num_addresses); + +void grpc_addresses_set_address(grpc_addresses *addresses, size_t index, + void *address, size_t address_len, + bool is_balancer); + +void grpc_addresses_destroy(grpc_addresses *addresses); /** Results reported from a grpc_resolver. */ typedef struct grpc_resolver_result grpc_resolver_result; diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 5d3021b57e..61721a4a9e 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -296,10 +296,7 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args args; args.client_channel_factory = glb_policy->cc_factory; - args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - args.addresses->naddrs = serverlist->num_servers; - args.addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs); + args.addresses = grpc_addresses_create(serverlist->num_servers); size_t out_addrs_idx = 0; for (size_t i = 0; i < serverlist->num_servers; ++i) { grpc_uri uri; @@ -307,13 +304,12 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, size_t sa_len; uri.path = host_ports[i]; if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ - memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); - args.addresses->addrs[out_addrs_idx].len = sa_len; - // These are, of course, actually balancer addresses. However, we - // want the round_robin LB policy to treat them as normal backend - // addresses, since we don't need to talk to balancers in order to - // find the balancers themselves. - args.addresses->addrs[out_addrs_idx].is_balancer = false; + /* These are, of course, actually balancer addresses. However, we + * want the round_robin LB policy to treat them as normal backend + * addresses, since we don't need to talk to balancers in order to + * find the balancers themselves, so we set is_balancer=false. */ + grpc_addresses_set_address(args.addresses, out_addrs_idx, &sa, sa_len, + false /* is_balancer */); ++out_addrs_idx; } else { gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", @@ -328,8 +324,7 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, gpr_free(host_ports[i]); } gpr_free(host_ports); - gpr_free(args.addresses->addrs); - gpr_free(args.addresses); + grpc_addresses_destroy(args.addresses); return rr; } @@ -419,17 +414,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - // Count the number of gRPC-LB addresses. There must be at least one. - // TODO(roth): For now, we ignore non-balancer addresses, so there must be - // at least one balancer address. In the future, we may change the - // behavior such that we fall back to using the non-balancer addresses - // if we cannot reach any balancers. At that time, this should be - // changed to allow a list with no balancer addresses, since the - // resolver might fail to return a balancer address even when this is - // the right LB policy to use. + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. At that + * time, this should be changed to allow a list with no balancer addresses, + * since the resolver might fail to return a balancer address even when + * this is the right LB policy to use. */ size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < args->addresses->naddrs; ++i) { - if (args->addresses->addrs[i].is_balancer) ++num_grpclb_addrs; + for (size_t i = 0; i < args->addresses->num_addresses; ++i) { + if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs; } if (num_grpclb_addrs == 0) return NULL; @@ -448,14 +442,15 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); - addr_strs[0] = - grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); + addr_strs[0] = grpc_sockaddr_to_uri( + (const struct sockaddr *)&args->addresses->addresses[0].address.addr); size_t addr_index = 1; - for (size_t i = 1; i < args->addresses->naddrs; i++) { - if (args->addresses->addrs[i].is_balancer) { + for (size_t i = 1; i < args->addresses->num_addresses; i++) { + if (args->addresses->addresses[i].is_balancer) { GPR_ASSERT(grpc_sockaddr_to_string( &addr_strs[addr_index++], - (const struct sockaddr *)&args->addresses->addrs[i], + (const struct sockaddr *)&args->addresses->addresses[i] + .address.addr, true) == 0); } } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index d374f0e4e5..9f3a6939e6 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -443,11 +443,11 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - // Find the number of backend addresses. We ignore balancer - // addresses, since we don't know how to handle them. + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ size_t num_addrs = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { - if (!args->addresses->addrs[i].is_balancer) ++num_addrs; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (!args->addresses->addresses[i].is_balancer) ++num_addrs; } if (num_addrs == 0) return NULL; @@ -458,13 +458,14 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { - // Skip balancer addresses, since we only know how to handle backends. - if (args->addresses->addrs[i].is_balancer) continue; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + /* Skip balancer addresses, since we only know how to handle backends. */ + if (args->addresses->addresses[i].is_balancer) continue; memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + sc_args.addr = + (struct sockaddr *)(&args->addresses->addresses[i].address.addr); + sc_args.addr_len = args->addresses->addresses[i].address.len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 2dc0a52744..fc1534887a 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -571,11 +571,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - // Find the number of backend addresses. We ignore balancer - // addresses, since we don't know how to handle them. + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ size_t num_addrs = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { - if (!args->addresses->addrs[i].is_balancer) ++num_addrs; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (!args->addresses->addresses[i].is_balancer) ++num_addrs; } if (num_addrs == 0) return NULL; @@ -587,13 +587,14 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { - // Skip balancer addresses, since we only know how to handle backends. - if (args->addresses->addrs[i].is_balancer) continue; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + /* Skip balancer addresses, since we only know how to handle backends. */ + if (args->addresses->addresses[i].is_balancer) continue; memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + sc_args.addr = + (struct sockaddr *)(&args->addresses->addresses[i].address.addr); + sc_args.addr_len = args->addresses->addresses[i].address.len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 79682e78b5..8fc10d98a8 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -170,20 +170,25 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = 0; - grpc_resolved_addresses *addresses = r->addresses; - if (addresses != NULL) { + if (r->addresses != NULL) { grpc_lb_policy_args lb_policy_args; - result = grpc_resolver_result_create(); memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.addresses = addresses; + lb_policy_args.addresses = grpc_addresses_create(r->addresses->naddrs); + for (size_t i = 0; i < r->addresses->naddrs; ++i) { + grpc_addresses_set_address( + lb_policy_args.addresses, i, &r->addresses->addrs[i].addr, + r->addresses->addrs[i].len, false /* is_balancer */); + } + grpc_resolved_addresses_destroy(r->addresses); lb_policy_args.client_channel_factory = r->client_channel_factory; lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + grpc_addresses_destroy(lb_policy_args.addresses); + result = grpc_resolver_result_create(); if (lb_policy != NULL) { grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); } - grpc_resolved_addresses_destroy(addresses); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index f0748ef583..94d2f892eb 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -58,7 +58,7 @@ typedef struct { char *lb_policy_name; /** the addresses that we've 'resolved' */ - grpc_resolved_addresses *addresses; + grpc_addresses *addresses; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -142,7 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory); - grpc_resolved_addresses_destroy(r->addresses); + grpc_addresses_destroy(r->addresses); gpr_free(r->lb_policy_name); gpr_free(r); } @@ -216,22 +216,18 @@ static grpc_resolver *sockaddr_create( gpr_slice_buffer_init(&path_parts); gpr_slice_split(path_slice, ",", &path_parts); - r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - r->addresses->naddrs = path_parts.count; - r->addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs); - - for (size_t i = 0; i < r->addresses->naddrs; i++) { + r->addresses = grpc_addresses_create(path_parts.count); + for (size_t i = 0; i < r->addresses->num_addresses; i++) { grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; - if (!parse(&ith_uri, - (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), - &r->addresses->addrs[i].len)) { + if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addresses[i] + .address.addr), + &r->addresses->addresses[i].address.len)) { errors_found = true; } gpr_free(part_str); - r->addresses->addrs[i].is_balancer = lb_enabled; + r->addresses->addresses[i].is_balancer = lb_enabled; if (errors_found) break; } @@ -239,7 +235,7 @@ static grpc_resolver *sockaddr_create( gpr_slice_unref(path_slice); if (errors_found) { gpr_free(r->lb_policy_name); - grpc_resolved_addresses_destroy(r->addresses); + grpc_addresses_destroy(r->addresses); gpr_free(r); return NULL; } diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index 796e75da91..ddbe375755 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -34,7 +34,6 @@ #ifndef GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H #define GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H -#include #include #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" @@ -44,7 +43,6 @@ typedef struct { char addr[GRPC_MAX_SOCKADDR_SIZE]; size_t len; - bool is_balancer; } grpc_resolved_address; typedef struct { diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index f69cc09593..4e9f978584 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -132,7 +132,6 @@ static grpc_error *blocking_resolve_address_impl( for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); (*addresses)->addrs[i].len = resp->ai_addrlen; - (*addresses)->addrs[i].is_balancer = false; i++; } err = GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index e6bb209548..2af8af82dc 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -118,7 +118,6 @@ static grpc_error *blocking_resolve_address_impl( for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); (*addresses)->addrs[i].len = resp->ai_addrlen; - (*addresses)->addrs[i].is_balancer = false; i++; } diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 2951286159..0e7670e5a5 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -58,7 +58,6 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name, un->sun_family = AF_UNIX; strcpy(un->sun_path, name); (*addrs)->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; - (*addrs)->addrs->is_balancer = false; return GRPC_ERROR_NONE; } -- cgit v1.2.3 From 331b9c02f9372ef832e612d48b90991c181ba8a7 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 12 Sep 2016 18:37:05 -0700 Subject: Moved LB token changes solely into grpclb.c --- src/core/ext/client_config/client_channel.c | 2 +- src/core/ext/client_config/lb_policy.c | 5 +- src/core/ext/client_config/lb_policy.h | 20 +- src/core/ext/client_config/lb_policy_factory.h | 25 ++- src/core/ext/lb_policy/grpclb/grpclb.c | 240 ++++++++++++++------- src/core/ext/lb_policy/pick_first/pick_first.c | 18 +- src/core/ext/lb_policy/round_robin/round_robin.c | 89 +++----- src/core/ext/resolver/dns/native/dns_resolver.c | 9 +- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 10 +- src/core/lib/transport/metadata.c | 18 +- src/core/lib/transport/metadata.h | 2 +- 11 files changed, 269 insertions(+), 169 deletions(-) (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c') diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 8f1c821ebb..58ef629be0 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -572,7 +572,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, initial_metadata_flags, &calld->lb_token_mdelem}; r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, - on_ready); + NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); return r; diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index 71170f5655..903563ef6b 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -101,9 +101,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { - return policy->vtable->pick(exec_ctx, policy, pick_args, target, on_complete); + return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data, + on_complete); } void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 6f133a2948..734b3bff97 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -72,7 +72,8 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_pick */ int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, grpc_closure *on_complete); + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete); /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, @@ -138,14 +139,19 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); /** Find an appropriate target for this call, based on \a pick_args. - Upon completion \a on_complete will be called, with \a *target set to an - appropriate connected subchannel if the pick was successful or NULL - otherwise. - Picking can be asynchronous. Any IO should be done under \a - pick_args->pollent. */ + Picking can be synchronous or asynchronous. In the synchronous case, when a + pick is readily available, it'll be returned in \a target and a non-zero + value will be returned. + In the asynchronous case, zero is returned and \a on_complete will be called + once \a target and \a user_data have been set. Any IO should be done under + \a + pick_args->pollent. + The opaque \a user_data output argument corresponds to information that may + need be propagated from the LB policy. It may be NULL. + Errors are signaled by receiving a NULL \a *target. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index 2125eaba70..e1d67633b4 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -47,16 +47,25 @@ struct grpc_lb_policy_factory { const grpc_lb_policy_factory_vtable *vtable; }; -typedef struct grpc_lb_policy_address_token { - uint8_t *token; - size_t token_size; -} grpc_lb_policy_address_token; +/** A resolved address alongside any LB related information associated with it. + * \a user_data, if not \a NULL, is opaque and meant to be consumed by the gRPC + * LB policy. Anywhere else, refer to the functions in \a + * grpc_lb_policy_user_data_vtable to operate with it */ +typedef struct grpc_lb_address { + grpc_resolved_address *resolved_address; + void *user_data; +} grpc_lb_address; + +/** Functions acting upon the opaque \a grpc_lb_address.user_data */ +typedef struct grpc_lb_policy_user_data_vtable { + void *(*copy)(void *); + void (*destroy)(void *); +} grpc_lb_policy_user_data_vtable; typedef struct grpc_lb_policy_args { - grpc_resolved_addresses *addresses; - /* If not NULL, array of load balancing tokens associated with \a addresses, - * on a 1:1 correspondence. Some indices may be NULL for missing tokens. */ - grpc_lb_policy_address_token *tokens; + grpc_lb_address *lb_addresses; + size_t num_addresses; + grpc_lb_policy_user_data_vtable user_data_vtable; grpc_client_channel_factory *client_channel_factory; } grpc_lb_policy_args; diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index c3294b7988..ad070e458a 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -116,14 +116,50 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/static_metadata.h" int grpc_lb_glb_trace = 0; +static void *user_data_copy(void *user_data) { + if (user_data == NULL) return NULL; + return GRPC_MDELEM_REF(user_data); +} + +static void user_data_destroy(void *user_data) { + if (user_data == NULL) return; + GRPC_MDELEM_UNREF(user_data); +} + +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static void initial_metadata_add_lb_token( + grpc_metadata_batch *initial_metadata, + grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != NULL); + GPR_ASSERT(lb_token != NULL); + grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + typedef struct wrapped_rr_closure_arg { /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; + /* the pick's initial metadata, kept in order to append the LB token for the + * pick */ + grpc_metadata_batch *initial_metadata; + + /* the picked target, used to determine which LB token to add to the pick's + * initial metadata */ + grpc_connected_subchannel **target; + + /* the LB token associated with the pick */ + grpc_mdelem *lb_token; + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; @@ -146,6 +182,11 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); } GPR_ASSERT(wc_arg->wrapped_closure != NULL); + + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + wc_arg->lb_token); + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL); gpr_free(wc_arg->owning_pending_node); } @@ -194,12 +235,15 @@ static void add_pending_pick(pending_pick **root, memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->pollent = pick_args->pollent; pp->target = target; pp->initial_metadata = pick_args->initial_metadata; pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; + pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; + pp->wrapped_on_complete_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; @@ -285,37 +329,90 @@ struct rr_connectivity_data { glb_lb_policy *glb_policy; }; -static bool process_serverlist(const grpc_grpclb_server *server, - struct sockaddr_storage *sa, size_t *sa_len) { - if (server->port >> 16 != 0) { - gpr_log(GPR_ERROR, "Invalid port '%d'.", server->port); - return false; +/* populate \a addresses according to \a serverlist. Returns the number of + * addresses successfully parsed and added to \a addresses */ +static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, + grpc_lb_address **lb_addresses) { + size_t num_valid = 0; + /* first pass: count how many are valid in order to allocate the necessary + * memory in a single block */ + for (size_t i = 0; i < serverlist->num_servers; ++i) { + const grpc_grpclb_server *server = serverlist->servers[i]; + const grpc_grpclb_ip_address *ip = &server->ip_address; + + if (server->port >> 16 != 0) { + gpr_log(GPR_ERROR, + "Invalid port '%d' at index %zu of serverlist. Ignoring.", + server->port, i); + continue; + } + + if (ip->size != 4 && ip->size != 16) { + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes, got %d at index %zu of " + "serverlist. Ignoring", + ip->size, i); + continue; + } + ++num_valid; } - const uint16_t netorder_port = htons((uint16_t)server->port); - /* the addresses are given in binary format (a in(6)_addr struct) in - * server->ip_address.bytes. */ - const grpc_grpclb_ip_address *ip = &server->ip_address; - *sa_len = 0; - if (ip->size == 4) { - struct sockaddr_in *addr4 = (struct sockaddr_in *)sa; - *sa_len = sizeof(struct sockaddr_in); - memset(addr4, 0, *sa_len); - addr4->sin_family = AF_INET; - memcpy(&addr4->sin_addr, ip->bytes, ip->size); - addr4->sin_port = netorder_port; - } else if (ip->size == 6) { - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa; - *sa_len = sizeof(struct sockaddr_in6); - memset(addr6, 0, *sa_len); - addr6->sin6_family = AF_INET; - memcpy(&addr6->sin6_addr, ip->bytes, ip->size); - addr6->sin6_port = netorder_port; - } else { - gpr_log(GPR_ERROR, "Expected IP to be 4 or 16 bytes. Got %d.", ip->size); - return false; + if (num_valid == 0) { + return 0; } - GPR_ASSERT(*sa_len > 0); - return true; + + /* allocate the memory block for the "resolved" addresses. */ + grpc_resolved_address *r_addrs_memblock = + gpr_malloc(sizeof(grpc_resolved_address) * num_valid); + memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid); + grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid); + memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid); + + /* second pass: actually populate the addresses and LB tokens (aka user data + * to the outside world) to be read by the RR policy during its creation */ + for (size_t i = 0; i < num_valid; ++i) { + const grpc_grpclb_server *server = serverlist->servers[i]; + grpc_lb_address *const lb_addr = &lb_addrs[i]; + + /* lb token processing */ + if (server->has_load_balance_token) { + const size_t lb_token_size = + GPR_ARRAY_SIZE(server->load_balance_token) - 1; + grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( + (uint8_t *)server->load_balance_token, lb_token_size); + lb_addr->user_data = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); + } + + /* address processing */ + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + + lb_addr->resolved_address = &r_addrs_memblock[i]; + struct sockaddr_storage *sa = + (struct sockaddr_storage *)lb_addr->resolved_address->addr; + size_t *sa_len = &lb_addr->resolved_address->len; + *sa_len = 0; + if (ip->size == 4) { + struct sockaddr_in *addr4 = (struct sockaddr_in *)sa; + *sa_len = sizeof(struct sockaddr_in); + memset(addr4, 0, *sa_len); + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa; + *sa_len = sizeof(struct sockaddr_in6); + memset(addr6, 0, *sa_len); + addr6->sin6_family = AF_INET; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } + GPR_ASSERT(*sa_len > 0); + } + *lb_addresses = lb_addrs; + return num_valid; } static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, @@ -326,36 +423,19 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; - args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) * - serverlist->num_servers); - args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - args.addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * serverlist->num_servers); - size_t addr_idx = 0; - for (size_t i = 0; i < serverlist->num_servers; ++i) { - const grpc_grpclb_server *server = serverlist->servers[i]; - grpc_resolved_address *raddr = &args.addresses->addrs[addr_idx]; - if (!process_serverlist(server, (struct sockaddr_storage *)raddr->addr, - &raddr->len)) { - gpr_log(GPR_INFO, - "Problem processing server at index %zu of received serverlist, " - "ignoring.", - i); - continue; - } - ++addr_idx; - args.tokens[i].token_size = GPR_ARRAY_SIZE(server->load_balance_token) - 1; - args.tokens[i].token = gpr_malloc(args.tokens[i].token_size); - memcpy(args.tokens[i].token, server->load_balance_token, - args.tokens[i].token_size); - } - args.addresses->naddrs = addr_idx; + args.num_addresses = process_serverlist(serverlist, &args.lb_addresses); + args.user_data_vtable.copy = user_data_copy; + args.user_data_vtable.destroy = user_data_destroy; grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - - gpr_free(args.addresses->addrs); - gpr_free(args.addresses); - gpr_free(args.tokens); + if (args.num_addresses > 0) { + /* free "resolved" addresses memblock */ + gpr_free(args.lb_addresses->resolved_address); + } + for (size_t i = 0; i < args.num_addresses; ++i) { + args.user_data_vtable.destroy(args.lb_addresses[i].user_data); + } + gpr_free(args.lb_addresses); return rr; } @@ -395,6 +475,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, pp->lb_token_mdelem_storage}; grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, + (void **)&pp->wrapped_on_complete_arg.lb_token, &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; } @@ -457,25 +538,26 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * Create a client channel over them to communicate with a LB service */ glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->addresses->naddrs == 0) { + if (args->num_addresses == 0) { return NULL; } - /* construct a target from the args->addresses, in the form + /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); - addr_strs[0] = - grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); - for (size_t i = 1; i < args->addresses->naddrs; i++) { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[i], - (const struct sockaddr *)&args->addresses->addrs[i], - true) == 0); + char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses); + addr_strs[0] = grpc_sockaddr_to_uri( + (const struct sockaddr *)&args->lb_addresses[0].resolved_address->addr); + for (size_t i = 1; i < args->num_addresses; i++) { + GPR_ASSERT( + grpc_sockaddr_to_string(&addr_strs[i], + (const struct sockaddr *)&args->lb_addresses[i] + .resolved_address->addr, + true) == 0); } size_t uri_path_len; char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + (const char **)addr_strs, args->num_addresses, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -483,7 +565,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->num_addresses; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); @@ -635,7 +717,7 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; @@ -662,22 +744,28 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; glb_policy->wc_arg.wrapped_closure = on_complete; + glb_policy->wc_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; + glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; + glb_policy->wc_arg.owning_pending_node = NULL; grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, + (void **)&glb_policy->wc_arg.lb_token, &glb_policy->wrapped_on_complete); if (r != 0) { - /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR - * policy and notify the original callback */ - glb_policy->wc_arg.wrapped_closure = NULL; + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)glb_policy->wc_arg.rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); - grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure, - GRPC_ERROR_NONE, NULL); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + glb_policy->wc_arg.lb_token); } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index e1277b353f..21d948033a 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -200,7 +200,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; @@ -438,23 +438,23 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->lb_addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->addresses->naddrs == 0) return NULL; + if (args->num_addresses == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses); + memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + sc_args.addr = + (struct sockaddr *)(args->lb_addresses[i].resolved_address->addr); + sc_args.addr_len = (size_t)args->lb_addresses[i].resolved_address->len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 8fda405fb8..2069dc192c 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -84,8 +84,10 @@ typedef struct pending_pick { /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; - /* storage for the lb token initial metadata mdelem */ - grpc_linked_mdelem *lb_token_mdelem_storage; + /* output argument where to store the pick()ed user_data. It'll be NULL if no + * such data is present or there's an error (the definite test for errors is + * \a target being NULL). */ + void **user_data; /* bitmask passed to pick() and used for selective cancelling. See * grpc_lb_policy_cancel_picks() */ @@ -103,7 +105,7 @@ typedef struct pending_pick { typedef struct ready_list { grpc_subchannel *subchannel; /* references namesake entry in subchannel_data */ - grpc_lb_policy_address_token *lb_token; + void *user_data; struct ready_list *next; struct ready_list *prev; } ready_list; @@ -121,8 +123,8 @@ typedef struct { ready_list *ready_list_node; /** last observed connectivity */ grpc_connectivity_state connectivity_state; - /** the subchannel's target LB token */ - grpc_lb_policy_address_token *lb_token; + /** the subchannel's target user data */ + void *user_data; } subchannel_data; struct round_robin_lb_policy { @@ -131,8 +133,10 @@ struct round_robin_lb_policy { /** total number of addresses received at creation time */ size_t num_addresses; - /** load balancing tokens, one per incoming address */ - grpc_lb_policy_address_token *lb_tokens; + /** user data, one per incoming address */ + void **user_data; + /** functions to operate over \a user_data elements */ + grpc_lb_policy_user_data_vtable user_data_vtable; /** all our subchannels */ size_t num_subchannels; @@ -204,7 +208,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, ready_list *new_elem = gpr_malloc(sizeof(ready_list)); memset(new_elem, 0, sizeof(ready_list)); new_elem->subchannel = sd->subchannel; - new_elem->lb_token = sd->lb_token; + new_elem->user_data = sd->user_data; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -246,7 +250,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, + gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, (void *)node->subchannel); } @@ -259,9 +263,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); gpr_free(sd); @@ -282,12 +285,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { elem = tmp; } - if (p->lb_tokens != NULL) { - for (i = 0; i < p->num_addresses; i++) { - gpr_free(p->lb_tokens[i].token); - } - gpr_free(p->lb_tokens); + for (size_t i = 0; i < p->num_addresses; i++) { + p->user_data_vtable.destroy(p->user_data[i]); } + gpr_free(p->user_data); gpr_free(p); } @@ -397,26 +398,9 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -/* add lb_token of selected subchannel (address) to the call's initial - * metadata */ -static void initial_metadata_add_lb_token( - grpc_metadata_batch *initial_metadata, - grpc_linked_mdelem *lb_token_mdelem_storage, - grpc_lb_policy_address_token *lb_token) { - if (lb_token != NULL && lb_token->token_size > 0) { - GPR_ASSERT(lb_token->token != NULL); - grpc_mdstr *lb_token_mdstr = - grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size); - grpc_metadata_batch_add_tail( - initial_metadata, lb_token_mdelem_storage, - grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL, - lb_token_mdstr)); - } -} - static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; @@ -426,9 +410,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* readily available, report right away */ gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); - initial_metadata_add_lb_token(pick_args->initial_metadata, - pick_args->lb_token_mdelem_storage, - selected->lb_token); + *user_data = p->user_data_vtable.copy(selected->user_data); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", @@ -451,7 +433,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->on_complete = on_complete; pp->initial_metadata = pick_args->initial_metadata; pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + pp->user_data = user_data; p->pending_picks = pp; gpr_mu_unlock(&p->mu); return 0; @@ -493,11 +475,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - initial_metadata_add_lb_token(pp->initial_metadata, - pp->lb_token_mdelem_storage, - selected->lb_token); *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + *pp->user_data = p->user_data_vtable.copy(selected->user_data); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", @@ -631,30 +611,29 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->lb_addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); + if (args->num_addresses == 0) return NULL; round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->num_addresses = args->addresses->naddrs; - if (args->tokens != NULL) { - /* we need to copy because args contents aren't owned */ - p->lb_tokens = - gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses); - memcpy(p->lb_tokens, args->tokens, - sizeof(grpc_lb_policy_address_token) * p->num_addresses); - } - + p->num_addresses = args->num_addresses; p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses); memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses); + p->user_data = gpr_malloc(sizeof(void *) * p->num_addresses); + memset(p->user_data, 0, sizeof(void *) * p->num_addresses); + p->user_data_vtable = args->user_data_vtable; grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < p->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + sc_args.addr = + (struct sockaddr *)args->lb_addresses[i].resolved_address->addr; + sc_args.addr_len = args->lb_addresses[i].resolved_address->len; + + p->user_data[i] = p->user_data_vtable.copy(args->lb_addresses[i].user_data); grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); @@ -666,9 +645,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - if (p->lb_tokens != NULL) { - sd->lb_token = &p->lb_tokens[i]; - } + sd->user_data = p->user_data[i]; ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 79682e78b5..1841a75845 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -175,7 +175,14 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_args lb_policy_args; result = grpc_resolver_result_create(); memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.addresses = addresses; + lb_policy_args.num_addresses = addresses->naddrs; + lb_policy_args.lb_addresses = + gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses); + memset(lb_policy_args.lb_addresses, 0, + sizeof(grpc_lb_address) * lb_policy_args.num_addresses); + for (size_t i = 0; i < addresses->naddrs; ++i) { + lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i]; + } lb_policy_args.client_channel_factory = r->client_channel_factory; lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 3807522d2b..54a7e1c84c 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -125,10 +125,18 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver_result *result = grpc_resolver_result_create(); grpc_lb_policy_args lb_policy_args; memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.addresses = r->addresses; + lb_policy_args.num_addresses = r->addresses->naddrs; + lb_policy_args.lb_addresses = + gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses); + memset(lb_policy_args.lb_addresses, 0, + sizeof(grpc_lb_address) * lb_policy_args.num_addresses); + for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) { + lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i]; + } lb_policy_args.client_channel_factory = r->client_channel_factory; grpc_lb_policy *lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + gpr_free(lb_policy_args.lb_addresses); grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 0677f29766..4b40c275ad 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -278,7 +278,7 @@ static void ref_md_locked(mdtab_shard *shard, internal_metadata *md DEBUG_ARGS) { #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM REF:%p:%d->%d: '%s' = '%s'", md, + "ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) + 1, grpc_mdstr_as_c_string((grpc_mdstr *)md->key), @@ -566,7 +566,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdstr *mkey, shard->elems[idx] = md; gpr_mu_init(&md->mu_user_data); #ifdef GRPC_METADATA_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "ELM NEW:%p:%d: '%s' = '%s'", md, + gpr_log(GPR_DEBUG, "ELM NEW:%p:%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); @@ -639,7 +639,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { if (is_mdelem_static(gmd)) return gmd; #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM REF:%p:%d->%d: '%s' = '%s'", md, + "ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) + 1, grpc_mdstr_as_c_string((grpc_mdstr *)md->key), @@ -649,7 +649,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { this function - meaning that no adjustment to mdtab_free is necessary, simplifying the logic here to be just an atomic increment */ /* use C assert to have this removed in opt builds */ - assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); + GPR_ASSERT(gpr_atm_no_barrier_load(&md->refcnt) >= 1); gpr_atm_no_barrier_fetch_add(&md->refcnt, 1); return gmd; } @@ -660,14 +660,16 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { if (is_mdelem_static(gmd)) return; #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM UNREF:%p:%d->%d: '%s' = '%s'", md, + "ELM UNREF:%p:%zu->%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) - 1, grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); #endif uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash); - if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { + const gpr_atm prev_refcount = gpr_atm_full_fetch_add(&md->refcnt, -1); + GPR_ASSERT(prev_refcount >= 1); + if (1 == prev_refcount) { /* once the refcount hits zero, some other thread can come along and free md at any time: it's unsafe from this point on to access it */ mdtab_shard *shard = @@ -676,10 +678,12 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { } } -const char *grpc_mdstr_as_c_string(grpc_mdstr *s) { +const char *grpc_mdstr_as_c_string(const grpc_mdstr *s) { return (const char *)GPR_SLICE_START_PTR(s->slice); } +size_t grpc_mdstr_length(const grpc_mdstr *s) { return GRPC_MDSTR_LENGTH(s); } + grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) { internal_string *s = (internal_string *)gs; if (is_mdstr_static(gs)) return gs; diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index 2b0921c8d7..71eff0acf2 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -147,7 +147,7 @@ void grpc_mdelem_unref(grpc_mdelem *md); /* Recover a char* from a grpc_mdstr. The returned string is null terminated. Does not promise that the returned string has no embedded nulls however. */ -const char *grpc_mdstr_as_c_string(grpc_mdstr *s); +const char *grpc_mdstr_as_c_string(const grpc_mdstr *s); #define GRPC_MDSTR_LENGTH(s) (GPR_SLICE_LENGTH(s->slice)) -- cgit v1.2.3 From f47d6fbdccd6e3c64b0ff70b6a63236819886540 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 14 Sep 2016 12:59:17 -0700 Subject: More PR comments --- src/core/ext/client_config/lb_policy_factory.h | 15 +-- src/core/ext/lb_policy/grpclb/grpclb.c | 107 ++++++++++++--------- src/core/ext/lb_policy/pick_first/pick_first.c | 9 +- src/core/ext/lb_policy/round_robin/round_robin.c | 9 +- src/core/ext/resolver/dns/native/dns_resolver.c | 7 +- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 8 +- test/cpp/grpclb/grpclb_test.cc | 63 ++++++------ 7 files changed, 118 insertions(+), 100 deletions(-) (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c') diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index d2d1a0f16e..7191ca7d89 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -48,22 +48,17 @@ struct grpc_lb_policy_factory { }; /** A resolved address alongside any LB related information associated with it. - * \a user_data, if not \a NULL, is opaque and meant to be consumed by the gRPC - * LB policy. Anywhere else, refer to the functions in \a - * grpc_lb_policy_user_data_vtable to operate with it */ + * \a user_data, if not NULL, contains opaque data meant to be consumed by the + * gRPC LB policy. Note that no all LB policies support \a user_data as input. + * Those who don't will simply ignore it and will correspondingly return NULL in + * their namesake pick() output argument. */ typedef struct grpc_lb_address { grpc_resolved_address *resolved_address; void *user_data; } grpc_lb_address; -/** Functions acting upon the opaque \a grpc_lb_address.user_data */ -typedef struct grpc_lb_policy_user_data_vtable { - void *(*copy)(void *); - void (*destroy)(void *); -} grpc_lb_policy_user_data_vtable; - typedef struct grpc_lb_policy_args { - grpc_lb_address *lb_addresses; + grpc_lb_address *addresses; size_t num_addresses; grpc_client_channel_factory *client_channel_factory; } grpc_lb_policy_args; diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 9a176ba0d1..1150451116 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -125,9 +125,16 @@ static void *user_data_copy(void *user_data) { return GRPC_MDELEM_REF(user_data); } -static void user_data_destroy(void *user_data) { - if (user_data == NULL) return; - GRPC_MDELEM_UNREF(user_data); +static void lb_addrs_destroy(grpc_lb_address *lb_addresses, + size_t num_addresses) { + /* free "resolved" addresses memblock */ + gpr_free(lb_addresses->resolved_address); + for (size_t i = 0; i < num_addresses; ++i) { + if (lb_addresses[i].user_data != NULL) { + GRPC_MDELEM_UNREF(lb_addresses[i].user_data); + } + } + gpr_free(lb_addresses); } /* add lb_token of selected subchannel (address) to the call's initial @@ -385,21 +392,12 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, * Given that the validity tests are very cheap, they are performed again * instead of marking the valid ones during the first pass, as this would * incurr in an allocation due to the arbitrary number of server */ - size_t num_processed = 0; - for (size_t i = 0; i < num_valid; ++i) { - const grpc_grpclb_server *server = serverlist->servers[i]; - if (!is_server_valid(serverlist->servers[i], i, false)) continue; - grpc_lb_address *const lb_addr = &lb_addrs[i]; - - /* lb token processing */ - if (server->has_load_balance_token) { - const size_t lb_token_size = - GPR_ARRAY_SIZE(server->load_balance_token) - 1; - grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( - (uint8_t *)server->load_balance_token, lb_token_size); - lb_addr->user_data = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); - } + size_t addr_idx = 0; + for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { + GPR_ASSERT(addr_idx < num_valid); + const grpc_grpclb_server *server = serverlist->servers[sl_idx]; + if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; + grpc_lb_address *const lb_addr = &lb_addrs[addr_idx]; /* address processing */ const uint16_t netorder_port = htons((uint16_t)server->port); @@ -407,7 +405,7 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, * server->ip_address.bytes. */ const grpc_grpclb_ip_address *ip = &server->ip_address; - lb_addr->resolved_address = &r_addrs_memblock[i]; + lb_addr->resolved_address = &r_addrs_memblock[addr_idx]; struct sockaddr_storage *sa = (struct sockaddr_storage *)lb_addr->resolved_address->addr; size_t *sa_len = &lb_addr->resolved_address->len; @@ -428,9 +426,25 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, addr6->sin6_port = netorder_port; } GPR_ASSERT(*sa_len > 0); - ++num_processed; + + /* lb token processing */ + if (server->has_load_balance_token) { + const size_t lb_token_size = + GPR_ARRAY_SIZE(server->load_balance_token) - 1; + grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( + (uint8_t *)server->load_balance_token, lb_token_size); + lb_addr->user_data = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); + } else { + gpr_log(GPR_ERROR, + "Missing LB token for backend address '%s'. The empty token will " + "be used instead", + grpc_sockaddr_to_uri((struct sockaddr *)sa)); + lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; + } + ++addr_idx; } - GPR_ASSERT(num_processed == num_valid); + GPR_ASSERT(addr_idx == num_valid); *lb_addresses = lb_addrs; return num_valid; } @@ -444,26 +458,19 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; const size_t num_ok_addresses = - process_serverlist(serverlist, &args.lb_addresses); + process_serverlist(serverlist, &args.addresses); args.num_addresses = num_ok_addresses; grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - glb_policy->num_ok_serverlist_addresses = num_ok_addresses; if (glb_policy->lb_addresses != NULL) { /* dispose of the previous version */ - for (size_t i = 0; i < num_ok_addresses; ++i) { - user_data_destroy(glb_policy->lb_addresses[i].user_data); - } - gpr_free(glb_policy->lb_addresses); + lb_addrs_destroy(glb_policy->lb_addresses, + glb_policy->num_ok_serverlist_addresses); } + glb_policy->num_ok_serverlist_addresses = num_ok_addresses; + glb_policy->lb_addresses = args.addresses; - glb_policy->lb_addresses = args.lb_addresses; - - if (args.num_addresses > 0) { - /* free "resolved" addresses memblock */ - gpr_free(args.lb_addresses->resolved_address); - } return rr; } @@ -560,7 +567,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, memset(glb_policy, 0, sizeof(*glb_policy)); /* All input addresses in args->addresses come from a resolver that claims - * they are LB services. It's the resolver's responsibility to make sure this + * they are LB services. It's the resolver's responsibility to make sure + * this * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ @@ -570,18 +578,24 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, return NULL; } + /* this LB policy doesn't support \a user_data */ + GPR_ASSERT(args->addresses[0].user_data == NULL); + /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses); addr_strs[0] = grpc_sockaddr_to_uri( - (const struct sockaddr *)&args->lb_addresses[0].resolved_address->addr); + (const struct sockaddr *)&args->addresses[0].resolved_address->addr); for (size_t i = 1; i < args->num_addresses; i++) { + /* this LB policy doesn't support \a user_data */ + GPR_ASSERT(args->addresses[i].user_data == NULL); + GPR_ASSERT( - grpc_sockaddr_to_string(&addr_strs[i], - (const struct sockaddr *)&args->lb_addresses[i] - .resolved_address->addr, - true) == 0); + grpc_sockaddr_to_string( + &addr_strs[i], + (const struct sockaddr *)&args->addresses[i].resolved_address->addr, + true) == 0); } size_t uri_path_len; char *target_uri_str = gpr_strjoin_sep( @@ -630,10 +644,8 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } gpr_mu_destroy(&glb_policy->mu); - for (size_t i = 0; i < glb_policy->num_ok_serverlist_addresses; ++i) { - user_data_destroy(glb_policy->lb_addresses[i].user_data); - } - gpr_free(glb_policy->lb_addresses); + lb_addrs_destroy(glb_policy->lb_addresses, + glb_policy->num_ok_serverlist_addresses); gpr_free(glb_policy); } @@ -882,7 +894,8 @@ typedef struct lb_client_data { grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */ grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */ - /* what's being sent to the LB server. Note that its value may vary if the LB + /* what's being sent to the LB server. Note that its value may vary if the + * LB * server indicates a redirect. */ grpc_byte_buffer *request_payload; @@ -1090,7 +1103,8 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { * it'll just create the first RR policy instance */ rr_handover(exec_ctx, lb_client->glb_policy, error); } else { - /* unref the RR policy, eventually leading to its substitution with a + /* unref the RR policy, eventually leading to its substitution with + * a * new one constructed from the received serverlist (see * glb_rr_connectivity_changed) */ GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, @@ -1155,7 +1169,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, lb_client->status, lb_client->status_details, lb_client->status_details_capacity); } - /* TODO(dgq): deal with stream termination properly (fire up another one? fail + /* TODO(dgq): deal with stream termination properly (fire up another one? + * fail * the original call?) */ } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 21d948033a..d907ddd5d1 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -438,7 +438,7 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->lb_addresses != NULL); + GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); if (args->num_addresses == 0) return NULL; @@ -451,10 +451,13 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < args->num_addresses; i++) { + /* this LB policy doesn't support \a user_data */ + GPR_ASSERT(args->addresses[i].user_data == NULL); + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = - (struct sockaddr *)(args->lb_addresses[i].resolved_address->addr); - sc_args.addr_len = (size_t)args->lb_addresses[i].resolved_address->len; + (struct sockaddr *)(args->addresses[i].resolved_address->addr); + sc_args.addr_len = (size_t)args->addresses[i].resolved_address->len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 4d89cef9b6..1351ff0277 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -603,7 +603,7 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->lb_addresses != NULL); + GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); if (args->num_addresses == 0) return NULL; @@ -620,11 +620,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, size_t subchannel_idx = 0; for (size_t i = 0; i < p->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = - (struct sockaddr *)args->lb_addresses[i].resolved_address->addr; - sc_args.addr_len = args->lb_addresses[i].resolved_address->len; + sc_args.addr = (struct sockaddr *)args->addresses[i].resolved_address->addr; + sc_args.addr_len = args->addresses[i].resolved_address->len; - p->user_data[i] = args->lb_addresses[i].user_data; + p->user_data[i] = args->addresses[i].user_data; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 1841a75845..32e9de69a6 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -176,16 +176,17 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, result = grpc_resolver_result_create(); memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.num_addresses = addresses->naddrs; - lb_policy_args.lb_addresses = + lb_policy_args.addresses = gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses); - memset(lb_policy_args.lb_addresses, 0, + memset(lb_policy_args.addresses, 0, sizeof(grpc_lb_address) * lb_policy_args.num_addresses); for (size_t i = 0; i < addresses->naddrs; ++i) { - lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i]; + lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i]; } lb_policy_args.client_channel_factory = r->client_channel_factory; lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + gpr_free(lb_policy_args.addresses); if (lb_policy != NULL) { grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 54a7e1c84c..425285287c 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -126,17 +126,17 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args lb_policy_args; memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.num_addresses = r->addresses->naddrs; - lb_policy_args.lb_addresses = + lb_policy_args.addresses = gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses); - memset(lb_policy_args.lb_addresses, 0, + memset(lb_policy_args.addresses, 0, sizeof(grpc_lb_address) * lb_policy_args.num_addresses); for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) { - lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i]; + lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i]; } lb_policy_args.client_channel_factory = r->client_channel_factory; grpc_lb_policy *lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); - gpr_free(lb_policy_args.lb_addresses); + gpr_free(lb_policy_args.addresses); grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index c044256165..95abe38031 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -37,6 +37,8 @@ #include #include +#include + #include #include #include @@ -76,6 +78,7 @@ extern "C" { // - Test against a non-LB server. That server should return UNIMPLEMENTED and // the call should fail. // - Random LB server closing the stream unexpectedly. +// - Test using DNS-resolvable names (localhost?) namespace grpc { namespace { @@ -612,27 +615,30 @@ static void fork_lb_server(void *arg) { tf->lb_server_update_delay_ms); } -static void setup_test_fixture(test_fixture *tf, - int lb_server_update_delay_ms) { - tf->lb_server_update_delay_ms = lb_server_update_delay_ms; +static test_fixture setup_test_fixture(int lb_server_update_delay_ms) { + test_fixture tf; + memset(&tf, 0, sizeof(tf)); + tf.lb_server_update_delay_ms = lb_server_update_delay_ms; gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); for (int i = 0; i < NUM_BACKENDS; ++i) { - setup_server("127.0.0.1", &tf->lb_backends[i]); - gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server, - &tf->lb_backends[i], &options); + setup_server("127.0.0.1", &tf.lb_backends[i]); + gpr_thd_new(&tf.lb_backends[i].tid, fork_backend_server, &tf.lb_backends[i], + &options); } - setup_server("127.0.0.1", &tf->lb_server); - gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options); + setup_server("127.0.0.1", &tf.lb_server); + gpr_thd_new(&tf.lb_server.tid, fork_lb_server, &tf.lb_server, &options); char *server_uri; gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1", - tf->lb_server.servers_hostport); - setup_client(server_uri, &tf->client); + tf.lb_server.servers_hostport); + setup_client(server_uri, &tf.client); gpr_free(server_uri); + + return tf; } static void teardown_test_fixture(test_fixture *tf) { @@ -643,19 +649,13 @@ static void teardown_test_fixture(test_fixture *tf) { teardown_server(&tf->lb_server); } -// The LB server will send two updates: batch 1 and batch 2. Each batch -// contains -// two addresses, both of a valid and running backend server. Batch 1 is -// readily -// available and provided as soon as the client establishes the streaming -// call. -// Batch 2 is sent after a delay of \a lb_server_update_delay_ms -// milliseconds. +// The LB server will send two updates: batch 1 and batch 2. Each batch contains +// two addresses, both of a valid and running backend server. Batch 1 is readily +// available and provided as soon as the client establishes the streaming call. +// Batch 2 is sent after a delay of \a lb_server_update_delay_ms milliseconds. static test_fixture test_update(int lb_server_update_delay_ms) { gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms); - test_fixture tf; - memset(&tf, 0, sizeof(tf)); - setup_test_fixture(&tf, lb_server_update_delay_ms); + test_fixture tf = setup_test_fixture(lb_server_update_delay_ms); perform_request( &tf.client); // "consumes" 1st backend server of 1st serverlist perform_request( @@ -671,13 +671,7 @@ static test_fixture test_update(int lb_server_update_delay_ms) { return tf; } -} // namespace -} // namespace grpc - -int main(int argc, char **argv) { - grpc_test_init(argc, argv); - grpc_init(); - +TEST(GrpclbTest, Updates) { grpc::test_fixture tf_result; // Clients take a bit over one second to complete a call (the last part of the // call sleeps for 1 second while verifying the client's completion queue is @@ -712,7 +706,18 @@ int main(int argc, char **argv) { GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0); GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0); GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0); +} + +TEST(GrpclbTest, InvalidAddressInServerlist) {} +} // namespace +} // namespace grpc + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + grpc_init(); + const auto result = RUN_ALL_TESTS(); grpc_shutdown(); - return 0; + return result; } -- cgit v1.2.3 From 5ebb7af9ef074e69fb0173e938fddaa1b6b88913 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 15 Sep 2016 10:02:16 -0700 Subject: Don't assert on use of unsupported user_data for LB policies --- src/core/ext/lb_policy/grpclb/grpclb.c | 12 ++++++++---- src/core/ext/lb_policy/pick_first/pick_first.c | 6 ++++-- 2 files changed, 12 insertions(+), 6 deletions(-) (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c') diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 3ce3910b30..cf32658333 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -573,8 +573,10 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, return NULL; } - /* this LB policy doesn't support \a user_data */ - GPR_ASSERT(args->addresses[0].user_data == NULL); + if (args->addresses[0].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... @@ -583,8 +585,10 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, addr_strs[0] = grpc_sockaddr_to_uri( (const struct sockaddr *)&args->addresses[0].resolved_address->addr); for (size_t i = 1; i < args->num_addresses; i++) { - /* this LB policy doesn't support \a user_data */ - GPR_ASSERT(args->addresses[i].user_data == NULL); + if (args->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } GPR_ASSERT( grpc_sockaddr_to_string( diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index d907ddd5d1..9513078dce 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -451,8 +451,10 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < args->num_addresses; i++) { - /* this LB policy doesn't support \a user_data */ - GPR_ASSERT(args->addresses[i].user_data == NULL); + if (args->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = -- cgit v1.2.3