diff options
author | David Garcia Quintas <dgq@google.com> | 2016-08-15 19:38:39 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-08-15 23:02:33 -0700 |
commit | 5b0e9462f0d40b171d50c03b29016c36588a3520 (patch) | |
tree | 9a64906b2c490688a6f95aa6c974fec2ce89d06f | |
parent | 8aace513d090679bd15ceb68eb4824cec3dc044e (diff) |
Introduced LB token initial metadata propagation
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 3 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.h | 2 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy_factory.h | 8 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel_call_holder.h | 2 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 33 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 105 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_test.cc | 13 |
7 files changed, 146 insertions, 20 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 8e653a8eee..c1c5e38cb0 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -402,7 +402,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel"); gpr_mu_unlock(&chand->mu_config); const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata, - initial_metadata_flags}; + initial_metadata_flags, + &calld->lb_token_mdelem}; r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel"); diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 6b9d30c6e5..6f133a2948 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -61,6 +61,8 @@ typedef struct grpc_lb_policy_pick_args { grpc_metadata_batch *initial_metadata; /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */ uint32_t initial_metadata_flags; + /** Storage for LB token in \a initial_metadata, or NULL if not used */ + grpc_linked_mdelem *lb_token_mdelem_storage; } grpc_lb_policy_pick_args; struct grpc_lb_policy_vtable { diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index 1c89b28b59..635efb6e6a 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -49,8 +49,16 @@ struct grpc_lb_policy_factory { const grpc_lb_policy_factory_vtable *vtable; }; +typedef struct grpc_lb_policy_address_token { + uint8_t *token; + size_t token_size; +} grpc_lb_policy_address_token; + typedef struct grpc_lb_policy_args { grpc_resolved_addresses *addresses; + /* It not NULL, array of load balancing tokens associated with \a addresses, + * on a 1:1 correspondence. Some indices may be NULL for missing tokens. */ + grpc_lb_policy_address_token *tokens; grpc_client_channel_factory *client_channel_factory; } grpc_lb_policy_args; diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h index 8d2deb02f3..40a0681a3b 100644 --- a/src/core/ext/client_config/subchannel_call_holder.h +++ b/src/core/ext/client_config/subchannel_call_holder.h @@ -81,6 +81,8 @@ typedef struct grpc_subchannel_call_holder { grpc_closure next_step; grpc_call_stack *owning_call; + + grpc_linked_mdelem lb_token_mdelem; } grpc_subchannel_call_holder; void grpc_subchannel_call_holder_init( diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 693145bd6c..3044f164c2 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -164,6 +164,9 @@ typedef struct pending_pick { /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* bitmask passed to pick() and used for selective cancelling. See * grpc_lb_policy_cancel_picks() */ uint32_t initial_metadata_flags; @@ -188,6 +191,7 @@ static void add_pending_pick(pending_pick **root, memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->pollent = pick_args->pollent; pp->target = target; pp->initial_metadata = pick_args->initial_metadata; @@ -294,7 +298,10 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); grpc_lb_policy_args args; + memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; + args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) * + serverlist->num_servers); args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); args.addresses->naddrs = serverlist->num_servers; args.addresses->addrs = @@ -309,6 +316,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); args.addresses->addrs[out_addrs_idx].len = sa_len; ++out_addrs_idx; + const size_t token_max_size = + GPR_ARRAY_SIZE(serverlist->servers[i]->load_balance_token); + serverlist->servers[i]->load_balance_token[token_max_size - 1] = '\0'; + args.tokens[i].token_size = + strlen(serverlist->servers[i]->load_balance_token); + args.tokens[i].token = gpr_malloc(args.tokens[i].token_size); + memcpy(args.tokens[i].token, serverlist->servers[i]->load_balance_token, + args.tokens[i].token_size); } else { gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", host_ports[i]); @@ -324,11 +339,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, gpr_free(host_ports); gpr_free(args.addresses->addrs); gpr_free(args.addresses); + gpr_free(args.tokens); return rr; } static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); GRPC_ERROR_REF(error); glb_policy->rr_policy = create_rr(exec_ctx, glb_policy->serverlist, glb_policy); @@ -359,7 +377,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, (intptr_t)glb_policy->rr_policy); } const grpc_lb_policy_pick_args pick_args = { - pp->pollent, pp->initial_metadata, pp->initial_metadata_flags}; + pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, + pp->lb_token_mdelem_storage}; grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; @@ -607,6 +626,18 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_closure *on_complete) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; + + if (pick_args->lb_token_mdelem_storage == NULL) { + /* TODO(dgq): should this be an assert? If storage is NULL, something has + * gone very wrong at the client channel filter */ + gpr_log(GPR_ERROR, + "No mdelem storage for the LB token. Load reporting won't work " + "without it. Failing"); + *target = NULL; + grpc_exec_ctx_sched(exec_ctx, on_complete, GRPC_ERROR_NONE, NULL); + return 1; + } + gpr_mu_lock(&glb_policy->mu); int r; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 49e724a0f2..8fda405fb8 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -66,6 +66,7 @@ #include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/static_metadata.h" typedef struct round_robin_lb_policy round_robin_lb_policy; @@ -76,15 +77,33 @@ int grpc_lb_round_robin_trace = 0; * Once a pick is available, \a target is updated and \a on_complete called. */ typedef struct pending_pick { struct pending_pick *next; + + /* polling entity for the pick()'s async notification */ grpc_polling_entity *pollent; + + /* the initial metadata for the pick. See grpc_lb_policy_pick() */ + grpc_metadata_batch *initial_metadata; + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + + /* bitmask passed to pick() and used for selective cancelling. See + * grpc_lb_policy_cancel_picks() */ uint32_t initial_metadata_flags; + + /* output argument where to store the pick()ed connected subchannel, or NULL + * upon error. */ grpc_connected_subchannel **target; + + /* to be invoked once the pick() has completed (regardless of success) */ grpc_closure *on_complete; } pending_pick; /** List of subchannels in a connectivity READY state */ typedef struct ready_list { grpc_subchannel *subchannel; + /* references namesake entry in subchannel_data */ + grpc_lb_policy_address_token *lb_token; struct ready_list *next; struct ready_list *prev; } ready_list; @@ -102,12 +121,19 @@ typedef struct { ready_list *ready_list_node; /** last observed connectivity */ grpc_connectivity_state connectivity_state; + /** the subchannel's target LB token */ + grpc_lb_policy_address_token *lb_token; } subchannel_data; struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; + /** total number of addresses received at creation time */ + size_t num_addresses; + /** load balancing tokens, one per incoming address */ + grpc_lb_policy_address_token *lb_tokens; + /** all our subchannels */ size_t num_subchannels; subchannel_data **subchannels; @@ -166,16 +192,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - p->ready_list_last_pick, p->ready_list_last_pick->subchannel); + (void *)p->ready_list_last_pick, + (void *)p->ready_list_last_pick->subchannel); } } /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *sc) { + subchannel_data *sd) { ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - new_elem->subchannel = sc; + memset(new_elem, 0, sizeof(ready_list)); + new_elem->subchannel = sd->subchannel; + new_elem->lb_token = sd->lb_token; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -189,7 +218,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, p->ready_list.prev = new_elem; } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc); + gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", + (void *)new_elem, (void *)sd->subchannel); } return new_elem; } @@ -217,7 +247,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, - node->subchannel); + (void *)node->subchannel); } node->next = NULL; @@ -251,6 +281,13 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(elem); elem = tmp; } + + if (p->lb_tokens != NULL) { + for (i = 0; i < p->num_addresses; i++) { + gpr_free(p->lb_tokens[i].token); + } + gpr_free(p->lb_tokens); + } gpr_free(p); } @@ -337,7 +374,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = 1; if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p, + gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p, p->num_subchannels); } @@ -360,6 +397,23 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static void initial_metadata_add_lb_token( + grpc_metadata_batch *initial_metadata, + grpc_linked_mdelem *lb_token_mdelem_storage, + grpc_lb_policy_address_token *lb_token) { + if (lb_token != NULL && lb_token->token_size > 0) { + GPR_ASSERT(lb_token->token != NULL); + grpc_mdstr *lb_token_mdstr = + grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size); + grpc_metadata_batch_add_tail( + initial_metadata, lb_token_mdelem_storage, + grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL, + lb_token_mdstr)); + } +} + static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, @@ -369,17 +423,22 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, ready_list *selected; gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { + /* readily available, report right away */ gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + selected->lb_token); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target, - selected); + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", + (void *)*target, (void *)selected); } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); return 1; } else { + /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking(exec_ctx, p); } @@ -390,7 +449,9 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->pollent = pick_args->pollent; pp->target = target; pp->on_complete = on_complete; + pp->initial_metadata = pick_args->initial_metadata; pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; p->pending_picks = pp; gpr_mu_unlock(&p->mu); return 0; @@ -419,7 +480,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); + sd->ready_list_node = add_connected_sc_locked(p, sd); /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ @@ -431,12 +492,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } while ((pp = p->pending_picks)) { p->pending_picks = pp->next; + + initial_metadata_add_lb_token(pp->initial_metadata, + pp->lb_token_mdelem_storage, + selected->lb_token); *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); + (void *)selected->subchannel, (void *)selected); } grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); @@ -572,13 +637,21 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + p->num_addresses = args->addresses->naddrs; + if (args->tokens != NULL) { + /* we need to copy because args contents aren't owned */ + p->lb_tokens = + gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses); + memcpy(p->lb_tokens, args->tokens, + sizeof(grpc_lb_policy_address_token) * p->num_addresses); + } + + p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses); + memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < p->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr_len = (size_t)args->addresses->addrs[i].len; @@ -593,12 +666,16 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; + if (p->lb_tokens != NULL) { + sd->lb_token = &p->lb_tokens[i]; + } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); } } if (subchannel_idx == 0) { + /* couldn't create any subchannel. Bail out */ gpr_free(p->subchannels); gpr_free(p); return NULL; diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index b2fdce2963..f83935fa1a 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -61,6 +61,7 @@ extern "C" { #include "src/proto/grpc/lb/v1/load_balancer.pb.h" #define NUM_BACKENDS 4 +#define PAYLOAD "hello you" // TODO(dgq): Other scenarios in need of testing: // - Send an empty serverlist update and verify that the client request blocks @@ -303,6 +304,12 @@ static void start_backend_server(server_fixture *sf) { return; } GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + char *expected_token; + GPR_ASSERT(gpr_asprintf(&expected_token, "token%d", sf->port) > 0); + GPR_ASSERT(contains_metadata(&request_metadata_recv, + "load-reporting-initial", expected_token)); + gpr_free(expected_token); + gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport); op = ops; @@ -321,8 +328,7 @@ static void start_backend_server(server_fixture *sf) { gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport); bool exit = false; - gpr_slice response_payload_slice = - gpr_slice_from_copied_string("hello you"); + gpr_slice response_payload_slice = gpr_slice_from_copied_string(PAYLOAD); while (!exit) { op = ops; op->op = GRPC_OP_RECV_MESSAGE; @@ -474,10 +480,9 @@ static void perform_request(client_fixture *cf) { error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - peer = grpc_call_get_peer(c); cq_expect_completion(cqv, tag(2), 1); cq_verify(cqv); - gpr_free(peer); + GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD)); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(response_payload_recv); |