aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-09-14 12:59:17 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-09-14 12:59:17 -0700
commitf47d6fbdccd6e3c64b0ff70b6a63236819886540 (patch)
tree4d6c4cc911cb01b8de5d8e99b59e0c0c9ed1a1e7 /src/core/ext/lb_policy
parent35c2aba849e0f6852e3538fc488a6a2afec81c09 (diff)
More PR comments
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c107
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c9
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c9
3 files changed, 71 insertions, 54 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 9a176ba0d1..1150451116 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -125,9 +125,16 @@ static void *user_data_copy(void *user_data) {
return GRPC_MDELEM_REF(user_data);
}
-static void user_data_destroy(void *user_data) {
- if (user_data == NULL) return;
- GRPC_MDELEM_UNREF(user_data);
+static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
+ size_t num_addresses) {
+ /* free "resolved" addresses memblock */
+ gpr_free(lb_addresses->resolved_address);
+ for (size_t i = 0; i < num_addresses; ++i) {
+ if (lb_addresses[i].user_data != NULL) {
+ GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
+ }
+ }
+ gpr_free(lb_addresses);
}
/* add lb_token of selected subchannel (address) to the call's initial
@@ -385,21 +392,12 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
* Given that the validity tests are very cheap, they are performed again
* instead of marking the valid ones during the first pass, as this would
* incurr in an allocation due to the arbitrary number of server */
- size_t num_processed = 0;
- for (size_t i = 0; i < num_valid; ++i) {
- const grpc_grpclb_server *server = serverlist->servers[i];
- if (!is_server_valid(serverlist->servers[i], i, false)) continue;
- grpc_lb_address *const lb_addr = &lb_addrs[i];
-
- /* lb token processing */
- if (server->has_load_balance_token) {
- const size_t lb_token_size =
- GPR_ARRAY_SIZE(server->load_balance_token) - 1;
- grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
- (uint8_t *)server->load_balance_token, lb_token_size);
- lb_addr->user_data = grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
- }
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ GPR_ASSERT(addr_idx < num_valid);
+ const grpc_grpclb_server *server = serverlist->servers[sl_idx];
+ if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
/* address processing */
const uint16_t netorder_port = htons((uint16_t)server->port);
@@ -407,7 +405,7 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
* server->ip_address.bytes. */
const grpc_grpclb_ip_address *ip = &server->ip_address;
- lb_addr->resolved_address = &r_addrs_memblock[i];
+ lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
struct sockaddr_storage *sa =
(struct sockaddr_storage *)lb_addr->resolved_address->addr;
size_t *sa_len = &lb_addr->resolved_address->len;
@@ -428,9 +426,25 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
addr6->sin6_port = netorder_port;
}
GPR_ASSERT(*sa_len > 0);
- ++num_processed;
+
+ /* lb token processing */
+ if (server->has_load_balance_token) {
+ const size_t lb_token_size =
+ GPR_ARRAY_SIZE(server->load_balance_token) - 1;
+ grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
+ (uint8_t *)server->load_balance_token, lb_token_size);
+ lb_addr->user_data = grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
+ } else {
+ gpr_log(GPR_ERROR,
+ "Missing LB token for backend address '%s'. The empty token will "
+ "be used instead",
+ grpc_sockaddr_to_uri((struct sockaddr *)sa));
+ lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
+ }
+ ++addr_idx;
}
- GPR_ASSERT(num_processed == num_valid);
+ GPR_ASSERT(addr_idx == num_valid);
*lb_addresses = lb_addrs;
return num_valid;
}
@@ -444,26 +458,19 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
const size_t num_ok_addresses =
- process_serverlist(serverlist, &args.lb_addresses);
+ process_serverlist(serverlist, &args.addresses);
args.num_addresses = num_ok_addresses;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
if (glb_policy->lb_addresses != NULL) {
/* dispose of the previous version */
- for (size_t i = 0; i < num_ok_addresses; ++i) {
- user_data_destroy(glb_policy->lb_addresses[i].user_data);
- }
- gpr_free(glb_policy->lb_addresses);
+ lb_addrs_destroy(glb_policy->lb_addresses,
+ glb_policy->num_ok_serverlist_addresses);
}
+ glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
+ glb_policy->lb_addresses = args.addresses;
- glb_policy->lb_addresses = args.lb_addresses;
-
- if (args.num_addresses > 0) {
- /* free "resolved" addresses memblock */
- gpr_free(args.lb_addresses->resolved_address);
- }
return rr;
}
@@ -560,7 +567,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
memset(glb_policy, 0, sizeof(*glb_policy));
/* All input addresses in args->addresses come from a resolver that claims
- * they are LB services. It's the resolver's responsibility to make sure this
+ * they are LB services. It's the resolver's responsibility to make sure
+ * this
* policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
@@ -570,18 +578,24 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
return NULL;
}
+ /* this LB policy doesn't support \a user_data */
+ GPR_ASSERT(args->addresses[0].user_data == NULL);
+
/* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
addr_strs[0] = grpc_sockaddr_to_uri(
- (const struct sockaddr *)&args->lb_addresses[0].resolved_address->addr);
+ (const struct sockaddr *)&args->addresses[0].resolved_address->addr);
for (size_t i = 1; i < args->num_addresses; i++) {
+ /* this LB policy doesn't support \a user_data */
+ GPR_ASSERT(args->addresses[i].user_data == NULL);
+
GPR_ASSERT(
- grpc_sockaddr_to_string(&addr_strs[i],
- (const struct sockaddr *)&args->lb_addresses[i]
- .resolved_address->addr,
- true) == 0);
+ grpc_sockaddr_to_string(
+ &addr_strs[i],
+ (const struct sockaddr *)&args->addresses[i].resolved_address->addr,
+ true) == 0);
}
size_t uri_path_len;
char *target_uri_str = gpr_strjoin_sep(
@@ -630,10 +644,8 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
gpr_mu_destroy(&glb_policy->mu);
- for (size_t i = 0; i < glb_policy->num_ok_serverlist_addresses; ++i) {
- user_data_destroy(glb_policy->lb_addresses[i].user_data);
- }
- gpr_free(glb_policy->lb_addresses);
+ lb_addrs_destroy(glb_policy->lb_addresses,
+ glb_policy->num_ok_serverlist_addresses);
gpr_free(glb_policy);
}
@@ -882,7 +894,8 @@ typedef struct lb_client_data {
grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */
grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */
- /* what's being sent to the LB server. Note that its value may vary if the LB
+ /* what's being sent to the LB server. Note that its value may vary if the
+ * LB
* server indicates a redirect. */
grpc_byte_buffer *request_payload;
@@ -1090,7 +1103,8 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
* it'll just create the first RR policy instance */
rr_handover(exec_ctx, lb_client->glb_policy, error);
} else {
- /* unref the RR policy, eventually leading to its substitution with a
+ /* unref the RR policy, eventually leading to its substitution with
+ * a
* new one constructed from the received serverlist (see
* glb_rr_connectivity_changed) */
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
@@ -1155,7 +1169,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
lb_client->status, lb_client->status_details,
lb_client->status_details_capacity);
}
- /* TODO(dgq): deal with stream termination properly (fire up another one? fail
+ /* TODO(dgq): deal with stream termination properly (fire up another one?
+ * fail
* the original call?) */
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 21d948033a..d907ddd5d1 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -438,7 +438,7 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- GPR_ASSERT(args->lb_addresses != NULL);
+ GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
if (args->num_addresses == 0) return NULL;
@@ -451,10 +451,13 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->num_addresses; i++) {
+ /* this LB policy doesn't support \a user_data */
+ GPR_ASSERT(args->addresses[i].user_data == NULL);
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr =
- (struct sockaddr *)(args->lb_addresses[i].resolved_address->addr);
- sc_args.addr_len = (size_t)args->lb_addresses[i].resolved_address->len;
+ (struct sockaddr *)(args->addresses[i].resolved_address->addr);
+ sc_args.addr_len = (size_t)args->addresses[i].resolved_address->len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 4d89cef9b6..1351ff0277 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -603,7 +603,7 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- GPR_ASSERT(args->lb_addresses != NULL);
+ GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
if (args->num_addresses == 0) return NULL;
@@ -620,11 +620,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
size_t subchannel_idx = 0;
for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr =
- (struct sockaddr *)args->lb_addresses[i].resolved_address->addr;
- sc_args.addr_len = args->lb_addresses[i].resolved_address->len;
+ sc_args.addr = (struct sockaddr *)args->addresses[i].resolved_address->addr;
+ sc_args.addr_len = args->addresses[i].resolved_address->len;
- p->user_data[i] = args->lb_addresses[i].user_data;
+ p->user_data[i] = args->addresses[i].user_data;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);