diff options
Diffstat (limited to 'src/core/ext/lb_policy/pick_first/pick_first.c')
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 61 |
1 files changed, 41 insertions, 20 deletions
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9decf70692..961a0c9b19 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -140,8 +141,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -150,11 +152,13 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -166,8 +170,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -176,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { @@ -199,10 +205,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; @@ -225,13 +229,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pollent; + pp->pollent = pick_args->pollent; pp->target = target; - pp->initial_metadata_flags = initial_metadata_flags; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); @@ -443,20 +447,37 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->addresses->naddrs == 0) return NULL; + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (!args->addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + /* Skip balancer addresses, since we only know how to handle backends. */ + if (args->addresses->addresses[i].is_balancer) continue; + + if (args->addresses->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + /* server_name will be copied as part of the subchannel creation. This makes + * the copying of args->server_name (a borrowed pointer) OK. */ + sc_args.server_name = args->server_name; + sc_args.addr = + (struct sockaddr *)(&args->addresses->addresses[i].address.addr); + sc_args.addr_len = args->addresses->addresses[i].address.len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); |