aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-08-15 19:38:39 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-08-15 23:02:33 -0700
commit5b0e9462f0d40b171d50c03b29016c36588a3520 (patch)
tree9a64906b2c490688a6f95aa6c974fec2ce89d06f
parent8aace513d090679bd15ceb68eb4824cec3dc044e (diff)
Introduced LB token initial metadata propagation
-rw-r--r--src/core/ext/client_config/client_channel.c3
-rw-r--r--src/core/ext/client_config/lb_policy.h2
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h8
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.h2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c33
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c105
-rw-r--r--test/cpp/grpclb/grpclb_test.cc13
7 files changed, 146 insertions, 20 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 8e653a8eee..c1c5e38cb0 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -402,7 +402,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
- initial_metadata_flags};
+ initial_metadata_flags,
+ &calld->lb_token_mdelem};
r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 6b9d30c6e5..6f133a2948 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -61,6 +61,8 @@ typedef struct grpc_lb_policy_pick_args {
grpc_metadata_batch *initial_metadata;
/** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
uint32_t initial_metadata_flags;
+ /** Storage for LB token in \a initial_metadata, or NULL if not used */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
} grpc_lb_policy_pick_args;
struct grpc_lb_policy_vtable {
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index 1c89b28b59..635efb6e6a 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -49,8 +49,16 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable;
};
+typedef struct grpc_lb_policy_address_token {
+ uint8_t *token;
+ size_t token_size;
+} grpc_lb_policy_address_token;
+
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
+ /* It not NULL, array of load balancing tokens associated with \a addresses,
+ * on a 1:1 correspondence. Some indices may be NULL for missing tokens. */
+ grpc_lb_policy_address_token *tokens;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 8d2deb02f3..40a0681a3b 100644
--- a/src/core/ext/client_config/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -81,6 +81,8 @@ typedef struct grpc_subchannel_call_holder {
grpc_closure next_step;
grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
} grpc_subchannel_call_holder;
void grpc_subchannel_call_holder_init(
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 693145bd6c..3044f164c2 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -164,6 +164,9 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
@@ -188,6 +191,7 @@ static void add_pending_pick(pending_pick **root,
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->pollent = pick_args->pollent;
pp->target = target;
pp->initial_metadata = pick_args->initial_metadata;
@@ -294,7 +298,10 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
(const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
grpc_lb_policy_args args;
+ memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
+ args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) *
+ serverlist->num_servers);
args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
args.addresses->naddrs = serverlist->num_servers;
args.addresses->addrs =
@@ -309,6 +316,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
args.addresses->addrs[out_addrs_idx].len = sa_len;
++out_addrs_idx;
+ const size_t token_max_size =
+ GPR_ARRAY_SIZE(serverlist->servers[i]->load_balance_token);
+ serverlist->servers[i]->load_balance_token[token_max_size - 1] = '\0';
+ args.tokens[i].token_size =
+ strlen(serverlist->servers[i]->load_balance_token);
+ args.tokens[i].token = gpr_malloc(args.tokens[i].token_size);
+ memcpy(args.tokens[i].token, serverlist->servers[i]->load_balance_token,
+ args.tokens[i].token_size);
} else {
gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
host_ports[i]);
@@ -324,11 +339,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
gpr_free(host_ports);
gpr_free(args.addresses->addrs);
gpr_free(args.addresses);
+ gpr_free(args.tokens);
return rr;
}
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_error *error) {
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
GRPC_ERROR_REF(error);
glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
@@ -359,7 +377,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
(intptr_t)glb_policy->rr_policy);
}
const grpc_lb_policy_pick_args pick_args = {
- pp->pollent, pp->initial_metadata, pp->initial_metadata_flags};
+ pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
+ pp->lb_token_mdelem_storage};
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
@@ -607,6 +626,18 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+
+ if (pick_args->lb_token_mdelem_storage == NULL) {
+ /* TODO(dgq): should this be an assert? If storage is NULL, something has
+ * gone very wrong at the client channel filter */
+ gpr_log(GPR_ERROR,
+ "No mdelem storage for the LB token. Load reporting won't work "
+ "without it. Failing");
+ *target = NULL;
+ grpc_exec_ctx_sched(exec_ctx, on_complete, GRPC_ERROR_NONE, NULL);
+ return 1;
+ }
+
gpr_mu_lock(&glb_policy->mu);
int r;
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 49e724a0f2..8fda405fb8 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -66,6 +66,7 @@
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
@@ -76,15 +77,33 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
+
+ /* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
+
+ /* the initial metadata for the pick. See grpc_lb_policy_pick() */
+ grpc_metadata_batch *initial_metadata;
+
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
+ /* bitmask passed to pick() and used for selective cancelling. See
+ * grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
+
+ /* output argument where to store the pick()ed connected subchannel, or NULL
+ * upon error. */
grpc_connected_subchannel **target;
+
+ /* to be invoked once the pick() has completed (regardless of success) */
grpc_closure *on_complete;
} pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
+ /* references namesake entry in subchannel_data */
+ grpc_lb_policy_address_token *lb_token;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@@ -102,12 +121,19 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
+ /** the subchannel's target LB token */
+ grpc_lb_policy_address_token *lb_token;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ /** total number of addresses received at creation time */
+ size_t num_addresses;
+ /** load balancing tokens, one per incoming address */
+ grpc_lb_policy_address_token *lb_tokens;
+
/** all our subchannels */
size_t num_subchannels;
subchannel_data **subchannels;
@@ -166,16 +192,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
- p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
+ (void *)p->ready_list_last_pick,
+ (void *)p->ready_list_last_pick->subchannel);
}
}
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *sc) {
+ subchannel_data *sd) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = sc;
+ memset(new_elem, 0, sizeof(ready_list));
+ new_elem->subchannel = sd->subchannel;
+ new_elem->lb_token = sd->lb_token;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -189,7 +218,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
+ (void *)new_elem, (void *)sd->subchannel);
}
return new_elem;
}
@@ -217,7 +247,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
- node->subchannel);
+ (void *)node->subchannel);
}
node->next = NULL;
@@ -251,6 +281,13 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem);
elem = tmp;
}
+
+ if (p->lb_tokens != NULL) {
+ for (i = 0; i < p->num_addresses; i++) {
+ gpr_free(p->lb_tokens[i].token);
+ }
+ gpr_free(p->lb_tokens);
+ }
gpr_free(p);
}
@@ -337,7 +374,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
p->num_subchannels);
}
@@ -360,6 +397,23 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu);
}
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static void initial_metadata_add_lb_token(
+ grpc_metadata_batch *initial_metadata,
+ grpc_linked_mdelem *lb_token_mdelem_storage,
+ grpc_lb_policy_address_token *lb_token) {
+ if (lb_token != NULL && lb_token->token_size > 0) {
+ GPR_ASSERT(lb_token->token != NULL);
+ grpc_mdstr *lb_token_mdstr =
+ grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size);
+ grpc_metadata_batch_add_tail(
+ initial_metadata, lb_token_mdelem_storage,
+ grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL,
+ lb_token_mdstr));
+ }
+}
+
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
@@ -369,17 +423,22 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
+ /* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ initial_metadata_add_lb_token(pick_args->initial_metadata,
+ pick_args->lb_token_mdelem_storage,
+ selected->lb_token);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
- selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
+ (void *)*target, (void *)selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
return 1;
} else {
+ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
@@ -390,7 +449,9 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
+ pp->initial_metadata = pick_args->initial_metadata;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -419,7 +480,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -431,12 +492,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
+
+ initial_metadata_add_lb_token(pp->initial_metadata,
+ pp->lb_token_mdelem_storage,
+ selected->lb_token);
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- selected->subchannel, selected);
+ (void *)selected->subchannel, (void *)selected);
}
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
@@ -572,13 +637,21 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->num_addresses = args->addresses->naddrs;
+ if (args->tokens != NULL) {
+ /* we need to copy because args contents aren't owned */
+ p->lb_tokens =
+ gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+ memcpy(p->lb_tokens, args->tokens,
+ sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+ }
+
+ p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
@@ -593,12 +666,16 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ if (p->lb_tokens != NULL) {
+ sd->lb_token = &p->lb_tokens[i];
+ }
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
+ /* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index b2fdce2963..f83935fa1a 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -61,6 +61,7 @@ extern "C" {
#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
#define NUM_BACKENDS 4
+#define PAYLOAD "hello you"
// TODO(dgq): Other scenarios in need of testing:
// - Send an empty serverlist update and verify that the client request blocks
@@ -303,6 +304,12 @@ static void start_backend_server(server_fixture *sf) {
return;
}
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
+ char *expected_token;
+ GPR_ASSERT(gpr_asprintf(&expected_token, "token%d", sf->port) > 0);
+ GPR_ASSERT(contains_metadata(&request_metadata_recv,
+ "load-reporting-initial", expected_token));
+ gpr_free(expected_token);
+
gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
op = ops;
@@ -321,8 +328,7 @@ static void start_backend_server(server_fixture *sf) {
gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
bool exit = false;
- gpr_slice response_payload_slice =
- gpr_slice_from_copied_string("hello you");
+ gpr_slice response_payload_slice = gpr_slice_from_copied_string(PAYLOAD);
while (!exit) {
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
@@ -474,10 +480,9 @@ static void perform_request(client_fixture *cf) {
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
- peer = grpc_call_get_peer(c);
cq_expect_completion(cqv, tag(2), 1);
cq_verify(cqv);
- gpr_free(peer);
+ GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload_recv);