diff options
author | David Garcia Quintas <dgq@google.com> | 2016-08-15 19:38:39 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-08-15 23:02:33 -0700 |
commit | 5b0e9462f0d40b171d50c03b29016c36588a3520 (patch) | |
tree | 9a64906b2c490688a6f95aa6c974fec2ce89d06f /src/core/ext/lb_policy | |
parent | 8aace513d090679bd15ceb68eb4824cec3dc044e (diff) |
Introduced LB token initial metadata propagation
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 33 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 105 |
2 files changed, 123 insertions, 15 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 693145bd6c..3044f164c2 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -164,6 +164,9 @@ typedef struct pending_pick { /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* bitmask passed to pick() and used for selective cancelling. See * grpc_lb_policy_cancel_picks() */ uint32_t initial_metadata_flags; @@ -188,6 +191,7 @@ static void add_pending_pick(pending_pick **root, memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->pollent = pick_args->pollent; pp->target = target; pp->initial_metadata = pick_args->initial_metadata; @@ -294,7 +298,10 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); grpc_lb_policy_args args; + memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; + args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) * + serverlist->num_servers); args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); args.addresses->naddrs = serverlist->num_servers; args.addresses->addrs = @@ -309,6 +316,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); args.addresses->addrs[out_addrs_idx].len = sa_len; ++out_addrs_idx; + const size_t token_max_size = + GPR_ARRAY_SIZE(serverlist->servers[i]->load_balance_token); + serverlist->servers[i]->load_balance_token[token_max_size - 1] = '\0'; + args.tokens[i].token_size = + strlen(serverlist->servers[i]->load_balance_token); + args.tokens[i].token = gpr_malloc(args.tokens[i].token_size); + memcpy(args.tokens[i].token, serverlist->servers[i]->load_balance_token, + args.tokens[i].token_size); } else { gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", host_ports[i]); @@ -324,11 +339,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, gpr_free(host_ports); gpr_free(args.addresses->addrs); gpr_free(args.addresses); + gpr_free(args.tokens); return rr; } static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); GRPC_ERROR_REF(error); glb_policy->rr_policy = create_rr(exec_ctx, glb_policy->serverlist, glb_policy); @@ -359,7 +377,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, (intptr_t)glb_policy->rr_policy); } const grpc_lb_policy_pick_args pick_args = { - pp->pollent, pp->initial_metadata, pp->initial_metadata_flags}; + pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, + pp->lb_token_mdelem_storage}; grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; @@ -607,6 +626,18 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_closure *on_complete) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; + + if (pick_args->lb_token_mdelem_storage == NULL) { + /* TODO(dgq): should this be an assert? If storage is NULL, something has + * gone very wrong at the client channel filter */ + gpr_log(GPR_ERROR, + "No mdelem storage for the LB token. Load reporting won't work " + "without it. Failing"); + *target = NULL; + grpc_exec_ctx_sched(exec_ctx, on_complete, GRPC_ERROR_NONE, NULL); + return 1; + } + gpr_mu_lock(&glb_policy->mu); int r; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 49e724a0f2..8fda405fb8 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -66,6 +66,7 @@ #include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/static_metadata.h" typedef struct round_robin_lb_policy round_robin_lb_policy; @@ -76,15 +77,33 @@ int grpc_lb_round_robin_trace = 0; * Once a pick is available, \a target is updated and \a on_complete called. */ typedef struct pending_pick { struct pending_pick *next; + + /* polling entity for the pick()'s async notification */ grpc_polling_entity *pollent; + + /* the initial metadata for the pick. See grpc_lb_policy_pick() */ + grpc_metadata_batch *initial_metadata; + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + + /* bitmask passed to pick() and used for selective cancelling. See + * grpc_lb_policy_cancel_picks() */ uint32_t initial_metadata_flags; + + /* output argument where to store the pick()ed connected subchannel, or NULL + * upon error. */ grpc_connected_subchannel **target; + + /* to be invoked once the pick() has completed (regardless of success) */ grpc_closure *on_complete; } pending_pick; /** List of subchannels in a connectivity READY state */ typedef struct ready_list { grpc_subchannel *subchannel; + /* references namesake entry in subchannel_data */ + grpc_lb_policy_address_token *lb_token; struct ready_list *next; struct ready_list *prev; } ready_list; @@ -102,12 +121,19 @@ typedef struct { ready_list *ready_list_node; /** last observed connectivity */ grpc_connectivity_state connectivity_state; + /** the subchannel's target LB token */ + grpc_lb_policy_address_token *lb_token; } subchannel_data; struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; + /** total number of addresses received at creation time */ + size_t num_addresses; + /** load balancing tokens, one per incoming address */ + grpc_lb_policy_address_token *lb_tokens; + /** all our subchannels */ size_t num_subchannels; subchannel_data **subchannels; @@ -166,16 +192,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - p->ready_list_last_pick, p->ready_list_last_pick->subchannel); + (void *)p->ready_list_last_pick, + (void *)p->ready_list_last_pick->subchannel); } } /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *sc) { + subchannel_data *sd) { ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - new_elem->subchannel = sc; + memset(new_elem, 0, sizeof(ready_list)); + new_elem->subchannel = sd->subchannel; + new_elem->lb_token = sd->lb_token; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -189,7 +218,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, p->ready_list.prev = new_elem; } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc); + gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", + (void *)new_elem, (void *)sd->subchannel); } return new_elem; } @@ -217,7 +247,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, - node->subchannel); + (void *)node->subchannel); } node->next = NULL; @@ -251,6 +281,13 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(elem); elem = tmp; } + + if (p->lb_tokens != NULL) { + for (i = 0; i < p->num_addresses; i++) { + gpr_free(p->lb_tokens[i].token); + } + gpr_free(p->lb_tokens); + } gpr_free(p); } @@ -337,7 +374,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = 1; if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p, + gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p, p->num_subchannels); } @@ -360,6 +397,23 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static void initial_metadata_add_lb_token( + grpc_metadata_batch *initial_metadata, + grpc_linked_mdelem *lb_token_mdelem_storage, + grpc_lb_policy_address_token *lb_token) { + if (lb_token != NULL && lb_token->token_size > 0) { + GPR_ASSERT(lb_token->token != NULL); + grpc_mdstr *lb_token_mdstr = + grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size); + grpc_metadata_batch_add_tail( + initial_metadata, lb_token_mdelem_storage, + grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL, + lb_token_mdstr)); + } +} + static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, @@ -369,17 +423,22 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, ready_list *selected; gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { + /* readily available, report right away */ gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + selected->lb_token); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target, - selected); + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", + (void *)*target, (void *)selected); } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); return 1; } else { + /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking(exec_ctx, p); } @@ -390,7 +449,9 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->pollent = pick_args->pollent; pp->target = target; pp->on_complete = on_complete; + pp->initial_metadata = pick_args->initial_metadata; pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; p->pending_picks = pp; gpr_mu_unlock(&p->mu); return 0; @@ -419,7 +480,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); + sd->ready_list_node = add_connected_sc_locked(p, sd); /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ @@ -431,12 +492,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } while ((pp = p->pending_picks)) { p->pending_picks = pp->next; + + initial_metadata_add_lb_token(pp->initial_metadata, + pp->lb_token_mdelem_storage, + selected->lb_token); *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); + (void *)selected->subchannel, (void *)selected); } grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); @@ -572,13 +637,21 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + p->num_addresses = args->addresses->naddrs; + if (args->tokens != NULL) { + /* we need to copy because args contents aren't owned */ + p->lb_tokens = + gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses); + memcpy(p->lb_tokens, args->tokens, + sizeof(grpc_lb_policy_address_token) * p->num_addresses); + } + + p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses); + memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < p->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr_len = (size_t)args->addresses->addrs[i].len; @@ -593,12 +666,16 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; + if (p->lb_tokens != NULL) { + sd->lb_token = &p->lb_tokens[i]; + } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); } } if (subchannel_idx == 0) { + /* couldn't create any subchannel. Bail out */ gpr_free(p->subchannels); gpr_free(p); return NULL; |