aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy/round_robin
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-09-12 18:37:05 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-09-12 22:57:02 -0700
commit331b9c02f9372ef832e612d48b90991c181ba8a7 (patch)
tree02fa9ad3354a9f13a2f83e7da63b02e04e87c993 /src/core/ext/lb_policy/round_robin
parent42adfb89ce3f842bf7a3d24e9f04b32d46644457 (diff)
Moved LB token changes solely into grpclb.c
Diffstat (limited to 'src/core/ext/lb_policy/round_robin')
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c89
1 files changed, 33 insertions, 56 deletions
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 8fda405fb8..2069dc192c 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -84,8 +84,10 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
- /* storage for the lb token initial metadata mdelem */
- grpc_linked_mdelem *lb_token_mdelem_storage;
+ /* output argument where to store the pick()ed user_data. It'll be NULL if no
+ * such data is present or there's an error (the definite test for errors is
+ * \a target being NULL). */
+ void **user_data;
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
@@ -103,7 +105,7 @@ typedef struct pending_pick {
typedef struct ready_list {
grpc_subchannel *subchannel;
/* references namesake entry in subchannel_data */
- grpc_lb_policy_address_token *lb_token;
+ void *user_data;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@@ -121,8 +123,8 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
- /** the subchannel's target LB token */
- grpc_lb_policy_address_token *lb_token;
+ /** the subchannel's target user data */
+ void *user_data;
} subchannel_data;
struct round_robin_lb_policy {
@@ -131,8 +133,10 @@ struct round_robin_lb_policy {
/** total number of addresses received at creation time */
size_t num_addresses;
- /** load balancing tokens, one per incoming address */
- grpc_lb_policy_address_token *lb_tokens;
+ /** user data, one per incoming address */
+ void **user_data;
+ /** functions to operate over \a user_data elements */
+ grpc_lb_policy_user_data_vtable user_data_vtable;
/** all our subchannels */
size_t num_subchannels;
@@ -204,7 +208,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
memset(new_elem, 0, sizeof(ready_list));
new_elem->subchannel = sd->subchannel;
- new_elem->lb_token = sd->lb_token;
+ new_elem->user_data = sd->user_data;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -246,7 +250,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
+ gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
(void *)node->subchannel);
}
@@ -259,9 +263,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- size_t i;
ready_list *elem;
- for (i = 0; i < p->num_subchannels; i++) {
+ for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
gpr_free(sd);
@@ -282,12 +285,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
elem = tmp;
}
- if (p->lb_tokens != NULL) {
- for (i = 0; i < p->num_addresses; i++) {
- gpr_free(p->lb_tokens[i].token);
- }
- gpr_free(p->lb_tokens);
+ for (size_t i = 0; i < p->num_addresses; i++) {
+ p->user_data_vtable.destroy(p->user_data[i]);
}
+ gpr_free(p->user_data);
gpr_free(p);
}
@@ -397,26 +398,9 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu);
}
-/* add lb_token of selected subchannel (address) to the call's initial
- * metadata */
-static void initial_metadata_add_lb_token(
- grpc_metadata_batch *initial_metadata,
- grpc_linked_mdelem *lb_token_mdelem_storage,
- grpc_lb_policy_address_token *lb_token) {
- if (lb_token != NULL && lb_token->token_size > 0) {
- GPR_ASSERT(lb_token->token != NULL);
- grpc_mdstr *lb_token_mdstr =
- grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size);
- grpc_metadata_batch_add_tail(
- initial_metadata, lb_token_mdelem_storage,
- grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL,
- lb_token_mdstr));
- }
-}
-
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
@@ -426,9 +410,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
- initial_metadata_add_lb_token(pick_args->initial_metadata,
- pick_args->lb_token_mdelem_storage,
- selected->lb_token);
+ *user_data = p->user_data_vtable.copy(selected->user_data);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
@@ -451,7 +433,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->on_complete = on_complete;
pp->initial_metadata = pick_args->initial_metadata;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+ pp->user_data = user_data;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -493,11 +475,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- initial_metadata_add_lb_token(pp->initial_metadata,
- pp->lb_token_mdelem_storage,
- selected->lb_token);
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ *pp->user_data = p->user_data_vtable.copy(selected->user_data);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
@@ -631,30 +611,29 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- GPR_ASSERT(args->addresses != NULL);
+ GPR_ASSERT(args->lb_addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
+ if (args->num_addresses == 0) return NULL;
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->num_addresses = args->addresses->naddrs;
- if (args->tokens != NULL) {
- /* we need to copy because args contents aren't owned */
- p->lb_tokens =
- gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses);
- memcpy(p->lb_tokens, args->tokens,
- sizeof(grpc_lb_policy_address_token) * p->num_addresses);
- }
-
+ p->num_addresses = args->num_addresses;
p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
+ p->user_data = gpr_malloc(sizeof(void *) * p->num_addresses);
+ memset(p->user_data, 0, sizeof(void *) * p->num_addresses);
+ p->user_data_vtable = args->user_data_vtable;
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ sc_args.addr =
+ (struct sockaddr *)args->lb_addresses[i].resolved_address->addr;
+ sc_args.addr_len = args->lb_addresses[i].resolved_address->len;
+
+ p->user_data[i] = p->user_data_vtable.copy(args->lb_addresses[i].user_data);
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@@ -666,9 +645,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
- if (p->lb_tokens != NULL) {
- sd->lb_token = &p->lb_tokens[i];
- }
+ sd->user_data = p->user_data[i];
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);