diff options
Diffstat (limited to 'src/core/ext/lb_policy/grpclb/grpclb.c')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 393 |
1 files changed, 278 insertions, 115 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index a390daec01..a5ab42ba88 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -76,9 +76,9 @@ * operations in progress over the old RR instance. This is done by * decreasing the reference count on the old policy. The moment no more * references are held on the old RR policy, it'll be destroyed and \a - * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state. - * At this point we can transition to a new RR instance safely, which is done - * once again via \a rr_handover(). + * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN + * state. At this point we can transition to a new RR instance safely, which + * is done once again via \a rr_handover(). * * * Once a RR policy instance is in place (and getting updated as described), @@ -100,6 +100,7 @@ headers because if we are building with libuv, those headers will include uv.h, which needs to be included before other system headers */ #include "src/core/lib/iomgr/port.h" +#include <errno.h> #include <string.h> @@ -114,18 +115,57 @@ #include "src/core/ext/client_config/parse_address.h" #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/static_metadata.h" int grpc_lb_glb_trace = 0; +static void lb_addrs_destroy(grpc_lb_address *lb_addresses, + size_t num_addresses) { + /* free "resolved" addresses memblock */ + gpr_free(lb_addresses->resolved_address); + for (size_t i = 0; i < num_addresses; ++i) { + if (lb_addresses[i].user_data != NULL) { + GRPC_MDELEM_UNREF(lb_addresses[i].user_data); + } + } + gpr_free(lb_addresses); +} + +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static void initial_metadata_add_lb_token( + grpc_metadata_batch *initial_metadata, + grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != NULL); + GPR_ASSERT(lb_token != NULL); + grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + typedef struct wrapped_rr_closure_arg { /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; + /* the pick's initial metadata, kept in order to append the LB token for the + * pick */ + grpc_metadata_batch *initial_metadata; + + /* the picked target, used to determine which LB token to add to the pick's + * initial metadata */ + grpc_connected_subchannel **target; + + /* the LB token associated with the pick */ + grpc_mdelem *lb_token; + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; @@ -146,9 +186,20 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + + /* if target is NULL, no pick has been made by the RR policy (eg, all + * addresses failed to connect). There won't be any user_data/token + * available */ + if (wc_arg->target != NULL) { + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + } } GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL); + + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), + NULL); gpr_free(wc_arg->owning_pending_node); } @@ -169,6 +220,9 @@ typedef struct pending_pick { /* the initial metadata for the pick. See grpc_lb_policy_pick() */ grpc_metadata_batch *initial_metadata; + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* bitmask passed to pick() and used for selective cancelling. See * grpc_lb_policy_cancel_picks() */ uint32_t initial_metadata_flags; @@ -185,20 +239,24 @@ typedef struct pending_pick { wrapped_rr_closure_arg wrapped_on_complete_arg; } pending_pick; -static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, +static void add_pending_pick(pending_pick **root, + const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_closure *on_complete) { pending_pick *pp = gpr_malloc(sizeof(*pp)); memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->pollent = pollent; + pp->pollent = pick_args->pollent; pp->target = target; - pp->initial_metadata = initial_metadata; - pp->initial_metadata_flags = initial_metadata_flags; + pp->initial_metadata = pick_args->initial_metadata; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; + pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; + pp->wrapped_on_complete_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; @@ -257,6 +315,12 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; + /** total number of valid addresses received in \a serverlist */ + size_t num_ok_serverlist_addresses; + + /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */ + grpc_lb_address *lb_addresses; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -284,58 +348,142 @@ struct rr_connectivity_data { glb_lb_policy *glb_policy; }; -static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, - const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { - /* TODO(dgq): support mixed ip version */ - GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address, - serverlist->servers[i]->port); +static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, + bool log) { + const grpc_grpclb_ip_address *ip = &server->ip_address; + if (server->port >> 16 != 0) { + if (log) { + gpr_log(GPR_ERROR, + "Invalid port '%d' at index %zu of serverlist. Ignoring.", + server->port, idx); + } + return false; } - size_t uri_path_len; - char *concat_ipports = gpr_strjoin_sep( - (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); + if (ip->size != 4 && ip->size != 16) { + if (log) { + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes, got %d at index %zu of " + "serverlist. Ignoring", + ip->size, idx); + } + return false; + } + return true; +} - grpc_lb_policy_args args; - args.client_channel_factory = glb_policy->cc_factory; - args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - args.addresses->naddrs = serverlist->num_servers; - args.addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs); - size_t out_addrs_idx = 0; +/* populate \a addresses according to \a serverlist. Returns the number of + * addresses successfully parsed and added to \a addresses */ +static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, + grpc_lb_address **lb_addresses) { + size_t num_valid = 0; + /* first pass: count how many are valid in order to allocate the necessary + * memory in a single block */ for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_uri uri; - struct sockaddr_storage sa; - size_t sa_len; - uri.path = host_ports[i]; - if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ - memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); - args.addresses->addrs[out_addrs_idx].len = sa_len; - ++out_addrs_idx; + if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; + } + if (num_valid == 0) { + return 0; + } + + /* allocate the memory block for the "resolved" addresses. */ + grpc_resolved_address *r_addrs_memblock = + gpr_malloc(sizeof(grpc_resolved_address) * num_valid); + memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid); + grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid); + memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid); + + /* second pass: actually populate the addresses and LB tokens (aka user data + * to the outside world) to be read by the RR policy during its creation. + * Given that the validity tests are very cheap, they are performed again + * instead of marking the valid ones during the first pass, as this would + * incurr in an allocation due to the arbitrary number of server */ + size_t addr_idx = 0; + for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { + GPR_ASSERT(addr_idx < num_valid); + const grpc_grpclb_server *server = serverlist->servers[sl_idx]; + if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; + grpc_lb_address *const lb_addr = &lb_addrs[addr_idx]; + + /* address processing */ + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + + lb_addr->resolved_address = &r_addrs_memblock[addr_idx]; + struct sockaddr_storage *sa = + (struct sockaddr_storage *)lb_addr->resolved_address->addr; + size_t *sa_len = &lb_addr->resolved_address->len; + *sa_len = 0; + if (ip->size == 4) { + struct sockaddr_in *addr4 = (struct sockaddr_in *)sa; + *sa_len = sizeof(struct sockaddr_in); + memset(addr4, 0, *sa_len); + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa; + *sa_len = sizeof(struct sockaddr_in6); + memset(addr6, 0, *sa_len); + addr6->sin6_family = AF_INET; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } + GPR_ASSERT(*sa_len > 0); + + /* lb token processing */ + if (server->has_load_balance_token) { + const size_t lb_token_size = + GPR_ARRAY_SIZE(server->load_balance_token) - 1; + grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( + (uint8_t *)server->load_balance_token, lb_token_size); + lb_addr->user_data = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); } else { - gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", - host_ports[i]); + gpr_log(GPR_ERROR, + "Missing LB token for backend address '%s'. The empty token will " + "be used instead", + grpc_sockaddr_to_uri((struct sockaddr *)sa)); + lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; } + ++addr_idx; } + GPR_ASSERT(addr_idx == num_valid); + *lb_addresses = lb_addrs; + return num_valid; +} + +static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, + const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *glb_policy) { + GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); + + grpc_lb_policy_args args; + memset(&args, 0, sizeof(args)); + args.client_channel_factory = glb_policy->cc_factory; + const size_t num_ok_addresses = + process_serverlist(serverlist, &args.addresses); + args.num_addresses = num_ok_addresses; grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - gpr_free(concat_ipports); - for (size_t i = 0; i < serverlist->num_servers; i++) { - gpr_free(host_ports[i]); + if (glb_policy->lb_addresses != NULL) { + /* dispose of the previous version */ + lb_addrs_destroy(glb_policy->lb_addresses, + glb_policy->num_ok_serverlist_addresses); } - gpr_free(host_ports); - gpr_free(args.addresses->addrs); - gpr_free(args.addresses); + glb_policy->num_ok_serverlist_addresses = num_ok_addresses; + glb_policy->lb_addresses = args.addresses; + return rr; } static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { - GRPC_ERROR_REF(error); + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); glb_policy->rr_policy = create_rr(exec_ctx, glb_policy->serverlist, glb_policy); @@ -350,8 +498,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state, &glb_policy->rr_connectivity->on_change); grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - glb_policy->rr_connectivity->state, error, - "rr_handover"); + glb_policy->rr_connectivity->state, + GRPC_ERROR_REF(error), "rr_handover"); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); /* flush pending ops */ @@ -364,9 +512,12 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent, - pp->initial_metadata, pp->initial_metadata_flags, - pp->target, &pp->wrapped_on_complete); + const grpc_lb_policy_pick_args pick_args = { + pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, + pp->lb_token_mdelem_storage}; + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, + (void **)&pp->wrapped_on_complete_arg.lb_token, + &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; } @@ -383,13 +534,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, &pping->wrapped_notify); pping->wrapped_notify_arg.owning_pending_node = pping; } - GRPC_ERROR_UNREF(error); } -static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; + if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) { if (glb_policy->serverlist != NULL) { /* a RR policy is shutting down but there's a serverlist available -> @@ -403,8 +554,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, error, - "rr_connectivity_changed"); + rr_conn_data->state, GRPC_ERROR_REF(error), + "glb_rr_connectivity_changed"); /* resubscribe */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, @@ -413,7 +564,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(rr_conn_data); } } - GRPC_ERROR_UNREF(error); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, @@ -423,31 +573,43 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, memset(glb_policy, 0, sizeof(*glb_policy)); /* All input addresses in args->addresses come from a resolver that claims - * they are LB services. It's the resolver's responsibility to make sure this + * they are LB services. It's the resolver's responsibility to make sure + * this * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->addresses->naddrs == 0) { + if (args->num_addresses == 0) { return NULL; } - /* construct a target from the args->addresses, in the form + if (args->addresses[0].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + + /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); - addr_strs[0] = - grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); - for (size_t i = 1; i < args->addresses->naddrs; i++) { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[i], - (const struct sockaddr *)&args->addresses->addrs[i], - true) == 0); + char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses); + addr_strs[0] = grpc_sockaddr_to_uri( + (const struct sockaddr *)&args->addresses[0].resolved_address->addr); + for (size_t i = 1; i < args->num_addresses; i++) { + if (args->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + + GPR_ASSERT( + grpc_sockaddr_to_string( + &addr_strs[i], + (const struct sockaddr *)&args->addresses[i].resolved_address->addr, + true) == 0); } size_t uri_path_len; char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + (const char **)addr_strs, args->num_addresses, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -455,7 +617,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->num_addresses; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); @@ -468,7 +630,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, rr_connectivity_data *rr_connectivity = gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed, + grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, rr_connectivity); rr_connectivity->glb_policy = glb_policy; glb_policy->rr_connectivity = rr_connectivity; @@ -491,6 +653,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); + + lb_addrs_destroy(glb_policy->lb_addresses, + glb_policy->num_ok_serverlist_addresses); gpr_free(glb_policy); } @@ -551,7 +716,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, *target = NULL; grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CANCELLED, NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -581,7 +745,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, exec_ctx, pp->pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CANCELLED, NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -608,12 +771,21 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; + + if (pick_args->lb_token_mdelem_storage == NULL) { + *target = NULL; + grpc_exec_ctx_sched( + exec_ctx, on_complete, + GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting " + "won't work without it. Failing"), + NULL); + return 1; + } + gpr_mu_lock(&glb_policy->mu); int r; @@ -625,29 +797,36 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; + glb_policy->wc_arg.target = target; glb_policy->wc_arg.wrapped_closure = on_complete; + glb_policy->wc_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; + glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; + glb_policy->wc_arg.owning_pending_node = NULL; grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); - r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent, - initial_metadata, initial_metadata_flags, target, + + r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, + (void **)&glb_policy->wc_arg.lb_token, &glb_policy->wrapped_on_complete); if (r != 0) { - /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR - * policy and notify the original callback */ - glb_policy->wc_arg.wrapped_closure = NULL; + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)glb_policy->wc_arg.rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); - grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure, - GRPC_ERROR_NONE, NULL); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token( + pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); } } else { - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, glb_policy->base.interested_parties); - add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata, - initial_metadata_flags, target, on_complete); + add_pending_pick(&glb_policy->pending_picks, pick_args, target, + on_complete); if (!glb_policy->started_picking) { start_picking(exec_ctx, glb_policy); @@ -707,9 +886,6 @@ typedef struct lb_client_data { /* called once initial metadata's been sent */ grpc_closure md_sent; - /* called once initial metadata's been received */ - grpc_closure md_rcvd; - /* called once the LoadBalanceRequest has been sent to the LB server. See * src/proto/grpc/.../load_balancer.proto */ grpc_closure req_sent; @@ -746,7 +922,6 @@ typedef struct lb_client_data { } lb_client_data; static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, @@ -761,7 +936,6 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { gpr_mu_init(&lb_client->mu); grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client); - grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client); grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client); grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client); grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); @@ -860,23 +1034,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op ops[1]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; - op->flags = 0; - op->reserved = NULL; - op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->md_rcvd); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = lb_client->request_payload; @@ -891,11 +1048,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { lb_client_data *lb_client = arg; + GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; + grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &lb_client->response_payload; op->flags = 0; @@ -914,8 +1078,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op *op = ops; if (lb_client->response_payload != NULL) { /* Received data from the LB server. Look inside - * lb_client->response_payload, for - * a serverlist. */ + * lb_client->response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload); gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); @@ -952,7 +1115,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { /* unref the RR policy, eventually leading to its substitution with a * new one constructed from the received serverlist (see - * rr_connectivity_changed) */ + * glb_rr_connectivity_changed) */ GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, "serverlist_received"); } @@ -1015,8 +1178,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, lb_client->status, lb_client->status_details, lb_client->status_details_capacity); } - /* TODO(dgq): deal with stream termination properly (fire up another one? fail - * the original call?) */ + /* TODO(dgq): deal with stream termination properly (fire up another one? + * fail the original call?) */ } /* Code wiring the policy with the rest of the core */ |