aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy/pick_first/pick_first.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c')
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c61
1 files changed, 41 insertions, 20 deletions
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70692..961a0c9b19 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target) {
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -140,8 +141,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -150,11 +152,13 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -166,8 +170,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -176,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
@@ -199,10 +205,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@@ -225,13 +229,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -443,20 +447,37 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->addresses->naddrs == 0) return NULL;
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ /* Skip balancer addresses, since we only know how to handle backends. */
+ if (args->addresses->addresses[i].is_balancer) continue;
+
+ if (args->addresses->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ /* server_name will be copied as part of the subchannel creation. This makes
+ * the copying of args->server_name (a borrowed pointer) OK. */
+ sc_args.server_name = args->server_name;
+ sc_args.addr =
+ (struct sockaddr *)(&args->addresses->addresses[i].address.addr);
+ sc_args.addr_len = args->addresses->addresses[i].address.len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);