diff options
author | David Garcia Quintas <dgq@google.com> | 2016-09-12 18:37:05 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-09-12 22:57:02 -0700 |
commit | 331b9c02f9372ef832e612d48b90991c181ba8a7 (patch) | |
tree | 02fa9ad3354a9f13a2f83e7da63b02e04e87c993 /src/core/ext/lb_policy/round_robin | |
parent | 42adfb89ce3f842bf7a3d24e9f04b32d46644457 (diff) |
Moved LB token changes solely into grpclb.c
Diffstat (limited to 'src/core/ext/lb_policy/round_robin')
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 89 |
1 files changed, 33 insertions, 56 deletions
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 8fda405fb8..2069dc192c 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -84,8 +84,10 @@ typedef struct pending_pick { /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; - /* storage for the lb token initial metadata mdelem */ - grpc_linked_mdelem *lb_token_mdelem_storage; + /* output argument where to store the pick()ed user_data. It'll be NULL if no + * such data is present or there's an error (the definite test for errors is + * \a target being NULL). */ + void **user_data; /* bitmask passed to pick() and used for selective cancelling. See * grpc_lb_policy_cancel_picks() */ @@ -103,7 +105,7 @@ typedef struct pending_pick { typedef struct ready_list { grpc_subchannel *subchannel; /* references namesake entry in subchannel_data */ - grpc_lb_policy_address_token *lb_token; + void *user_data; struct ready_list *next; struct ready_list *prev; } ready_list; @@ -121,8 +123,8 @@ typedef struct { ready_list *ready_list_node; /** last observed connectivity */ grpc_connectivity_state connectivity_state; - /** the subchannel's target LB token */ - grpc_lb_policy_address_token *lb_token; + /** the subchannel's target user data */ + void *user_data; } subchannel_data; struct round_robin_lb_policy { @@ -131,8 +133,10 @@ struct round_robin_lb_policy { /** total number of addresses received at creation time */ size_t num_addresses; - /** load balancing tokens, one per incoming address */ - grpc_lb_policy_address_token *lb_tokens; + /** user data, one per incoming address */ + void **user_data; + /** functions to operate over \a user_data elements */ + grpc_lb_policy_user_data_vtable user_data_vtable; /** all our subchannels */ size_t num_subchannels; @@ -204,7 +208,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, ready_list *new_elem = gpr_malloc(sizeof(ready_list)); memset(new_elem, 0, sizeof(ready_list)); new_elem->subchannel = sd->subchannel; - new_elem->lb_token = sd->lb_token; + new_elem->user_data = sd->user_data; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -246,7 +250,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, + gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, (void *)node->subchannel); } @@ -259,9 +263,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); gpr_free(sd); @@ -282,12 +285,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { elem = tmp; } - if (p->lb_tokens != NULL) { - for (i = 0; i < p->num_addresses; i++) { - gpr_free(p->lb_tokens[i].token); - } - gpr_free(p->lb_tokens); + for (size_t i = 0; i < p->num_addresses; i++) { + p->user_data_vtable.destroy(p->user_data[i]); } + gpr_free(p->user_data); gpr_free(p); } @@ -397,26 +398,9 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -/* add lb_token of selected subchannel (address) to the call's initial - * metadata */ -static void initial_metadata_add_lb_token( - grpc_metadata_batch *initial_metadata, - grpc_linked_mdelem *lb_token_mdelem_storage, - grpc_lb_policy_address_token *lb_token) { - if (lb_token != NULL && lb_token->token_size > 0) { - GPR_ASSERT(lb_token->token != NULL); - grpc_mdstr *lb_token_mdstr = - grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size); - grpc_metadata_batch_add_tail( - initial_metadata, lb_token_mdelem_storage, - grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL, - lb_token_mdstr)); - } -} - static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; @@ -426,9 +410,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* readily available, report right away */ gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); - initial_metadata_add_lb_token(pick_args->initial_metadata, - pick_args->lb_token_mdelem_storage, - selected->lb_token); + *user_data = p->user_data_vtable.copy(selected->user_data); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", @@ -451,7 +433,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->on_complete = on_complete; pp->initial_metadata = pick_args->initial_metadata; pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + pp->user_data = user_data; p->pending_picks = pp; gpr_mu_unlock(&p->mu); return 0; @@ -493,11 +475,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - initial_metadata_add_lb_token(pp->initial_metadata, - pp->lb_token_mdelem_storage, - selected->lb_token); *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + *pp->user_data = p->user_data_vtable.copy(selected->user_data); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", @@ -631,30 +611,29 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->lb_addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); + if (args->num_addresses == 0) return NULL; round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->num_addresses = args->addresses->naddrs; - if (args->tokens != NULL) { - /* we need to copy because args contents aren't owned */ - p->lb_tokens = - gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses); - memcpy(p->lb_tokens, args->tokens, - sizeof(grpc_lb_policy_address_token) * p->num_addresses); - } - + p->num_addresses = args->num_addresses; p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses); memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses); + p->user_data = gpr_malloc(sizeof(void *) * p->num_addresses); + memset(p->user_data, 0, sizeof(void *) * p->num_addresses); + p->user_data_vtable = args->user_data_vtable; grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < p->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + sc_args.addr = + (struct sockaddr *)args->lb_addresses[i].resolved_address->addr; + sc_args.addr_len = args->lb_addresses[i].resolved_address->len; + + p->user_data[i] = p->user_data_vtable.copy(args->lb_addresses[i].user_data); grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); @@ -666,9 +645,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - if (p->lb_tokens != NULL) { - sd->lb_token = &p->lb_tokens[i]; - } + sd->user_data = p->user_data[i]; ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); |