diff options
author | David Garcia Quintas <dgq@google.com> | 2016-09-12 18:37:05 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-09-12 22:57:02 -0700 |
commit | 331b9c02f9372ef832e612d48b90991c181ba8a7 (patch) | |
tree | 02fa9ad3354a9f13a2f83e7da63b02e04e87c993 /src/core/ext/lb_policy | |
parent | 42adfb89ce3f842bf7a3d24e9f04b32d46644457 (diff) |
Moved LB token changes solely into grpclb.c
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 240 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 18 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 89 |
3 files changed, 206 insertions, 141 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index c3294b7988..ad070e458a 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -116,14 +116,50 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/static_metadata.h" int grpc_lb_glb_trace = 0; +static void *user_data_copy(void *user_data) { + if (user_data == NULL) return NULL; + return GRPC_MDELEM_REF(user_data); +} + +static void user_data_destroy(void *user_data) { + if (user_data == NULL) return; + GRPC_MDELEM_UNREF(user_data); +} + +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static void initial_metadata_add_lb_token( + grpc_metadata_batch *initial_metadata, + grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != NULL); + GPR_ASSERT(lb_token != NULL); + grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + typedef struct wrapped_rr_closure_arg { /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; + /* the pick's initial metadata, kept in order to append the LB token for the + * pick */ + grpc_metadata_batch *initial_metadata; + + /* the picked target, used to determine which LB token to add to the pick's + * initial metadata */ + grpc_connected_subchannel **target; + + /* the LB token associated with the pick */ + grpc_mdelem *lb_token; + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; @@ -146,6 +182,11 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); } GPR_ASSERT(wc_arg->wrapped_closure != NULL); + + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + wc_arg->lb_token); + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL); gpr_free(wc_arg->owning_pending_node); } @@ -194,12 +235,15 @@ static void add_pending_pick(pending_pick **root, memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->pollent = pick_args->pollent; pp->target = target; pp->initial_metadata = pick_args->initial_metadata; pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; + pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; + pp->wrapped_on_complete_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; @@ -285,37 +329,90 @@ struct rr_connectivity_data { glb_lb_policy *glb_policy; }; -static bool process_serverlist(const grpc_grpclb_server *server, - struct sockaddr_storage *sa, size_t *sa_len) { - if (server->port >> 16 != 0) { - gpr_log(GPR_ERROR, "Invalid port '%d'.", server->port); - return false; +/* populate \a addresses according to \a serverlist. Returns the number of + * addresses successfully parsed and added to \a addresses */ +static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, + grpc_lb_address **lb_addresses) { + size_t num_valid = 0; + /* first pass: count how many are valid in order to allocate the necessary + * memory in a single block */ + for (size_t i = 0; i < serverlist->num_servers; ++i) { + const grpc_grpclb_server *server = serverlist->servers[i]; + const grpc_grpclb_ip_address *ip = &server->ip_address; + + if (server->port >> 16 != 0) { + gpr_log(GPR_ERROR, + "Invalid port '%d' at index %zu of serverlist. Ignoring.", + server->port, i); + continue; + } + + if (ip->size != 4 && ip->size != 16) { + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes, got %d at index %zu of " + "serverlist. Ignoring", + ip->size, i); + continue; + } + ++num_valid; } - const uint16_t netorder_port = htons((uint16_t)server->port); - /* the addresses are given in binary format (a in(6)_addr struct) in - * server->ip_address.bytes. */ - const grpc_grpclb_ip_address *ip = &server->ip_address; - *sa_len = 0; - if (ip->size == 4) { - struct sockaddr_in *addr4 = (struct sockaddr_in *)sa; - *sa_len = sizeof(struct sockaddr_in); - memset(addr4, 0, *sa_len); - addr4->sin_family = AF_INET; - memcpy(&addr4->sin_addr, ip->bytes, ip->size); - addr4->sin_port = netorder_port; - } else if (ip->size == 6) { - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa; - *sa_len = sizeof(struct sockaddr_in6); - memset(addr6, 0, *sa_len); - addr6->sin6_family = AF_INET; - memcpy(&addr6->sin6_addr, ip->bytes, ip->size); - addr6->sin6_port = netorder_port; - } else { - gpr_log(GPR_ERROR, "Expected IP to be 4 or 16 bytes. Got %d.", ip->size); - return false; + if (num_valid == 0) { + return 0; } - GPR_ASSERT(*sa_len > 0); - return true; + + /* allocate the memory block for the "resolved" addresses. */ + grpc_resolved_address *r_addrs_memblock = + gpr_malloc(sizeof(grpc_resolved_address) * num_valid); + memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid); + grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid); + memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid); + + /* second pass: actually populate the addresses and LB tokens (aka user data + * to the outside world) to be read by the RR policy during its creation */ + for (size_t i = 0; i < num_valid; ++i) { + const grpc_grpclb_server *server = serverlist->servers[i]; + grpc_lb_address *const lb_addr = &lb_addrs[i]; + + /* lb token processing */ + if (server->has_load_balance_token) { + const size_t lb_token_size = + GPR_ARRAY_SIZE(server->load_balance_token) - 1; + grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( + (uint8_t *)server->load_balance_token, lb_token_size); + lb_addr->user_data = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); + } + + /* address processing */ + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + + lb_addr->resolved_address = &r_addrs_memblock[i]; + struct sockaddr_storage *sa = + (struct sockaddr_storage *)lb_addr->resolved_address->addr; + size_t *sa_len = &lb_addr->resolved_address->len; + *sa_len = 0; + if (ip->size == 4) { + struct sockaddr_in *addr4 = (struct sockaddr_in *)sa; + *sa_len = sizeof(struct sockaddr_in); + memset(addr4, 0, *sa_len); + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa; + *sa_len = sizeof(struct sockaddr_in6); + memset(addr6, 0, *sa_len); + addr6->sin6_family = AF_INET; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } + GPR_ASSERT(*sa_len > 0); + } + *lb_addresses = lb_addrs; + return num_valid; } static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, @@ -326,36 +423,19 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; - args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) * - serverlist->num_servers); - args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - args.addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * serverlist->num_servers); - size_t addr_idx = 0; - for (size_t i = 0; i < serverlist->num_servers; ++i) { - const grpc_grpclb_server *server = serverlist->servers[i]; - grpc_resolved_address *raddr = &args.addresses->addrs[addr_idx]; - if (!process_serverlist(server, (struct sockaddr_storage *)raddr->addr, - &raddr->len)) { - gpr_log(GPR_INFO, - "Problem processing server at index %zu of received serverlist, " - "ignoring.", - i); - continue; - } - ++addr_idx; - args.tokens[i].token_size = GPR_ARRAY_SIZE(server->load_balance_token) - 1; - args.tokens[i].token = gpr_malloc(args.tokens[i].token_size); - memcpy(args.tokens[i].token, server->load_balance_token, - args.tokens[i].token_size); - } - args.addresses->naddrs = addr_idx; + args.num_addresses = process_serverlist(serverlist, &args.lb_addresses); + args.user_data_vtable.copy = user_data_copy; + args.user_data_vtable.destroy = user_data_destroy; grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - - gpr_free(args.addresses->addrs); - gpr_free(args.addresses); - gpr_free(args.tokens); + if (args.num_addresses > 0) { + /* free "resolved" addresses memblock */ + gpr_free(args.lb_addresses->resolved_address); + } + for (size_t i = 0; i < args.num_addresses; ++i) { + args.user_data_vtable.destroy(args.lb_addresses[i].user_data); + } + gpr_free(args.lb_addresses); return rr; } @@ -395,6 +475,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, pp->lb_token_mdelem_storage}; grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, + (void **)&pp->wrapped_on_complete_arg.lb_token, &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; } @@ -457,25 +538,26 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * Create a client channel over them to communicate with a LB service */ glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->addresses->naddrs == 0) { + if (args->num_addresses == 0) { return NULL; } - /* construct a target from the args->addresses, in the form + /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); - addr_strs[0] = - grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); - for (size_t i = 1; i < args->addresses->naddrs; i++) { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[i], - (const struct sockaddr *)&args->addresses->addrs[i], - true) == 0); + char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses); + addr_strs[0] = grpc_sockaddr_to_uri( + (const struct sockaddr *)&args->lb_addresses[0].resolved_address->addr); + for (size_t i = 1; i < args->num_addresses; i++) { + GPR_ASSERT( + grpc_sockaddr_to_string(&addr_strs[i], + (const struct sockaddr *)&args->lb_addresses[i] + .resolved_address->addr, + true) == 0); } size_t uri_path_len; char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + (const char **)addr_strs, args->num_addresses, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -483,7 +565,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->num_addresses; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); @@ -635,7 +717,7 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; @@ -662,22 +744,28 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; glb_policy->wc_arg.wrapped_closure = on_complete; + glb_policy->wc_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; + glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; + glb_policy->wc_arg.owning_pending_node = NULL; grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, + (void **)&glb_policy->wc_arg.lb_token, &glb_policy->wrapped_on_complete); if (r != 0) { - /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR - * policy and notify the original callback */ - glb_policy->wc_arg.wrapped_closure = NULL; + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)glb_policy->wc_arg.rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); - grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure, - GRPC_ERROR_NONE, NULL); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + glb_policy->wc_arg.lb_token); } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index e1277b353f..21d948033a 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -200,7 +200,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; @@ -438,23 +438,23 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->lb_addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->addresses->naddrs == 0) return NULL; + if (args->num_addresses == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses); + memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + sc_args.addr = + (struct sockaddr *)(args->lb_addresses[i].resolved_address->addr); + sc_args.addr_len = (size_t)args->lb_addresses[i].resolved_address->len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 8fda405fb8..2069dc192c 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -84,8 +84,10 @@ typedef struct pending_pick { /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; - /* storage for the lb token initial metadata mdelem */ - grpc_linked_mdelem *lb_token_mdelem_storage; + /* output argument where to store the pick()ed user_data. It'll be NULL if no + * such data is present or there's an error (the definite test for errors is + * \a target being NULL). */ + void **user_data; /* bitmask passed to pick() and used for selective cancelling. See * grpc_lb_policy_cancel_picks() */ @@ -103,7 +105,7 @@ typedef struct pending_pick { typedef struct ready_list { grpc_subchannel *subchannel; /* references namesake entry in subchannel_data */ - grpc_lb_policy_address_token *lb_token; + void *user_data; struct ready_list *next; struct ready_list *prev; } ready_list; @@ -121,8 +123,8 @@ typedef struct { ready_list *ready_list_node; /** last observed connectivity */ grpc_connectivity_state connectivity_state; - /** the subchannel's target LB token */ - grpc_lb_policy_address_token *lb_token; + /** the subchannel's target user data */ + void *user_data; } subchannel_data; struct round_robin_lb_policy { @@ -131,8 +133,10 @@ struct round_robin_lb_policy { /** total number of addresses received at creation time */ size_t num_addresses; - /** load balancing tokens, one per incoming address */ - grpc_lb_policy_address_token *lb_tokens; + /** user data, one per incoming address */ + void **user_data; + /** functions to operate over \a user_data elements */ + grpc_lb_policy_user_data_vtable user_data_vtable; /** all our subchannels */ size_t num_subchannels; @@ -204,7 +208,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, ready_list *new_elem = gpr_malloc(sizeof(ready_list)); memset(new_elem, 0, sizeof(ready_list)); new_elem->subchannel = sd->subchannel; - new_elem->lb_token = sd->lb_token; + new_elem->user_data = sd->user_data; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -246,7 +250,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, + gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, (void *)node->subchannel); } @@ -259,9 +263,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); gpr_free(sd); @@ -282,12 +285,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { elem = tmp; } - if (p->lb_tokens != NULL) { - for (i = 0; i < p->num_addresses; i++) { - gpr_free(p->lb_tokens[i].token); - } - gpr_free(p->lb_tokens); + for (size_t i = 0; i < p->num_addresses; i++) { + p->user_data_vtable.destroy(p->user_data[i]); } + gpr_free(p->user_data); gpr_free(p); } @@ -397,26 +398,9 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -/* add lb_token of selected subchannel (address) to the call's initial - * metadata */ -static void initial_metadata_add_lb_token( - grpc_metadata_batch *initial_metadata, - grpc_linked_mdelem *lb_token_mdelem_storage, - grpc_lb_policy_address_token *lb_token) { - if (lb_token != NULL && lb_token->token_size > 0) { - GPR_ASSERT(lb_token->token != NULL); - grpc_mdstr *lb_token_mdstr = - grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size); - grpc_metadata_batch_add_tail( - initial_metadata, lb_token_mdelem_storage, - grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL, - lb_token_mdstr)); - } -} - static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; @@ -426,9 +410,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* readily available, report right away */ gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); - initial_metadata_add_lb_token(pick_args->initial_metadata, - pick_args->lb_token_mdelem_storage, - selected->lb_token); + *user_data = p->user_data_vtable.copy(selected->user_data); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", @@ -451,7 +433,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->on_complete = on_complete; pp->initial_metadata = pick_args->initial_metadata; pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + pp->user_data = user_data; p->pending_picks = pp; gpr_mu_unlock(&p->mu); return 0; @@ -493,11 +475,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - initial_metadata_add_lb_token(pp->initial_metadata, - pp->lb_token_mdelem_storage, - selected->lb_token); *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + *pp->user_data = p->user_data_vtable.copy(selected->user_data); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", @@ -631,30 +611,29 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->lb_addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); + if (args->num_addresses == 0) return NULL; round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->num_addresses = args->addresses->naddrs; - if (args->tokens != NULL) { - /* we need to copy because args contents aren't owned */ - p->lb_tokens = - gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses); - memcpy(p->lb_tokens, args->tokens, - sizeof(grpc_lb_policy_address_token) * p->num_addresses); - } - + p->num_addresses = args->num_addresses; p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses); memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses); + p->user_data = gpr_malloc(sizeof(void *) * p->num_addresses); + memset(p->user_data, 0, sizeof(void *) * p->num_addresses); + p->user_data_vtable = args->user_data_vtable; grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < p->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + sc_args.addr = + (struct sockaddr *)args->lb_addresses[i].resolved_address->addr; + sc_args.addr_len = args->lb_addresses[i].resolved_address->len; + + p->user_data[i] = p->user_data_vtable.copy(args->lb_addresses[i].user_data); grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); @@ -666,9 +645,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - if (p->lb_tokens != NULL) { - sd->lb_token = &p->lb_tokens[i]; - } + sd->user_data = p->user_data[i]; ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); |