diff options
author | murgatroid99 <mlumish@google.com> | 2016-09-23 14:35:49 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2016-09-23 14:35:49 -0700 |
commit | 08b0fab4260ce533c71a3b62c49fbf6871a87c93 (patch) | |
tree | 0fc432dfb3062f2963d43f9d52e8b9e30425a46e /src/core/ext/lb_policy | |
parent | 7871f736ce62e74559602f928b25bea7389f57fb (diff) | |
parent | 942c264861dedd8020fc18d65933e8f4f57e3e46 (diff) |
Merge branch 'master' into uv_core_transport
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 155 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 22 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 34 |
3 files changed, 105 insertions, 106 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 7f3c4613c6..8a2fe6d72e 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -109,6 +109,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/client_config/client_channel_factory.h" +#include "src/core/ext/client_config/lb_policy_factory.h" #include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/parse_address.h" #include "src/core/ext/lb_policy/grpclb/grpclb.h" @@ -121,18 +122,6 @@ int grpc_lb_glb_trace = 0; -static void lb_addrs_destroy(grpc_lb_address *lb_addresses, - size_t num_addresses) { - /* free "resolved" addresses memblock */ - gpr_free(lb_addresses->resolved_address); - for (size_t i = 0; i < num_addresses; ++i) { - if (lb_addresses[i].user_data != NULL) { - GRPC_MDELEM_UNREF(lb_addresses[i].user_data); - } - } - gpr_free(lb_addresses); -} - /* add lb_token of selected subchannel (address) to the call's initial * metadata */ static void initial_metadata_add_lb_token( @@ -295,6 +284,7 @@ typedef struct glb_lb_policy { /** mutex protecting remaining members */ gpr_mu mu; + const char *server_name; grpc_client_channel_factory *cc_factory; /** for communicating with the LB server */ @@ -312,11 +302,8 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; - /** total number of valid addresses received in \a serverlist */ - size_t num_ok_serverlist_addresses; - - /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */ - grpc_lb_address *lb_addresses; + /** addresses from \a serverlist */ + grpc_lb_addresses *addresses; /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -369,26 +356,18 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, return true; } -/* populate \a addresses according to \a serverlist. Returns the number of - * addresses successfully parsed and added to \a addresses */ -static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, - grpc_lb_address **lb_addresses) { +/* Returns addresses extracted from \a serverlist. */ +static grpc_lb_addresses *process_serverlist( + const grpc_grpclb_serverlist *serverlist) { size_t num_valid = 0; /* first pass: count how many are valid in order to allocate the necessary * memory in a single block */ for (size_t i = 0; i < serverlist->num_servers; ++i) { if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; } - if (num_valid == 0) { - return 0; - } + if (num_valid == 0) return NULL; - /* allocate the memory block for the "resolved" addresses. */ - grpc_resolved_address *r_addrs_memblock = - gpr_malloc(sizeof(grpc_resolved_address) * num_valid); - memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid); - grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid); - memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid); + grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid); /* second pass: actually populate the addresses and LB tokens (aka user data * to the outside world) to be read by the RR policy during its creation. @@ -400,56 +379,58 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, GPR_ASSERT(addr_idx < num_valid); const grpc_grpclb_server *server = serverlist->servers[sl_idx]; if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; - grpc_lb_address *const lb_addr = &lb_addrs[addr_idx]; /* address processing */ const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address *ip = &server->ip_address; - - lb_addr->resolved_address = &r_addrs_memblock[addr_idx]; - struct sockaddr_storage *sa = - (struct sockaddr_storage *)lb_addr->resolved_address->addr; - size_t *sa_len = &lb_addr->resolved_address->len; - *sa_len = 0; + grpc_resolved_address addr; + memset(&addr, 0, sizeof(addr)); if (ip->size == 4) { - struct sockaddr_in *addr4 = (struct sockaddr_in *)sa; - *sa_len = sizeof(struct sockaddr_in); - memset(addr4, 0, *sa_len); + addr.len = sizeof(struct sockaddr_in); + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr; addr4->sin_family = AF_INET; memcpy(&addr4->sin_addr, ip->bytes, ip->size); addr4->sin_port = netorder_port; } else if (ip->size == 16) { - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa; - *sa_len = sizeof(struct sockaddr_in6); - memset(addr6, 0, *sa_len); + addr.len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr; addr6->sin6_family = AF_INET; memcpy(&addr6->sin6_addr, ip->bytes, ip->size); addr6->sin6_port = netorder_port; } - GPR_ASSERT(*sa_len > 0); /* lb token processing */ + void *user_data; if (server->has_load_balance_token) { const size_t lb_token_size = GPR_ARRAY_SIZE(server->load_balance_token) - 1; grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( (uint8_t *)server->load_balance_token, lb_token_size); - lb_addr->user_data = grpc_mdelem_from_metadata_strings( + user_data = grpc_mdelem_from_metadata_strings( GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); } else { gpr_log(GPR_ERROR, "Missing LB token for backend address '%s'. The empty token will " "be used instead", - grpc_sockaddr_to_uri(lb_addr->resolved_address)); - lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; + grpc_sockaddr_to_uri(&addr)); + user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; } + + grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, + false /* is_balancer */, + NULL /* balancer_name */, user_data); ++addr_idx; } GPR_ASSERT(addr_idx == num_valid); - *lb_addresses = lb_addrs; - return num_valid; + + return lb_addresses; +} + +/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */ +static void lb_token_destroy(void *token) { + if (token != NULL) GRPC_MDELEM_UNREF(token); } static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, @@ -459,20 +440,17 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); + args.server_name = glb_policy->server_name; args.client_channel_factory = glb_policy->cc_factory; - const size_t num_ok_addresses = - process_serverlist(serverlist, &args.addresses); - args.num_addresses = num_ok_addresses; + args.addresses = process_serverlist(serverlist); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - if (glb_policy->lb_addresses != NULL) { + if (glb_policy->addresses != NULL) { /* dispose of the previous version */ - lb_addrs_destroy(glb_policy->lb_addresses, - glb_policy->num_ok_serverlist_addresses); + grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); } - glb_policy->num_ok_serverlist_addresses = num_ok_addresses; - glb_policy->lb_addresses = args.addresses; + glb_policy->addresses = args.addresses; return rr; } @@ -566,6 +544,19 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. At that + * time, this should be changed to allow a list with no balancer addresses, + * since the resolver might fail to return a balancer address even when + * this is the right LB policy to use. */ + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; ++i) { + if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); memset(glb_policy, 0, sizeof(*glb_policy)); @@ -575,37 +566,35 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ + glb_policy->server_name = gpr_strdup(args->server_name); glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->num_addresses == 0) { - return NULL; - } - - if (args->addresses[0].user_data != NULL) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); - } /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses); - addr_strs[0] = grpc_sockaddr_to_uri(args->addresses[0].resolved_address); - for (size_t i = 1; i < args->num_addresses; i++) { - if (args->addresses[i].user_data != NULL) { + char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); + size_t addr_index = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (args->addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); } - - GPR_ASSERT( - grpc_sockaddr_to_string( - &addr_strs[i], - args->addresses[i].resolved_address, - true) == 0); + if (args->addresses->addresses[i].is_balancer) { + if (addr_index == 0) { + addr_strs[addr_index++] = grpc_sockaddr_to_uri( + &args->addresses->addresses[i].address); + } else { + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[addr_index++], + &args->addresses->addresses[i].address, + true) == 0); + } + } } size_t uri_path_len; - char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->num_addresses, ",", &uri_path_len); + char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs, + num_grpclb_addrs, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -613,7 +602,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->num_addresses; i++) { + for (size_t i = 0; i < num_grpclb_addrs; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); @@ -642,6 +631,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); + gpr_free((void *)glb_policy->server_name); grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -649,9 +639,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); - - lb_addrs_destroy(glb_policy->lb_addresses, - glb_policy->num_ok_serverlist_addresses); + grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); gpr_free(glb_policy); } @@ -947,9 +935,8 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { * entities passed to glb_pick(). */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, - glb_policy->base.interested_parties, "/BalanceLoad", - NULL, /* FIXME(dgq): which "host" value to use? */ - lb_client->deadline, NULL); + glb_policy->base.interested_parties, + "/grpc.lb.v1.LoadBalancer/BalanceLoad", NULL, lb_client->deadline, NULL); grpc_metadata_array_init(&lb_client->initial_metadata_recv); grpc_metadata_array_init(&lb_client->trailing_metadata_recv); diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 00b9660049..892b5feda9 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -441,23 +441,33 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->num_addresses == 0) return NULL; + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (!args->addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->num_addresses; i++) { - if (args->addresses[i].user_data != NULL) { + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + /* Skip balancer addresses, since we only know how to handle backends. */ + if (args->addresses->addresses[i].is_balancer) continue; + + if (args->addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); } memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = args->addresses[i].resolved_address; + sc_args.server_name = args->server_name; + sc_args.addr = &args->addresses->addresses[i].address; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index eee3dce249..dc23f12015 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -130,10 +130,6 @@ struct round_robin_lb_policy { /** total number of addresses received at creation time */ size_t num_addresses; - /** array holding the borrowed and opaque pointers to incoming user data, one - * per incoming address. These individual pointers will be returned as-is in - * successful picks. */ - void **user_data_pointers; /** all our subchannels */ size_t num_subchannels; @@ -282,7 +278,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { elem = tmp; } - gpr_free(p->user_data_pointers); gpr_free(p); } @@ -611,24 +606,31 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args *args) { GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->num_addresses == 0) return NULL; + + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (!args->addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->num_addresses = args->num_addresses; - p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses); - memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses); - p->user_data_pointers = gpr_malloc(sizeof(void *) * p->num_addresses); - memset(p->user_data_pointers, 0, sizeof(void *) * p->num_addresses); + p->num_addresses = num_addrs; + p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < p->num_addresses; i++) { - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = args->addresses[i].resolved_address; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + /* Skip balancer addresses, since we only know how to handle backends. */ + if (args->addresses->addresses[i].is_balancer) continue; - p->user_data_pointers[i] = args->addresses[i].user_data; + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + sc_args.server_name = args->server_name; + sc_args.addr = &args->addresses->addresses[i].address; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); @@ -640,7 +642,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - sd->user_data = p->user_data_pointers[i]; + sd->user_data = args->addresses->addresses[i].user_data; ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); |