diff options
Diffstat (limited to 'src/core/ext/lb_policy/grpclb')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 311 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.h | 2 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/load_balancer_api.h | 2 |
3 files changed, 187 insertions, 128 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 626f285b90..6da4febf26 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -69,8 +69,8 @@ * possible scenarios: * * 1. This is the first server list received. There was no previous instance of - * the Round Robin policy. \a rr_handover() will instantiate the RR policy - * and perform all the pending operations over it. + * the Round Robin policy. \a rr_handover_locked() will instantiate the RR + * policy and perform all the pending operations over it. * 2. There's already a RR policy instance active. We need to introduce the new * one build from the new serverlist, but taking care not to disrupt the * operations in progress over the old RR instance. This is done by @@ -78,7 +78,7 @@ * references are held on the old RR policy, it'll be destroyed and \a * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * state. At this point we can transition to a new RR instance safely, which - * is done once again via \a rr_handover(). + * is done once again via \a rr_handover_locked(). * * * Once a RR policy instance is in place (and getting updated as described), @@ -86,8 +86,8 @@ * forwarding them to the RR instance. Any time there's no RR policy available * (ie, right after the creation of the gRPCLB policy, if an empty serverlist * is received, etc), pick/ping requests are added to a list of pending - * picks/pings to be flushed and serviced as part of \a rr_handover() the moment - * the RR policy instance becomes available. + * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the + * moment the RR policy instance becomes available. * * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the * high level design and details. */ @@ -96,6 +96,12 @@ * - Implement LB service forwarding (point 2c. in the doc's diagram). */ +/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when + using that endpoint. Because of various transitive includes in uv.h, + including windows.h on Windows, uv.h must be included before other system + headers. Therefore, sockaddr.h must always be included first */ +#include "src/core/lib/iomgr/sockaddr.h" + #include <errno.h> #include <string.h> @@ -107,13 +113,13 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> -#include "src/core/ext/client_config/client_channel_factory.h" -#include "src/core/ext/client_config/lb_policy_factory.h" -#include "src/core/ext/client_config/lb_policy_registry.h" -#include "src/core/ext/client_config/parse_address.h" +#include "src/core/ext/client_channel/client_channel_factory.h" +#include "src/core/ext/client_channel/lb_policy_factory.h" +#include "src/core/ext/client_channel/lb_policy_registry.h" +#include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" -#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" @@ -134,6 +140,9 @@ static void initial_metadata_add_lb_token( } typedef struct wrapped_rr_closure_arg { + /* the closure instance using this struct as argument */ + grpc_closure wrapper_closure; + /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; @@ -155,9 +164,8 @@ typedef struct wrapped_rr_closure_arg { /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; - /* when not NULL, represents a pending_{pick,ping} node to be freed upon - * closure execution */ - void *owning_pending_node; /* to be freed if not NULL */ + /* heap memory to be freed upon closure execution. */ + void *free_when_done; } wrapped_rr_closure_arg; /* The \a on_complete closure passed as part of the pick requires keeping a @@ -183,10 +191,10 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, } } GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), NULL); - gpr_free(wc_arg->owning_pending_node); + GPR_ASSERT(wc_arg->free_when_done != NULL); + gpr_free(wc_arg->free_when_done); } /* Linked list of pending pick requests. It stores all information needed to @@ -207,10 +215,6 @@ typedef struct pending_pick { * upon error. */ grpc_connected_subchannel **target; - /* a closure wrapping the original on_complete one to be invoked once the - * pick() has completed (regardless of success) */ - grpc_closure wrapped_on_complete; - /* args for wrapped_on_complete */ wrapped_rr_closure_arg wrapped_on_complete_arg; } pending_pick; @@ -230,8 +234,9 @@ static void add_pending_pick(pending_pick **root, pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; - grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, - &pp->wrapped_on_complete_arg); + pp->wrapped_on_complete_arg.free_when_done = pp; + grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure, + wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; } @@ -239,10 +244,6 @@ static void add_pending_pick(pending_pick **root, typedef struct pending_ping { struct pending_ping *next; - /* a closure wrapping the original on_complete one to be invoked once the - * ping() has completed (regardless of success) */ - grpc_closure wrapped_notify; - /* args for wrapped_notify */ wrapped_rr_closure_arg wrapped_notify_arg; } pending_ping; @@ -251,10 +252,11 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { pending_ping *pping = gpr_malloc(sizeof(*pping)); memset(pping, 0, sizeof(pending_ping)); memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); - pping->next = *root; - grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure, - &pping->wrapped_notify_arg); pping->wrapped_notify_arg.wrapped_closure = notify; + pping->wrapped_notify_arg.free_when_done = pping; + pping->next = *root; + grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure, + wrapped_rr_closure, &pping->wrapped_notify_arg); *root = pping; } @@ -274,6 +276,7 @@ typedef struct glb_lb_policy { /** who the client is trying to communicate with */ const char *server_name; grpc_client_channel_factory *cc_factory; + grpc_channel_args *args; /** deadline for the LB's call */ gpr_timespec deadline; @@ -307,13 +310,6 @@ typedef struct glb_lb_policy { /** for tracking of the RR connectivity */ rr_connectivity_data *rr_connectivity; - - /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily - * available RR picks */ - grpc_closure wrapped_on_complete; - - /* arguments for the wrapped_on_complete closure */ - wrapped_rr_closure_arg wc_arg; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -347,6 +343,21 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, return true; } +/* vtable for LB tokens in grpc_lb_addresses. */ +static void *lb_token_copy(void *token) { + return token == NULL ? NULL : GRPC_MDELEM_REF(token); +} +static void lb_token_destroy(void *token) { + if (token != NULL) GRPC_MDELEM_UNREF(token); +} +static int lb_token_cmp(void *token1, void *token2) { + if (token1 > token2) return 1; + if (token1 < token2) return -1; + return 0; +} +static const grpc_lb_user_data_vtable lb_token_vtable = { + lb_token_copy, lb_token_destroy, lb_token_cmp}; + /* Returns addresses extracted from \a serverlist. */ static grpc_lb_addresses *process_serverlist( const grpc_grpclb_serverlist *serverlist) { @@ -358,7 +369,8 @@ static grpc_lb_addresses *process_serverlist( } if (num_valid == 0) return NULL; - grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid); + grpc_lb_addresses *lb_addresses = + grpc_lb_addresses_create(num_valid, &lb_token_vtable); /* second pass: actually populate the addresses and LB tokens (aka user data * to the outside world) to be read by the RR policy during its creation. @@ -399,14 +411,14 @@ static grpc_lb_addresses *process_serverlist( GPR_ARRAY_SIZE(server->load_balance_token) - 1; grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( (uint8_t *)server->load_balance_token, lb_token_size); - user_data = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); + user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN, + lb_token_mdstr); } else { gpr_log(GPR_ERROR, "Missing LB token for backend address '%s'. The empty token will " "be used instead", - grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr)); - user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; + grpc_sockaddr_to_uri(&addr)); + user_data = GRPC_MDELEM_LB_TOKEN_EMPTY; } grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, @@ -419,45 +431,85 @@ static grpc_lb_addresses *process_serverlist( return lb_addresses; } -/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */ -static void lb_token_destroy(void *token) { - if (token != NULL) GRPC_MDELEM_UNREF(token); +/* perform a pick over \a rr_policy. Given that a pick can return immediately + * (ignoring its completion callback) we need to perform the cleanups this + * callback would be otherwise resposible for */ +static bool pick_from_internal_rr_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { + GPR_ASSERT(rr_policy != NULL); + const bool pick_done = + grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target, + (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + + gpr_free(wc_arg); + } + /* else, the pending pick will be registered and taken care of by the + * pending pick list inside the RR policy (glb_policy->rr_policy). + * Eventually, wrapped_on_complete will be called, which will -among other + * things- add the LB token to the call's initial metadata */ + + return pick_done; } -static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, - const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { +static grpc_lb_policy *create_rr_locked( + grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *glb_policy) { GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); + if (glb_policy->addresses != NULL) { + /* dispose of the previous version */ + grpc_lb_addresses_destroy(glb_policy->addresses); + } + glb_policy->addresses = process_serverlist(serverlist); + grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); - args.server_name = glb_policy->server_name; args.client_channel_factory = glb_policy->cc_factory; - args.addresses = process_serverlist(serverlist); - grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); + // Replace the LB addresses in the channel args that we pass down to + // the subchannel. + static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + const grpc_arg arg = + grpc_lb_addresses_create_channel_arg(glb_policy->addresses); + args.args = grpc_channel_args_copy_and_add_and_remove( + glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, + 1); - if (glb_policy->addresses != NULL) { - /* dispose of the previous version */ - grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); - } - glb_policy->addresses = args.addresses; + grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); + grpc_channel_args_destroy(args.args); return rr; } -static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - grpc_error *error) { +static void rr_handover_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy, grpc_error *error) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); glb_policy->rr_policy = - create_rr(exec_ctx, glb_policy->serverlist, glb_policy); + create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", (intptr_t)glb_policy->rr_policy); } GPR_ASSERT(glb_policy->rr_policy != NULL); + grpc_pollset_set_add_pollset_set(exec_ctx, + glb_policy->rr_policy->interested_parties, + glb_policy->base.interested_parties); glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity( exec_ctx, glb_policy->rr_policy, &error); grpc_lb_policy_notify_on_state_change( @@ -478,11 +530,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, - pp->target, - (void **)&pp->wrapped_on_complete_arg.lb_token, - &pp->wrapped_on_complete); - pp->wrapped_on_complete_arg.owning_pending_node = pp; + pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, + &pp->pick_args, pp->target, + &pp->wrapped_on_complete_arg); } pending_ping *pping; @@ -495,8 +545,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, (intptr_t)glb_policy->rr_policy); } grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, - &pping->wrapped_notify); - pping->wrapped_notify_arg.owning_pending_node = pping; + &pping->wrapped_notify_arg.wrapper_closure); } } @@ -509,13 +558,16 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (glb_policy->serverlist != NULL) { /* a RR policy is shutting down but there's a serverlist available -> * perform a handover */ - rr_handover(exec_ctx, glb_policy, error); + gpr_mu_lock(&glb_policy->mu); + rr_handover_locked(exec_ctx, glb_policy, error); + gpr_mu_unlock(&glb_policy->mu); } else { /* shutting down and no new serverlist available. Bail out. */ gpr_free(rr_conn_data); } } else { if (error == GRPC_ERROR_NONE) { + gpr_mu_lock(&glb_policy->mu); /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_conn_data->state, GRPC_ERROR_REF(error), @@ -524,6 +576,7 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, &rr_conn_data->on_change); + gpr_mu_unlock(&glb_policy->mu); } else { /* error */ gpr_free(rr_conn_data); } @@ -533,6 +586,12 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + /* Get server name. */ + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME); + const char *server_name = + arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL; + /* Count the number of gRPC-LB addresses. There must be at least one. * TODO(roth): For now, we ignore non-balancer addresses, but in the * future, we may change the behavior such that we fall back to using @@ -540,23 +599,27 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * time, this should be changed to allow a list with no balancer addresses, * since the resolver might fail to return a balancer address even when * this is the right LB policy to use. */ + arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER); + grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < args->addresses->num_addresses; ++i) { - if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; } if (num_grpclb_addrs == 0) return NULL; glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); memset(glb_policy, 0, sizeof(*glb_policy)); - /* All input addresses in args->addresses come from a resolver that claims + /* All input addresses in addresses come from a resolver that claims * they are LB services. It's the resolver's responsibility to make sure * this * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ - glb_policy->server_name = gpr_strdup(args->server_name); + glb_policy->server_name = gpr_strdup(server_name); glb_policy->cc_factory = args->client_channel_factory; + glb_policy->args = grpc_channel_args_copy(args->args); GPR_ASSERT(glb_policy->cc_factory != NULL); /* construct a target from the addresses in args, given in the form @@ -564,22 +627,19 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * TODO(dgq): support mixed ip version */ char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); size_t addr_index = 0; - for (size_t i = 0; i < args->addresses->num_addresses; i++) { - if (args->addresses->addresses[i].user_data != NULL) { + for (size_t i = 0; i < addresses->num_addresses; i++) { + if (addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); } - if (args->addresses->addresses[i].is_balancer) { + if (addresses->addresses[i].is_balancer) { if (addr_index == 0) { - addr_strs[addr_index++] = grpc_sockaddr_to_uri( - (const struct sockaddr *)&args->addresses->addresses[i] - .address.addr); + addr_strs[addr_index++] = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); } else { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[addr_index++], - (const struct sockaddr *)&args->addresses->addresses[i] - .address.addr, - true) > 0); + GPR_ASSERT(grpc_sockaddr_to_string(&addr_strs[addr_index++], + &addresses->addresses[i].address, + true) > 0); } } } @@ -587,10 +647,29 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs, ",", &uri_path_len); - /* will pick using pick_first */ + /* Create a channel to talk to the LBs. + * + * We strip out the channel arg for the LB policy name, since we want + * to use the default (pick_first) in this case. + * + * We also strip out the channel arg for the resolved addresses, since + * that will be generated by the name resolver used in the LB channel. + * Note that the LB channel will use the sockaddr resolver, so this + * won't actually generate a query to DNS (or some other name service). + * However, the addresses returned by the sockaddr resolver will have + * is_balancer=false, whereas our own addresses have is_balancer=true. + * We need the LB channel to return addresses with is_balancer=false + * so that it does not wind up recursively using the grpclb LB policy, + * as per the special case logic in client_channel.c. + */ + static const char *keys_to_remove[] = {GRPC_ARG_LB_POLICY_NAME, + GRPC_ARG_LB_ADDRESSES}; + grpc_channel_args *new_args = grpc_channel_args_copy_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove)); glb_policy->lb_channel = grpc_client_channel_factory_create_channel( exec_ctx, glb_policy->cc_factory, target_uri_str, - GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); + GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args); + grpc_channel_args_destroy(new_args); gpr_free(target_uri_str); for (size_t i = 0; i < num_grpclb_addrs; i++) { @@ -623,6 +702,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(glb_policy->args); grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -630,7 +710,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); - grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); + grpc_lb_addresses_destroy(glb_policy->addresses); gpr_free(glb_policy); } @@ -648,15 +728,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_NONE, NULL); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_NONE, NULL); pping = next; } @@ -686,11 +766,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); *target = NULL; grpc_exec_ctx_sched( - exec_ctx, &pp->wrapped_on_complete, + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; @@ -719,10 +797,8 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched( - exec_ctx, &pp->wrapped_on_complete, + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; @@ -775,39 +851,20 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, (intptr_t)glb_policy->rr_policy); } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); - glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; - glb_policy->wc_arg.target = target; - glb_policy->wc_arg.wrapped_closure = on_complete; - glb_policy->wc_arg.lb_token_mdelem_storage = - pick_args->lb_token_mdelem_storage; - glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; - glb_policy->wc_arg.owning_pending_node = NULL; - grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, - &glb_policy->wc_arg); - - pick_done = - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, - (void **)&glb_policy->wc_arg.lb_token, - &glb_policy->wrapped_on_complete); - if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)glb_policy->wc_arg.rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); - /* add the load reporting initial metadata */ - initial_metadata_add_lb_token( - pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, - GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); - } + wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); + + grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg); + wc_arg->rr_policy = glb_policy->rr_policy; + wc_arg->target = target; + wc_arg->wrapped_closure = on_complete; + wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + wc_arg->initial_metadata = pick_args->initial_metadata; + wc_arg->free_when_done = wc_arg; + pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, + pick_args, target, wc_arg); } else { - /* else, the pending pick will be registered and taken care of by the - * pending pick list inside the RR policy (glb_policy->rr_policy) */ - grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, - glb_policy->base.interested_parties); add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete); @@ -931,7 +988,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling - * entities passed to glb_pick(). */ + * entities from \a client_channel. */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, @@ -1076,6 +1133,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* update serverlist */ if (serverlist->num_servers > 0) { + gpr_mu_lock(&lb_client->glb_policy->mu); if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { @@ -1093,7 +1151,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { if (lb_client->glb_policy->rr_policy == NULL) { /* initial "handover", in this case from a null RR policy, meaning * it'll just create the first RR policy instance */ - rr_handover(exec_ctx, lb_client->glb_policy, error); + rr_handover_locked(exec_ctx, lb_client->glb_policy, error); } else { /* unref the RR policy, eventually leading to its substitution with a * new one constructed from the received serverlist (see @@ -1101,6 +1159,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, "serverlist_received"); } + gpr_mu_unlock(&lb_client->glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.h b/src/core/ext/lb_policy/grpclb/grpclb.h index 83552b4fa0..ff23f3a545 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.h +++ b/src/core/ext/lb_policy/grpclb/grpclb.h @@ -34,7 +34,7 @@ #ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H #define GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H -#include "src/core/ext/client_config/lb_policy_factory.h" +#include "src/core/ext/client_channel/lb_policy_factory.h" /** Returns a load balancing factory for the glb policy, which tries to connect * to a load balancing server to decide the next successfully connected diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index c1e73d08ef..079a64a3f3 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -36,7 +36,7 @@ #include <grpc/support/slice_buffer.h> -#include "src/core/ext/client_config/lb_policy_factory.h" +#include "src/core/ext/client_channel/lb_policy_factory.h" #include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" #ifdef __cplusplus |