diff options
Diffstat (limited to 'src/core')
28 files changed, 1640 insertions, 618 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index 04666edbec..aa464f3ab7 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -126,9 +126,10 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } else { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq), NULL, - &w->on_complete, NULL); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL, + &w->on_complete, NULL); } gpr_mu_lock(&w->mu); @@ -245,7 +246,8 @@ void grpc_channel_watch_connectivity_state( if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); grpc_client_channel_watch_connectivity_state( - &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, + &exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state, &w->on_complete, &w->watcher_timer_init); } else { abort(); diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 8cebbe9eca..ab71467d73 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -389,7 +389,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; - grpc_lb_policy *old_lb_policy; + grpc_lb_policy *old_lb_policy = NULL; grpc_slice_hash_table *method_params_table = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; @@ -399,6 +399,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, service_config_parsing_state parsing_state; memset(&parsing_state, 0, sizeof(parsing_state)); + bool lb_policy_updated = false; if (chand->resolver_result != NULL) { // Find LB policy name. const grpc_arg *channel_arg = @@ -438,14 +439,27 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, lb_policy_args.args = chand->resolver_result; lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.combiner = chand->combiner; - lb_policy = - grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); - if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "config_change"); - GRPC_ERROR_UNREF(state_error); - state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, - &state_error); + + const bool lb_policy_type_changed = + (chand->info_lb_policy_name == NULL) || + (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0); + if (chand->lb_policy != NULL && !lb_policy_type_changed) { + // update + lb_policy_updated = true; + grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); + } else { + lb_policy = + grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + GRPC_ERROR_UNREF(state_error); + state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, + &state_error); + old_lb_policy = chand->lb_policy; + chand->lb_policy = lb_policy; + } } + // Find service config. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); @@ -492,8 +506,6 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, gpr_free(chand->info_lb_policy_name); chand->info_lb_policy_name = lb_policy_name; } - old_lb_policy = chand->lb_policy; - chand->lb_policy = lb_policy; if (service_config_json != NULL) { gpr_free(chand->info_service_config_json); chand->info_service_config_json = service_config_json; @@ -516,17 +528,21 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, "Channel disconnected", &error, 1)); grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } - if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + if (!lb_policy_updated && lb_policy != NULL && + chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); exit_idle = true; chand->exit_idle_when_lb_policy_arrives = false; } if (error == GRPC_ERROR_NONE && chand->resolver) { - set_channel_connectivity_state_locked( - exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); - if (lb_policy != NULL) { - watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); + if (!lb_policy_updated) { + set_channel_connectivity_state_locked(exec_ctx, chand, state, + GRPC_ERROR_REF(state_error), + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); + } } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next_locked(exec_ctx, chand->resolver, @@ -546,7 +562,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, "resolver_gone"); } - if (exit_idle) { + if (!lb_policy_updated && lb_policy != NULL && exit_idle) { grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); } @@ -555,9 +571,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_pollset_set_del_pollset_set( exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); + old_lb_policy = NULL; } - if (lb_policy != NULL) { + if (!lb_policy_updated && lb_policy != NULL) { GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } @@ -1447,7 +1464,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( typedef struct external_connectivity_watcher { channel_data *chand; - grpc_pollset *pollset; + grpc_polling_entity pollent; grpc_closure *on_complete; grpc_closure *watcher_timer_init; grpc_connectivity_state *state; @@ -1522,8 +1539,8 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, - w->pollset); + grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); @@ -1550,8 +1567,8 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_notify_on_state_change( exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure); } - grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, - w->pollset); + grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); @@ -1559,18 +1576,18 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *closure, - grpc_closure *watcher_timer_init) { + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_polling_entity pollent, grpc_connectivity_state *state, + grpc_closure *closure, grpc_closure *watcher_timer_init) { channel_data *chand = elem->channel_data; external_connectivity_watcher *w = gpr_zalloc(sizeof(*w)); w->chand = chand; - w->pollset = pollset; + w->pollent = pollent; w->on_complete = closure; w->state = state; w->watcher_timer_init = watcher_timer_init; - - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); + grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent, + chand->interested_parties); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); grpc_closure_sched( diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 356a7ab0c1..4f8987f0da 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -57,9 +57,9 @@ int grpc_client_channel_num_external_connectivity_watchers( grpc_channel_element *elem); void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete, - grpc_closure *watcher_timer_init); + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_polling_entity pollent, grpc_connectivity_state *state, + grpc_closure *on_complete, grpc_closure *watcher_timer_init); /* Debug helper: pull the subchannel call from a call stack element */ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 112ba40658..7ac2cb1775 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -166,3 +166,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( return policy->vtable->check_connectivity_locked(exec_ctx, policy, connectivity_error); } + +void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + const grpc_lb_policy_args *lb_policy_args) { + policy->vtable->update_locked(exec_ctx, policy, lb_policy_args); +} diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index fb4aa084a6..07e7953009 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -42,6 +42,7 @@ is expected to be extended to contain some parameters) */ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; +typedef struct grpc_lb_policy_args grpc_lb_policy_args; struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; @@ -105,9 +106,12 @@ struct grpc_lb_policy_vtable { grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_closure *closure); + + void (*update_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args); }; -/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ +//#define GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG /* Strong references: the policy will shutdown when they reach zero */ @@ -207,4 +211,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error); +/** Update \a policy with \a lb_policy_args. */ +void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + const grpc_lb_policy_args *lb_policy_args); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index d2a2856a18..6eb0a9ef21 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -115,6 +115,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" @@ -315,6 +316,9 @@ typedef struct glb_lb_policy { /** for communicating with the LB server */ grpc_channel *lb_channel; + /** response generator to inject address updates into \a lb_channel */ + grpc_fake_resolver_response_generator *response_generator; + /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy *rr_policy; @@ -323,6 +327,9 @@ typedef struct glb_lb_policy { /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; + /** connectivity state of the LB channel */ + grpc_connectivity_state lb_channel_connectivity; + /** stores the deserialized response from the LB. May be NULL until one such * response has arrived. */ grpc_grpclb_serverlist *serverlist; @@ -340,10 +347,27 @@ typedef struct glb_lb_policy { bool shutting_down; + /** are we currently updating lb_call? */ + bool updating_lb_call; + + /** are we currently updating lb_channel? */ + bool updating_lb_channel; + + /** are we already watching the LB channel's connectivity? */ + bool watching_lb_channel; + + /** is \a lb_call_retry_timer active? */ + bool retry_timer_active; + + /** called upon changes to the LB channel's connectivity. */ + grpc_closure lb_channel_on_connectivity_changed; + + /** args from the latest update received while already updating, or NULL */ + grpc_lb_policy_args *pending_update_args; + /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ - /* Finished sending initial request. */ grpc_closure lb_on_sent_initial_request; @@ -533,10 +557,9 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } -/* returns true if the new RR policy should replace the current one, if any */ -static bool update_lb_connectivity_status_locked( +static void update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { + grpc_connectivity_state rr_state, grpc_error *rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); @@ -570,28 +593,26 @@ static bool update_lb_connectivity_status_locked( * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (new_rr_state) { + switch (rr_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: - GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE); - return false; /* don't replace the RR policy */ + GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); + break; case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: - GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE); + GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); } if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Setting grpclb's state to %s from new RR policy %p state.", - grpc_connectivity_state_name(new_rr_state), - (void *)glb_policy->rr_policy); + gpr_log( + GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.", + grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy); } - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - new_rr_state, GRPC_ERROR_REF(new_rr_state_error), + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, + GRPC_ERROR_REF(rr_state_error), "update_lb_connectivity_status_locked"); - return true; } /* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return @@ -670,45 +691,38 @@ static bool pick_from_internal_rr_locked( return pick_done; } -static grpc_lb_policy *create_rr_locked( - grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { - GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - - grpc_lb_policy_args args; - memset(&args, 0, sizeof(args)); - args.client_channel_factory = glb_policy->cc_factory; - args.combiner = glb_policy->base.combiner; +static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args)); + args->client_channel_factory = glb_policy->cc_factory; + args->combiner = glb_policy->base.combiner; grpc_lb_addresses *addresses = - process_serverlist_locked(exec_ctx, serverlist); - + process_serverlist_locked(exec_ctx, glb_policy->serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); - args.args = grpc_channel_args_copy_and_add_and_remove( + args->args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); - - grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - GPR_ASSERT(rr != NULL); grpc_lb_addresses_destroy(exec_ctx, addresses); - grpc_channel_args_destroy(exec_ctx, args.args); - return rr; + return args; +} + +static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_args *args) { + grpc_channel_args_destroy(exec_ctx, args->args); + gpr_free(args); } static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -/* glb_policy->rr_policy may be NULL (initial handover) */ -static void rr_handover_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { - GPR_ASSERT(glb_policy->serverlist != NULL && - glb_policy->serverlist->num_servers > 0); - - if (glb_policy->shutting_down) return; +static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + grpc_lb_policy_args *args) { + GPR_ASSERT(glb_policy->rr_policy == NULL); grpc_lb_policy *new_rr_policy = - create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); + grpc_lb_policy_create(exec_ctx, "round_robin", args); if (new_rr_policy == NULL) { gpr_log(GPR_ERROR, "Failure creating a RoundRobin policy for serverlist update with " @@ -719,41 +733,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, (void *)glb_policy->rr_policy); return; } - - grpc_error *new_rr_state_error = NULL; - const grpc_connectivity_state new_rr_state = - grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy, - &new_rr_state_error); - /* Connectivity state is a function of the new RR policy just created */ - const bool replace_old_rr = update_lb_connectivity_status_locked( - exec_ctx, glb_policy, new_rr_state, new_rr_state_error); - - if (!replace_old_rr) { - /* dispose of the new RR policy that won't be used after all */ - GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace"); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Keeping old RR policy (%p) despite new serverlist: new RR " - "policy was in %s connectivity state.", - (void *)glb_policy->rr_policy, - grpc_connectivity_state_name(new_rr_state)); - } - return; - } - - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)", - (void *)new_rr_policy, (void *)glb_policy->rr_policy); - } - - if (glb_policy->rr_policy != NULL) { - /* if we are phasing out an existing RR instance, unref it. */ - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); - } - - /* Finally update the RR policy to the newly created one */ glb_policy->rr_policy = new_rr_policy; + grpc_error *rr_state_error = NULL; + const grpc_connectivity_state rr_state = + grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, + &rr_state_error); + /* Connectivity state is a function of the RR policy updated/created */ + update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, + rr_state_error); + /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ @@ -769,10 +758,10 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner, false)); rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = new_rr_state; + rr_connectivity->state = rr_state; /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched"); grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -809,6 +798,31 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, } } +/* glb_policy->rr_policy may be NULL (initial handover) */ +static void rr_handover_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); + + if (glb_policy->shutting_down) return; + + grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); + if (glb_policy->rr_policy != NULL) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", + (void *)glb_policy->rr_policy); + } + grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args); + } else { + create_rr_locked(exec_ctx, glb_policy, args); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)", + (void *)glb_policy->rr_policy); + } + } + lb_policy_args_destroy(exec_ctx, args); +} + static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; @@ -854,18 +868,24 @@ static grpc_slice_hash_table_entry targets_info_entry_create( return entry; } -/* Returns the target URI for the LB service whose addresses are in \a - * addresses. Using this URI, a bidirectional streaming channel will be created - * for the reception of load balancing updates. +static int balancer_name_cmp_fn(void *a, void *b) { + const char *a_str = a; + const char *b_str = b; + return strcmp(a_str, b_str); +} + +/* Returns the channel args for the LB channel, used to create a bidirectional + * stream for the reception of load balancing updates. * - * The output argument \a targets_info will be updated to contain a mapping of - * "LB server address" to "balancer name", as reported by the naming system. - * This mapping will be propagated via the channel arguments of the - * aforementioned LB streaming channel, to be used by the security connector for - * secure naming checks. The user is responsible for freeing \a targets_info. */ -static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, - const grpc_lb_addresses *addresses, - grpc_slice_hash_table **targets_info) { + * Inputs: + * - \a addresses: corresponding to the balancers. + * - \a response_generator: in order to propagate updates from the resolver + * above the grpclb policy. + * - \a args: other args inherited from the grpclb policy. */ +static grpc_channel_args *build_lb_channel_args( + grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { size_t num_grpclb_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; @@ -874,53 +894,54 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, * It's the resolver's responsibility to make sure this policy is only * instantiated and used in that case. Otherwise, something has gone wrong. */ GPR_ASSERT(num_grpclb_addrs > 0); - + grpc_lb_addresses *lb_addresses = + grpc_lb_addresses_create(num_grpclb_addrs, NULL); grpc_slice_hash_table_entry *targets_info_entries = - gpr_malloc(sizeof(*targets_info_entries) * num_grpclb_addrs); + gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs); - /* construct a target ipvX://ip1:port1,ip2:port2,... from the addresses in \a - * addresses */ - /* TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); - size_t addr_index = 0; - - for (size_t i = 0; i < addresses->num_addresses; i++) { + size_t lb_addresses_idx = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) continue; if (addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); } - if (addresses->addresses[i].is_balancer) { - char *addr_str; - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_str, &addresses->addresses[i].address, true) > 0); - targets_info_entries[addr_index] = targets_info_entry_create( - addr_str, addresses->addresses[i].balancer_name); - addr_strs[addr_index++] = addr_str; - } + char *addr_str; + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_str, &addresses->addresses[i].address, true) > 0); + targets_info_entries[lb_addresses_idx] = targets_info_entry_create( + addr_str, addresses->addresses[i].balancer_name); + gpr_free(addr_str); + + grpc_lb_addresses_set_address( + lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, + addresses->addresses[i].address.len, false /* is balancer */, + addresses->addresses[i].balancer_name, NULL /* user data */); } - GPR_ASSERT(addr_index == num_grpclb_addrs); - - size_t uri_path_len; - char *uri_path = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs, - ",", &uri_path_len); - for (size_t i = 0; i < num_grpclb_addrs; i++) gpr_free(addr_strs[i]); - gpr_free(addr_strs); - - char *target_uri_str = NULL; - /* TODO(dgq): Don't assume all addresses will share the scheme of the first - * one */ - gpr_asprintf(&target_uri_str, "%s:%s", - grpc_sockaddr_get_uri_scheme(&addresses->addresses[0].address), - uri_path); - gpr_free(uri_path); - - *targets_info = grpc_slice_hash_table_create( - num_grpclb_addrs, targets_info_entries, destroy_balancer_name); + GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); + grpc_slice_hash_table *targets_info = + grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries, + destroy_balancer_name, balancer_name_cmp_fn); gpr_free(targets_info_entries); - return target_uri_str; + grpc_channel_args *lb_channel_args = + grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info, + response_generator, args); + + grpc_arg lb_channel_addresses_arg = + grpc_lb_addresses_create_channel_arg(lb_addresses); + + grpc_channel_args *result = grpc_channel_args_copy_and_add( + lb_channel_args, &lb_channel_addresses_arg, 1); + grpc_slice_hash_table_unref(exec_ctx, targets_info); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + grpc_lb_addresses_destroy(exec_ctx, lb_addresses); + return result; } +static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error); static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { @@ -976,24 +997,31 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->args = grpc_channel_args_copy_and_add_and_remove( args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - grpc_slice_hash_table *targets_info = NULL; /* Create a client channel over them to communicate with a LB service */ - char *lb_service_target_addresses = - get_lb_uri_target_addresses(exec_ctx, addresses, &targets_info); - grpc_channel_args *lb_channel_args = - get_lb_channel_args(exec_ctx, targets_info, args->args); + glb_policy->response_generator = + grpc_fake_resolver_response_generator_create(); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + char *uri_str; + gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, lb_service_target_addresses, args->client_channel_factory, - lb_channel_args); - grpc_slice_hash_table_unref(exec_ctx, targets_info); + exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + + /* Propagate initial resolution */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); grpc_channel_args_destroy(exec_ctx, lb_channel_args); - gpr_free(lb_service_target_addresses); + gpr_free(uri_str); if (glb_policy->lb_channel == NULL) { gpr_free((void *)glb_policy->server_name); grpc_channel_args_destroy(exec_ctx, glb_policy->args); gpr_free(glb_policy); return NULL; } + + grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed, + glb_lb_channel_on_connectivity_changed_cb, glb_policy, + grpc_combiner_scheduler(args->combiner, false)); grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); @@ -1009,12 +1037,15 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->client_stats != NULL) { grpc_grpclb_client_stats_unref(glb_policy->client_stats); } - grpc_channel_destroy(glb_policy->lb_channel); - glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } + grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); + if (glb_policy->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); + gpr_free(glb_policy->pending_update_args); + } gpr_free(glb_policy); } @@ -1022,16 +1053,6 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_policy->shutting_down = true; - pending_pick *pp = glb_policy->pending_picks; - glb_policy->pending_picks = NULL; - pending_ping *pping = glb_policy->pending_pings; - glb_policy->pending_pings = NULL; - if (glb_policy->rr_policy) { - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); - } - grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); /* We need a copy of the lb_call pointer because we can't cancell the call * while holding glb_policy->mu: lb_on_server_status_received, invoked due to * the cancel, needs to acquire that same lock */ @@ -1045,6 +1066,30 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_call_cancel(lb_call, NULL); /* lb_on_server_status_received will pick up the cancel and clean up */ } + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + glb_policy->retry_timer_active = false; + } + + pending_pick *pp = glb_policy->pending_picks; + glb_policy->pending_picks = NULL; + pending_ping *pping = glb_policy->pending_pings; + glb_policy->pending_pings = NULL; + if (glb_policy->rr_policy) { + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + } + // We destroy the LB channel here because + // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy + // instance. Destroying the lb channel in glb_destroy would likely result in + // a callback invocation without a valid glb_policy arg. + if (glb_policy->lb_channel != NULL) { + grpc_channel_destroy(glb_policy->lb_channel); + glb_policy->lb_channel = NULL; + } + grpc_connectivity_state_set( + exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); + while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -1318,6 +1363,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); + GPR_ASSERT(glb_policy->lb_call == NULL); GPR_ASSERT(!glb_policy->shutting_down); /* Note the following LB call progresses every time there's activity in \a @@ -1403,8 +1449,10 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, lb_call_init_locked(exec_ctx, glb_policy); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", - (void *)glb_policy, (void *)glb_policy->lb_call); + gpr_log(GPR_INFO, + "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)", + (void *)glb_policy, (void *)glb_policy->lb_channel, + (void *)glb_policy->lb_call); } GPR_ASSERT(glb_policy->lb_call != NULL); @@ -1608,8 +1656,8 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - - if (!glb_policy->shutting_down) { + glb_policy->retry_timer_active = false; + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", (void *)glb_policy); @@ -1617,31 +1665,32 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(glb_policy->lb_call == NULL); query_for_backends_locked(exec_ctx, glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "grpclb_on_retry_timer"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - GPR_ASSERT(glb_policy->lb_call != NULL); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { char *status_details = grpc_slice_to_c_string(glb_policy->lb_call_status_details); - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "Status from LB server received. Status = %d, Details = '%s', " - "(call: %p)", + "(call: %p), error %p", glb_policy->lb_call_status, status_details, - (void *)glb_policy->lb_call); + (void *)glb_policy->lb_call, (void *)error); gpr_free(status_details); } - /* We need to perform cleanups no matter what. */ lb_call_destroy_locked(exec_ctx, glb_policy); - - if (!glb_policy->shutting_down) { + if (glb_policy->started_picking && glb_policy->updating_lb_call) { + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + } + if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy); + glb_policy->updating_lb_call = false; + } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = @@ -1651,16 +1700,18 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, (void *)glb_policy); gpr_timespec timeout = gpr_time_sub(next_try, now); if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { - gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.", + gpr_log(GPR_DEBUG, + "... retry_timer_active in %" PRId64 ".%09d seconds.", timeout.tv_sec, timeout.tv_nsec); } else { - gpr_log(GPR_DEBUG, "... retrying immediately."); + gpr_log(GPR_DEBUG, "... retry_timer_active immediately."); } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); grpc_closure_init( &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); + glb_policy->retry_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); } @@ -1668,6 +1719,138 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, "lb_on_server_status_received"); } +static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + glb_lb_policy *glb_policy = (glb_lb_policy *)policy; + + if (glb_policy->updating_lb_channel) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "Update already in progress for grpclb %p. Deferring update.", + (void *)glb_policy); + } + if (glb_policy->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, + glb_policy->pending_update_args->args); + gpr_free(glb_policy->pending_update_args); + } + glb_policy->pending_update_args = + gpr_zalloc(sizeof(*glb_policy->pending_update_args)); + glb_policy->pending_update_args->client_channel_factory = + args->client_channel_factory; + glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); + glb_policy->pending_update_args->combiner = args->combiner; + return; + } + + glb_policy->updating_lb_channel = true; + // Propagate update to lb_channel (pick first). + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + if (glb_policy->lb_channel == NULL) { + // If we don't have a current channel to the LB, go into TRANSIENT + // FAILURE. + grpc_connectivity_state_set( + exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "glb_update_missing"); + } else { + // otherwise, keep using the current LB channel (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for grpclb %p update, " + "ignoring.", + (void *)glb_policy); + } + } + const grpc_lb_addresses *addresses = arg->value.pointer.p; + GPR_ASSERT(glb_policy->lb_channel != NULL); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + /* Propagate updates to the LB channel through the fake resolver */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + + if (!glb_policy->watching_lb_channel) { + // Watch the LB channel connectivity for connection. + glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT; + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(glb_policy->lb_channel)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + glb_policy->watching_lb_channel = true; + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), + &glb_policy->lb_channel_connectivity, + &glb_policy->lb_channel_on_connectivity_changed, NULL); + } +} + +// Invoked as part of the update process. It continues watching the LB channel +// until it shuts down or becomes READY. It's invoked even if the LB channel +// stayed READY throughout the update (for example if the update is identical). +static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + if (glb_policy->shutting_down) goto done; + // Re-initialize the lb_call. This should also take care of updating the + // embedded RR policy. Note that the current RR policy, if any, will stay in + // effect until an update from the new lb_call is received. + switch (glb_policy->lb_channel_connectivity) { + case GRPC_CHANNEL_INIT: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + /* resub. */ + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(glb_policy->lb_channel)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), + &glb_policy->lb_channel_connectivity, + &glb_policy->lb_channel_on_connectivity_changed, NULL); + break; + } + case GRPC_CHANNEL_IDLE: + // lb channel inactive (probably shutdown prior to update). Restart lb + // call to kick the lb channel into gear. + GPR_ASSERT(glb_policy->lb_call == NULL); + /* fallthrough */ + case GRPC_CHANNEL_READY: + if (glb_policy->lb_call != NULL) { + glb_policy->updating_lb_channel = false; + glb_policy->updating_lb_call = true; + grpc_call_cancel(glb_policy->lb_call, NULL); + // lb_on_server_status_received will pick up the cancel and reinit + // lb_call. + if (glb_policy->pending_update_args != NULL) { + const grpc_lb_policy_args *args = glb_policy->pending_update_args; + glb_policy->pending_update_args = NULL; + glb_update_locked(exec_ctx, &glb_policy->base, args); + } + } else if (glb_policy->started_picking && !glb_policy->shutting_down) { + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + glb_policy->retry_timer_active = false; + } + start_picking_locked(exec_ctx, glb_policy); + } + /* fallthrough */ + case GRPC_CHANNEL_SHUTDOWN: + done: + glb_policy->watching_lb_channel = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "watch_lb_channel_connectivity_cb_shutdown"); + break; + } +} + /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1678,7 +1861,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_ping_one_locked, glb_exit_idle_locked, glb_check_connectivity_locked, - glb_notify_on_state_change_locked}; + glb_notify_on_state_change_locked, + glb_update_locked}; static void glb_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c index d6201f2387..a190c2075d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c @@ -50,28 +50,37 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( return lb_channel; } -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args) { - /* We strip out the channel arg for the LB policy name, since we want - * to use the default (pick_first) in this case. +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { + const grpc_arg to_add[] = { + grpc_fake_resolver_response_generator_arg(response_generator)}; + /* We remove: * - * We also strip out the channel arg for the resolved addresses, since - * that will be generated by the name resolver used in the LB channel. - * Note that the LB channel will use the sockaddr resolver, so this - * won't actually generate a query to DNS (or some other name service). - * However, the addresses returned by the sockaddr resolver will have - * is_balancer=false, whereas our own addresses have is_balancer=true. - * We need the LB channel to return addresses with is_balancer=false - * so that it does not wind up recursively using the grpclb LB policy, - * as per the special case logic in client_channel.c. + * - The channel arg for the LB policy name, since we want to use the default + * (pick_first) in this case. * - * Lastly, we also strip out the channel arg for the server URI, - * since that will be different for the LB channel than for the parent - * channel (the client channel factory will re-add this arg with - * the right value). */ + * - The channel arg for the resolved addresses, since that will be generated + * by the name resolver used in the LB channel. Note that the LB channel + * will use the fake resolver, so this won't actually generate a query + * to DNS (or some other name service). However, the addresses returned by + * the fake resolver will have is_balancer=false, whereas our own + * addresses have is_balancer=true. We need the LB channel to return + * addresses with is_balancer=false so that it does not wind up recursively + * using the grpclb LB policy, as per the special case logic in + * client_channel.c. + * + * - The channel arg for the server URI, since that will be different for the + * LB channel than for the parent channel (the client channel factory will + * re-add this arg with the right value). + * + * - The fake resolver generator, because we are replacing it with the one + * from the grpclb policy, used to propagate updates to the LB channel. */ static const char *keys_to_remove[] = { - GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI}; - return grpc_channel_args_copy_and_remove(args, keys_to_remove, - GPR_ARRAY_SIZE(keys_to_remove)); + GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI, + GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR}; + return grpc_channel_args_copy_and_add_and_remove( + args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add, + GPR_ARRAY_SIZE(to_add)); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h index 9730c971d9..dfb682ad16 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -35,6 +35,7 @@ #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H #include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/slice/slice_hash_table.h" /** Create the channel used for communicating with an LB service. @@ -49,9 +50,10 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( grpc_client_channel_factory *client_channel_factory, grpc_channel_args *args); -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args); +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \ */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c index a145cba63c..21effd75e3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c @@ -76,32 +76,39 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( return lb_channel; } -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args) { - const grpc_arg targets_info_arg = - grpc_lb_targets_info_create_channel_arg(targets_info); - /* We strip out the channel arg for the LB policy name, since we want - * to use the default (pick_first) in this case. +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { + const grpc_arg to_add[] = { + grpc_lb_targets_info_create_channel_arg(targets_info), + grpc_fake_resolver_response_generator_arg(response_generator)}; + /* We remove: * - * We also strip out the channel arg for the resolved addresses, since - * that will be generated by the name resolver used in the LB channel. - * Note that the LB channel will use the sockaddr resolver, so this - * won't actually generate a query to DNS (or some other name service). - * However, the addresses returned by the sockaddr resolver will have - * is_balancer=false, whereas our own addresses have is_balancer=true. - * We need the LB channel to return addresses with is_balancer=false - * so that it does not wind up recursively using the grpclb LB policy, - * as per the special case logic in client_channel.c. + * - The channel arg for the LB policy name, since we want to use the default + * (pick_first) in this case. * - * Lastly, we also strip out the channel arg for the server URI, - * since that will be different for the LB channel than for the parent - * channel (the client channel factory will re-add this arg with - * the right value). */ + * - The channel arg for the resolved addresses, since that will be generated + * by the name resolver used in the LB channel. Note that the LB channel + * will use the fake resolver, so this won't actually generate a query + * to DNS (or some other name service). However, the addresses returned by + * the fake resolver will have is_balancer=false, whereas our own + * addresses have is_balancer=true. We need the LB channel to return + * addresses with is_balancer=false so that it does not wind up recursively + * using the grpclb LB policy, as per the special case logic in + * client_channel.c. + * + * - The channel arg for the server URI, since that will be different for the + * LB channel than for the parent channel (the client channel factory will + * re-add this arg with the right value). + * + * - The fake resolver generator, because we are replacing it with the one + * from the grpclb policy, used to propagate updates to the LB channel. */ static const char *keys_to_remove[] = { - GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI}; + GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI, + GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR}; /* Add the targets info table to be used for secure naming */ return grpc_channel_args_copy_and_add_and_remove( - args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &targets_info_arg, - 1); + args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add, + GPR_ARRAY_SIZE(to_add)); } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index b1c5dfc61c..b6f86255df 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -37,11 +37,14 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" +grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false); + typedef struct pending_pick { struct pending_pick *next; uint32_t initial_metadata_flags; @@ -54,7 +57,9 @@ typedef struct { grpc_lb_policy base; /** all our subchannels */ grpc_subchannel **subchannels; + grpc_subchannel **new_subchannels; size_t num_subchannels; + size_t num_new_subchannels; grpc_closure connectivity_changed; @@ -63,10 +68,19 @@ typedef struct { /** the selected channel */ grpc_connected_subchannel *selected; + /** the subchannel key for \a selected, or NULL if \a selected not set */ + const grpc_subchannel_key *selected_key; + /** have we started picking? */ - int started_picking; + bool started_picking; /** are we shut down? */ - int shutdown; + bool shutdown; + /** are we updating the selected subchannel? */ + bool updating_selected; + /** are we updating the subchannel candidates? */ + bool updating_subchannels; + /** args from the latest update received while already updating, or NULL */ + grpc_lb_policy_args *pending_update_args; /** which subchannel are we watching? */ size_t checking_subchannel; /** what is the connectivity of that channel? */ @@ -80,23 +94,28 @@ typedef struct { static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - size_t i; GPR_ASSERT(p->pending_picks == NULL); - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); + for (size_t i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy"); } if (p->selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, + "picked_first_destroy"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + if (p->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + gpr_free(p->pending_update_args); + } gpr_free(p->subchannels); + gpr_free(p->new_subchannels); gpr_free(p); } static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - p->shutdown = 1; + p->shutdown = true; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set( @@ -106,7 +125,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (p->selected != NULL) { grpc_connected_subchannel_notify_on_state_change( exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - } else if (p->num_subchannels > 0) { + } else if (p->num_subchannels > 0 && p->started_picking) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); @@ -169,21 +188,25 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { - p->started_picking = 1; - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + p->started_picking = true; + if (p->subchannels != NULL) { + GPR_ASSERT(p->num_subchannels > 0); + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } } static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } } @@ -203,7 +226,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* No subchannel selected yet, so try again */ if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -216,30 +239,290 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { - size_t i; size_t num_subchannels = p->num_subchannels; - grpc_subchannel **subchannels; + grpc_subchannel **subchannels = p->subchannels; - subchannels = p->subchannels; p->num_subchannels = 0; p->subchannels = NULL; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); - for (i = 0; i < num_subchannels; i++) { + for (size_t i = 0; i < num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); } - gpr_free(subchannels); } +static grpc_connectivity_state pf_check_connectivity_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + return grpc_connectivity_state_get(&p->state_tracker, error); +} + +static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, + current, notify); +} + +static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + if (p->selected) { + grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + } else { + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); + } +} + +/* unsubscribe all subchannels */ +static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + if (p->num_subchannels > 0) { + GPR_ASSERT(p->selected == NULL); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + &p->connectivity_changed); + p->updating_subchannels = true; + } else if (p->selected != NULL) { + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); + p->updating_selected = true; + } +} + +/* true upon success */ +static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + pick_first_lb_policy *p = (pick_first_lb_policy *)policy; + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + if (p->subchannels == NULL) { + // If we don't have a current subchannel list, go into TRANSIENT FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "pf_update_missing"); + } else { + // otherwise, keep using the current subchannel list (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for Pick First %p update, " + "ignoring.", + (void *)p); + } + return; + } + const grpc_lb_addresses *addresses = arg->value.pointer.p; + size_t num_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; i++) { + if (!addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) { + // Empty update. Unsubscribe from all current subchannels and put the + // channel in TRANSIENT_FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "pf_update_empty"); + stop_connectivity_watchers(exec_ctx, p); + return; + } + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", + (void *)p, (unsigned long)num_addrs); + } + grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs); + /* We remove the following keys in order for subchannel keys belonging to + * subchannels point to the same address to match. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + size_t sc_args_count = 0; + + /* Create list of subchannel args for new addresses in \a args. */ + for (size_t i = 0; i < addresses->num_addresses; i++) { + if (addresses->addresses[i].is_balancer) continue; + if (addresses->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, + 1); + gpr_free(addr_arg.value.string); + sc_args[sc_args_count++].args = new_args; + } + + /* Check if p->selected is amongst them. If so, we are done. */ + if (p->selected != NULL) { + GPR_ASSERT(p->selected_key != NULL); + for (size_t i = 0; i < sc_args_count; i++) { + grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); + const bool found_selected = + grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0; + grpc_subchannel_key_destroy(exec_ctx, ith_sc_key); + if (found_selected) { + // The currently selected subchannel is in the update: we are done. + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Pick First %p found already selected subchannel %p amongst " + "updates. Update done.", + (void *)p, (void *)p->selected); + } + for (size_t j = 0; j < sc_args_count; j++) { + grpc_channel_args_destroy(exec_ctx, + (grpc_channel_args *)sc_args[j].args); + } + gpr_free(sc_args); + return; + } + } + } + // We only check for already running updates here because if the previous + // steps were successful, the update can be considered done without any + // interference (ie, no callbacks were scheduled). + if (p->updating_selected || p->updating_subchannels) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Update already in progress for pick first %p. Deferring update.", + (void *)p); + } + if (p->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + gpr_free(p->pending_update_args); + } + p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args)); + p->pending_update_args->client_channel_factory = + args->client_channel_factory; + p->pending_update_args->args = grpc_channel_args_copy(args->args); + p->pending_update_args->combiner = args->combiner; + return; + } + /* Create the subchannels for the new subchannel args/addresses. */ + grpc_subchannel **new_subchannels = + gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); + size_t num_new_subchannels = 0; + for (size_t i = 0; i < sc_args_count; i++) { + grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( + exec_ctx, args->client_channel_factory, &sc_args[i]); + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_INFO, + "Pick First %p created subchannel %p for address uri %s", + (void *)p, (void *)subchannel, address_uri); + gpr_free(address_uri); + } + grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args); + if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel; + } + gpr_free(sc_args); + if (num_new_subchannels == 0) { + gpr_free(new_subchannels); + // Empty update. Unsubscribe from all current subchannels and put the + // channel in TRANSIENT_FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"), + "pf_update_no_valid_addresses"); + stop_connectivity_watchers(exec_ctx, p); + return; + } + + /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ + stop_connectivity_watchers(exec_ctx, p); + + /* Save new subchannels. The switch over will happen in + * pf_connectivity_changed_locked */ + if (p->updating_selected || p->updating_subchannels) { + p->num_new_subchannels = num_new_subchannels; + p->new_subchannels = new_subchannels; + } else { /* nothing is updating. Get things moving from here */ + p->num_subchannels = num_new_subchannels; + p->subchannels = new_subchannels; + p->new_subchannels = NULL; + p->num_new_subchannels = 0; + if (p->started_picking) { + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } + } +} + static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; + bool restart = false; + if (p->updating_selected && error == GRPC_ERROR_CANCELLED) { + /* Captured the unsubscription for p->selected */ + GPR_ASSERT(p->selected != NULL); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, + "pf_update_connectivity"); + p->updating_selected = false; + if (p->num_new_subchannels == 0) { + p->selected = NULL; + return; + } + restart = true; + } + if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) { + /* Captured the unsubscription for the checking subchannel */ + GPR_ASSERT(p->selected == NULL); + for (size_t i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], + "pf_update_connectivity"); + } + gpr_free(p->subchannels); + p->subchannels = NULL; + p->num_subchannels = 0; + p->updating_subchannels = false; + if (p->num_new_subchannels == 0) return; + restart = true; + } + if (restart) { + p->selected = NULL; + p->selected_key = NULL; + + GPR_ASSERT(p->new_subchannels != NULL); + GPR_ASSERT(p->num_new_subchannels > 0); + p->num_subchannels = p->num_new_subchannels; + p->subchannels = p->new_subchannels; + p->num_new_subchannels = 0; + p->new_subchannels = NULL; + + if (p->started_picking) { + /* If we were picking, continue to do so over the new subchannels, + * starting from the 0th index. */ + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + /* reuses the weak ref from start_picking_locked */ + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } + if (p->pending_update_args != NULL) { + const grpc_lb_policy_args *args = p->pending_update_args; + p->pending_update_args = NULL; + pf_update_locked(exec_ctx, &p->base, args); + } + return; + } GRPC_ERROR_REF(error); - if (p->shutdown) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_ERROR_UNREF(error); @@ -272,6 +555,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected_subchannel), "picked_first"); + + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected); + } + p->selected_key = grpc_subchannel_get_key(selected_subchannel); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); destroy_subchannels_locked(exec_ctx, p); @@ -279,6 +567,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Servicing pending pick with selected subchannel %p", + (void *)p->selected); + } grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } @@ -353,32 +646,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(error); } -static grpc_connectivity_state pf_check_connectivity_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - return grpc_connectivity_state_get(&p->state_tracker, error); -} - -static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); -} - -static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); - } else { - grpc_closure_sched(exec_ctx, closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - } -} - static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -388,7 +655,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_ping_one_locked, pf_exit_idle_locked, pf_check_connectivity_locked, - pf_notify_on_state_change_locked}; + pf_notify_on_state_change_locked, + pf_update_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} @@ -398,59 +666,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { GPR_ASSERT(args->client_channel_factory != NULL); - - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ - const grpc_arg *arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; - } - grpc_lb_addresses *addresses = arg->value.pointer.p; - size_t num_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - if (!addresses->addresses[i].is_balancer) ++num_addrs; - } - if (num_addrs == 0) return NULL; - pick_first_lb_policy *p = gpr_zalloc(sizeof(*p)); - - p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs); - grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - /* Skip balancer addresses, since we only know how to handle backends. */ - if (addresses->addresses[i].is_balancer) continue; - - if (addresses->addresses[i].user_data != NULL) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); - } - - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args.args = new_args; - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args); - grpc_channel_args_destroy(exec_ctx, new_args); - - if (subchannel != NULL) { - p->subchannels[subchannel_idx++] = subchannel; - } - } - if (subchannel_idx == 0) { - gpr_free(p->subchannels); - gpr_free(p); - return NULL; - } - p->num_subchannels = subchannel_idx; - + pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, grpc_combiner_scheduler(args->combiner, false)); @@ -472,6 +689,7 @@ static grpc_lb_policy_factory *pick_first_lb_factory_create() { void grpc_lb_policy_pick_first_init() { grpc_register_lb_policy(pick_first_lb_factory_create()); + grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace); } void grpc_lb_policy_pick_first_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 7ee6ffb787..c9758eef88 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -33,31 +33,11 @@ /** Round Robin Policy. * - * This policy keeps: - * - A circular list of ready (connected) subchannels, the *readylist*. An empty - * readylist consists solely of its root (dummy) node. - * - A pointer to the last element picked from the readylist, the *lastpick*. - * Initially set to point to the readylist's root. - * - * Behavior: - * - When a subchannel connects, it's *prepended* to the readylist's root node. - * Ie, if readylist = A <-> B <-> ROOT <-> C - * ^ ^ - * |____________________| - * and subchannel D becomes connected, the addition of D to the readylist - * results in readylist = A <-> B <-> D <-> ROOT <-> C - * ^ ^ - * |__________________________| - * - When a subchannel disconnects, it's removed from the readylist. If the - * subchannel being removed was the most recently picked, the *lastpick* - * pointer moves to the removed node's previous element. Note that if the - * readylist only had one element, this is still legal, as the lastpick would - * point to the dummy root node, for an empty readylist. - * - Upon picking, *lastpick* is updated to point to the returned (connected) - * subchannel. Note that it's possible that the selected subchannel becomes - * disconnected in the interim between the selection and the actual usage of - * the subchannel by the caller. - */ + * Before every pick, the \a get_next_ready_subchannel_index_locked function + * returns the p->subchannel_list->subchannels index for next subchannel, + * respecting the relative + * order of the addresses provided upon creation or updates. Note however that + * updates will start picking from the beginning of the updated list. */ #include <string.h> @@ -72,8 +52,6 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -typedef struct round_robin_lb_policy round_robin_lb_policy; - grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false); /** List of entities waiting for a pick. @@ -99,9 +77,37 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; +typedef struct rr_subchannel_list rr_subchannel_list; +typedef struct round_robin_lb_policy { + /** base policy: must be first */ + grpc_lb_policy base; + + rr_subchannel_list *subchannel_list; + + /** have we started picking? */ + bool started_picking; + /** are we shutting down? */ + bool shutdown; + /** List of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + /** Index into subchannels for last pick. */ + size_t last_ready_subchannel_index; + + /** Latest version of the subchannel list. + * Subchannel connectivity callbacks will only promote updated subchannel + * lists if they equal \a latest_pending_subchannel_list. In other words, + * racing callbacks that reference outdated subchannel lists won't perform any + * update. */ + rr_subchannel_list *latest_pending_subchannel_list; +} round_robin_lb_policy; + typedef struct { - /** backpointer to owning policy */ - round_robin_lb_policy *policy; + /** backpointer to owning subchannel list */ + rr_subchannel_list *subchannel_list; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ @@ -123,12 +129,9 @@ typedef struct { const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; -struct round_robin_lb_policy { - /** base policy: must be first */ - grpc_lb_policy base; - - /** total number of addresses received at creation time */ - size_t num_addresses; +struct rr_subchannel_list { + /** backpointer to owning policy */ + round_robin_lb_policy *policy; /** all our subchannels */ size_t num_subchannels; @@ -141,67 +144,143 @@ struct round_robin_lb_policy { /** how many subchannels are in state IDLE */ size_t num_idle; - /** have we started picking? */ - bool started_picking; - /** are we shutting down? */ - bool shutdown; - /** List of picks that are waiting on connectivity */ - pending_pick *pending_picks; - - /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; + /** There will be one ref for each entry in subchannels for which there is a + * pending connectivity state watcher callback. */ + gpr_refcount refcount; - // Index into subchannels for last pick. - size_t last_ready_subchannel_index; + /** Is this list shutting down? This may be true due to the shutdown of the + * policy itself or because a newer update has arrived while this one hadn't + * finished processing. */ + bool shutting_down; }; -/** Returns the index into p->subchannels of the next subchannel in - * READY state, or p->num_subchannels if no subchannel is READY. +static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list) { + GPR_ASSERT(subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p", + (void *)subchannel_list->policy, (void *)subchannel_list); + } + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, + "rr_subchannel_list_destroy"); + } + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + } + gpr_free(subchannel_list->subchannels); + gpr_free(subchannel_list); +} + +static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, + const char *reason) { + gpr_ref_non_zero(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count - 1), (unsigned long)count); + } +} + +static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + const bool done = gpr_unref(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count + 1), (unsigned long)count); + } + if (done) { + rr_subchannel_list_destroy(exec_ctx, subchannel_list); + } +} + +/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The + * watcher's callback will ultimately unref \a subchannel_list. */ +static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + GPR_ASSERT(!subchannel_list->shutting_down); + subchannel_list->shutting_down = true; + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } + } + rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} + +/** Returns the index into p->subchannel_list->subchannels of the next + * subchannel in READY state, or p->subchannel_list->num_subchannels if no + * subchannel is READY. * * Note that this function does *not* update p->last_ready_subchannel_index. * The caller must do that if it returns a pick. */ static size_t get_next_ready_subchannel_index_locked( const round_robin_lb_policy *p) { + GPR_ASSERT(p->subchannel_list != NULL); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR: %p] getting next ready subchannel, " + "[RR %p] getting next ready subchannel (out of %lu), " "last_ready_subchannel_index=%lu", - p, (unsigned long)p->last_ready_subchannel_index); + (void *)p, (unsigned long)p->subchannel_list->num_subchannels, + (unsigned long)p->last_ready_subchannel_index); } - for (size_t i = 0; i < p->num_subchannels; ++i) { - const size_t index = - (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + const size_t index = (i + p->last_ready_subchannel_index + 1) % + p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p, - (unsigned long)index, - p->subchannels[index].curr_connectivity_state); + gpr_log(GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%d", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + p->subchannel_list->subchannels[index].curr_connectivity_state); } - if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (p->subchannel_list->subchannels[index].curr_connectivity_state == + GRPC_CHANNEL_READY) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu", - p, (unsigned long)index); + gpr_log(GPR_DEBUG, + "[RR %p] found next ready subchannel (%p) at index %lu of " + "subchannel_list %p", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (unsigned long)index, (void *)p->subchannel_list); } return index; } } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p); } - return p->num_subchannels; + return p->subchannel_list->num_subchannels; } // Sets p->last_ready_subchannel_index to last_ready_index. static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, size_t last_ready_index) { - GPR_ASSERT(last_ready_index < p->num_subchannels); + GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); p->last_ready_subchannel_index = last_ready_index; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", - (void *)p, (unsigned long)last_ready_index, - (void *)p->subchannels[last_ready_index].subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->subchannels[last_ready_index].subchannel)); + gpr_log( + GPR_DEBUG, + "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", + (void *)p, (unsigned long)last_ready_index, + (void *)p->subchannel_list->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannel_list->subchannels[last_ready_index].subchannel)); } } @@ -210,18 +289,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - } - } - } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - gpr_free(p->subchannels); gpr_free(p); } @@ -243,14 +311,9 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, - NULL, - &sd->connectivity_changed_closure); - } - } + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); + p->subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -304,15 +367,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = true; - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); - } + for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &p->subchannel_list->subchannels[i]; + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + rr_subchannel_list_ref(sd->subchannel_list, "start_picking"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } @@ -332,63 +394,70 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - /* readily available, report right away */ - subchannel_data *sd = &p->subchannels[next_ready_index]; - *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); - if (user_data != NULL) { - *user_data = sd->user_data; - } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)", - (void *)*target, (unsigned long)next_ready_index); - } - /* only advance the last picked pointer if the selection was used */ - update_last_ready_subchannel_index_locked(p, next_ready_index); - return 1; - } else { - /* no pick currently available. Save for later in list of pending picks */ - if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + if (p->subchannel_list != NULL) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->subchannel_list->num_subchannels) { + /* readily available, report right away */ + subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; + *target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "rr_picked"); + if (user_data != NULL) { + *user_data = sd->user_data; + } + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, " + "INDEX %lu)", + (void *)p, (void *)sd->subchannel, (void *)*target, + (void *)sd->subchannel_list, (unsigned long)next_ready_index); + } + /* only advance the last picked pointer if the selection was used */ + update_last_ready_subchannel_index_locked(p, next_ready_index); + return 1; } - pending_pick *pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->on_complete = on_complete; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->user_data = user_data; - p->pending_picks = pp; - return 0; } + /* no pick currently available. Save for later in list of pending picks */ + if (!p->started_picking) { + start_picking_locked(exec_ctx, p); + } + pending_pick *pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->on_complete = on_complete; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->user_data = user_data; + p->pending_picks = pp; + return 0; } static void update_state_counters_locked(subchannel_data *sd) { - round_robin_lb_policy *p = sd->policy; + rr_subchannel_list *subchannel_list = sd->subchannel_list; if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(p->num_ready > 0); - --p->num_ready; + GPR_ASSERT(subchannel_list->num_ready > 0); + --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(p->num_transient_failures > 0); - --p->num_transient_failures; + GPR_ASSERT(subchannel_list->num_transient_failures > 0); + --subchannel_list->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(p->num_idle > 0); - --p->num_idle; + GPR_ASSERT(subchannel_list->num_idle > 0); + --subchannel_list->num_idle; } if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - ++p->num_ready; + ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++p->num_transient_failures; + ++subchannel_list->num_transient_failures; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { - ++p->num_idle; + ++subchannel_list->num_idle; } } -/* sd is the subchannel_data associted with the updated subchannel. - * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE - * or SHUTDOWN */ +/** Sets the policy's connectivity status based on that of the passed-in \a sd + * (the subchannel_data associted with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be + * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the + * connectivity status set. */ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we @@ -401,17 +470,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( * CHECK: sd->curr_connectivity_state == CONNECTING. * * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. - * CHECK: p->num_subchannels = 0. + * CHECK: p->subchannel_list->num_subchannels = 0. * * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is * TRANSIENT_FAILURE. - * CHECK: p->num_transient_failures == p->num_subchannels. + * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels. * * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: p->num_idle == p->num_subchannels. + * CHECK: p->num_idle == p->subchannel_list->num_subchannels. */ - round_robin_lb_policy *p = sd->policy; - if (p->num_ready > 0) { /* 1) READY */ + rr_subchannel_list *subchannel_list = sd->subchannel_list; + round_robin_lb_policy *p = subchannel_list->policy; + if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -421,18 +491,19 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); return GRPC_CHANNEL_CONNECTING; - } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ + } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); return GRPC_CHANNEL_SHUTDOWN; - } else if (p->num_transient_failures == - p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + } else if (subchannel_list->num_transient_failures == + p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); return GRPC_CHANNEL_TRANSIENT_FAILURE; - } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ + } else if (subchannel_list->num_idle == + p->subchannel_list->num_subchannels) { /* 5) IDLE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "rr_idle"); return GRPC_CHANNEL_IDLE; @@ -444,7 +515,28 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; - round_robin_lb_policy *p = sd->policy; + round_robin_lb_policy *p = sd->subchannel_list->policy; + // If the policy is shutting down, unref and return. + if (p->shutdown) { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); + return; + } + if (sd->subchannel_list->shutting_down) { + // the subchannel list associated with sd has been discarded. This callback + // corresponds to the unsubscription. + GPR_ASSERT(error == GRPC_ERROR_CANCELLED); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); + return; + } + // Dispose of outdated subchannel lists. + if (sd->subchannel_list != p->subchannel_list && + sd->subchannel_list != p->latest_pending_subchannel_list) { + // sd belongs to an outdated subchannel_list: get rid of it. + rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated"); + return; + } // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -453,21 +545,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p: " "prev_state=%d new_state=%d", - p, sd->subchannel, sd->prev_connectivity_state, + (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, sd->curr_connectivity_state); } - // If we're shutting down, unref and return. - if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - return; - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; - grpc_connectivity_state new_connectivity_state = + const grpc_connectivity_state new_policy_connectivity_state = update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); - // If the new state is SHUTDOWN, unref the subchannel, and if the new - // overall state is SHUTDOWN, clean up. + // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new + // policy's state is SHUTDOWN, clean up. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); sd->subchannel = NULL; @@ -475,7 +562,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); } - if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* the policy is shutting down. Flush all the pending picks... */ pending_pick *pp; while ((pp = p->pending_picks)) { @@ -486,15 +573,42 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } } /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - } else { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, + "rr_connectivity_sd_shutdown"); + } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + if (sd->subchannel_list != p->subchannel_list) { + // promote sd->subchannel_list to p->subchannel_list. + // sd->subchannel_list must be equal to + // p->latest_pending_subchannel_list because we have already filtered + // for sds belonging to outdated subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(!sd->subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] phasing out subchannel list %p (size %lu) in favor " + "of %p (size %lu)", + (void *)p, (void *)p->subchannel_list, + (unsigned long)p->subchannel_list->num_subchannels, + (void *)sd->subchannel_list, + (unsigned long)sd->subchannel_list->num_subchannels); + } + if (p->subchannel_list != NULL) { + // dispose of the current subchannel_list + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update_connectivity"); + } + p->subchannel_list = sd->subchannel_list; + p->latest_pending_subchannel_list = NULL; + } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - GPR_ASSERT(next_ready_index < p->num_subchannels); - subchannel_data *selected = &p->subchannels[next_ready_index]; + GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ @@ -519,7 +633,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity" weak ref */ + /* renew notification: reuses the "rr_connectivity" weak ref on the policy + * as well as the sd->subchannel_list ref. */ grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -546,8 +661,9 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - subchannel_data *selected = &p->subchannels[next_ready_index]; + if (next_ready_index < p->subchannel_list->num_subchannels) { + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); @@ -559,52 +675,68 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } -static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown_locked, - rr_pick_locked, - rr_cancel_pick_locked, - rr_cancel_picks_locked, - rr_ping_one_locked, - rr_exit_idle_locked, - rr_check_connectivity_locked, - rr_notify_on_state_change_locked}; - -static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - -static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} - -static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - GPR_ASSERT(args->client_channel_factory != NULL); - - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ +static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + round_robin_lb_policy *p = (round_robin_lb_policy *)policy; + /* Find the number of backend addresses. We ignore balancer addresses, since + * we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; + if (p->subchannel_list == NULL) { + // If we don't have a current subchannel list, go into TRANSIENT FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "rr_update_missing"); + } else { + // otherwise, keep using the current subchannel list (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for Round Robin %p update, " + "ignoring.", + (void *)p); + } + return; } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { if (!addresses->addresses[i].is_balancer) ++num_addrs; } - if (num_addrs == 0) return NULL; - - round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); - - p->num_addresses = num_addrs; - p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); - - grpc_subchannel_args sc_args; + if (num_addrs == 0) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "rr_update_empty"); + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update"); + p->subchannel_list = NULL; + } + return; + } size_t subchannel_index = 0; + rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); + subchannel_list->policy = p; + subchannel_list->subchannels = + gpr_zalloc(sizeof(subchannel_data) * num_addrs); + subchannel_list->num_subchannels = num_addrs; + gpr_ref_init(&subchannel_list->refcount, 1); + p->latest_pending_subchannel_list = subchannel_list; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", + (void *)subchannel_list, (unsigned long)num_addrs); + } + grpc_subchannel_args sc_args; + /* We need to remove the LB addresses in order to be able to compare the + * subchannel keys of subchannels from a different batch of addresses. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + /* Create subchannels for addresses in the update. */ for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; - - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + GPR_ASSERT(i < num_addrs); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); @@ -618,52 +750,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s", - (unsigned long)subchannel_index, (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, + "index %lu: Created subchannel %p for address uri %s into " + "subchannel_list %p", + (unsigned long)subchannel_index, (void *)subchannel, address_uri, + (void *)subchannel_list); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); - if (subchannel != NULL) { - subchannel_data *sd = &p->subchannels[subchannel_index]; - sd->policy = p; - sd->subchannel = subchannel; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != NULL) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); - } - grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner, false)); - ++subchannel_index; + subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; + sd->subchannel_list = subchannel_list; + sd->subchannel = subchannel; + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed_locked, sd, + grpc_combiner_scheduler(args->combiner, false)); + /* use some sentinel value outside of the range of + * grpc_connectivity_state to signal an undefined previous state. We + * won't be referring to this value again and it'll be overwritten after + * the first call to rr_connectivity_changed_locked */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } + if (p->started_picking) { + rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update"); + /* 2. Watch every new subchannel. A subchannel list becomes active the + * moment one of its subchannels is READY. At that moment, we swap + * p->subchannel_list for sd->subchannel_list, provided the subchannel + * list is still valid (ie, isn't shutting down) */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } - if (subchannel_index == 0) { - /* couldn't create any subchannel. Bail out */ - gpr_free(p->subchannels); - gpr_free(p); - return NULL; + if (!p->started_picking) { + // The policy isn't picking yet. Save the update for later, disposing of + // previous version if any. + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "rr_update_before_started_picking"); + } + p->subchannel_list = subchannel_list; } - p->num_subchannels = subchannel_index; +} + +static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { + rr_destroy, + rr_shutdown_locked, + rr_pick_locked, + rr_cancel_pick_locked, + rr_cancel_picks_locked, + rr_ping_one_locked, + rr_exit_idle_locked, + rr_check_connectivity_locked, + rr_notify_on_state_change_locked, + rr_update_locked}; + +static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - // Initialize the last pick index to the last subchannel, so that the - // first pick will start at the beginning of the list. - p->last_ready_subchannel_index = subchannel_index - 1; +static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} +static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + GPR_ASSERT(args->client_channel_factory != NULL); + round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); + rr_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", - (void *)p, (unsigned long)p->num_subchannels); + gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p, + (unsigned long)p->subchannel_list->num_subchannels); } return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index 9d6c0fc139..5c6464bc87 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -118,11 +118,11 @@ grpc_lb_addresses *grpc_lb_addresses_find_channel_arg( const grpc_channel_args *channel_args); /** Arguments passed to LB policies. */ -typedef struct grpc_lb_policy_args { +struct grpc_lb_policy_args { grpc_client_channel_factory *client_channel_factory; grpc_channel_args *args; grpc_combiner *combiner; -} grpc_lb_policy_args; +}; struct grpc_lb_policy_factory_vtable { void (*ref)(grpc_lb_policy_factory *factory); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c new file mode 100644 index 0000000000..cb9b1f07c2 --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -0,0 +1,255 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +// This is similar to the sockaddr resolver, except that it supports a +// bunch of query args that are useful for dependency injection in tests. + +#include <limits.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/string.h" + +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" + +// +// fake_resolver +// + +typedef struct { + // base class -- must be first + grpc_resolver base; + + // passed-in parameters + grpc_channel_args* channel_args; + + // If not NULL, the next set of resolution results to be returned to + // grpc_resolver_next_locked()'s closure. + grpc_channel_args* next_results; + + // pending next completion, or NULL + grpc_closure* next_completion; + // target result address for next completion + grpc_channel_args** target_result; +} fake_resolver; + +static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { + fake_resolver* r = (fake_resolver*)gr; + grpc_channel_args_destroy(exec_ctx, r->next_results); + grpc_channel_args_destroy(exec_ctx, r->channel_args); + gpr_free(r); +} + +static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { + fake_resolver* r = (fake_resolver*)resolver; + if (r->next_completion != NULL) { + *r->target_result = NULL; + grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + r->next_completion = NULL; + } +} + +static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, + fake_resolver* r) { + if (r->next_completion != NULL && r->next_results != NULL) { + *r->target_result = + grpc_channel_args_union(r->next_results, r->channel_args); + grpc_channel_args_destroy(exec_ctx, r->next_results); + grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + r->next_completion = NULL; + r->next_results = NULL; + } +} + +static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver) { + fake_resolver* r = (fake_resolver*)resolver; + fake_resolver_maybe_finish_next_locked(exec_ctx, r); +} + +static void fake_resolver_next_locked(grpc_exec_ctx* exec_ctx, + grpc_resolver* resolver, + grpc_channel_args** target_result, + grpc_closure* on_complete) { + fake_resolver* r = (fake_resolver*)resolver; + GPR_ASSERT(!r->next_completion); + r->next_completion = on_complete; + r->target_result = target_result; + fake_resolver_maybe_finish_next_locked(exec_ctx, r); +} + +static const grpc_resolver_vtable fake_resolver_vtable = { + fake_resolver_destroy, fake_resolver_shutdown_locked, + fake_resolver_channel_saw_error_locked, fake_resolver_next_locked}; + +struct grpc_fake_resolver_response_generator { + fake_resolver* resolver; // Set by the fake_resolver constructor to itself. + grpc_channel_args* next_response; + gpr_refcount refcount; +}; + +grpc_fake_resolver_response_generator* +grpc_fake_resolver_response_generator_create() { + grpc_fake_resolver_response_generator* generator = + (grpc_fake_resolver_response_generator*)gpr_zalloc(sizeof(*generator)); + gpr_ref_init(&generator->refcount, 1); + return generator; +} + +grpc_fake_resolver_response_generator* +grpc_fake_resolver_response_generator_ref( + grpc_fake_resolver_response_generator* generator) { + gpr_ref(&generator->refcount); + return generator; +} + +void grpc_fake_resolver_response_generator_unref( + grpc_fake_resolver_response_generator* generator) { + if (gpr_unref(&generator->refcount)) { + gpr_free(generator); + } +} + +static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_fake_resolver_response_generator* generator = + (grpc_fake_resolver_response_generator*)arg; + fake_resolver* r = generator->resolver; + if (r->next_results != NULL) { + grpc_channel_args_destroy(exec_ctx, r->next_results); + } + r->next_results = generator->next_response; + fake_resolver_maybe_finish_next_locked(exec_ctx, r); +} + +void grpc_fake_resolver_response_generator_set_response( + grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator, + grpc_channel_args* next_response) { + GPR_ASSERT(generator->resolver != NULL); + generator->next_response = grpc_channel_args_copy(next_response); + grpc_closure_sched( + exec_ctx, + grpc_closure_create( + set_response_cb, generator, + grpc_combiner_scheduler(generator->resolver->base.combiner, false)), + GRPC_ERROR_NONE); +} + +static void* response_generator_arg_copy(void* p) { + return grpc_fake_resolver_response_generator_ref( + (grpc_fake_resolver_response_generator*)p); +} + +static void response_generator_arg_destroy(grpc_exec_ctx* exec_ctx, void* p) { + grpc_fake_resolver_response_generator_unref( + (grpc_fake_resolver_response_generator*)p); +} + +static int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); } + +static const grpc_arg_pointer_vtable response_generator_arg_vtable = { + response_generator_arg_copy, response_generator_arg_destroy, + response_generator_cmp}; + +grpc_arg grpc_fake_resolver_response_generator_arg( + grpc_fake_resolver_response_generator* generator) { + grpc_arg arg; + arg.type = GRPC_ARG_POINTER; + arg.key = GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR; + arg.value.pointer.p = generator; + arg.value.pointer.vtable = &response_generator_arg_vtable; + return arg; +} + +grpc_fake_resolver_response_generator* +grpc_fake_resolver_get_response_generator(const grpc_channel_args* args) { + const grpc_arg* arg = + grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) return NULL; + return (grpc_fake_resolver_response_generator*)arg->value.pointer.p; +} + +// +// fake_resolver_factory +// + +static void fake_resolver_factory_ref(grpc_resolver_factory* factory) {} + +static void fake_resolver_factory_unref(grpc_resolver_factory* factory) {} + +static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx, + grpc_resolver_factory* factory, + grpc_resolver_args* args) { + fake_resolver* r = (fake_resolver*)gpr_zalloc(sizeof(*r)); + r->channel_args = grpc_channel_args_copy(args->args); + grpc_resolver_init(&r->base, &fake_resolver_vtable, args->combiner); + grpc_fake_resolver_response_generator* response_generator = + grpc_fake_resolver_get_response_generator(args->args); + if (response_generator != NULL) response_generator->resolver = r; + return &r->base; +} + +static char* fake_resolver_get_default_authority(grpc_resolver_factory* factory, + grpc_uri* uri) { + const char* path = uri->path; + if (path[0] == '/') ++path; + return gpr_strdup(path); +} + +static const grpc_resolver_factory_vtable fake_resolver_factory_vtable = { + fake_resolver_factory_ref, fake_resolver_factory_unref, + fake_resolver_create, fake_resolver_get_default_authority, "fake"}; + +static grpc_resolver_factory fake_resolver_factory = { + &fake_resolver_factory_vtable}; + +void grpc_resolver_fake_init(void) { + grpc_register_resolver_type(&fake_resolver_factory); +} + +void grpc_resolver_fake_shutdown(void) {} diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h new file mode 100644 index 0000000000..373509b97f --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -0,0 +1,75 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H + +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/uri_parser.h" +#include "src/core/lib/channel/channel_args.h" + +#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \ + "grpc.fake_resolver.response_generator" + +void grpc_resolver_fake_init(); + +// Instances of \a grpc_fake_resolver_response_generator are passed to the +// fake resolver in a channel argument (see \a +// grpc_fake_resolver_response_generator_arg) in order to inject and trigger +// custom resolutions. See also \a +// grpc_fake_resolver_response_generator_set_response. +typedef struct grpc_fake_resolver_response_generator + grpc_fake_resolver_response_generator; +grpc_fake_resolver_response_generator* +grpc_fake_resolver_response_generator_create(); + +// Instruct the fake resolver associated with the \a response_generator instance +// to trigger a new resolution for \a uri and \a args. +void grpc_fake_resolver_response_generator_set_response( + grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator, + grpc_channel_args* next_response); + +// Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance. +grpc_arg grpc_fake_resolver_response_generator_arg( + grpc_fake_resolver_response_generator* generator); +// Return the \a grpc_fake_resolver_response_generator instance in \a args or +// NULL. +grpc_fake_resolver_response_generator* +grpc_fake_resolver_get_response_generator(const grpc_channel_args* args); + +grpc_fake_resolver_response_generator* +grpc_fake_resolver_response_generator_ref( + grpc_fake_resolver_response_generator* generator); +void grpc_fake_resolver_response_generator_unref( + grpc_fake_resolver_response_generator* generator); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H \ + */ diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index dd14bf1d02..1007916b40 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -307,7 +307,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_connector *connector, const grpc_subchannel_args *args) { - grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args); + grpc_subchannel_key *key = grpc_subchannel_key_create(args); grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); if (c) { grpc_subchannel_key_destroy(exec_ctx, key); @@ -770,6 +770,11 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( return GET_CONNECTED_SUBCHANNEL(c, acq); } +const grpc_subchannel_key *grpc_subchannel_get_key( + const grpc_subchannel *subchannel) { + return subchannel->key; +} + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, const grpc_connected_subchannel_call_args *args, diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e433c33e40..e284001fa2 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -50,6 +50,7 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_connected_subchannel grpc_connected_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; +typedef struct grpc_subchannel_key grpc_subchannel_key; #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) \ @@ -155,6 +156,10 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_subchannel *subchannel); +/** return the subchannel index key for \a subchannel */ +const grpc_subchannel_key *grpc_subchannel_get_key( + const grpc_subchannel *subchannel); + /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call, diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index b25dbfcf51..2a707353c0 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -50,7 +50,6 @@ static gpr_avl g_subchannel_index; static gpr_mu g_mu; struct grpc_subchannel_key { - grpc_connector *connector; grpc_subchannel_args args; }; @@ -73,10 +72,9 @@ static grpc_exec_ctx *current_ctx() { } static grpc_subchannel_key *create_key( - grpc_connector *connector, const grpc_subchannel_args *args, + const grpc_subchannel_args *args, grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) { grpc_subchannel_key *k = gpr_malloc(sizeof(*k)); - k->connector = grpc_connector_ref(connector); k->args.filter_count = args->filter_count; if (k->args.filter_count > 0) { k->args.filters = @@ -91,19 +89,17 @@ static grpc_subchannel_key *create_key( } grpc_subchannel_key *grpc_subchannel_key_create( - grpc_connector *connector, const grpc_subchannel_args *args) { - return create_key(connector, args, grpc_channel_args_normalize); + const grpc_subchannel_args *args) { + return create_key(args, grpc_channel_args_normalize); } static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) { - return create_key(k->connector, &k->args, grpc_channel_args_copy); + return create_key(&k->args, grpc_channel_args_copy); } -static int subchannel_key_compare(grpc_subchannel_key *a, - grpc_subchannel_key *b) { - int c = GPR_ICMP(a->connector, b->connector); - if (c != 0) return c; - c = GPR_ICMP(a->args.filter_count, b->args.filter_count); +int grpc_subchannel_key_compare(const grpc_subchannel_key *a, + const grpc_subchannel_key *b) { + int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; if (a->args.filter_count > 0) { c = memcmp(a->args.filters, b->args.filters, @@ -115,7 +111,6 @@ static int subchannel_key_compare(grpc_subchannel_key *a, void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *k) { - grpc_connector_unref(exec_ctx, k->connector); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args); gpr_free(k); @@ -128,7 +123,7 @@ static void sck_avl_destroy(void *p) { static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); } static long sck_avl_compare(void *a, void *b) { - return subchannel_key_compare(a, b); + return grpc_subchannel_key_compare(a, b); } static void scv_avl_destroy(void *p) { diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index f673ade378..55e90bb645 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -34,17 +34,14 @@ #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H -#include "src/core/ext/filters/client_channel/connector.h" #include "src/core/ext/filters/client_channel/subchannel.h" /** \file Provides an index of active subchannels so that they can be shared amongst channels */ -typedef struct grpc_subchannel_key grpc_subchannel_key; - /** Create a key that can be used to uniquely identify a subchannel */ grpc_subchannel_key *grpc_subchannel_key_create( - grpc_connector *con, const grpc_subchannel_args *args); + const grpc_subchannel_args *args); /** Destroy a subchannel key */ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, @@ -69,6 +66,9 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); +int grpc_subchannel_key_compare(const grpc_subchannel_key *a, + const grpc_subchannel_key *b); + /** Initialize the subchannel index (global) */ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ diff --git a/src/core/ext/filters/workarounds/workaround_utils.h b/src/core/ext/filters/workarounds/workaround_utils.h index 7cd70c12d8..0608d1e937 100644 --- a/src/core/ext/filters/workarounds/workaround_utils.h +++ b/src/core/ext/filters/workarounds/workaround_utils.h @@ -49,4 +49,4 @@ typedef bool (*user_agent_parser)(grpc_mdelem); void grpc_register_workaround(uint32_t id, user_agent_parser parser); -#endif +#endif /* GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H */ diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 247b134938..2df07c8ae6 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -129,9 +129,23 @@ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) { return grpc_channel_args_copy_and_add(src, NULL, 0); } -grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, +grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a, const grpc_channel_args *b) { - return grpc_channel_args_copy_and_add(a, b->args, b->num_args); + const size_t max_out = (a->num_args + b->num_args); + grpc_arg *uniques = gpr_malloc(sizeof(*uniques) * max_out); + for (size_t i = 0; i < a->num_args; ++i) uniques[i] = a->args[i]; + + size_t uniques_idx = a->num_args; + for (size_t i = 0; i < b->num_args; ++i) { + const char *b_key = b->args[i].key; + if (grpc_channel_args_find(a, b_key) == NULL) { // not found + uniques[uniques_idx++] = b->args[i]; + } + } + grpc_channel_args *result = + grpc_channel_args_copy_and_add(NULL, uniques, uniques_idx); + gpr_free(uniques); + return result; } static int cmp_arg(const grpc_arg *a, const grpc_arg *b) { diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index f0f603e251..128afd00d4 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -63,8 +63,8 @@ grpc_channel_args *grpc_channel_args_copy_and_add_and_remove( const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add); -/** Concatenate args from \a a and \a b into a new instance */ -grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, +/** Perform the union of \a a and \a b, prioritizing \a a entries */ +grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a, const grpc_channel_args *b); /** Destroy arguments created by \a grpc_channel_args_copy */ diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index e81531053c..740e553b2a 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -38,9 +38,8 @@ #include "src/core/lib/iomgr/pollset_set.h" /* A grpc_polling_entity is a pollset-or-pollset_set container. It allows - * functions that - * accept a pollset XOR a pollset_set to do so through an abstract interface. - * No ownership is taken. */ + * functions that accept a pollset XOR a pollset_set to do so through an + * abstract interface. No ownership is taken. */ typedef struct grpc_polling_entity { union { @@ -64,18 +63,14 @@ grpc_pollset_set *grpc_polling_entity_pollset_set(grpc_polling_entity *pollent); bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent); /** Add the pollset or pollset_set in \a pollent to the destination pollset_set - * \a - * pss_dst */ + * \a * pss_dst */ void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_pollset_set *pss_dst); /** Delete the pollset or pollset_set in \a pollent from the destination - * pollset_set \a - * pss_dst */ + * pollset_set \a * pss_dst */ void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_pollset_set *pss_dst); -/* pollset_set specific */ - #endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */ diff --git a/src/core/lib/security/transport/lb_targets_info.c b/src/core/lib/security/transport/lb_targets_info.c index e73483c039..8bb06b1f61 100644 --- a/src/core/lib/security/transport/lb_targets_info.c +++ b/src/core/lib/security/transport/lb_targets_info.c @@ -44,7 +44,9 @@ static void *targets_info_copy(void *p) { return grpc_slice_hash_table_ref(p); } static void targets_info_destroy(grpc_exec_ctx *exec_ctx, void *p) { grpc_slice_hash_table_unref(exec_ctx, p); } -static int targets_info_cmp(void *a, void *b) { return GPR_ICMP(a, b); } +static int targets_info_cmp(void *a, void *b) { + return grpc_slice_hash_table_cmp(a, b); +} static const grpc_arg_pointer_vtable server_to_balancer_names_vtable = { targets_info_copy, targets_info_destroy, targets_info_cmp}; diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c index 444f22aa19..2d9ff61c95 100644 --- a/src/core/lib/slice/slice_hash_table.c +++ b/src/core/lib/slice/slice_hash_table.c @@ -43,6 +43,7 @@ struct grpc_slice_hash_table { gpr_refcount refs; void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value); + int (*value_cmp)(void* a, void* b); size_t size; size_t max_num_probes; grpc_slice_hash_table_entry* entries; @@ -72,10 +73,12 @@ static void grpc_slice_hash_table_add(grpc_slice_hash_table* table, grpc_slice_hash_table* grpc_slice_hash_table_create( size_t num_entries, grpc_slice_hash_table_entry* entries, - void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) { + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value), + int (*value_cmp)(void* a, void* b)) { grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table)); gpr_ref_init(&table->refs, 1); table->destroy_value = destroy_value; + table->value_cmp = value_cmp; // Keep load factor low to improve performance of lookups. table->size = num_entries * 2; const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size; @@ -121,3 +124,37 @@ void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table, } return NULL; // Not found. } + +static int pointer_cmp(void* a, void* b) { return GPR_ICMP(a, b); } +int grpc_slice_hash_table_cmp(const grpc_slice_hash_table* a, + const grpc_slice_hash_table* b) { + int (*const value_cmp_fn_a)(void* a, void* b) = + a->value_cmp != NULL ? a->value_cmp : pointer_cmp; + int (*const value_cmp_fn_b)(void* a, void* b) = + b->value_cmp != NULL ? b->value_cmp : pointer_cmp; + // Compare value_fns + const int value_fns_cmp = + GPR_ICMP((void*)value_cmp_fn_a, (void*)value_cmp_fn_b); + if (value_fns_cmp != 0) return value_fns_cmp; + // Compare sizes + if (a->size < b->size) return -1; + if (a->size > b->size) return 1; + // Compare rows. + for (size_t i = 0; i < a->size; ++i) { + if (is_empty(&a->entries[i])) { + if (!is_empty(&b->entries[i])) { + return -1; // a empty but b non-empty + } + continue; // both empty, no need to check key or value + } else if (is_empty(&b->entries[i])) { + return 1; // a non-empty but b empty + } + // neither entry is empty + const int key_cmp = grpc_slice_cmp(a->entries[i].key, b->entries[i].key); + if (key_cmp != 0) return key_cmp; + const int value_cmp = + value_cmp_fn_a(a->entries[i].value, b->entries[i].value); + if (value_cmp != 0) return value_cmp; + } + return 0; +} diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h index 1e61c5eb11..07988abff4 100644 --- a/src/core/lib/slice/slice_hash_table.h +++ b/src/core/lib/slice/slice_hash_table.h @@ -54,11 +54,15 @@ typedef struct grpc_slice_hash_table_entry { } grpc_slice_hash_table_entry; /** Creates a new hash table of containing \a entries, which is an array - of length \a num_entries. Takes ownership of all keys and values in - \a entries. Values will be cleaned up via \a destroy_value(). */ + of length \a num_entries. Takes ownership of all keys and values in \a + entries. Values will be cleaned up via \a destroy_value(). If not NULL, \a + value_cmp will be used to compare values in the context of \a + grpc_slice_hash_table_cmp. If NULL, raw pointer (\a GPR_ICMP) comparison + will be used. */ grpc_slice_hash_table *grpc_slice_hash_table_create( size_t num_entries, grpc_slice_hash_table_entry *entries, - void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value)); + void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value), + int (*value_cmp)(void *a, void *b)); grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table); void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx, @@ -69,4 +73,13 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx, void *grpc_slice_hash_table_get(const grpc_slice_hash_table *table, const grpc_slice key); +/** Compares \a a vs. \a b. + * A table is considered "smaller" (resp. "greater") if: + * - GPR_ICMP(a->value_cmp, b->value_cmp) < 1 (resp. > 1), + * - else, it contains fewer (resp. more) entries, + * - else, if strcmp(a_key, b_key) < 1 (resp. > 1), + * - else, if value_cmp(a_value, b_value) < 1 (resp. > 1). */ +int grpc_slice_hash_table_cmp(const grpc_slice_hash_table *a, + const grpc_slice_hash_table *b); + #endif /* GRPC_CORE_LIB_SLICE_SLICE_HASH_TABLE_H */ diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 6aecb7fa93..107818f4d1 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -229,7 +229,7 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( grpc_slice_hash_table* method_config_table = NULL; if (entries != NULL) { method_config_table = - grpc_slice_hash_table_create(num_entries, entries, destroy_value); + grpc_slice_hash_table_create(num_entries, entries, destroy_value, NULL); gpr_free(entries); } return method_config_table; diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c index 510cf5d5a0..64e3c07510 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.c +++ b/src/core/plugin_registry/grpc_plugin_registry.c @@ -41,6 +41,8 @@ extern void grpc_deadline_filter_init(void); extern void grpc_deadline_filter_shutdown(void); extern void grpc_client_channel_init(void); extern void grpc_client_channel_shutdown(void); +extern void grpc_resolver_fake_init(void); +extern void grpc_resolver_fake_shutdown(void); extern void grpc_lb_policy_grpclb_init(void); extern void grpc_lb_policy_grpclb_shutdown(void); extern void grpc_lb_policy_pick_first_init(void); @@ -73,6 +75,8 @@ void grpc_register_built_in_plugins(void) { grpc_deadline_filter_shutdown); grpc_register_plugin(grpc_client_channel_init, grpc_client_channel_shutdown); + grpc_register_plugin(grpc_resolver_fake_init, + grpc_resolver_fake_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c index e5eb68f934..76b17ed06d 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c @@ -47,6 +47,8 @@ extern void grpc_resolver_dns_native_init(void); extern void grpc_resolver_dns_native_shutdown(void); extern void grpc_resolver_sockaddr_init(void); extern void grpc_resolver_sockaddr_shutdown(void); +extern void grpc_resolver_fake_init(void); +extern void grpc_resolver_fake_shutdown(void); extern void grpc_load_reporting_plugin_init(void); extern void grpc_load_reporting_plugin_shutdown(void); extern void grpc_lb_policy_grpclb_init(void); @@ -79,6 +81,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_dns_native_shutdown); grpc_register_plugin(grpc_resolver_sockaddr_init, grpc_resolver_sockaddr_shutdown); + grpc_register_plugin(grpc_resolver_fake_init, + grpc_resolver_fake_shutdown); grpc_register_plugin(grpc_load_reporting_plugin_init, grpc_load_reporting_plugin_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, |