aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h1
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c93
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c19
3 files changed, 68 insertions, 45 deletions
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index e1d67633b4..d2d1a0f16e 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -65,7 +65,6 @@ typedef struct grpc_lb_policy_user_data_vtable {
typedef struct grpc_lb_policy_args {
grpc_lb_address *lb_addresses;
size_t num_addresses;
- grpc_lb_policy_user_data_vtable user_data_vtable;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 7fdc97dc3d..9a176ba0d1 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -185,7 +185,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
- wc_arg->lb_token);
+ user_data_copy(wc_arg->lb_token));
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
gpr_free(wc_arg->owning_pending_node);
@@ -302,6 +302,12 @@ typedef struct glb_lb_policy {
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
+ /** total number of valid addresses received in \a serverlist */
+ size_t num_ok_serverlist_addresses;
+
+ /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
+ grpc_lb_address *lb_addresses;
+
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@@ -329,32 +335,39 @@ struct rr_connectivity_data {
glb_lb_policy *glb_policy;
};
-/* populate \a addresses according to \a serverlist. Returns the number of
- * addresses successfully parsed and added to \a addresses */
-static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
- grpc_lb_address **lb_addresses) {
- size_t num_valid = 0;
- /* first pass: count how many are valid in order to allocate the necessary
- * memory in a single block */
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- const grpc_grpclb_server *server = serverlist->servers[i];
- const grpc_grpclb_ip_address *ip = &server->ip_address;
-
- if (server->port >> 16 != 0) {
+static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
+ bool log) {
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+ if (server->port >> 16 != 0) {
+ if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %zu of serverlist. Ignoring.",
- server->port, i);
- continue;
+ server->port, idx);
}
+ return false;
+ }
- if (ip->size != 4 && ip->size != 16) {
+ if (ip->size != 4 && ip->size != 16) {
+ if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %zu of "
"serverlist. Ignoring",
- ip->size, i);
- continue;
+ ip->size, idx);
}
- ++num_valid;
+ return false;
+ }
+ return true;
+}
+
+/* populate \a addresses according to \a serverlist. Returns the number of
+ * addresses successfully parsed and added to \a addresses */
+static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
+ grpc_lb_address **lb_addresses) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
}
if (num_valid == 0) {
return 0;
@@ -368,9 +381,14 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
/* second pass: actually populate the addresses and LB tokens (aka user data
- * to the outside world) to be read by the RR policy during its creation */
+ * to the outside world) to be read by the RR policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t num_processed = 0;
for (size_t i = 0; i < num_valid; ++i) {
const grpc_grpclb_server *server = serverlist->servers[i];
+ if (!is_server_valid(serverlist->servers[i], i, false)) continue;
grpc_lb_address *const lb_addr = &lb_addrs[i];
/* lb token processing */
@@ -410,7 +428,9 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
addr6->sin6_port = netorder_port;
}
GPR_ASSERT(*sa_len > 0);
+ ++num_processed;
}
+ GPR_ASSERT(num_processed == num_valid);
*lb_addresses = lb_addrs;
return num_valid;
}
@@ -423,19 +443,27 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
- args.num_addresses = process_serverlist(serverlist, &args.lb_addresses);
- args.user_data_vtable.copy = user_data_copy;
- args.user_data_vtable.destroy = user_data_destroy;
+ const size_t num_ok_addresses =
+ process_serverlist(serverlist, &args.lb_addresses);
+ args.num_addresses = num_ok_addresses;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
+
+ glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
+ if (glb_policy->lb_addresses != NULL) {
+ /* dispose of the previous version */
+ for (size_t i = 0; i < num_ok_addresses; ++i) {
+ user_data_destroy(glb_policy->lb_addresses[i].user_data);
+ }
+ gpr_free(glb_policy->lb_addresses);
+ }
+
+ glb_policy->lb_addresses = args.lb_addresses;
+
if (args.num_addresses > 0) {
/* free "resolved" addresses memblock */
gpr_free(args.lb_addresses->resolved_address);
}
- for (size_t i = 0; i < args.num_addresses; ++i) {
- args.user_data_vtable.destroy(args.lb_addresses[i].user_data);
- }
- gpr_free(args.lb_addresses);
return rr;
}
@@ -601,6 +629,11 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
+
+ for (size_t i = 0; i < glb_policy->num_ok_serverlist_addresses; ++i) {
+ user_data_destroy(glb_policy->lb_addresses[i].user_data);
+ }
+ gpr_free(glb_policy->lb_addresses);
gpr_free(glb_policy);
}
@@ -762,9 +795,9 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
/* add the load reporting initial metadata */
- initial_metadata_add_lb_token(pick_args->initial_metadata,
- pick_args->lb_token_mdelem_storage,
- glb_policy->wc_arg.lb_token);
+ initial_metadata_add_lb_token(
+ pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
+ user_data_copy(glb_policy->wc_arg.lb_token));
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 2069dc192c..4d89cef9b6 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -81,9 +81,6 @@ typedef struct pending_pick {
/* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
- /* the initial metadata for the pick. See grpc_lb_policy_pick() */
- grpc_metadata_batch *initial_metadata;
-
/* output argument where to store the pick()ed user_data. It'll be NULL if no
* such data is present or there's an error (the definite test for errors is
* \a target being NULL). */
@@ -133,10 +130,9 @@ struct round_robin_lb_policy {
/** total number of addresses received at creation time */
size_t num_addresses;
- /** user data, one per incoming address */
+ /** user data, one per incoming address. This pointer is borrowed and opaque.
+ * It'll be returned as-is in successful picks. */
void **user_data;
- /** functions to operate over \a user_data elements */
- grpc_lb_policy_user_data_vtable user_data_vtable;
/** all our subchannels */
size_t num_subchannels;
@@ -285,9 +281,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
elem = tmp;
}
- for (size_t i = 0; i < p->num_addresses; i++) {
- p->user_data_vtable.destroy(p->user_data[i]);
- }
gpr_free(p->user_data);
gpr_free(p);
}
@@ -410,7 +403,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
- *user_data = p->user_data_vtable.copy(selected->user_data);
+ *user_data = selected->user_data;
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
@@ -431,7 +424,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
- pp->initial_metadata = pick_args->initial_metadata;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->user_data = user_data;
p->pending_picks = pp;
@@ -477,7 +469,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
- *pp->user_data = p->user_data_vtable.copy(selected->user_data);
+ *pp->user_data = selected->user_data;
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
@@ -623,7 +615,6 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
p->user_data = gpr_malloc(sizeof(void *) * p->num_addresses);
memset(p->user_data, 0, sizeof(void *) * p->num_addresses);
- p->user_data_vtable = args->user_data_vtable;
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
@@ -633,7 +624,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
(struct sockaddr *)args->lb_addresses[i].resolved_address->addr;
sc_args.addr_len = args->lb_addresses[i].resolved_address->len;
- p->user_data[i] = p->user_data_vtable.copy(args->lb_addresses[i].user_data);
+ p->user_data[i] = args->lb_addresses[i].user_data;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);