aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2016-09-15 15:15:06 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2016-09-15 15:15:06 -0700
commit5bb81aa4a435ae95ac6b732fdf4ab1f84bd9e224 (patch)
treea03f27aa2da1af682bcf5b09ee0ec64a349a75a9 /src/core/ext/lb_policy
parentf7c45aea5367c1340b1c05b6011528f2f9e39d43 (diff)
parentb410f80bc4603fd749e2f9f81d5ab7d20990258d (diff)
Merge remote-tracking branch 'upstream/master' into call_holder_add_pollent
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c377
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c5
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h1
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c8
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h30
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c26
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c88
7 files changed, 380 insertions, 155 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index a9e329ed6d..b44ca1b6b2 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -76,9 +76,9 @@
* operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a
- * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
- * At this point we can transition to a new RR instance safely, which is done
- * once again via \a rr_handover().
+ * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ * state. At this point we can transition to a new RR instance safely, which
+ * is done once again via \a rr_handover().
*
*
* Once a RR policy instance is in place (and getting updated as described),
@@ -96,6 +96,8 @@
* - Implement LB service forwarding (point 2c. in the doc's diagram).
*/
+#include <errno.h>
+
#include <string.h>
#include <grpc/byte_buffer_reader.h>
@@ -109,18 +111,57 @@
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/static_metadata.h"
int grpc_lb_glb_trace = 0;
+static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
+ size_t num_addresses) {
+ /* free "resolved" addresses memblock */
+ gpr_free(lb_addresses->resolved_address);
+ for (size_t i = 0; i < num_addresses; ++i) {
+ if (lb_addresses[i].user_data != NULL) {
+ GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
+ }
+ }
+ gpr_free(lb_addresses);
+}
+
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static void initial_metadata_add_lb_token(
+ grpc_metadata_batch *initial_metadata,
+ grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
+ GPR_ASSERT(lb_token_mdelem_storage != NULL);
+ GPR_ASSERT(lb_token != NULL);
+ grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
typedef struct wrapped_rr_closure_arg {
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
+ /* the pick's initial metadata, kept in order to append the LB token for the
+ * pick */
+ grpc_metadata_batch *initial_metadata;
+
+ /* the picked target, used to determine which LB token to add to the pick's
+ * initial metadata */
+ grpc_connected_subchannel **target;
+
+ /* the LB token associated with the pick */
+ grpc_mdelem *lb_token;
+
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* The RR instance related to the closure */
grpc_lb_policy *rr_policy;
@@ -143,6 +184,11 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
+ wc_arg->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
+
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
gpr_free(wc_arg->owning_pending_node);
}
@@ -161,6 +207,9 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
@@ -178,8 +227,7 @@ typedef struct pending_pick {
} pending_pick;
static void add_pending_pick(pending_pick **root,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pending_pick *pp = gpr_malloc(sizeof(*pp));
@@ -187,9 +235,13 @@ static void add_pending_pick(pending_pick **root,
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
pp->target = target;
- pp->initial_metadata = initial_metadata;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata = pick_args->initial_metadata;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
+ pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
+ pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
+ pick_args->lb_token_mdelem_storage;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg);
*root = pp;
@@ -248,6 +300,12 @@ typedef struct glb_lb_policy {
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
+ /** total number of valid addresses received in \a serverlist */
+ size_t num_ok_serverlist_addresses;
+
+ /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
+ grpc_lb_address *lb_addresses;
+
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@@ -275,58 +333,142 @@ struct rr_connectivity_data {
glb_lb_policy *glb_policy;
};
-static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
- const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
- /* TODO(dgq): support mixed ip version */
- GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
- char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
- serverlist->servers[i]->port);
+static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
+ bool log) {
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+ if (server->port >> 16 != 0) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Invalid port '%d' at index %zu of serverlist. Ignoring.",
+ server->port, idx);
+ }
+ return false;
}
- size_t uri_path_len;
- char *concat_ipports = gpr_strjoin_sep(
- (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
+ if (ip->size != 4 && ip->size != 16) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Expected IP to be 4 or 16 bytes, got %d at index %zu of "
+ "serverlist. Ignoring",
+ ip->size, idx);
+ }
+ return false;
+ }
+ return true;
+}
- grpc_lb_policy_args args;
- args.client_channel_factory = glb_policy->cc_factory;
- args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
- args.addresses->naddrs = serverlist->num_servers;
- args.addresses->addrs =
- gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
- size_t out_addrs_idx = 0;
+/* populate \a addresses according to \a serverlist. Returns the number of
+ * addresses successfully parsed and added to \a addresses */
+static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
+ grpc_lb_address **lb_addresses) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- grpc_uri uri;
- struct sockaddr_storage sa;
- size_t sa_len;
- uri.path = host_ports[i];
- if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
- memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
- args.addresses->addrs[out_addrs_idx].len = sa_len;
- ++out_addrs_idx;
+ if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
+ }
+ if (num_valid == 0) {
+ return 0;
+ }
+
+ /* allocate the memory block for the "resolved" addresses. */
+ grpc_resolved_address *r_addrs_memblock =
+ gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
+ memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
+ grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
+ memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
+
+ /* second pass: actually populate the addresses and LB tokens (aka user data
+ * to the outside world) to be read by the RR policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ GPR_ASSERT(addr_idx < num_valid);
+ const grpc_grpclb_server *server = serverlist->servers[sl_idx];
+ if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
+
+ /* address processing */
+ const uint16_t netorder_port = htons((uint16_t)server->port);
+ /* the addresses are given in binary format (a in(6)_addr struct) in
+ * server->ip_address.bytes. */
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+
+ lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
+ struct sockaddr_storage *sa =
+ (struct sockaddr_storage *)lb_addr->resolved_address->addr;
+ size_t *sa_len = &lb_addr->resolved_address->len;
+ *sa_len = 0;
+ if (ip->size == 4) {
+ struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
+ *sa_len = sizeof(struct sockaddr_in);
+ memset(addr4, 0, *sa_len);
+ addr4->sin_family = AF_INET;
+ memcpy(&addr4->sin_addr, ip->bytes, ip->size);
+ addr4->sin_port = netorder_port;
+ } else if (ip->size == 16) {
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
+ *sa_len = sizeof(struct sockaddr_in6);
+ memset(addr6, 0, *sa_len);
+ addr6->sin6_family = AF_INET;
+ memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
+ addr6->sin6_port = netorder_port;
+ }
+ GPR_ASSERT(*sa_len > 0);
+
+ /* lb token processing */
+ if (server->has_load_balance_token) {
+ const size_t lb_token_size =
+ GPR_ARRAY_SIZE(server->load_balance_token) - 1;
+ grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
+ (uint8_t *)server->load_balance_token, lb_token_size);
+ lb_addr->user_data = grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
} else {
- gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
- host_ports[i]);
+ gpr_log(GPR_ERROR,
+ "Missing LB token for backend address '%s'. The empty token will "
+ "be used instead",
+ grpc_sockaddr_to_uri((struct sockaddr *)sa));
+ lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
}
+ ++addr_idx;
}
+ GPR_ASSERT(addr_idx == num_valid);
+ *lb_addresses = lb_addrs;
+ return num_valid;
+}
+
+static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
+ const grpc_grpclb_serverlist *serverlist,
+ glb_lb_policy *glb_policy) {
+ GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
+
+ grpc_lb_policy_args args;
+ memset(&args, 0, sizeof(args));
+ args.client_channel_factory = glb_policy->cc_factory;
+ const size_t num_ok_addresses =
+ process_serverlist(serverlist, &args.addresses);
+ args.num_addresses = num_ok_addresses;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- gpr_free(concat_ipports);
- for (size_t i = 0; i < serverlist->num_servers; i++) {
- gpr_free(host_ports[i]);
+ if (glb_policy->lb_addresses != NULL) {
+ /* dispose of the previous version */
+ lb_addrs_destroy(glb_policy->lb_addresses,
+ glb_policy->num_ok_serverlist_addresses);
}
- gpr_free(host_ports);
- gpr_free(args.addresses->addrs);
- gpr_free(args.addresses);
+ glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
+ glb_policy->lb_addresses = args.addresses;
+
return rr;
}
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_error *error) {
- GRPC_ERROR_REF(error);
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
@@ -344,8 +486,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
&glb_policy->rr_connectivity->on_change);
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- glb_policy->rr_connectivity->state, error,
- "rr_handover");
+ glb_policy->rr_connectivity->state,
+ GRPC_ERROR_REF(error), "rr_handover");
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
/* flush pending ops */
@@ -358,8 +500,11 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->initial_metadata,
- pp->initial_metadata_flags, pp->target,
+ const grpc_lb_policy_pick_args pick_args = {
+ pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
+ pp->lb_token_mdelem_storage};
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+ (void **)&pp->wrapped_on_complete_arg.lb_token,
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@@ -377,13 +522,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
&pping->wrapped_notify);
pping->wrapped_notify_arg.owning_pending_node = pping;
}
- GRPC_ERROR_UNREF(error);
}
-static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+
if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available ->
@@ -397,8 +542,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (error == GRPC_ERROR_NONE) {
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_conn_data->state, error,
- "rr_connectivity_changed");
+ rr_conn_data->state, GRPC_ERROR_REF(error),
+ "glb_rr_connectivity_changed");
/* resubscribe */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
@@ -407,7 +552,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(rr_conn_data);
}
}
- GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -417,31 +561,43 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
memset(glb_policy, 0, sizeof(*glb_policy));
/* All input addresses in args->addresses come from a resolver that claims
- * they are LB services. It's the resolver's responsibility to make sure this
+ * they are LB services. It's the resolver's responsibility to make sure
+ * this
* policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
- if (args->addresses->naddrs == 0) {
+ if (args->num_addresses == 0) {
return NULL;
}
- /* construct a target from the args->addresses, in the form
+ if (args->addresses[0].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
+ /* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
- addr_strs[0] =
- grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
- for (size_t i = 1; i < args->addresses->naddrs; i++) {
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_strs[i],
- (const struct sockaddr *)&args->addresses->addrs[i],
- true) == 0);
+ char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
+ addr_strs[0] = grpc_sockaddr_to_uri(
+ (const struct sockaddr *)&args->addresses[0].resolved_address->addr);
+ for (size_t i = 1; i < args->num_addresses; i++) {
+ if (args->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
+ GPR_ASSERT(
+ grpc_sockaddr_to_string(
+ &addr_strs[i],
+ (const struct sockaddr *)&args->addresses[i].resolved_address->addr,
+ true) == 0);
}
size_t uri_path_len;
char *target_uri_str = gpr_strjoin_sep(
- (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
+ (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@@ -449,7 +605,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < args->num_addresses; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
@@ -462,7 +618,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
- grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
+ grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
glb_policy->rr_connectivity = rr_connectivity;
@@ -485,6 +641,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
+
+ lb_addrs_destroy(glb_policy->lb_addresses,
+ glb_policy->num_ok_serverlist_addresses);
gpr_free(glb_policy);
}
@@ -543,7 +702,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -571,7 +729,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -598,11 +755,21 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+
+ if (pick_args->lb_token_mdelem_storage == NULL) {
+ *target = NULL;
+ grpc_exec_ctx_sched(
+ exec_ctx, on_complete,
+ GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
+ "won't work without it. Failing"),
+ NULL);
+ return 1;
+ }
+
gpr_mu_lock(&glb_policy->mu);
int r;
@@ -615,26 +782,34 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
glb_policy->wc_arg.wrapped_closure = on_complete;
+ glb_policy->wc_arg.lb_token_mdelem_storage =
+ pick_args->lb_token_mdelem_storage;
+ glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
+ glb_policy->wc_arg.owning_pending_node = NULL;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, initial_metadata,
- initial_metadata_flags, target,
+
+ r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
+ (void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
if (r != 0) {
- /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
- * policy and notify the original callback */
- glb_policy->wc_arg.wrapped_closure = NULL;
+ /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)glb_policy->wc_arg.rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
- grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
- GRPC_ERROR_NONE, NULL);
+
+ /* add the load reporting initial metadata */
+ initial_metadata_add_lb_token(
+ pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
}
} else {
- add_pending_pick(&glb_policy->pending_picks, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
+ glb_policy->base.interested_parties);
+ add_pending_pick(&glb_policy->pending_picks, pick_args, target,
+ on_complete);
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
@@ -694,9 +869,6 @@ typedef struct lb_client_data {
/* called once initial metadata's been sent */
grpc_closure md_sent;
- /* called once initial metadata's been received */
- grpc_closure md_rcvd;
-
/* called once the LoadBalanceRequest has been sent to the LB server. See
* src/proto/grpc/.../load_balancer.proto */
grpc_closure req_sent;
@@ -733,7 +905,6 @@ typedef struct lb_client_data {
} lb_client_data;
static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
@@ -748,7 +919,6 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
gpr_mu_init(&lb_client->mu);
grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
- grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
@@ -847,23 +1017,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
- op->flags = 0;
- op->reserved = NULL;
- op++;
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
- &lb_client->md_rcvd);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- lb_client_data *lb_client = arg;
- GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
- memset(ops, 0, sizeof(ops));
- grpc_op *op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = lb_client->request_payload;
@@ -878,11 +1031,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lb_client = arg;
+ GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
+ grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &lb_client->response_payload;
op->flags = 0;
@@ -901,8 +1061,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op *op = ops;
if (lb_client->response_payload != NULL) {
/* Received data from the LB server. Look inside
- * lb_client->response_payload, for
- * a serverlist. */
+ * lb_client->response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
@@ -939,7 +1098,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
- * rr_connectivity_changed) */
+ * glb_rr_connectivity_changed) */
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received");
}
@@ -1002,8 +1161,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
lb_client->status, lb_client->status_details,
lb_client->status_details_capacity);
}
- /* TODO(dgq): deal with stream termination properly (fire up another one? fail
- * the original call?) */
+ /* TODO(dgq): deal with stream termination properly (fire up another one?
+ * fail the original call?) */
}
/* Code wiring the policy with the rest of the core */
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index f4720a1345..a8881004a0 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -57,6 +57,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
if (dec_arg->first_pass) { /* count how many server do we have */
grpc_grpclb_server server;
if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->num_servers++;
@@ -69,6 +70,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->servers[dec_arg->decoding_idx++] = server;
@@ -118,6 +120,7 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
grpc_grpclb_response res;
memset(&res, 0, sizeof(grpc_grpclb_response));
if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
grpc_grpclb_initial_response *initial_res =
@@ -145,6 +148,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
arg.first_pass = true;
status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
@@ -152,6 +156,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
status =
pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 9726c87a37..c1e73d08ef 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -45,6 +45,7 @@ extern "C" {
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
+typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address;
typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef grpc_lb_v1_Server grpc_grpclb_server;
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 52e11c40bb..2676714175 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -31,10 +31,11 @@
*
*/
/* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -72,7 +73,7 @@ const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3] = {
};
const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
- PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
@@ -84,7 +85,7 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
};
const pb_field_t grpc_lb_v1_Server_fields[5] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
+ PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
@@ -116,3 +117,4 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
#endif
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 46fe588f72..4f1031ec7b 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -31,11 +31,12 @@
*
*/
/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
+#ifndef PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
+#define PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
#include "third_party/nanopb/pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -52,6 +53,7 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t client_rpc_errors;
bool has_dropped_requests;
int64_t dropped_requests;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
typedef struct _grpc_lb_v1_Duration {
@@ -59,22 +61,26 @@ typedef struct _grpc_lb_v1_Duration {
int64_t seconds;
bool has_nanos;
int32_t nanos;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Duration) */
} grpc_lb_v1_Duration;
typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
bool has_name;
char name[128];
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceRequest) */
} grpc_lb_v1_InitialLoadBalanceRequest;
+typedef PB_BYTES_ARRAY_T(16) grpc_lb_v1_Server_ip_address_t;
typedef struct _grpc_lb_v1_Server {
bool has_ip_address;
- char ip_address[46];
+ grpc_lb_v1_Server_ip_address_t ip_address;
bool has_port;
int32_t port;
bool has_load_balance_token;
- char load_balance_token[64];
+ char load_balance_token[65];
bool has_drop_request;
bool drop_request;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
@@ -82,6 +88,7 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
char load_balancer_delegate[64];
bool has_client_stats_report_interval;
grpc_lb_v1_Duration client_stats_report_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
typedef struct _grpc_lb_v1_LoadBalanceRequest {
@@ -89,12 +96,14 @@ typedef struct _grpc_lb_v1_LoadBalanceRequest {
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
bool has_client_stats;
grpc_lb_v1_ClientStats client_stats;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
} grpc_lb_v1_LoadBalanceRequest;
typedef struct _grpc_lb_v1_ServerList {
pb_callback_t servers;
bool has_expiration_interval;
grpc_lb_v1_Duration expiration_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
} grpc_lb_v1_ServerList;
typedef struct _grpc_lb_v1_LoadBalanceResponse {
@@ -102,6 +111,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
grpc_lb_v1_InitialLoadBalanceResponse initial_response;
bool has_server_list;
grpc_lb_v1_ServerList server_list;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceResponse) */
} grpc_lb_v1_LoadBalanceResponse;
/* Default values for struct fields */
@@ -114,7 +124,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
@@ -122,7 +132,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
#define grpc_lb_v1_ClientStats_total_requests_tag 1
@@ -135,7 +145,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
#define grpc_lb_v1_Server_drop_request_tag 4
-#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
+#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
@@ -161,7 +171,8 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#define grpc_lb_v1_ClientStats_size 33
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
-#define grpc_lb_v1_Server_size 127
+/* grpc_lb_v1_ServerList_size depends on runtime parameters */
+#define grpc_lb_v1_Server_size 98
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
@@ -174,5 +185,6 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#ifdef __cplusplus
} /* extern "C" */
#endif
+/* @@protoc_insertion_point(eof) */
#endif
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index a1a438c1df..201a2ea073 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -192,9 +192,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@@ -220,7 +219,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->target = target;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -430,20 +429,25 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->addresses->naddrs == 0) return NULL;
+ if (args->num_addresses == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < args->num_addresses; i++) {
+ if (args->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ sc_args.addr =
+ (struct sockaddr *)(args->addresses[i].resolved_address->addr);
+ sc_args.addr_len = (size_t)args->addresses[i].resolved_address->len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 9b557f423e..f8a375f0a8 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -66,6 +66,7 @@
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
@@ -76,14 +77,29 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
+
+ /* output argument where to store the pick()ed user_data. It'll be NULL if no
+ * such data is present or there's an error (the definite test for errors is
+ * \a target being NULL). */
+ void **user_data;
+
+ /* bitmask passed to pick() and used for selective cancelling. See
+ * grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
+
+ /* output argument where to store the pick()ed connected subchannel, or NULL
+ * upon error. */
grpc_connected_subchannel **target;
+
+ /* to be invoked once the pick() has completed (regardless of success) */
grpc_closure *on_complete;
} pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
+ /* references namesake entry in subchannel_data */
+ void *user_data;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@@ -101,12 +117,21 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
+ /** the subchannel's target user data */
+ void *user_data;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ /** total number of addresses received at creation time */
+ size_t num_addresses;
+ /** array holding the borrowed and opaque pointers to incoming user data, one
+ * per incoming address. These individual pointers will be returned as-is in
+ * successful picks. */
+ void **user_data_pointers;
+
/** all our subchannels */
size_t num_subchannels;
subchannel_data **subchannels;
@@ -165,16 +190,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
- p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
+ (void *)p->ready_list_last_pick,
+ (void *)p->ready_list_last_pick->subchannel);
}
}
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *sc) {
+ subchannel_data *sd) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = sc;
+ memset(new_elem, 0, sizeof(ready_list));
+ new_elem->subchannel = sd->subchannel;
+ new_elem->user_data = sd->user_data;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -188,7 +216,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
+ (void *)new_elem, (void *)sd->subchannel);
}
return new_elem;
}
@@ -215,8 +244,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
- node->subchannel);
+ gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
+ (void *)node->subchannel);
}
node->next = NULL;
@@ -228,9 +257,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- size_t i;
ready_list *elem;
- for (i = 0; i < p->num_subchannels; i++) {
+ for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
gpr_free(sd);
@@ -250,6 +278,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem);
elem = tmp;
}
+
+ gpr_free(p->user_data_pointers);
gpr_free(p);
}
@@ -332,7 +362,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
p->num_subchannels);
}
@@ -356,26 +386,28 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
+ /* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ *user_data = selected->user_data;
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
- selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
+ (void *)*target, (void *)selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
return 1;
} else {
+ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
@@ -383,7 +415,8 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->next = p->pending_picks;
pp->target = target;
pp->on_complete = on_complete;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->user_data = user_data;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -412,7 +445,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -424,12 +457,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
+
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ *pp->user_data = selected->user_data;
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- selected->subchannel, selected);
+ (void *)selected->subchannel, (void *)selected);
}
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
@@ -559,20 +594,25 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
+ if (args->num_addresses == 0) return NULL;
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->num_addresses = args->num_addresses;
+ p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
+ p->user_data_pointers = gpr_malloc(sizeof(void *) * p->num_addresses);
+ memset(p->user_data_pointers, 0, sizeof(void *) * p->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ sc_args.addr = (struct sockaddr *)args->addresses[i].resolved_address->addr;
+ sc_args.addr_len = args->addresses[i].resolved_address->len;
+
+ p->user_data_pointers[i] = args->addresses[i].user_data;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@@ -584,12 +624,14 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ sd->user_data = p->user_data_pointers[i];
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
+ /* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;