diff options
Diffstat (limited to 'src/core/ext/lb_policy/grpclb/grpclb.c')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 423 |
1 files changed, 292 insertions, 131 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index af913d8a9d..bdfe65a62a 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -76,9 +76,9 @@ * operations in progress over the old RR instance. This is done by * decreasing the reference count on the old policy. The moment no more * references are held on the old RR policy, it'll be destroyed and \a - * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state. - * At this point we can transition to a new RR instance safely, which is done - * once again via \a rr_handover(). + * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN + * state. At this point we can transition to a new RR instance safely, which + * is done once again via \a rr_handover(). * * * Once a RR policy instance is in place (and getting updated as described), @@ -96,6 +96,8 @@ * - Implement LB service forwarding (point 2c. in the doc's diagram). */ +#include <errno.h> + #include <string.h> #include <grpc/byte_buffer_reader.h> @@ -105,22 +107,50 @@ #include <grpc/support/string_util.h> #include "src/core/ext/client_config/client_channel_factory.h" +#include "src/core/ext/client_config/lb_policy_factory.h" #include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/parse_address.h" #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/static_metadata.h" int grpc_lb_glb_trace = 0; +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static void initial_metadata_add_lb_token( + grpc_metadata_batch *initial_metadata, + grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != NULL); + GPR_ASSERT(lb_token != NULL); + grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + typedef struct wrapped_rr_closure_arg { /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; + /* the pick's initial metadata, kept in order to append the LB token for the + * pick */ + grpc_metadata_batch *initial_metadata; + + /* the picked target, used to determine which LB token to add to the pick's + * initial metadata */ + grpc_connected_subchannel **target; + + /* the LB token associated with the pick */ + grpc_mdelem *lb_token; + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; @@ -141,9 +171,20 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + + /* if target is NULL, no pick has been made by the RR policy (eg, all + * addresses failed to connect). There won't be any user_data/token + * available */ + if (wc_arg->target != NULL) { + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + } } GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL); + + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), + NULL); gpr_free(wc_arg->owning_pending_node); } @@ -164,6 +205,9 @@ typedef struct pending_pick { /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* bitmask passed to pick() and used for selective cancelling. See * grpc_lb_policy_cancel_picks() */ uint32_t initial_metadata_flags; @@ -180,20 +224,24 @@ typedef struct pending_pick { wrapped_rr_closure_arg wrapped_on_complete_arg; } pending_pick; -static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, +static void add_pending_pick(pending_pick **root, + const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_closure *on_complete) { pending_pick *pp = gpr_malloc(sizeof(*pp)); memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->pollent = pollent; + pp->pollent = pick_args->pollent; pp->target = target; - pp->initial_metadata = initial_metadata; - pp->initial_metadata_flags = initial_metadata_flags; + pp->initial_metadata = pick_args->initial_metadata; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; + pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; + pp->wrapped_on_complete_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; @@ -235,6 +283,7 @@ typedef struct glb_lb_policy { /** mutex protecting remaining members */ gpr_mu mu; + const char *server_name; grpc_client_channel_factory *cc_factory; /** for communicating with the LB server */ @@ -252,6 +301,9 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; + /** addresses from \a serverlist */ + grpc_lb_addresses *addresses; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -279,58 +331,133 @@ struct rr_connectivity_data { glb_lb_policy *glb_policy; }; -static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, - const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { - /* TODO(dgq): support mixed ip version */ - GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address, - serverlist->servers[i]->port); +static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, + bool log) { + const grpc_grpclb_ip_address *ip = &server->ip_address; + if (server->port >> 16 != 0) { + if (log) { + gpr_log(GPR_ERROR, + "Invalid port '%d' at index %zu of serverlist. Ignoring.", + server->port, idx); + } + return false; } - size_t uri_path_len; - char *concat_ipports = gpr_strjoin_sep( - (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); + if (ip->size != 4 && ip->size != 16) { + if (log) { + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes, got %d at index %zu of " + "serverlist. Ignoring", + ip->size, idx); + } + return false; + } + return true; +} - grpc_lb_policy_args args; - args.client_channel_factory = glb_policy->cc_factory; - args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - args.addresses->naddrs = serverlist->num_servers; - args.addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs); - size_t out_addrs_idx = 0; +/* Returns addresses extracted from \a serverlist. */ +static grpc_lb_addresses *process_serverlist( + const grpc_grpclb_serverlist *serverlist) { + size_t num_valid = 0; + /* first pass: count how many are valid in order to allocate the necessary + * memory in a single block */ for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_uri uri; - struct sockaddr_storage sa; - size_t sa_len; - uri.path = host_ports[i]; - if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ - memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); - args.addresses->addrs[out_addrs_idx].len = sa_len; - ++out_addrs_idx; + if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; + } + if (num_valid == 0) return NULL; + + grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid); + + /* second pass: actually populate the addresses and LB tokens (aka user data + * to the outside world) to be read by the RR policy during its creation. + * Given that the validity tests are very cheap, they are performed again + * instead of marking the valid ones during the first pass, as this would + * incurr in an allocation due to the arbitrary number of server */ + size_t addr_idx = 0; + for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { + GPR_ASSERT(addr_idx < num_valid); + const grpc_grpclb_server *server = serverlist->servers[sl_idx]; + if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; + + /* address processing */ + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + grpc_resolved_address addr; + memset(&addr, 0, sizeof(addr)); + if (ip->size == 4) { + addr.len = sizeof(struct sockaddr_in); + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr; + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + addr.len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr; + addr6->sin6_family = AF_INET; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } + + /* lb token processing */ + void *user_data; + if (server->has_load_balance_token) { + const size_t lb_token_size = + GPR_ARRAY_SIZE(server->load_balance_token) - 1; + grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( + (uint8_t *)server->load_balance_token, lb_token_size); + user_data = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); } else { - gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", - host_ports[i]); + gpr_log(GPR_ERROR, + "Missing LB token for backend address '%s'. The empty token will " + "be used instead", + grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr)); + user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; } + + grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, + false /* is_balancer */, + NULL /* balancer_name */, user_data); + ++addr_idx; } + GPR_ASSERT(addr_idx == num_valid); + + return lb_addresses; +} + +/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */ +static void lb_token_destroy(void *token) { + if (token != NULL) GRPC_MDELEM_UNREF(token); +} + +static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, + const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *glb_policy) { + GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); + + grpc_lb_policy_args args; + memset(&args, 0, sizeof(args)); + args.server_name = glb_policy->server_name; + args.client_channel_factory = glb_policy->cc_factory; + args.addresses = process_serverlist(serverlist); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - gpr_free(concat_ipports); - for (size_t i = 0; i < serverlist->num_servers; i++) { - gpr_free(host_ports[i]); + if (glb_policy->addresses != NULL) { + /* dispose of the previous version */ + grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); } - gpr_free(host_ports); - gpr_free(args.addresses->addrs); - gpr_free(args.addresses); + glb_policy->addresses = args.addresses; + return rr; } static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { - GRPC_ERROR_REF(error); + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); glb_policy->rr_policy = create_rr(exec_ctx, glb_policy->serverlist, glb_policy); @@ -345,8 +472,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state, &glb_policy->rr_connectivity->on_change); grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - glb_policy->rr_connectivity->state, error, - "rr_handover"); + glb_policy->rr_connectivity->state, + GRPC_ERROR_REF(error), "rr_handover"); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); /* flush pending ops */ @@ -359,9 +486,12 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent, - pp->initial_metadata, pp->initial_metadata_flags, - pp->target, &pp->wrapped_on_complete); + const grpc_lb_policy_pick_args pick_args = { + pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, + pp->lb_token_mdelem_storage}; + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, + (void **)&pp->wrapped_on_complete_arg.lb_token, + &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; } @@ -378,13 +508,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, &pping->wrapped_notify); pping->wrapped_notify_arg.owning_pending_node = pping; } - GRPC_ERROR_UNREF(error); } -static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; + if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) { if (glb_policy->serverlist != NULL) { /* a RR policy is shutting down but there's a serverlist available -> @@ -398,8 +528,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, error, - "rr_connectivity_changed"); + rr_conn_data->state, GRPC_ERROR_REF(error), + "glb_rr_connectivity_changed"); /* resubscribe */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, @@ -408,41 +538,64 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(rr_conn_data); } } - GRPC_ERROR_UNREF(error); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. At that + * time, this should be changed to allow a list with no balancer addresses, + * since the resolver might fail to return a balancer address even when + * this is the right LB policy to use. */ + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; ++i) { + if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); memset(glb_policy, 0, sizeof(*glb_policy)); /* All input addresses in args->addresses come from a resolver that claims - * they are LB services. It's the resolver's responsibility to make sure this + * they are LB services. It's the resolver's responsibility to make sure + * this * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ + glb_policy->server_name = gpr_strdup(args->server_name); glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->addresses->naddrs == 0) { - return NULL; - } - /* construct a target from the args->addresses, in the form + /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); - addr_strs[0] = - grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); - for (size_t i = 1; i < args->addresses->naddrs; i++) { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[i], - (const struct sockaddr *)&args->addresses->addrs[i], - true) == 0); + char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); + size_t addr_index = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (args->addresses->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + if (args->addresses->addresses[i].is_balancer) { + if (addr_index == 0) { + addr_strs[addr_index++] = grpc_sockaddr_to_uri( + (const struct sockaddr *)&args->addresses->addresses[i] + .address.addr); + } else { + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[addr_index++], + (const struct sockaddr *)&args->addresses->addresses[i] + .address.addr, + true) == 0); + } + } } size_t uri_path_len; - char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs, + num_grpclb_addrs, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -450,7 +603,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < num_grpclb_addrs; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); @@ -463,7 +616,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, rr_connectivity_data *rr_connectivity = gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed, + grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, rr_connectivity); rr_connectivity->glb_policy = glb_policy; glb_policy->rr_connectivity = rr_connectivity; @@ -479,6 +632,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); + gpr_free((void *)glb_policy->server_name); grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -486,6 +640,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); + grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); gpr_free(glb_policy); } @@ -533,7 +688,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); pending_pick *pp = glb_policy->pending_picks; @@ -544,8 +700,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set( exec_ctx, pp->pollent, glb_policy->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, - GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched( + exec_ctx, &pp->wrapped_on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = glb_policy->pending_picks; @@ -554,12 +711,14 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&glb_policy->mu); + GRPC_ERROR_UNREF(error); } static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client); static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_client != NULL) { @@ -574,8 +733,9 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set( exec_ctx, pp->pollent, glb_policy->base.interested_parties); - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, - GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched( + exec_ctx, &pp->wrapped_on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = glb_policy->pending_picks; @@ -584,6 +744,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&glb_policy->mu); + GRPC_ERROR_UNREF(error); } static void query_for_backends(grpc_exec_ctx *exec_ctx, @@ -603,12 +764,21 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; + + if (pick_args->lb_token_mdelem_storage == NULL) { + *target = NULL; + grpc_exec_ctx_sched( + exec_ctx, on_complete, + GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting " + "won't work without it. Failing"), + NULL); + return 1; + } + gpr_mu_lock(&glb_policy->mu); int r; @@ -620,29 +790,36 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; + glb_policy->wc_arg.target = target; glb_policy->wc_arg.wrapped_closure = on_complete; + glb_policy->wc_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; + glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; + glb_policy->wc_arg.owning_pending_node = NULL; grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); - r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent, - initial_metadata, initial_metadata_flags, target, + + r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, + (void **)&glb_policy->wc_arg.lb_token, &glb_policy->wrapped_on_complete); if (r != 0) { - /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR - * policy and notify the original callback */ - glb_policy->wc_arg.wrapped_closure = NULL; + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)glb_policy->wc_arg.rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); - grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure, - GRPC_ERROR_NONE, NULL); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token( + pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); } } else { - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, glb_policy->base.interested_parties); - add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata, - initial_metadata_flags, target, on_complete); + add_pending_pick(&glb_policy->pending_picks, pick_args, target, + on_complete); if (!glb_policy->started_picking) { start_picking(exec_ctx, glb_policy); @@ -702,9 +879,6 @@ typedef struct lb_client_data { /* called once initial metadata's been sent */ grpc_closure md_sent; - /* called once initial metadata's been received */ - grpc_closure md_rcvd; - /* called once the LoadBalanceRequest has been sent to the LB server. See * src/proto/grpc/.../load_balancer.proto */ grpc_closure req_sent; @@ -741,7 +915,6 @@ typedef struct lb_client_data { } lb_client_data; static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, @@ -750,38 +923,37 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->server_name != NULL); + GPR_ASSERT(glb_policy->server_name[0] != '\0'); + lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data)); memset(lb_client, 0, sizeof(lb_client_data)); gpr_mu_init(&lb_client->mu); grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client); - grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client); grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client); grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client); grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client); - /* TODO(dgq): get the deadline from the client config instead of fabricating - * one here. */ - lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(3, GPR_TIMESPAN)); + /* TODO(dgq): get the deadline from the parent channel. */ + lb_client->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling * entities passed to glb_pick(). */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, - glb_policy->base.interested_parties, "/BalanceLoad", - NULL, /* FIXME(dgq): which "host" value to use? */ + glb_policy->base.interested_parties, + "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, lb_client->deadline, NULL); grpc_metadata_array_init(&lb_client->initial_metadata_recv); grpc_metadata_array_init(&lb_client->trailing_metadata_recv); - grpc_grpclb_request *request = grpc_grpclb_request_create( - "load.balanced.service.name"); /* FIXME(dgq): get the name of the load - balanced service from the resolver */ + grpc_grpclb_request *request = + grpc_grpclb_request_create(glb_policy->server_name); gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); lb_client->request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); @@ -855,23 +1027,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op ops[1]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; - op->flags = 0; - op->reserved = NULL; - op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->md_rcvd); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = lb_client->request_payload; @@ -886,11 +1041,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { lb_client_data *lb_client = arg; + GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; + grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &lb_client->response_payload; op->flags = 0; @@ -909,8 +1071,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op *op = ops; if (lb_client->response_payload != NULL) { /* Received data from the LB server. Look inside - * lb_client->response_payload, for - * a serverlist. */ + * lb_client->response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload); gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); @@ -947,7 +1108,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { /* unref the RR policy, eventually leading to its substitution with a * new one constructed from the received serverlist (see - * rr_connectivity_changed) */ + * glb_rr_connectivity_changed) */ GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, "serverlist_received"); } @@ -1010,8 +1171,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, lb_client->status, lb_client->status_details, lb_client->status_details_capacity); } - /* TODO(dgq): deal with stream termination properly (fire up another one? fail - * the original call?) */ + /* TODO(dgq): deal with stream termination properly (fire up another one? + * fail the original call?) */ } /* Code wiring the policy with the rest of the core */ |