diff options
author | 2017-06-26 22:01:11 +0200 | |
---|---|---|
committer | 2017-06-26 22:01:11 +0200 | |
commit | 5d80dc4985ef01a468e027df75912bfc1e95579d (patch) | |
tree | 17bf626f6c6709661435924a74ada235e777c221 /src/core/ext/filters/client_channel/lb_policy | |
parent | e7c2458d66b4e4210fb4c5d7e1670aef6b4c9381 (diff) | |
parent | 8bec6a93163861e467005a23c997eb93b793710b (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into import
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
13 files changed, 1304 insertions, 914 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index 67baa46de7..52c6e38c87 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -68,7 +53,7 @@ static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { calld->send_initial_metadata_succeeded = true; } - grpc_closure_run(exec_ctx, calld->original_on_complete_for_send, + GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete_for_send, GRPC_ERROR_REF(error)); } @@ -78,7 +63,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { calld->recv_initial_metadata_succeeded = true; } - grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } @@ -119,7 +104,7 @@ static void start_transport_stream_op_batch( // Intercept send_initial_metadata. if (batch->send_initial_metadata) { calld->original_on_complete_for_send = batch->on_complete; - grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld, + GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld, grpc_schedule_on_exec_ctx); batch->on_complete = &calld->on_complete_for_send; } @@ -127,7 +112,7 @@ static void start_transport_stream_op_batch( if (batch->recv_initial_metadata) { calld->original_recv_initial_metadata_ready = batch->payload->recv_initial_metadata.recv_initial_metadata_ready; - grpc_closure_init(&calld->recv_initial_metadata_ready, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, calld, grpc_schedule_on_exec_ctx); batch->payload->recv_initial_metadata.recv_initial_metadata_ready = diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h index 28b313d874..51e30b20b8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index d2a2856a18..5a5ff2902d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -1,33 +1,18 @@ /* * - * Copyright 2016, Google Inc. - * All rights reserved. + * Copyright 2016 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -115,6 +100,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" @@ -198,7 +184,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, wrapped_rr_closure_arg *wc_arg = arg; GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); if (wc_arg->rr_policy != NULL) { /* if *target is NULL, no pick has been made by the RR policy (eg, all @@ -270,7 +256,7 @@ static void add_pending_pick(pending_pick **root, pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.free_when_done = pp; - grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure, wrapped_rr_closure, &pp->wrapped_on_complete_arg, grpc_schedule_on_exec_ctx); *root = pp; @@ -289,7 +275,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { pping->wrapped_notify_arg.wrapped_closure = notify; pping->wrapped_notify_arg.free_when_done = pping; pping->next = *root; - grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure, + GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure, wrapped_rr_closure, &pping->wrapped_notify_arg, grpc_schedule_on_exec_ctx); *root = pping; @@ -315,6 +301,9 @@ typedef struct glb_lb_policy { /** for communicating with the LB server */ grpc_channel *lb_channel; + /** response generator to inject address updates into \a lb_channel */ + grpc_fake_resolver_response_generator *response_generator; + /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy *rr_policy; @@ -323,6 +312,9 @@ typedef struct glb_lb_policy { /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; + /** connectivity state of the LB channel */ + grpc_connectivity_state lb_channel_connectivity; + /** stores the deserialized response from the LB. May be NULL until one such * response has arrived. */ grpc_grpclb_serverlist *serverlist; @@ -340,10 +332,27 @@ typedef struct glb_lb_policy { bool shutting_down; + /** are we currently updating lb_call? */ + bool updating_lb_call; + + /** are we currently updating lb_channel? */ + bool updating_lb_channel; + + /** are we already watching the LB channel's connectivity? */ + bool watching_lb_channel; + + /** is \a lb_call_retry_timer active? */ + bool retry_timer_active; + + /** called upon changes to the LB channel's connectivity. */ + grpc_closure lb_channel_on_connectivity_changed; + + /** args from the latest update received while already updating, or NULL */ + grpc_lb_policy_args *pending_update_args; + /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ - /* Finished sending initial request. */ grpc_closure lb_on_sent_initial_request; @@ -533,10 +542,9 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } -/* returns true if the new RR policy should replace the current one, if any */ -static bool update_lb_connectivity_status_locked( +static void update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { + grpc_connectivity_state rr_state, grpc_error *rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); @@ -570,28 +578,26 @@ static bool update_lb_connectivity_status_locked( * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (new_rr_state) { + switch (rr_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: - GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE); - return false; /* don't replace the RR policy */ + GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); + break; case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: - GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE); + GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); } if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Setting grpclb's state to %s from new RR policy %p state.", - grpc_connectivity_state_name(new_rr_state), - (void *)glb_policy->rr_policy); + gpr_log( + GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.", + grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy); } - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - new_rr_state, GRPC_ERROR_REF(new_rr_state_error), + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, + GRPC_ERROR_REF(rr_state_error), "update_lb_connectivity_status_locked"); - return true; } /* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return @@ -629,7 +635,7 @@ static bool pick_from_internal_rr_locked( grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -657,7 +663,7 @@ static bool pick_from_internal_rr_locked( wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); return false; } @@ -670,45 +676,38 @@ static bool pick_from_internal_rr_locked( return pick_done; } -static grpc_lb_policy *create_rr_locked( - grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { - GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - - grpc_lb_policy_args args; - memset(&args, 0, sizeof(args)); - args.client_channel_factory = glb_policy->cc_factory; - args.combiner = glb_policy->base.combiner; +static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args)); + args->client_channel_factory = glb_policy->cc_factory; + args->combiner = glb_policy->base.combiner; grpc_lb_addresses *addresses = - process_serverlist_locked(exec_ctx, serverlist); - + process_serverlist_locked(exec_ctx, glb_policy->serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); - args.args = grpc_channel_args_copy_and_add_and_remove( + args->args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); - - grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - GPR_ASSERT(rr != NULL); grpc_lb_addresses_destroy(exec_ctx, addresses); - grpc_channel_args_destroy(exec_ctx, args.args); - return rr; + return args; +} + +static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_args *args) { + grpc_channel_args_destroy(exec_ctx, args->args); + gpr_free(args); } static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -/* glb_policy->rr_policy may be NULL (initial handover) */ -static void rr_handover_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { - GPR_ASSERT(glb_policy->serverlist != NULL && - glb_policy->serverlist->num_servers > 0); - - if (glb_policy->shutting_down) return; +static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + grpc_lb_policy_args *args) { + GPR_ASSERT(glb_policy->rr_policy == NULL); grpc_lb_policy *new_rr_policy = - create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); + grpc_lb_policy_create(exec_ctx, "round_robin", args); if (new_rr_policy == NULL) { gpr_log(GPR_ERROR, "Failure creating a RoundRobin policy for serverlist update with " @@ -719,41 +718,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, (void *)glb_policy->rr_policy); return; } - - grpc_error *new_rr_state_error = NULL; - const grpc_connectivity_state new_rr_state = - grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy, - &new_rr_state_error); - /* Connectivity state is a function of the new RR policy just created */ - const bool replace_old_rr = update_lb_connectivity_status_locked( - exec_ctx, glb_policy, new_rr_state, new_rr_state_error); - - if (!replace_old_rr) { - /* dispose of the new RR policy that won't be used after all */ - GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace"); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Keeping old RR policy (%p) despite new serverlist: new RR " - "policy was in %s connectivity state.", - (void *)glb_policy->rr_policy, - grpc_connectivity_state_name(new_rr_state)); - } - return; - } - - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)", - (void *)new_rr_policy, (void *)glb_policy->rr_policy); - } - - if (glb_policy->rr_policy != NULL) { - /* if we are phasing out an existing RR instance, unref it. */ - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); - } - - /* Finally update the RR policy to the newly created one */ glb_policy->rr_policy = new_rr_policy; + grpc_error *rr_state_error = NULL; + const grpc_connectivity_state rr_state = + grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, + &rr_state_error); + /* Connectivity state is a function of the RR policy updated/created */ + update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, + rr_state_error); + /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ @@ -765,14 +739,14 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = gpr_zalloc(sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, + GRPC_CLOSURE_INIT(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = new_rr_state; + rr_connectivity->state = rr_state; /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched"); grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -809,6 +783,31 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, } } +/* glb_policy->rr_policy may be NULL (initial handover) */ +static void rr_handover_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); + + if (glb_policy->shutting_down) return; + + grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); + if (glb_policy->rr_policy != NULL) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", + (void *)glb_policy->rr_policy); + } + grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args); + } else { + create_rr_locked(exec_ctx, glb_policy, args); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)", + (void *)glb_policy->rr_policy); + } + } + lb_policy_args_destroy(exec_ctx, args); +} + static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; @@ -854,18 +853,24 @@ static grpc_slice_hash_table_entry targets_info_entry_create( return entry; } -/* Returns the target URI for the LB service whose addresses are in \a - * addresses. Using this URI, a bidirectional streaming channel will be created - * for the reception of load balancing updates. +static int balancer_name_cmp_fn(void *a, void *b) { + const char *a_str = a; + const char *b_str = b; + return strcmp(a_str, b_str); +} + +/* Returns the channel args for the LB channel, used to create a bidirectional + * stream for the reception of load balancing updates. * - * The output argument \a targets_info will be updated to contain a mapping of - * "LB server address" to "balancer name", as reported by the naming system. - * This mapping will be propagated via the channel arguments of the - * aforementioned LB streaming channel, to be used by the security connector for - * secure naming checks. The user is responsible for freeing \a targets_info. */ -static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, - const grpc_lb_addresses *addresses, - grpc_slice_hash_table **targets_info) { + * Inputs: + * - \a addresses: corresponding to the balancers. + * - \a response_generator: in order to propagate updates from the resolver + * above the grpclb policy. + * - \a args: other args inherited from the grpclb policy. */ +static grpc_channel_args *build_lb_channel_args( + grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { size_t num_grpclb_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; @@ -874,53 +879,54 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, * It's the resolver's responsibility to make sure this policy is only * instantiated and used in that case. Otherwise, something has gone wrong. */ GPR_ASSERT(num_grpclb_addrs > 0); - + grpc_lb_addresses *lb_addresses = + grpc_lb_addresses_create(num_grpclb_addrs, NULL); grpc_slice_hash_table_entry *targets_info_entries = - gpr_malloc(sizeof(*targets_info_entries) * num_grpclb_addrs); + gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs); - /* construct a target ipvX://ip1:port1,ip2:port2,... from the addresses in \a - * addresses */ - /* TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); - size_t addr_index = 0; - - for (size_t i = 0; i < addresses->num_addresses; i++) { + size_t lb_addresses_idx = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) continue; if (addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); } - if (addresses->addresses[i].is_balancer) { - char *addr_str; - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_str, &addresses->addresses[i].address, true) > 0); - targets_info_entries[addr_index] = targets_info_entry_create( - addr_str, addresses->addresses[i].balancer_name); - addr_strs[addr_index++] = addr_str; - } + char *addr_str; + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_str, &addresses->addresses[i].address, true) > 0); + targets_info_entries[lb_addresses_idx] = targets_info_entry_create( + addr_str, addresses->addresses[i].balancer_name); + gpr_free(addr_str); + + grpc_lb_addresses_set_address( + lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, + addresses->addresses[i].address.len, false /* is balancer */, + addresses->addresses[i].balancer_name, NULL /* user data */); } - GPR_ASSERT(addr_index == num_grpclb_addrs); - - size_t uri_path_len; - char *uri_path = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs, - ",", &uri_path_len); - for (size_t i = 0; i < num_grpclb_addrs; i++) gpr_free(addr_strs[i]); - gpr_free(addr_strs); - - char *target_uri_str = NULL; - /* TODO(dgq): Don't assume all addresses will share the scheme of the first - * one */ - gpr_asprintf(&target_uri_str, "%s:%s", - grpc_sockaddr_get_uri_scheme(&addresses->addresses[0].address), - uri_path); - gpr_free(uri_path); - - *targets_info = grpc_slice_hash_table_create( - num_grpclb_addrs, targets_info_entries, destroy_balancer_name); + GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); + grpc_slice_hash_table *targets_info = + grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries, + destroy_balancer_name, balancer_name_cmp_fn); gpr_free(targets_info_entries); - return target_uri_str; + grpc_channel_args *lb_channel_args = + grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info, + response_generator, args); + + grpc_arg lb_channel_addresses_arg = + grpc_lb_addresses_create_channel_arg(lb_addresses); + + grpc_channel_args *result = grpc_channel_args_copy_and_add( + lb_channel_args, &lb_channel_addresses_arg, 1); + grpc_slice_hash_table_unref(exec_ctx, targets_info); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + grpc_lb_addresses_destroy(exec_ctx, lb_addresses); + return result; } +static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error); static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { @@ -968,32 +974,37 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. - grpc_arg new_arg; - new_arg.key = GRPC_ARG_LB_POLICY_NAME; - new_arg.type = GRPC_ARG_STRING; - new_arg.value.string = "grpclb"; + grpc_arg new_arg = + grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb"); static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; glb_policy->args = grpc_channel_args_copy_and_add_and_remove( args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - grpc_slice_hash_table *targets_info = NULL; /* Create a client channel over them to communicate with a LB service */ - char *lb_service_target_addresses = - get_lb_uri_target_addresses(exec_ctx, addresses, &targets_info); - grpc_channel_args *lb_channel_args = - get_lb_channel_args(exec_ctx, targets_info, args->args); + glb_policy->response_generator = + grpc_fake_resolver_response_generator_create(); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + char *uri_str; + gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, lb_service_target_addresses, args->client_channel_factory, - lb_channel_args); - grpc_slice_hash_table_unref(exec_ctx, targets_info); + exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + + /* Propagate initial resolution */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); grpc_channel_args_destroy(exec_ctx, lb_channel_args); - gpr_free(lb_service_target_addresses); + gpr_free(uri_str); if (glb_policy->lb_channel == NULL) { gpr_free((void *)glb_policy->server_name); grpc_channel_args_destroy(exec_ctx, glb_policy->args); gpr_free(glb_policy); return NULL; } + + GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, + glb_lb_channel_on_connectivity_changed_cb, glb_policy, + grpc_combiner_scheduler(args->combiner)); grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); @@ -1009,12 +1020,15 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->client_stats != NULL) { grpc_grpclb_client_stats_unref(glb_policy->client_stats); } - grpc_channel_destroy(glb_policy->lb_channel); - glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } + grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); + if (glb_policy->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); + gpr_free(glb_policy->pending_update_args); + } gpr_free(glb_policy); } @@ -1022,16 +1036,6 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_policy->shutting_down = true; - pending_pick *pp = glb_policy->pending_picks; - glb_policy->pending_picks = NULL; - pending_ping *pping = glb_policy->pending_pings; - glb_policy->pending_pings = NULL; - if (glb_policy->rr_policy) { - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); - } - grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); /* We need a copy of the lb_call pointer because we can't cancell the call * while holding glb_policy->mu: lb_on_server_status_received, invoked due to * the cancel, needs to acquire that same lock */ @@ -1045,17 +1049,41 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_call_cancel(lb_call, NULL); /* lb_on_server_status_received will pick up the cancel and clean up */ } + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + glb_policy->retry_timer_active = false; + } + + pending_pick *pp = glb_policy->pending_picks; + glb_policy->pending_picks = NULL; + pending_ping *pping = glb_policy->pending_pings; + glb_policy->pending_pings = NULL; + if (glb_policy->rr_policy) { + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + } + // We destroy the LB channel here because + // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy + // instance. Destroying the lb channel in glb_destroy would likely result in + // a callback invocation without a valid glb_policy arg. + if (glb_policy->lb_channel != NULL) { + grpc_channel_destroy(glb_policy->lb_channel); + glb_policy->lb_channel = NULL; + } + grpc_connectivity_state_set( + exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); + while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_NONE); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, GRPC_ERROR_NONE); pping = next; } @@ -1071,7 +1099,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1095,7 +1123,7 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, pending_pick *next = pp->next; if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1130,7 +1158,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; - grpc_closure_sched(exec_ctx, on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, on_complete, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "No mdelem storage for the LB token. Load reporting " "won't work without it. Failing")); @@ -1149,7 +1177,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wrapped_rr_closure_arg *wc_arg = gpr_zalloc(sizeof(wrapped_rr_closure_arg)); - grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, + GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, grpc_schedule_on_exec_ctx); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; @@ -1220,9 +1248,9 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); const gpr_timespec next_client_load_report_time = gpr_time_add(now, glb_policy->client_stats_report_interval); - grpc_closure_init(&glb_policy->client_load_report_closure, + GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, next_client_load_report_time, &glb_policy->client_load_report_closure, now); @@ -1248,9 +1276,9 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, memset(&op, 0, sizeof(op)); op.op = GRPC_OP_SEND_MESSAGE; op.data.send_message.send_message = glb_policy->client_load_report_payload; - grpc_closure_init(&glb_policy->client_load_report_closure, + GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); @@ -1318,6 +1346,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); + GPR_ASSERT(glb_policy->lb_call == NULL); GPR_ASSERT(!glb_policy->shutting_down); /* Note the following LB call progresses every time there's activity in \a @@ -1353,15 +1382,15 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_slice_unref_internal(exec_ctx, request_payload_slice); grpc_grpclb_request_destroy(request); - grpc_closure_init(&glb_policy->lb_on_sent_initial_request, + GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request, lb_on_sent_initial_request_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); - grpc_closure_init(&glb_policy->lb_on_server_status_received, + grpc_combiner_scheduler(glb_policy->base.combiner)); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); - grpc_closure_init(&glb_policy->lb_on_response_received, + grpc_combiner_scheduler(glb_policy->base.combiner)); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received, lb_on_response_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); gpr_backoff_init(&glb_policy->lb_call_backoff_state, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, @@ -1403,8 +1432,10 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, lb_call_init_locked(exec_ctx, glb_policy); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", - (void *)glb_policy, (void *)glb_policy->lb_call); + gpr_log(GPR_INFO, + "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)", + (void *)glb_policy, (void *)glb_policy->lb_channel, + (void *)glb_policy->lb_call); } GPR_ASSERT(glb_policy->lb_call != NULL); @@ -1608,8 +1639,8 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - - if (!glb_policy->shutting_down) { + glb_policy->retry_timer_active = false; + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", (void *)glb_policy); @@ -1617,31 +1648,32 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(glb_policy->lb_call == NULL); query_for_backends_locked(exec_ctx, glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "grpclb_on_retry_timer"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - GPR_ASSERT(glb_policy->lb_call != NULL); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { char *status_details = grpc_slice_to_c_string(glb_policy->lb_call_status_details); - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "Status from LB server received. Status = %d, Details = '%s', " - "(call: %p)", + "(call: %p), error %p", glb_policy->lb_call_status, status_details, - (void *)glb_policy->lb_call); + (void *)glb_policy->lb_call, (void *)error); gpr_free(status_details); } - /* We need to perform cleanups no matter what. */ lb_call_destroy_locked(exec_ctx, glb_policy); - - if (!glb_policy->shutting_down) { + if (glb_policy->started_picking && glb_policy->updating_lb_call) { + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + } + if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy); + glb_policy->updating_lb_call = false; + } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = @@ -1651,16 +1683,18 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, (void *)glb_policy); gpr_timespec timeout = gpr_time_sub(next_try, now); if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { - gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.", + gpr_log(GPR_DEBUG, + "... retry_timer_active in %" PRId64 ".%09d seconds.", timeout.tv_sec, timeout.tv_nsec); } else { - gpr_log(GPR_DEBUG, "... retrying immediately."); + gpr_log(GPR_DEBUG, "... retry_timer_active immediately."); } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); - grpc_closure_init( - &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, - glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, + lb_call_on_retry_timer_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); + glb_policy->retry_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); } @@ -1668,6 +1702,138 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, "lb_on_server_status_received"); } +static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + glb_lb_policy *glb_policy = (glb_lb_policy *)policy; + + if (glb_policy->updating_lb_channel) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "Update already in progress for grpclb %p. Deferring update.", + (void *)glb_policy); + } + if (glb_policy->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, + glb_policy->pending_update_args->args); + gpr_free(glb_policy->pending_update_args); + } + glb_policy->pending_update_args = + gpr_zalloc(sizeof(*glb_policy->pending_update_args)); + glb_policy->pending_update_args->client_channel_factory = + args->client_channel_factory; + glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); + glb_policy->pending_update_args->combiner = args->combiner; + return; + } + + glb_policy->updating_lb_channel = true; + // Propagate update to lb_channel (pick first). + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + if (glb_policy->lb_channel == NULL) { + // If we don't have a current channel to the LB, go into TRANSIENT + // FAILURE. + grpc_connectivity_state_set( + exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "glb_update_missing"); + } else { + // otherwise, keep using the current LB channel (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for grpclb %p update, " + "ignoring.", + (void *)glb_policy); + } + } + const grpc_lb_addresses *addresses = arg->value.pointer.p; + GPR_ASSERT(glb_policy->lb_channel != NULL); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + /* Propagate updates to the LB channel through the fake resolver */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + + if (!glb_policy->watching_lb_channel) { + // Watch the LB channel connectivity for connection. + glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT; + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(glb_policy->lb_channel)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + glb_policy->watching_lb_channel = true; + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), + &glb_policy->lb_channel_connectivity, + &glb_policy->lb_channel_on_connectivity_changed, NULL); + } +} + +// Invoked as part of the update process. It continues watching the LB channel +// until it shuts down or becomes READY. It's invoked even if the LB channel +// stayed READY throughout the update (for example if the update is identical). +static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + if (glb_policy->shutting_down) goto done; + // Re-initialize the lb_call. This should also take care of updating the + // embedded RR policy. Note that the current RR policy, if any, will stay in + // effect until an update from the new lb_call is received. + switch (glb_policy->lb_channel_connectivity) { + case GRPC_CHANNEL_INIT: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + /* resub. */ + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(glb_policy->lb_channel)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), + &glb_policy->lb_channel_connectivity, + &glb_policy->lb_channel_on_connectivity_changed, NULL); + break; + } + case GRPC_CHANNEL_IDLE: + // lb channel inactive (probably shutdown prior to update). Restart lb + // call to kick the lb channel into gear. + GPR_ASSERT(glb_policy->lb_call == NULL); + /* fallthrough */ + case GRPC_CHANNEL_READY: + if (glb_policy->lb_call != NULL) { + glb_policy->updating_lb_channel = false; + glb_policy->updating_lb_call = true; + grpc_call_cancel(glb_policy->lb_call, NULL); + // lb_on_server_status_received will pick up the cancel and reinit + // lb_call. + if (glb_policy->pending_update_args != NULL) { + const grpc_lb_policy_args *args = glb_policy->pending_update_args; + glb_policy->pending_update_args = NULL; + glb_update_locked(exec_ctx, &glb_policy->base, args); + } + } else if (glb_policy->started_picking && !glb_policy->shutting_down) { + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + glb_policy->retry_timer_active = false; + } + start_picking_locked(exec_ctx, glb_policy); + } + /* fallthrough */ + case GRPC_CHANNEL_SHUTDOWN: + done: + glb_policy->watching_lb_channel = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "watch_lb_channel_connectivity_cb_shutdown"); + break; + } +} + /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1678,7 +1844,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_ping_one_locked, glb_exit_idle_locked, glb_check_connectivity_locked, - glb_notify_on_state_change_locked}; + glb_notify_on_state_change_locked, + glb_update_locked}; static void glb_factory_ref(grpc_lb_policy_factory *factory) {} @@ -1713,6 +1880,9 @@ static bool maybe_add_client_load_reporting_filter( void grpc_lb_policy_grpclb_init() { grpc_register_lb_policy(grpc_glb_lb_factory_create()); grpc_register_tracer("glb", &grpc_lb_glb_trace); +#ifndef NDEBUG + grpc_register_tracer("lb_policy_refcount", &grpc_trace_lb_policy_refcount); +#endif grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_client_load_reporting_filter, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h index b069fae2f8..63ad66c5e9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h @@ -1,33 +1,18 @@ /* * - * Copyright 2016, Google Inc. - * All rights reserved. + * Copyright 2016 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c index d6201f2387..f2967182e2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -50,28 +35,37 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( return lb_channel; } -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args) { - /* We strip out the channel arg for the LB policy name, since we want - * to use the default (pick_first) in this case. +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { + const grpc_arg to_add[] = { + grpc_fake_resolver_response_generator_arg(response_generator)}; + /* We remove: * - * We also strip out the channel arg for the resolved addresses, since - * that will be generated by the name resolver used in the LB channel. - * Note that the LB channel will use the sockaddr resolver, so this - * won't actually generate a query to DNS (or some other name service). - * However, the addresses returned by the sockaddr resolver will have - * is_balancer=false, whereas our own addresses have is_balancer=true. - * We need the LB channel to return addresses with is_balancer=false - * so that it does not wind up recursively using the grpclb LB policy, - * as per the special case logic in client_channel.c. + * - The channel arg for the LB policy name, since we want to use the default + * (pick_first) in this case. * - * Lastly, we also strip out the channel arg for the server URI, - * since that will be different for the LB channel than for the parent - * channel (the client channel factory will re-add this arg with - * the right value). */ + * - The channel arg for the resolved addresses, since that will be generated + * by the name resolver used in the LB channel. Note that the LB channel + * will use the fake resolver, so this won't actually generate a query + * to DNS (or some other name service). However, the addresses returned by + * the fake resolver will have is_balancer=false, whereas our own + * addresses have is_balancer=true. We need the LB channel to return + * addresses with is_balancer=false so that it does not wind up recursively + * using the grpclb LB policy, as per the special case logic in + * client_channel.c. + * + * - The channel arg for the server URI, since that will be different for the + * LB channel than for the parent channel (the client channel factory will + * re-add this arg with the right value). + * + * - The fake resolver generator, because we are replacing it with the one + * from the grpclb policy, used to propagate updates to the LB channel. */ static const char *keys_to_remove[] = { - GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI}; - return grpc_channel_args_copy_and_remove(args, keys_to_remove, - GPR_ARRAY_SIZE(keys_to_remove)); + GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI, + GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR}; + return grpc_channel_args_copy_and_add_and_remove( + args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add, + GPR_ARRAY_SIZE(to_add)); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h index 9730c971d9..6120bf53f7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -35,6 +20,7 @@ #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H #include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/slice/slice_hash_table.h" /** Create the channel used for communicating with an LB service. @@ -49,9 +35,10 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( grpc_client_channel_factory *client_channel_factory, grpc_channel_args *args); -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args); +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \ */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c index a145cba63c..2681b2a079 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -76,32 +61,39 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( return lb_channel; } -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args) { - const grpc_arg targets_info_arg = - grpc_lb_targets_info_create_channel_arg(targets_info); - /* We strip out the channel arg for the LB policy name, since we want - * to use the default (pick_first) in this case. +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { + const grpc_arg to_add[] = { + grpc_lb_targets_info_create_channel_arg(targets_info), + grpc_fake_resolver_response_generator_arg(response_generator)}; + /* We remove: * - * We also strip out the channel arg for the resolved addresses, since - * that will be generated by the name resolver used in the LB channel. - * Note that the LB channel will use the sockaddr resolver, so this - * won't actually generate a query to DNS (or some other name service). - * However, the addresses returned by the sockaddr resolver will have - * is_balancer=false, whereas our own addresses have is_balancer=true. - * We need the LB channel to return addresses with is_balancer=false - * so that it does not wind up recursively using the grpclb LB policy, - * as per the special case logic in client_channel.c. + * - The channel arg for the LB policy name, since we want to use the default + * (pick_first) in this case. * - * Lastly, we also strip out the channel arg for the server URI, - * since that will be different for the LB channel than for the parent - * channel (the client channel factory will re-add this arg with - * the right value). */ + * - The channel arg for the resolved addresses, since that will be generated + * by the name resolver used in the LB channel. Note that the LB channel + * will use the fake resolver, so this won't actually generate a query + * to DNS (or some other name service). However, the addresses returned by + * the fake resolver will have is_balancer=false, whereas our own + * addresses have is_balancer=true. We need the LB channel to return + * addresses with is_balancer=false so that it does not wind up recursively + * using the grpclb LB policy, as per the special case logic in + * client_channel.c. + * + * - The channel arg for the server URI, since that will be different for the + * LB channel than for the parent channel (the client channel factory will + * re-add this arg with the right value). + * + * - The fake resolver generator, because we are replacing it with the one + * from the grpclb policy, used to propagate updates to the LB channel. */ static const char *keys_to_remove[] = { - GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI}; + GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI, + GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR}; /* Add the targets info table to be used for secure naming */ return grpc_channel_args_copy_and_add_and_remove( - args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &targets_info_arg, - 1); + args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add, + GPR_ARRAY_SIZE(to_add)); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c index 444c03b9aa..c762443b7c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h index 0af4a919f8..4bb47d5c5c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 90e7c2efe5..bec7c97a78 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -1,33 +1,18 @@ /* * - * Copyright 2016, Google Inc. - * All rights reserved. + * Copyright 2016 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 7f596ce1f1..ef8d563edc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -1,33 +1,18 @@ /* * - * Copyright 2016, Google Inc. - * All rights reserved. + * Copyright 2016 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index b1c5dfc61c..307e3bad67 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -1,33 +1,18 @@ /* * - * Copyright 2015, Google Inc. - * All rights reserved. + * Copyright 2015 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -37,11 +22,14 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" +grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false); + typedef struct pending_pick { struct pending_pick *next; uint32_t initial_metadata_flags; @@ -54,7 +42,9 @@ typedef struct { grpc_lb_policy base; /** all our subchannels */ grpc_subchannel **subchannels; + grpc_subchannel **new_subchannels; size_t num_subchannels; + size_t num_new_subchannels; grpc_closure connectivity_changed; @@ -63,10 +53,19 @@ typedef struct { /** the selected channel */ grpc_connected_subchannel *selected; + /** the subchannel key for \a selected, or NULL if \a selected not set */ + const grpc_subchannel_key *selected_key; + /** have we started picking? */ - int started_picking; + bool started_picking; /** are we shut down? */ - int shutdown; + bool shutdown; + /** are we updating the selected subchannel? */ + bool updating_selected; + /** are we updating the subchannel candidates? */ + bool updating_subchannels; + /** args from the latest update received while already updating, or NULL */ + grpc_lb_policy_args *pending_update_args; /** which subchannel are we watching? */ size_t checking_subchannel; /** what is the connectivity of that channel? */ @@ -80,23 +79,28 @@ typedef struct { static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - size_t i; GPR_ASSERT(p->pending_picks == NULL); - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); + for (size_t i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy"); } if (p->selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, + "picked_first_destroy"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + if (p->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + gpr_free(p->pending_update_args); + } gpr_free(p->subchannels); + gpr_free(p->new_subchannels); gpr_free(p); } static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - p->shutdown = 1; + p->shutdown = true; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set( @@ -106,7 +110,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (p->selected != NULL) { grpc_connected_subchannel_notify_on_state_change( exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - } else if (p->num_subchannels > 0) { + } else if (p->num_subchannels > 0 && p->started_picking) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); @@ -114,7 +118,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); pp = next; } @@ -131,7 +135,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -156,7 +160,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); gpr_free(pp); @@ -169,21 +173,25 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { - p->started_picking = 1; - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + p->started_picking = true; + if (p->subchannels != NULL) { + GPR_ASSERT(p->num_subchannels > 0); + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } } static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } } @@ -203,7 +211,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* No subchannel selected yet, so try again */ if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -216,30 +224,290 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { - size_t i; size_t num_subchannels = p->num_subchannels; - grpc_subchannel **subchannels; + grpc_subchannel **subchannels = p->subchannels; - subchannels = p->subchannels; p->num_subchannels = 0; p->subchannels = NULL; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); - for (i = 0; i < num_subchannels; i++) { + for (size_t i = 0; i < num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); } - gpr_free(subchannels); } +static grpc_connectivity_state pf_check_connectivity_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + return grpc_connectivity_state_get(&p->state_tracker, error); +} + +static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, + current, notify); +} + +static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + if (p->selected) { + grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + } else { + GRPC_CLOSURE_SCHED(exec_ctx, closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); + } +} + +/* unsubscribe all subchannels */ +static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + if (p->num_subchannels > 0) { + GPR_ASSERT(p->selected == NULL); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + &p->connectivity_changed); + p->updating_subchannels = true; + } else if (p->selected != NULL) { + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); + p->updating_selected = true; + } +} + +/* true upon success */ +static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + pick_first_lb_policy *p = (pick_first_lb_policy *)policy; + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + if (p->subchannels == NULL) { + // If we don't have a current subchannel list, go into TRANSIENT FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "pf_update_missing"); + } else { + // otherwise, keep using the current subchannel list (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for Pick First %p update, " + "ignoring.", + (void *)p); + } + return; + } + const grpc_lb_addresses *addresses = arg->value.pointer.p; + size_t num_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; i++) { + if (!addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) { + // Empty update. Unsubscribe from all current subchannels and put the + // channel in TRANSIENT_FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "pf_update_empty"); + stop_connectivity_watchers(exec_ctx, p); + return; + } + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", + (void *)p, (unsigned long)num_addrs); + } + grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs); + /* We remove the following keys in order for subchannel keys belonging to + * subchannels point to the same address to match. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + size_t sc_args_count = 0; + + /* Create list of subchannel args for new addresses in \a args. */ + for (size_t i = 0; i < addresses->num_addresses; i++) { + if (addresses->addresses[i].is_balancer) continue; + if (addresses->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, + 1); + gpr_free(addr_arg.value.string); + sc_args[sc_args_count++].args = new_args; + } + + /* Check if p->selected is amongst them. If so, we are done. */ + if (p->selected != NULL) { + GPR_ASSERT(p->selected_key != NULL); + for (size_t i = 0; i < sc_args_count; i++) { + grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); + const bool found_selected = + grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0; + grpc_subchannel_key_destroy(exec_ctx, ith_sc_key); + if (found_selected) { + // The currently selected subchannel is in the update: we are done. + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Pick First %p found already selected subchannel %p amongst " + "updates. Update done.", + (void *)p, (void *)p->selected); + } + for (size_t j = 0; j < sc_args_count; j++) { + grpc_channel_args_destroy(exec_ctx, + (grpc_channel_args *)sc_args[j].args); + } + gpr_free(sc_args); + return; + } + } + } + // We only check for already running updates here because if the previous + // steps were successful, the update can be considered done without any + // interference (ie, no callbacks were scheduled). + if (p->updating_selected || p->updating_subchannels) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Update already in progress for pick first %p. Deferring update.", + (void *)p); + } + if (p->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + gpr_free(p->pending_update_args); + } + p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args)); + p->pending_update_args->client_channel_factory = + args->client_channel_factory; + p->pending_update_args->args = grpc_channel_args_copy(args->args); + p->pending_update_args->combiner = args->combiner; + return; + } + /* Create the subchannels for the new subchannel args/addresses. */ + grpc_subchannel **new_subchannels = + gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); + size_t num_new_subchannels = 0; + for (size_t i = 0; i < sc_args_count; i++) { + grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( + exec_ctx, args->client_channel_factory, &sc_args[i]); + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_INFO, + "Pick First %p created subchannel %p for address uri %s", + (void *)p, (void *)subchannel, address_uri); + gpr_free(address_uri); + } + grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args); + if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel; + } + gpr_free(sc_args); + if (num_new_subchannels == 0) { + gpr_free(new_subchannels); + // Empty update. Unsubscribe from all current subchannels and put the + // channel in TRANSIENT_FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"), + "pf_update_no_valid_addresses"); + stop_connectivity_watchers(exec_ctx, p); + return; + } + + /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ + stop_connectivity_watchers(exec_ctx, p); + + /* Save new subchannels. The switch over will happen in + * pf_connectivity_changed_locked */ + if (p->updating_selected || p->updating_subchannels) { + p->num_new_subchannels = num_new_subchannels; + p->new_subchannels = new_subchannels; + } else { /* nothing is updating. Get things moving from here */ + p->num_subchannels = num_new_subchannels; + p->subchannels = new_subchannels; + p->new_subchannels = NULL; + p->num_new_subchannels = 0; + if (p->started_picking) { + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } + } +} + static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; + bool restart = false; + if (p->updating_selected && error == GRPC_ERROR_CANCELLED) { + /* Captured the unsubscription for p->selected */ + GPR_ASSERT(p->selected != NULL); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, + "pf_update_connectivity"); + p->updating_selected = false; + if (p->num_new_subchannels == 0) { + p->selected = NULL; + return; + } + restart = true; + } + if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) { + /* Captured the unsubscription for the checking subchannel */ + GPR_ASSERT(p->selected == NULL); + for (size_t i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], + "pf_update_connectivity"); + } + gpr_free(p->subchannels); + p->subchannels = NULL; + p->num_subchannels = 0; + p->updating_subchannels = false; + if (p->num_new_subchannels == 0) return; + restart = true; + } + if (restart) { + p->selected = NULL; + p->selected_key = NULL; + + GPR_ASSERT(p->new_subchannels != NULL); + GPR_ASSERT(p->num_new_subchannels > 0); + p->num_subchannels = p->num_new_subchannels; + p->subchannels = p->new_subchannels; + p->num_new_subchannels = 0; + p->new_subchannels = NULL; + + if (p->started_picking) { + /* If we were picking, continue to do so over the new subchannels, + * starting from the 0th index. */ + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + /* reuses the weak ref from start_picking_locked */ + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } + if (p->pending_update_args != NULL) { + const grpc_lb_policy_args *args = p->pending_update_args; + p->pending_update_args = NULL; + pf_update_locked(exec_ctx, &p->base, args); + } + return; + } GRPC_ERROR_REF(error); - if (p->shutdown) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_ERROR_UNREF(error); @@ -272,6 +540,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected_subchannel), "picked_first"); + + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected); + } + p->selected_key = grpc_subchannel_get_key(selected_subchannel); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); destroy_subchannels_locked(exec_ctx, p); @@ -279,7 +552,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Servicing pending pick with selected subchannel %p", + (void *)p->selected); + } + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -332,7 +610,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -353,32 +631,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(error); } -static grpc_connectivity_state pf_check_connectivity_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - return grpc_connectivity_state_get(&p->state_tracker, error); -} - -static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); -} - -static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); - } else { - grpc_closure_sched(exec_ctx, closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - } -} - static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -388,7 +640,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_ping_one_locked, pf_exit_idle_locked, pf_check_connectivity_locked, - pf_notify_on_state_change_locked}; + pf_notify_on_state_change_locked, + pf_update_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} @@ -398,62 +651,11 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { GPR_ASSERT(args->client_channel_factory != NULL); - - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ - const grpc_arg *arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; - } - grpc_lb_addresses *addresses = arg->value.pointer.p; - size_t num_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - if (!addresses->addresses[i].is_balancer) ++num_addrs; - } - if (num_addrs == 0) return NULL; - pick_first_lb_policy *p = gpr_zalloc(sizeof(*p)); - - p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs); - grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - /* Skip balancer addresses, since we only know how to handle backends. */ - if (addresses->addresses[i].is_balancer) continue; - - if (addresses->addresses[i].user_data != NULL) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); - } - - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args.args = new_args; - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args); - grpc_channel_args_destroy(exec_ctx, new_args); - - if (subchannel != NULL) { - p->subchannels[subchannel_idx++] = subchannel; - } - } - if (subchannel_idx == 0) { - gpr_free(p->subchannels); - gpr_free(p); - return NULL; - } - p->num_subchannels = subchannel_idx; - + pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, - grpc_combiner_scheduler(args->combiner, false)); + GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, + grpc_combiner_scheduler(args->combiner)); return &p->base; } @@ -472,6 +674,7 @@ static grpc_lb_policy_factory *pick_first_lb_factory_create() { void grpc_lb_policy_pick_first_init() { grpc_register_lb_policy(pick_first_lb_factory_create()); + grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace); } void grpc_lb_policy_pick_first_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 7ee6ffb787..3c8520cc1c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -1,63 +1,28 @@ /* * - * Copyright 2015, Google Inc. - * All rights reserved. + * Copyright 2015 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ /** Round Robin Policy. * - * This policy keeps: - * - A circular list of ready (connected) subchannels, the *readylist*. An empty - * readylist consists solely of its root (dummy) node. - * - A pointer to the last element picked from the readylist, the *lastpick*. - * Initially set to point to the readylist's root. - * - * Behavior: - * - When a subchannel connects, it's *prepended* to the readylist's root node. - * Ie, if readylist = A <-> B <-> ROOT <-> C - * ^ ^ - * |____________________| - * and subchannel D becomes connected, the addition of D to the readylist - * results in readylist = A <-> B <-> D <-> ROOT <-> C - * ^ ^ - * |__________________________| - * - When a subchannel disconnects, it's removed from the readylist. If the - * subchannel being removed was the most recently picked, the *lastpick* - * pointer moves to the removed node's previous element. Note that if the - * readylist only had one element, this is still legal, as the lastpick would - * point to the dummy root node, for an empty readylist. - * - Upon picking, *lastpick* is updated to point to the returned (connected) - * subchannel. Note that it's possible that the selected subchannel becomes - * disconnected in the interim between the selection and the actual usage of - * the subchannel by the caller. - */ + * Before every pick, the \a get_next_ready_subchannel_index_locked function + * returns the p->subchannel_list->subchannels index for next subchannel, + * respecting the relative + * order of the addresses provided upon creation or updates. Note however that + * updates will start picking from the beginning of the updated list. */ #include <string.h> @@ -72,8 +37,6 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -typedef struct round_robin_lb_policy round_robin_lb_policy; - grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false); /** List of entities waiting for a pick. @@ -99,9 +62,37 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; +typedef struct rr_subchannel_list rr_subchannel_list; +typedef struct round_robin_lb_policy { + /** base policy: must be first */ + grpc_lb_policy base; + + rr_subchannel_list *subchannel_list; + + /** have we started picking? */ + bool started_picking; + /** are we shutting down? */ + bool shutdown; + /** List of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + /** Index into subchannels for last pick. */ + size_t last_ready_subchannel_index; + + /** Latest version of the subchannel list. + * Subchannel connectivity callbacks will only promote updated subchannel + * lists if they equal \a latest_pending_subchannel_list. In other words, + * racing callbacks that reference outdated subchannel lists won't perform any + * update. */ + rr_subchannel_list *latest_pending_subchannel_list; +} round_robin_lb_policy; + typedef struct { - /** backpointer to owning policy */ - round_robin_lb_policy *policy; + /** backpointer to owning subchannel list */ + rr_subchannel_list *subchannel_list; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ @@ -123,12 +114,9 @@ typedef struct { const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; -struct round_robin_lb_policy { - /** base policy: must be first */ - grpc_lb_policy base; - - /** total number of addresses received at creation time */ - size_t num_addresses; +struct rr_subchannel_list { + /** backpointer to owning policy */ + round_robin_lb_policy *policy; /** all our subchannels */ size_t num_subchannels; @@ -141,67 +129,143 @@ struct round_robin_lb_policy { /** how many subchannels are in state IDLE */ size_t num_idle; - /** have we started picking? */ - bool started_picking; - /** are we shutting down? */ - bool shutdown; - /** List of picks that are waiting on connectivity */ - pending_pick *pending_picks; - - /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; + /** There will be one ref for each entry in subchannels for which there is a + * pending connectivity state watcher callback. */ + gpr_refcount refcount; - // Index into subchannels for last pick. - size_t last_ready_subchannel_index; + /** Is this list shutting down? This may be true due to the shutdown of the + * policy itself or because a newer update has arrived while this one hadn't + * finished processing. */ + bool shutting_down; }; -/** Returns the index into p->subchannels of the next subchannel in - * READY state, or p->num_subchannels if no subchannel is READY. +static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list) { + GPR_ASSERT(subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p", + (void *)subchannel_list->policy, (void *)subchannel_list); + } + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, + "rr_subchannel_list_destroy"); + } + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + } + gpr_free(subchannel_list->subchannels); + gpr_free(subchannel_list); +} + +static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, + const char *reason) { + gpr_ref_non_zero(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count - 1), (unsigned long)count); + } +} + +static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + const bool done = gpr_unref(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count + 1), (unsigned long)count); + } + if (done) { + rr_subchannel_list_destroy(exec_ctx, subchannel_list); + } +} + +/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The + * watcher's callback will ultimately unref \a subchannel_list. */ +static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + GPR_ASSERT(!subchannel_list->shutting_down); + subchannel_list->shutting_down = true; + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } + } + rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} + +/** Returns the index into p->subchannel_list->subchannels of the next + * subchannel in READY state, or p->subchannel_list->num_subchannels if no + * subchannel is READY. * * Note that this function does *not* update p->last_ready_subchannel_index. * The caller must do that if it returns a pick. */ static size_t get_next_ready_subchannel_index_locked( const round_robin_lb_policy *p) { + GPR_ASSERT(p->subchannel_list != NULL); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR: %p] getting next ready subchannel, " + "[RR %p] getting next ready subchannel (out of %lu), " "last_ready_subchannel_index=%lu", - p, (unsigned long)p->last_ready_subchannel_index); + (void *)p, (unsigned long)p->subchannel_list->num_subchannels, + (unsigned long)p->last_ready_subchannel_index); } - for (size_t i = 0; i < p->num_subchannels; ++i) { - const size_t index = - (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + const size_t index = (i + p->last_ready_subchannel_index + 1) % + p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p, - (unsigned long)index, - p->subchannels[index].curr_connectivity_state); + gpr_log(GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%d", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + p->subchannel_list->subchannels[index].curr_connectivity_state); } - if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (p->subchannel_list->subchannels[index].curr_connectivity_state == + GRPC_CHANNEL_READY) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu", - p, (unsigned long)index); + gpr_log(GPR_DEBUG, + "[RR %p] found next ready subchannel (%p) at index %lu of " + "subchannel_list %p", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (unsigned long)index, (void *)p->subchannel_list); } return index; } } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p); } - return p->num_subchannels; + return p->subchannel_list->num_subchannels; } // Sets p->last_ready_subchannel_index to last_ready_index. static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, size_t last_ready_index) { - GPR_ASSERT(last_ready_index < p->num_subchannels); + GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); p->last_ready_subchannel_index = last_ready_index; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", - (void *)p, (unsigned long)last_ready_index, - (void *)p->subchannels[last_ready_index].subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->subchannels[last_ready_index].subchannel)); + gpr_log( + GPR_DEBUG, + "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", + (void *)p, (unsigned long)last_ready_index, + (void *)p->subchannel_list->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannel_list->subchannels[last_ready_index].subchannel)); } } @@ -210,18 +274,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - } - } - } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - gpr_free(p->subchannels); gpr_free(p); } @@ -235,7 +288,7 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); gpr_free(pp); @@ -243,14 +296,9 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, - NULL, - &sd->connectivity_changed_closure); - } - } + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); + p->subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -263,7 +311,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { *target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -288,7 +336,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); gpr_free(pp); @@ -304,15 +352,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = true; - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); - } + for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &p->subchannel_list->subchannels[i]; + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + rr_subchannel_list_ref(sd->subchannel_list, "start_picking"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } @@ -332,63 +379,70 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - /* readily available, report right away */ - subchannel_data *sd = &p->subchannels[next_ready_index]; - *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); - if (user_data != NULL) { - *user_data = sd->user_data; - } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)", - (void *)*target, (unsigned long)next_ready_index); - } - /* only advance the last picked pointer if the selection was used */ - update_last_ready_subchannel_index_locked(p, next_ready_index); - return 1; - } else { - /* no pick currently available. Save for later in list of pending picks */ - if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + if (p->subchannel_list != NULL) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->subchannel_list->num_subchannels) { + /* readily available, report right away */ + subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; + *target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "rr_picked"); + if (user_data != NULL) { + *user_data = sd->user_data; + } + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, " + "INDEX %lu)", + (void *)p, (void *)sd->subchannel, (void *)*target, + (void *)sd->subchannel_list, (unsigned long)next_ready_index); + } + /* only advance the last picked pointer if the selection was used */ + update_last_ready_subchannel_index_locked(p, next_ready_index); + return 1; } - pending_pick *pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->on_complete = on_complete; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->user_data = user_data; - p->pending_picks = pp; - return 0; } + /* no pick currently available. Save for later in list of pending picks */ + if (!p->started_picking) { + start_picking_locked(exec_ctx, p); + } + pending_pick *pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->on_complete = on_complete; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->user_data = user_data; + p->pending_picks = pp; + return 0; } static void update_state_counters_locked(subchannel_data *sd) { - round_robin_lb_policy *p = sd->policy; + rr_subchannel_list *subchannel_list = sd->subchannel_list; if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(p->num_ready > 0); - --p->num_ready; + GPR_ASSERT(subchannel_list->num_ready > 0); + --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(p->num_transient_failures > 0); - --p->num_transient_failures; + GPR_ASSERT(subchannel_list->num_transient_failures > 0); + --subchannel_list->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(p->num_idle > 0); - --p->num_idle; + GPR_ASSERT(subchannel_list->num_idle > 0); + --subchannel_list->num_idle; } if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - ++p->num_ready; + ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++p->num_transient_failures; + ++subchannel_list->num_transient_failures; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { - ++p->num_idle; + ++subchannel_list->num_idle; } } -/* sd is the subchannel_data associted with the updated subchannel. - * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE - * or SHUTDOWN */ +/** Sets the policy's connectivity status based on that of the passed-in \a sd + * (the subchannel_data associted with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be + * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the + * connectivity status set. */ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we @@ -401,17 +455,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( * CHECK: sd->curr_connectivity_state == CONNECTING. * * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. - * CHECK: p->num_subchannels = 0. + * CHECK: p->subchannel_list->num_subchannels = 0. * * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is * TRANSIENT_FAILURE. - * CHECK: p->num_transient_failures == p->num_subchannels. + * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels. * * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: p->num_idle == p->num_subchannels. + * CHECK: p->num_idle == p->subchannel_list->num_subchannels. */ - round_robin_lb_policy *p = sd->policy; - if (p->num_ready > 0) { /* 1) READY */ + rr_subchannel_list *subchannel_list = sd->subchannel_list; + round_robin_lb_policy *p = subchannel_list->policy; + if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -421,18 +476,19 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); return GRPC_CHANNEL_CONNECTING; - } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ + } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); return GRPC_CHANNEL_SHUTDOWN; - } else if (p->num_transient_failures == - p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + } else if (subchannel_list->num_transient_failures == + p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); return GRPC_CHANNEL_TRANSIENT_FAILURE; - } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ + } else if (subchannel_list->num_idle == + p->subchannel_list->num_subchannels) { /* 5) IDLE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "rr_idle"); return GRPC_CHANNEL_IDLE; @@ -444,7 +500,28 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; - round_robin_lb_policy *p = sd->policy; + round_robin_lb_policy *p = sd->subchannel_list->policy; + // If the policy is shutting down, unref and return. + if (p->shutdown) { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); + return; + } + if (sd->subchannel_list->shutting_down) { + // the subchannel list associated with sd has been discarded. This callback + // corresponds to the unsubscription. + GPR_ASSERT(error == GRPC_ERROR_CANCELLED); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); + return; + } + // Dispose of outdated subchannel lists. + if (sd->subchannel_list != p->subchannel_list && + sd->subchannel_list != p->latest_pending_subchannel_list) { + // sd belongs to an outdated subchannel_list: get rid of it. + rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated"); + return; + } // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -453,21 +530,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p: " "prev_state=%d new_state=%d", - p, sd->subchannel, sd->prev_connectivity_state, + (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, sd->curr_connectivity_state); } - // If we're shutting down, unref and return. - if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - return; - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; - grpc_connectivity_state new_connectivity_state = + const grpc_connectivity_state new_policy_connectivity_state = update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); - // If the new state is SHUTDOWN, unref the subchannel, and if the new - // overall state is SHUTDOWN, clean up. + // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new + // policy's state is SHUTDOWN, clean up. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); sd->subchannel = NULL; @@ -475,26 +547,53 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); } - if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* the policy is shutting down. Flush all the pending picks... */ pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - } else { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, + "rr_connectivity_sd_shutdown"); + } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + if (sd->subchannel_list != p->subchannel_list) { + // promote sd->subchannel_list to p->subchannel_list. + // sd->subchannel_list must be equal to + // p->latest_pending_subchannel_list because we have already filtered + // for sds belonging to outdated subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(!sd->subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] phasing out subchannel list %p (size %lu) in favor " + "of %p (size %lu)", + (void *)p, (void *)p->subchannel_list, + (unsigned long)p->subchannel_list->num_subchannels, + (void *)sd->subchannel_list, + (unsigned long)sd->subchannel_list->num_subchannels); + } + if (p->subchannel_list != NULL) { + // dispose of the current subchannel_list + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update_connectivity"); + } + p->subchannel_list = sd->subchannel_list; + p->latest_pending_subchannel_list = NULL; + } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - GPR_ASSERT(next_ready_index < p->num_subchannels); - subchannel_data *selected = &p->subchannels[next_ready_index]; + GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ @@ -515,11 +614,12 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, (void *)selected->subchannel, (unsigned long)next_ready_index); } - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity" weak ref */ + /* renew notification: reuses the "rr_connectivity" weak ref on the policy + * as well as the sd->subchannel_list ref. */ grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -546,65 +646,82 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - subchannel_data *selected = &p->subchannels[next_ready_index]; + if (next_ready_index < p->subchannel_list->num_subchannels) { + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); } } -static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown_locked, - rr_pick_locked, - rr_cancel_pick_locked, - rr_cancel_picks_locked, - rr_ping_one_locked, - rr_exit_idle_locked, - rr_check_connectivity_locked, - rr_notify_on_state_change_locked}; - -static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - -static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} - -static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - GPR_ASSERT(args->client_channel_factory != NULL); - - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ +static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + round_robin_lb_policy *p = (round_robin_lb_policy *)policy; + /* Find the number of backend addresses. We ignore balancer addresses, since + * we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; + if (p->subchannel_list == NULL) { + // If we don't have a current subchannel list, go into TRANSIENT FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "rr_update_missing"); + } else { + // otherwise, keep using the current subchannel list (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for Round Robin %p update, " + "ignoring.", + (void *)p); + } + return; } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { if (!addresses->addresses[i].is_balancer) ++num_addrs; } - if (num_addrs == 0) return NULL; - - round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); - - p->num_addresses = num_addrs; - p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); - - grpc_subchannel_args sc_args; + if (num_addrs == 0) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "rr_update_empty"); + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update"); + p->subchannel_list = NULL; + } + return; + } size_t subchannel_index = 0; + rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); + subchannel_list->policy = p; + subchannel_list->subchannels = + gpr_zalloc(sizeof(subchannel_data) * num_addrs); + subchannel_list->num_subchannels = num_addrs; + gpr_ref_init(&subchannel_list->refcount, 1); + p->latest_pending_subchannel_list = subchannel_list; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", + (void *)subchannel_list, (unsigned long)num_addrs); + } + grpc_subchannel_args sc_args; + /* We need to remove the LB addresses in order to be able to compare the + * subchannel keys of subchannels from a different batch of addresses. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + /* Create subchannels for addresses in the update. */ for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; - - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + GPR_ASSERT(i < num_addrs); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); @@ -618,52 +735,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s", - (unsigned long)subchannel_index, (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, + "index %lu: Created subchannel %p for address uri %s into " + "subchannel_list %p", + (unsigned long)subchannel_index, (void *)subchannel, address_uri, + (void *)subchannel_list); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); - if (subchannel != NULL) { - subchannel_data *sd = &p->subchannels[subchannel_index]; - sd->policy = p; - sd->subchannel = subchannel; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != NULL) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); - } - grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner, false)); - ++subchannel_index; + subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; + sd->subchannel_list = subchannel_list; + sd->subchannel = subchannel; + GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, + rr_connectivity_changed_locked, sd, + grpc_combiner_scheduler(args->combiner)); + /* use some sentinel value outside of the range of + * grpc_connectivity_state to signal an undefined previous state. We + * won't be referring to this value again and it'll be overwritten after + * the first call to rr_connectivity_changed_locked */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } + if (p->started_picking) { + rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update"); + /* 2. Watch every new subchannel. A subchannel list becomes active the + * moment one of its subchannels is READY. At that moment, we swap + * p->subchannel_list for sd->subchannel_list, provided the subchannel + * list is still valid (ie, isn't shutting down) */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } - if (subchannel_index == 0) { - /* couldn't create any subchannel. Bail out */ - gpr_free(p->subchannels); - gpr_free(p); - return NULL; + if (!p->started_picking) { + // The policy isn't picking yet. Save the update for later, disposing of + // previous version if any. + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "rr_update_before_started_picking"); + } + p->subchannel_list = subchannel_list; } - p->num_subchannels = subchannel_index; +} + +static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { + rr_destroy, + rr_shutdown_locked, + rr_pick_locked, + rr_cancel_pick_locked, + rr_cancel_picks_locked, + rr_ping_one_locked, + rr_exit_idle_locked, + rr_check_connectivity_locked, + rr_notify_on_state_change_locked, + rr_update_locked}; + +static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - // Initialize the last pick index to the last subchannel, so that the - // first pick will start at the beginning of the list. - p->last_ready_subchannel_index = subchannel_index - 1; +static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} +static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + GPR_ASSERT(args->client_channel_factory != NULL); + round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); + rr_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", - (void *)p, (unsigned long)p->num_subchannels); + gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p, + (unsigned long)p->subchannel_list->num_subchannels); } return &p->base; } |