aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c43
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h35
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c677
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h35
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c86
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h43
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c88
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c35
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h35
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c124
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h37
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c508
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c938
13 files changed, 1528 insertions, 1156 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 67baa46de7..52c6e38c87 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -68,7 +53,7 @@ static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg,
if (error == GRPC_ERROR_NONE) {
calld->send_initial_metadata_succeeded = true;
}
- grpc_closure_run(exec_ctx, calld->original_on_complete_for_send,
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete_for_send,
GRPC_ERROR_REF(error));
}
@@ -78,7 +63,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
if (error == GRPC_ERROR_NONE) {
calld->recv_initial_metadata_succeeded = true;
}
- grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready,
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
@@ -119,7 +104,7 @@ static void start_transport_stream_op_batch(
// Intercept send_initial_metadata.
if (batch->send_initial_metadata) {
calld->original_on_complete_for_send = batch->on_complete;
- grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld,
+ GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld,
grpc_schedule_on_exec_ctx);
batch->on_complete = &calld->on_complete_for_send;
}
@@ -127,7 +112,7 @@ static void start_transport_stream_op_batch(
if (batch->recv_initial_metadata) {
calld->original_recv_initial_metadata_ready =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
- grpc_closure_init(&calld->recv_initial_metadata_ready,
+ GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, calld,
grpc_schedule_on_exec_ctx);
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
index 28b313d874..51e30b20b8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index b7c0e929b7..5a5ff2902d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2016, Google Inc.
- * All rights reserved.
+ * Copyright 2016 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -115,6 +100,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -198,7 +184,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
wrapped_rr_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != NULL) {
/* if *target is NULL, no pick has been made by the RR policy (eg, all
@@ -270,7 +256,7 @@ static void add_pending_pick(pending_pick **root,
pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.free_when_done = pp;
- grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
wrapped_rr_closure, &pp->wrapped_on_complete_arg,
grpc_schedule_on_exec_ctx);
*root = pp;
@@ -289,7 +275,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pping->wrapped_notify_arg.wrapped_closure = notify;
pping->wrapped_notify_arg.free_when_done = pping;
pping->next = *root;
- grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure,
wrapped_rr_closure, &pping->wrapped_notify_arg,
grpc_schedule_on_exec_ctx);
*root = pping;
@@ -315,6 +301,9 @@ typedef struct glb_lb_policy {
/** for communicating with the LB server */
grpc_channel *lb_channel;
+ /** response generator to inject address updates into \a lb_channel */
+ grpc_fake_resolver_response_generator *response_generator;
+
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy *rr_policy;
@@ -323,10 +312,18 @@ typedef struct glb_lb_policy {
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
+ /** connectivity state of the LB channel */
+ grpc_connectivity_state lb_channel_connectivity;
+
/** stores the deserialized response from the LB. May be NULL until one such
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
+ /** Index into serverlist for next pick.
+ * If the server at this index is a drop, we return a drop.
+ * Otherwise, we delegate to the RR policy. */
+ size_t serverlist_index;
+
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@@ -335,10 +332,27 @@ typedef struct glb_lb_policy {
bool shutting_down;
+ /** are we currently updating lb_call? */
+ bool updating_lb_call;
+
+ /** are we currently updating lb_channel? */
+ bool updating_lb_channel;
+
+ /** are we already watching the LB channel's connectivity? */
+ bool watching_lb_channel;
+
+ /** is \a lb_call_retry_timer active? */
+ bool retry_timer_active;
+
+ /** called upon changes to the LB channel's connectivity. */
+ grpc_closure lb_channel_on_connectivity_changed;
+
+ /** args from the latest update received while already updating, or NULL */
+ grpc_lb_policy_args *pending_update_args;
+
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
-
/* Finished sending initial request. */
grpc_closure lb_on_sent_initial_request;
@@ -402,6 +416,9 @@ struct rr_connectivity_data {
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
bool log) {
+ if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+ return false;
+ }
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
@@ -411,7 +428,6 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
}
return false;
}
-
if (ip->size != 4 && ip->size != 16) {
if (log) {
gpr_log(GPR_ERROR,
@@ -445,11 +461,12 @@ static const grpc_lb_user_data_vtable lb_token_vtable = {
static void parse_server(const grpc_grpclb_server *server,
grpc_resolved_address *addr) {
+ memset(addr, 0, sizeof(*addr));
+ if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address *ip = &server->ip_address;
- memset(addr, 0, sizeof(*addr));
if (ip->size == 4) {
addr->len = sizeof(struct sockaddr_in);
struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
@@ -525,10 +542,9 @@ static grpc_lb_addresses *process_serverlist_locked(
return lb_addresses;
}
-/* returns true if the new RR policy should replace the current one, if any */
-static bool update_lb_connectivity_status_locked(
+static void update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
+ grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
@@ -562,40 +578,73 @@ static bool update_lb_connectivity_status_locked(
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
- switch (new_rr_state) {
+ switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
- GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE);
- return false; /* don't replace the RR policy */
+ GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
+ break;
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
- GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE);
+ GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
}
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Setting grpclb's state to %s from new RR policy %p state.",
- grpc_connectivity_state_name(new_rr_state),
- (void *)glb_policy->rr_policy);
+ gpr_log(
+ GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
+ grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
}
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- new_rr_state, GRPC_ERROR_REF(new_rr_state_error),
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
+ GRPC_ERROR_REF(rr_state_error),
"update_lb_connectivity_status_locked");
- return true;
}
-/* perform a pick over \a rr_policy. Given that a pick can return immediately
- * (ignoring its completion callback) we need to perform the cleanups this
- * callback would be otherwise resposible for */
+/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
+ * immediately (ignoring its completion callback), we need to perform the
+ * cleanups this callback would otherwise be resposible for.
+ * If \a force_async is true, then we will manually schedule the
+ * completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
- const grpc_lb_policy_pick_args *pick_args,
+ grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+ const grpc_lb_policy_pick_args *pick_args, bool force_async,
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
- GPR_ASSERT(rr_policy != NULL);
+ // Look at the index into the serverlist to see if we should drop this call.
+ grpc_grpclb_server *server =
+ glb_policy->serverlist->servers[glb_policy->serverlist_index++];
+ if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
+ glb_policy->serverlist_index = 0; // Wrap-around.
+ }
+ if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+ // Not using the RR policy, so unref it.
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
+ (intptr_t)wc_arg->rr_policy);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+ // Update client load reporting stats to indicate the number of
+ // dropped calls. Note that we have to do this here instead of in
+ // the client_load_reporting filter, because we do not create a
+ // subchannel call (and therefore no client_load_reporting filter)
+ // for dropped calls.
+ grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
+ grpc_grpclb_client_stats_add_call_finished(
+ server->drop_for_rate_limiting, server->drop_for_load_balancing,
+ false /* failed_to_send */, false /* known_received */,
+ wc_arg->client_stats);
+ grpc_grpclb_client_stats_unref(wc_arg->client_stats);
+ if (force_async) {
+ GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+ GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+ gpr_free(wc_arg->free_when_done);
+ return false;
+ }
+ gpr_free(wc_arg->free_when_done);
+ return true;
+ }
+ // Pick via the RR policy.
const bool pick_done = grpc_lb_policy_pick_locked(
- exec_ctx, rr_policy, pick_args, target, wc_arg->context,
+ exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
(void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
@@ -604,17 +653,20 @@ static bool pick_from_internal_rr_locked(
(intptr_t)wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
-
/* add the load reporting initial metadata */
initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
-
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT(wc_arg->client_stats != NULL);
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
-
+ if (force_async) {
+ GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+ GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+ gpr_free(wc_arg->free_when_done);
+ return false;
+ }
gpr_free(wc_arg->free_when_done);
}
/* else, the pending pick will be registered and taken care of by the
@@ -624,45 +676,38 @@ static bool pick_from_internal_rr_locked(
return pick_done;
}
-static grpc_lb_policy *create_rr_locked(
- grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
- GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
-
- grpc_lb_policy_args args;
- memset(&args, 0, sizeof(args));
- args.client_channel_factory = glb_policy->cc_factory;
- args.combiner = glb_policy->base.combiner;
+static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args));
+ args->client_channel_factory = glb_policy->cc_factory;
+ args->combiner = glb_policy->base.combiner;
grpc_lb_addresses *addresses =
- process_serverlist_locked(exec_ctx, serverlist);
-
+ process_serverlist_locked(exec_ctx, glb_policy->serverlist);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
- args.args = grpc_channel_args_copy_and_add_and_remove(
+ args->args = grpc_channel_args_copy_and_add_and_remove(
glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
1);
-
- grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- GPR_ASSERT(rr != NULL);
grpc_lb_addresses_destroy(exec_ctx, addresses);
- grpc_channel_args_destroy(exec_ctx, args.args);
- return rr;
+ return args;
+}
+
+static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_args *args) {
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ gpr_free(args);
}
static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error);
-/* glb_policy->rr_policy may be NULL (initial handover) */
-static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
- GPR_ASSERT(glb_policy->serverlist != NULL &&
- glb_policy->serverlist->num_servers > 0);
-
- if (glb_policy->shutting_down) return;
+static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+ grpc_lb_policy_args *args) {
+ GPR_ASSERT(glb_policy->rr_policy == NULL);
grpc_lb_policy *new_rr_policy =
- create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
+ grpc_lb_policy_create(exec_ctx, "round_robin", args);
if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR,
"Failure creating a RoundRobin policy for serverlist update with "
@@ -673,41 +718,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
(void *)glb_policy->rr_policy);
return;
}
-
- grpc_error *new_rr_state_error = NULL;
- const grpc_connectivity_state new_rr_state =
- grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy,
- &new_rr_state_error);
- /* Connectivity state is a function of the new RR policy just created */
- const bool replace_old_rr = update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
-
- if (!replace_old_rr) {
- /* dispose of the new RR policy that won't be used after all */
- GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace");
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Keeping old RR policy (%p) despite new serverlist: new RR "
- "policy was in %s connectivity state.",
- (void *)glb_policy->rr_policy,
- grpc_connectivity_state_name(new_rr_state));
- }
- return;
- }
-
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)",
- (void *)new_rr_policy, (void *)glb_policy->rr_policy);
- }
-
- if (glb_policy->rr_policy != NULL) {
- /* if we are phasing out an existing RR instance, unref it. */
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
- }
-
- /* Finally update the RR policy to the newly created one */
glb_policy->rr_policy = new_rr_policy;
+ grpc_error *rr_state_error = NULL;
+ const grpc_connectivity_state rr_state =
+ grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
+ &rr_state_error);
+ /* Connectivity state is a function of the RR policy updated/created */
+ update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
+ rr_state_error);
+
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
@@ -719,14 +739,14 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
* It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data *rr_connectivity =
gpr_zalloc(sizeof(rr_connectivity_data));
- grpc_closure_init(&rr_connectivity->on_change,
+ GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
rr_connectivity->glb_policy = glb_policy;
- rr_connectivity->state = new_rr_state;
+ rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
@@ -744,8 +764,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
- &pp->pick_args, pp->target,
+ pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
+ true /* force_async */, pp->target,
&pp->wrapped_on_complete_arg);
}
@@ -763,6 +783,31 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
}
}
+/* glb_policy->rr_policy may be NULL (initial handover) */
+static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
+
+ if (glb_policy->shutting_down) return;
+
+ grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
+ if (glb_policy->rr_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
+ (void *)glb_policy->rr_policy);
+ }
+ grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
+ } else {
+ create_rr_locked(exec_ctx, glb_policy, args);
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
+ (void *)glb_policy->rr_policy);
+ }
+ }
+ lb_policy_args_destroy(exec_ctx, args);
+}
+
static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg;
@@ -808,18 +853,24 @@ static grpc_slice_hash_table_entry targets_info_entry_create(
return entry;
}
-/* Returns the target URI for the LB service whose addresses are in \a
- * addresses. Using this URI, a bidirectional streaming channel will be created
- * for the reception of load balancing updates.
+static int balancer_name_cmp_fn(void *a, void *b) {
+ const char *a_str = a;
+ const char *b_str = b;
+ return strcmp(a_str, b_str);
+}
+
+/* Returns the channel args for the LB channel, used to create a bidirectional
+ * stream for the reception of load balancing updates.
*
- * The output argument \a targets_info will be updated to contain a mapping of
- * "LB server address" to "balancer name", as reported by the naming system.
- * This mapping will be propagated via the channel arguments of the
- * aforementioned LB streaming channel, to be used by the security connector for
- * secure naming checks. The user is responsible for freeing \a targets_info. */
-static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
- const grpc_lb_addresses *addresses,
- grpc_slice_hash_table **targets_info) {
+ * Inputs:
+ * - \a addresses: corresponding to the balancers.
+ * - \a response_generator: in order to propagate updates from the resolver
+ * above the grpclb policy.
+ * - \a args: other args inherited from the grpclb policy. */
+static grpc_channel_args *build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
@@ -828,53 +879,54 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
* It's the resolver's responsibility to make sure this policy is only
* instantiated and used in that case. Otherwise, something has gone wrong. */
GPR_ASSERT(num_grpclb_addrs > 0);
-
+ grpc_lb_addresses *lb_addresses =
+ grpc_lb_addresses_create(num_grpclb_addrs, NULL);
grpc_slice_hash_table_entry *targets_info_entries =
- gpr_malloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
-
- /* construct a target ipvX://ip1:port1,ip2:port2,... from the addresses in \a
- * addresses */
- /* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
- size_t addr_index = 0;
+ gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
- for (size_t i = 0; i < addresses->num_addresses; i++) {
+ size_t lb_addresses_idx = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) continue;
if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
- if (addresses->addresses[i].is_balancer) {
- char *addr_str;
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_str, &addresses->addresses[i].address, true) > 0);
- targets_info_entries[addr_index] = targets_info_entry_create(
- addr_str, addresses->addresses[i].balancer_name);
- addr_strs[addr_index++] = addr_str;
- }
+ char *addr_str;
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_str, &addresses->addresses[i].address, true) > 0);
+ targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
+ addr_str, addresses->addresses[i].balancer_name);
+ gpr_free(addr_str);
+
+ grpc_lb_addresses_set_address(
+ lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+ addresses->addresses[i].address.len, false /* is balancer */,
+ addresses->addresses[i].balancer_name, NULL /* user data */);
}
- GPR_ASSERT(addr_index == num_grpclb_addrs);
-
- size_t uri_path_len;
- char *uri_path = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs,
- ",", &uri_path_len);
- for (size_t i = 0; i < num_grpclb_addrs; i++) gpr_free(addr_strs[i]);
- gpr_free(addr_strs);
-
- char *target_uri_str = NULL;
- /* TODO(dgq): Don't assume all addresses will share the scheme of the first
- * one */
- gpr_asprintf(&target_uri_str, "%s:%s",
- grpc_sockaddr_get_uri_scheme(&addresses->addresses[0].address),
- uri_path);
- gpr_free(uri_path);
-
- *targets_info = grpc_slice_hash_table_create(
- num_grpclb_addrs, targets_info_entries, destroy_balancer_name);
+ GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+ grpc_slice_hash_table *targets_info =
+ grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
+ destroy_balancer_name, balancer_name_cmp_fn);
gpr_free(targets_info_entries);
- return target_uri_str;
+ grpc_channel_args *lb_channel_args =
+ grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
+ response_generator, args);
+
+ grpc_arg lb_channel_addresses_arg =
+ grpc_lb_addresses_create_channel_arg(lb_addresses);
+
+ grpc_channel_args *result = grpc_channel_args_copy_and_add(
+ lb_channel_args, &lb_channel_addresses_arg, 1);
+ grpc_slice_hash_table_unref(exec_ctx, targets_info);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
+ return result;
}
+static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error);
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
@@ -922,32 +974,37 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
- grpc_arg new_arg;
- new_arg.key = GRPC_ARG_LB_POLICY_NAME;
- new_arg.type = GRPC_ARG_STRING;
- new_arg.value.string = "grpclb";
+ grpc_arg new_arg =
+ grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
- grpc_slice_hash_table *targets_info = NULL;
/* Create a client channel over them to communicate with a LB service */
- char *lb_service_target_addresses =
- get_lb_uri_target_addresses(exec_ctx, addresses, &targets_info);
- grpc_channel_args *lb_channel_args =
- get_lb_channel_args(exec_ctx, targets_info, args->args);
+ glb_policy->response_generator =
+ grpc_fake_resolver_response_generator_create();
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ char *uri_str;
+ gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- exec_ctx, lb_service_target_addresses, args->client_channel_factory,
- lb_channel_args);
- grpc_slice_hash_table_unref(exec_ctx, targets_info);
+ exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
+
+ /* Propagate initial resolution */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
- gpr_free(lb_service_target_addresses);
+ gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
+
+ GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
+ glb_lb_channel_on_connectivity_changed_cb, glb_policy,
+ grpc_combiner_scheduler(args->combiner));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
@@ -963,12 +1020,15 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
- grpc_channel_destroy(glb_policy->lb_channel);
- glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
+ grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
+ if (glb_policy->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
+ gpr_free(glb_policy->pending_update_args);
+ }
gpr_free(glb_policy);
}
@@ -976,16 +1036,6 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
glb_policy->shutting_down = true;
- pending_pick *pp = glb_policy->pending_picks;
- glb_policy->pending_picks = NULL;
- pending_ping *pping = glb_policy->pending_pings;
- glb_policy->pending_pings = NULL;
- if (glb_policy->rr_policy) {
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
- }
- grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
/* We need a copy of the lb_call pointer because we can't cancell the call
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
@@ -999,17 +1049,41 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_call_cancel(lb_call, NULL);
/* lb_on_server_status_received will pick up the cancel and clean up */
}
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ glb_policy->retry_timer_active = false;
+ }
+
+ pending_pick *pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = NULL;
+ pending_ping *pping = glb_policy->pending_pings;
+ glb_policy->pending_pings = NULL;
+ if (glb_policy->rr_policy) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ }
+ // We destroy the LB channel here because
+ // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
+ // instance. Destroying the lb channel in glb_destroy would likely result in
+ // a callback invocation without a valid glb_policy arg.
+ if (glb_policy->lb_channel != NULL) {
+ grpc_channel_destroy(glb_policy->lb_channel);
+ glb_policy->lb_channel = NULL;
+ }
+ grpc_connectivity_state_set(
+ exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
+
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_NONE);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
GRPC_ERROR_NONE);
pping = next;
}
@@ -1025,7 +1099,7 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1049,7 +1123,7 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1084,7 +1158,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
- grpc_closure_sched(exec_ctx, on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No mdelem storage for the LB token. Load reporting "
"won't work without it. Failing"));
@@ -1103,7 +1177,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wrapped_rr_closure_arg *wc_arg = gpr_zalloc(sizeof(wrapped_rr_closure_arg));
- grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
+ GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
grpc_schedule_on_exec_ctx);
wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
@@ -1115,8 +1189,9 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
wc_arg->initial_metadata = pick_args->initial_metadata;
wc_arg->free_when_done = wc_arg;
- pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
- pick_args, target, wc_arg);
+ pick_done =
+ pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
+ false /* force_async */, target, wc_arg);
} else {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG,
@@ -1173,9 +1248,9 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
const gpr_timespec next_client_load_report_time =
gpr_time_add(now, glb_policy->client_stats_report_interval);
- grpc_closure_init(&glb_policy->client_load_report_closure,
+ GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
send_client_load_report_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
next_client_load_report_time,
&glb_policy->client_load_report_closure, now);
@@ -1201,9 +1276,9 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = glb_policy->client_load_report_payload;
- grpc_closure_init(&glb_policy->client_load_report_closure,
+ GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, &op, 1,
&glb_policy->client_load_report_closure);
@@ -1271,6 +1346,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
+ GPR_ASSERT(glb_policy->lb_call == NULL);
GPR_ASSERT(!glb_policy->shutting_down);
/* Note the following LB call progresses every time there's activity in \a
@@ -1306,15 +1382,15 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
- grpc_closure_init(&glb_policy->lb_on_sent_initial_request,
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
lb_on_sent_initial_request_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
- grpc_closure_init(&glb_policy->lb_on_server_status_received,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
- grpc_closure_init(&glb_policy->lb_on_response_received,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received,
lb_on_response_received_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
@@ -1356,8 +1432,10 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
lb_call_init_locked(exec_ctx, glb_policy);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
- (void *)glb_policy, (void *)glb_policy->lb_call);
+ gpr_log(GPR_INFO,
+ "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
+ (void *)glb_policy, (void *)glb_policy->lb_channel,
+ (void *)glb_policy->lb_call);
}
GPR_ASSERT(glb_policy->lb_call != NULL);
@@ -1517,7 +1595,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
* serverlist instance will be destroyed either upon the next
* update or in glb_destroy() */
glb_policy->serverlist = serverlist;
-
+ glb_policy->serverlist_index = 0;
rr_handover_locked(exec_ctx, glb_policy);
}
} else {
@@ -1561,8 +1639,8 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = arg;
-
- if (!glb_policy->shutting_down) {
+ glb_policy->retry_timer_active = false;
+ if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
(void *)glb_policy);
@@ -1570,31 +1648,32 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(glb_policy->lb_call == NULL);
query_for_backends_locked(exec_ctx, glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "grpclb_on_retry_timer");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
}
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = arg;
-
GPR_ASSERT(glb_policy->lb_call != NULL);
-
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
char *status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details);
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"Status from LB server received. Status = %d, Details = '%s', "
- "(call: %p)",
+ "(call: %p), error %p",
glb_policy->lb_call_status, status_details,
- (void *)glb_policy->lb_call);
+ (void *)glb_policy->lb_call, (void *)error);
gpr_free(status_details);
}
-
/* We need to perform cleanups no matter what. */
lb_call_destroy_locked(exec_ctx, glb_policy);
-
- if (!glb_policy->shutting_down) {
+ if (glb_policy->started_picking && glb_policy->updating_lb_call) {
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ }
+ if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
+ glb_policy->updating_lb_call = false;
+ } else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try =
@@ -1604,16 +1683,18 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
(void *)glb_policy);
gpr_timespec timeout = gpr_time_sub(next_try, now);
if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
- gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.",
+ gpr_log(GPR_DEBUG,
+ "... retry_timer_active in %" PRId64 ".%09d seconds.",
timeout.tv_sec, timeout.tv_nsec);
} else {
- gpr_log(GPR_DEBUG, "... retrying immediately.");
+ gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
- grpc_closure_init(
- &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
- glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
+ lb_call_on_retry_timer_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ glb_policy->retry_timer_active = true;
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry, now);
}
@@ -1621,6 +1702,138 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
"lb_on_server_status_received");
}
+static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
+
+ if (glb_policy->updating_lb_channel) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO,
+ "Update already in progress for grpclb %p. Deferring update.",
+ (void *)glb_policy);
+ }
+ if (glb_policy->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx,
+ glb_policy->pending_update_args->args);
+ gpr_free(glb_policy->pending_update_args);
+ }
+ glb_policy->pending_update_args =
+ gpr_zalloc(sizeof(*glb_policy->pending_update_args));
+ glb_policy->pending_update_args->client_channel_factory =
+ args->client_channel_factory;
+ glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
+ glb_policy->pending_update_args->combiner = args->combiner;
+ return;
+ }
+
+ glb_policy->updating_lb_channel = true;
+ // Propagate update to lb_channel (pick first).
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ if (glb_policy->lb_channel == NULL) {
+ // If we don't have a current channel to the LB, go into TRANSIENT
+ // FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "glb_update_missing");
+ } else {
+ // otherwise, keep using the current LB channel (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for grpclb %p update, "
+ "ignoring.",
+ (void *)glb_policy);
+ }
+ }
+ const grpc_lb_addresses *addresses = arg->value.pointer.p;
+ GPR_ASSERT(glb_policy->lb_channel != NULL);
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ /* Propagate updates to the LB channel through the fake resolver */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+
+ if (!glb_policy->watching_lb_channel) {
+ // Watch the LB channel connectivity for connection.
+ glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(glb_policy->lb_channel));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ glb_policy->watching_lb_channel = true;
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
+ &glb_policy->lb_channel_connectivity,
+ &glb_policy->lb_channel_on_connectivity_changed, NULL);
+ }
+}
+
+// Invoked as part of the update process. It continues watching the LB channel
+// until it shuts down or becomes READY. It's invoked even if the LB channel
+// stayed READY throughout the update (for example if the update is identical).
+static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
+ glb_lb_policy *glb_policy = arg;
+ if (glb_policy->shutting_down) goto done;
+ // Re-initialize the lb_call. This should also take care of updating the
+ // embedded RR policy. Note that the current RR policy, if any, will stay in
+ // effect until an update from the new lb_call is received.
+ switch (glb_policy->lb_channel_connectivity) {
+ case GRPC_CHANNEL_INIT:
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ /* resub. */
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(glb_policy->lb_channel));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
+ &glb_policy->lb_channel_connectivity,
+ &glb_policy->lb_channel_on_connectivity_changed, NULL);
+ break;
+ }
+ case GRPC_CHANNEL_IDLE:
+ // lb channel inactive (probably shutdown prior to update). Restart lb
+ // call to kick the lb channel into gear.
+ GPR_ASSERT(glb_policy->lb_call == NULL);
+ /* fallthrough */
+ case GRPC_CHANNEL_READY:
+ if (glb_policy->lb_call != NULL) {
+ glb_policy->updating_lb_channel = false;
+ glb_policy->updating_lb_call = true;
+ grpc_call_cancel(glb_policy->lb_call, NULL);
+ // lb_on_server_status_received will pick up the cancel and reinit
+ // lb_call.
+ if (glb_policy->pending_update_args != NULL) {
+ const grpc_lb_policy_args *args = glb_policy->pending_update_args;
+ glb_policy->pending_update_args = NULL;
+ glb_update_locked(exec_ctx, &glb_policy->base, args);
+ }
+ } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ glb_policy->retry_timer_active = false;
+ }
+ start_picking_locked(exec_ctx, glb_policy);
+ }
+ /* fallthrough */
+ case GRPC_CHANNEL_SHUTDOWN:
+ done:
+ glb_policy->watching_lb_channel = false;
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ "watch_lb_channel_connectivity_cb_shutdown");
+ break;
+ }
+}
+
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1631,7 +1844,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_ping_one_locked,
glb_exit_idle_locked,
glb_check_connectivity_locked,
- glb_notify_on_state_change_locked};
+ glb_notify_on_state_change_locked,
+ glb_update_locked};
static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -1666,6 +1880,9 @@ static bool maybe_add_client_load_reporting_filter(
void grpc_lb_policy_grpclb_init() {
grpc_register_lb_policy(grpc_glb_lb_factory_create());
grpc_register_tracer("glb", &grpc_lb_glb_trace);
+#ifndef NDEBUG
+ grpc_register_tracer("lb_policy_refcount", &grpc_trace_lb_policy_refcount);
+#endif
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_client_load_reporting_filter,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
index b069fae2f8..63ad66c5e9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2016, Google Inc.
- * All rights reserved.
+ * Copyright 2016 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
index d6201f2387..f2967182e2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -50,28 +35,37 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
return lb_channel;
}
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args) {
- /* We strip out the channel arg for the LB policy name, since we want
- * to use the default (pick_first) in this case.
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
+ const grpc_arg to_add[] = {
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+ /* We remove:
*
- * We also strip out the channel arg for the resolved addresses, since
- * that will be generated by the name resolver used in the LB channel.
- * Note that the LB channel will use the sockaddr resolver, so this
- * won't actually generate a query to DNS (or some other name service).
- * However, the addresses returned by the sockaddr resolver will have
- * is_balancer=false, whereas our own addresses have is_balancer=true.
- * We need the LB channel to return addresses with is_balancer=false
- * so that it does not wind up recursively using the grpclb LB policy,
- * as per the special case logic in client_channel.c.
+ * - The channel arg for the LB policy name, since we want to use the default
+ * (pick_first) in this case.
*
- * Lastly, we also strip out the channel arg for the server URI,
- * since that will be different for the LB channel than for the parent
- * channel (the client channel factory will re-add this arg with
- * the right value). */
+ * - The channel arg for the resolved addresses, since that will be generated
+ * by the name resolver used in the LB channel. Note that the LB channel
+ * will use the fake resolver, so this won't actually generate a query
+ * to DNS (or some other name service). However, the addresses returned by
+ * the fake resolver will have is_balancer=false, whereas our own
+ * addresses have is_balancer=true. We need the LB channel to return
+ * addresses with is_balancer=false so that it does not wind up recursively
+ * using the grpclb LB policy, as per the special case logic in
+ * client_channel.c.
+ *
+ * - The channel arg for the server URI, since that will be different for the
+ * LB channel than for the parent channel (the client channel factory will
+ * re-add this arg with the right value).
+ *
+ * - The fake resolver generator, because we are replacing it with the one
+ * from the grpclb policy, used to propagate updates to the LB channel. */
static const char *keys_to_remove[] = {
- GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
- return grpc_channel_args_copy_and_remove(args, keys_to_remove,
- GPR_ARRAY_SIZE(keys_to_remove));
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
+ GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+ return grpc_channel_args_copy_and_add_and_remove(
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
+ GPR_ARRAY_SIZE(to_add));
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
index 9730c971d9..6120bf53f7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -35,6 +20,7 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/slice/slice_hash_table.h"
/** Create the channel used for communicating with an LB service.
@@ -49,9 +35,10 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
grpc_client_channel_factory *client_channel_factory,
grpc_channel_args *args);
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args);
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
index a145cba63c..2681b2a079 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -76,32 +61,39 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
return lb_channel;
}
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args) {
- const grpc_arg targets_info_arg =
- grpc_lb_targets_info_create_channel_arg(targets_info);
- /* We strip out the channel arg for the LB policy name, since we want
- * to use the default (pick_first) in this case.
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
+ const grpc_arg to_add[] = {
+ grpc_lb_targets_info_create_channel_arg(targets_info),
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+ /* We remove:
*
- * We also strip out the channel arg for the resolved addresses, since
- * that will be generated by the name resolver used in the LB channel.
- * Note that the LB channel will use the sockaddr resolver, so this
- * won't actually generate a query to DNS (or some other name service).
- * However, the addresses returned by the sockaddr resolver will have
- * is_balancer=false, whereas our own addresses have is_balancer=true.
- * We need the LB channel to return addresses with is_balancer=false
- * so that it does not wind up recursively using the grpclb LB policy,
- * as per the special case logic in client_channel.c.
+ * - The channel arg for the LB policy name, since we want to use the default
+ * (pick_first) in this case.
*
- * Lastly, we also strip out the channel arg for the server URI,
- * since that will be different for the LB channel than for the parent
- * channel (the client channel factory will re-add this arg with
- * the right value). */
+ * - The channel arg for the resolved addresses, since that will be generated
+ * by the name resolver used in the LB channel. Note that the LB channel
+ * will use the fake resolver, so this won't actually generate a query
+ * to DNS (or some other name service). However, the addresses returned by
+ * the fake resolver will have is_balancer=false, whereas our own
+ * addresses have is_balancer=true. We need the LB channel to return
+ * addresses with is_balancer=false so that it does not wind up recursively
+ * using the grpclb LB policy, as per the special case logic in
+ * client_channel.c.
+ *
+ * - The channel arg for the server URI, since that will be different for the
+ * LB channel than for the parent channel (the client channel factory will
+ * re-add this arg with the right value).
+ *
+ * - The fake resolver generator, because we are replacing it with the one
+ * from the grpclb policy, used to propagate updates to the LB channel. */
static const char *keys_to_remove[] = {
- GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
+ GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
/* Add the targets info table to be used for secure naming */
return grpc_channel_args_copy_and_add_and_remove(
- args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &targets_info_arg,
- 1);
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
+ GPR_ARRAY_SIZE(to_add));
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
index 444c03b9aa..c762443b7c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
index 0af4a919f8..4bb47d5c5c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index 81b6932fae..bec7c97a78 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2016, Google Inc.
- * All rights reserved.
+ * Copyright 2016 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -37,44 +22,39 @@
#include <grpc/support/alloc.h>
+/* invoked once for every Server in ServerList */
+static bool count_serverlist(pb_istream_t *stream, const pb_field_t *field,
+ void **arg) {
+ grpc_grpclb_serverlist *sl = *arg;
+ grpc_grpclb_server server;
+ if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
+ return false;
+ }
+ ++sl->num_servers;
+ return true;
+}
+
typedef struct decode_serverlist_arg {
- /* The first pass counts the number of servers in the server list. The second
- * one allocates and decodes. */
- bool first_pass;
/* The decoding callback is invoked once per server in serverlist. Remember
* which index of the serverlist are we currently decoding */
size_t decoding_idx;
- /* Populated after the first pass. Number of server in the input serverlist */
- size_t num_servers;
/* The decoded serverlist */
- grpc_grpclb_server **servers;
+ grpc_grpclb_serverlist *serverlist;
} decode_serverlist_arg;
/* invoked once for every Server in ServerList */
static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
void **arg) {
decode_serverlist_arg *dec_arg = *arg;
- if (dec_arg->first_pass) { /* count how many server do we have */
- grpc_grpclb_server server;
- if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
- gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
- return false;
- }
- dec_arg->num_servers++;
- } else { /* second pass. Actually decode. */
- grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server));
- GPR_ASSERT(dec_arg->num_servers > 0);
- if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */
- dec_arg->servers =
- gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
- }
- if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
- gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
- return false;
- }
- dec_arg->servers[dec_arg->decoding_idx++] = server;
+ GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
+ grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server));
+ if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+ gpr_free(server);
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
+ return false;
}
-
+ dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server;
return true;
}
@@ -165,36 +145,38 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
grpc_slice encoded_grpc_grpclb_response) {
- bool status;
- decode_serverlist_arg arg;
pb_istream_t stream =
pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response),
GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
pb_istream_t stream_at_start = stream;
+ grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist));
grpc_grpclb_response res;
memset(&res, 0, sizeof(grpc_grpclb_response));
- memset(&arg, 0, sizeof(decode_serverlist_arg));
-
- res.server_list.servers.funcs.decode = decode_serverlist;
- res.server_list.servers.arg = &arg;
- arg.first_pass = true;
- status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
+ // First pass: count number of servers.
+ res.server_list.servers.funcs.decode = count_serverlist;
+ res.server_list.servers.arg = sl;
+ bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_free(sl);
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
-
- arg.first_pass = false;
- status =
- pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
- if (!status) {
- gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
- return NULL;
+ // Second pass: populate servers.
+ if (sl->num_servers > 0) {
+ sl->servers = gpr_zalloc(sizeof(grpc_grpclb_server *) * sl->num_servers);
+ decode_serverlist_arg decode_arg;
+ memset(&decode_arg, 0, sizeof(decode_arg));
+ decode_arg.serverlist = sl;
+ res.server_list.servers.funcs.decode = decode_serverlist;
+ res.server_list.servers.arg = &decode_arg;
+ status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
+ &res);
+ if (!status) {
+ grpc_grpclb_destroy_serverlist(sl);
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
+ return NULL;
+ }
}
-
- grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist));
- sl->num_servers = arg.num_servers;
- sl->servers = arg.servers;
if (res.server_list.has_expiration_interval) {
sl->expiration_interval = res.server_list.expiration_interval;
}
@@ -228,7 +210,7 @@ grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy(
bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs,
const grpc_grpclb_serverlist *rhs) {
- if ((lhs == NULL) || (rhs == NULL)) {
+ if (lhs == NULL || rhs == NULL) {
return false;
}
if (lhs->num_servers != rhs->num_servers) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index 06873821bd..ef8d563edc 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2016, Google Inc.
- * All rights reserved.
+ * Copyright 2016 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -51,7 +36,7 @@ typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef grpc_lb_v1_Server grpc_grpclb_server;
typedef grpc_lb_v1_Duration grpc_grpclb_duration;
-typedef struct grpc_grpclb_serverlist {
+typedef struct {
grpc_grpclb_server **servers;
size_t num_servers;
grpc_grpclb_duration expiration_interval;
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index b1c5dfc61c..d0acd7a901 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -37,11 +22,14 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
+grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false);
+
typedef struct pending_pick {
struct pending_pick *next;
uint32_t initial_metadata_flags;
@@ -54,7 +42,9 @@ typedef struct {
grpc_lb_policy base;
/** all our subchannels */
grpc_subchannel **subchannels;
+ grpc_subchannel **new_subchannels;
size_t num_subchannels;
+ size_t num_new_subchannels;
grpc_closure connectivity_changed;
@@ -63,10 +53,19 @@ typedef struct {
/** the selected channel */
grpc_connected_subchannel *selected;
+ /** the subchannel key for \a selected, or NULL if \a selected not set */
+ const grpc_subchannel_key *selected_key;
+
/** have we started picking? */
- int started_picking;
+ bool started_picking;
/** are we shut down? */
- int shutdown;
+ bool shutdown;
+ /** are we updating the selected subchannel? */
+ bool updating_selected;
+ /** are we updating the subchannel candidates? */
+ bool updating_subchannels;
+ /** args from the latest update received while already updating, or NULL */
+ grpc_lb_policy_args *pending_update_args;
/** which subchannel are we watching? */
size_t checking_subchannel;
/** what is the connectivity of that channel? */
@@ -80,23 +79,31 @@ typedef struct {
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- size_t i;
GPR_ASSERT(p->pending_picks == NULL);
- for (i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
}
if (p->selected != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
+ "picked_first_destroy");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ if (p->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ gpr_free(p->pending_update_args);
+ }
gpr_free(p->subchannels);
+ gpr_free(p->new_subchannels);
gpr_free(p);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
+ }
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- p->shutdown = 1;
+ p->shutdown = true;
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(
@@ -106,7 +113,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
- } else if (p->num_subchannels > 0) {
+ } else if (p->num_subchannels > 0 && p->started_picking) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
@@ -114,7 +121,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
pp = next;
}
@@ -131,7 +138,7 @@ static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
gpr_free(pp);
@@ -156,7 +163,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
gpr_free(pp);
@@ -169,21 +176,25 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_UNREF(error);
}
-static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
- p->started_picking = 1;
- p->checking_subchannel = 0;
- p->checking_connectivity = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+static void start_picking_locked(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
+ p->started_picking = true;
+ if (p->subchannels != NULL) {
+ GPR_ASSERT(p->num_subchannels > 0);
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
}
static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
}
@@ -203,7 +214,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* No subchannel selected yet, so try again */
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -216,30 +227,314 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
- size_t i;
size_t num_subchannels = p->num_subchannels;
- grpc_subchannel **subchannels;
+ grpc_subchannel **subchannels = p->subchannels;
- subchannels = p->subchannels;
p->num_subchannels = 0;
p->subchannels = NULL;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
- for (i = 0; i < num_subchannels; i++) {
+ for (size_t i = 0; i < num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
}
-
gpr_free(subchannels);
}
+static grpc_connectivity_state pf_check_connectivity_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ return grpc_connectivity_state_get(&p->state_tracker, error);
+}
+
+static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
+ current, notify);
+}
+
+static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ if (p->selected) {
+ grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
+ } else {
+ GRPC_CLOSURE_SCHED(exec_ctx, closure,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
+ }
+}
+
+/* unsubscribe all subchannels */
+static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
+ if (p->num_subchannels > 0) {
+ GPR_ASSERT(p->selected == NULL);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
+ (void *)p, (void *)p->subchannels[p->checking_subchannel]);
+ }
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
+ &p->connectivity_changed);
+ p->updating_subchannels = true;
+ } else if (p->selected != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Pick First %p unsubscribing from selected subchannel %p",
+ (void *)p, (void *)p->selected);
+ }
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+ p->updating_selected = true;
+ }
+}
+
+/* true upon success */
+static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ if (p->subchannels == NULL) {
+ // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "pf_update_missing");
+ } else {
+ // otherwise, keep using the current subchannel list (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for Pick First %p update, "
+ "ignoring.",
+ (void *)p);
+ }
+ return;
+ }
+ const grpc_lb_addresses *addresses = arg->value.pointer.p;
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (!addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) {
+ // Empty update. Unsubscribe from all current subchannels and put the
+ // channel in TRANSIENT_FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "pf_update_empty");
+ stop_connectivity_watchers(exec_ctx, p);
+ return;
+ }
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
+ (void *)p, (unsigned long)num_addrs);
+ }
+ grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs);
+ /* We remove the following keys in order for subchannel keys belonging to
+ * subchannels point to the same address to match. */
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ size_t sc_args_count = 0;
+
+ /* Create list of subchannel args for new addresses in \a args. */
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (addresses->addresses[i].is_balancer) continue;
+ if (addresses->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
+ args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
+ 1);
+ gpr_free(addr_arg.value.string);
+ sc_args[sc_args_count++].args = new_args;
+ }
+
+ /* Check if p->selected is amongst them. If so, we are done. */
+ if (p->selected != NULL) {
+ GPR_ASSERT(p->selected_key != NULL);
+ for (size_t i = 0; i < sc_args_count; i++) {
+ grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]);
+ const bool found_selected =
+ grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
+ grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
+ if (found_selected) {
+ // The currently selected subchannel is in the update: we are done.
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p found already selected subchannel %p amongst "
+ "updates. Update done.",
+ (void *)p, (void *)p->selected);
+ }
+ for (size_t j = 0; j < sc_args_count; j++) {
+ grpc_channel_args_destroy(exec_ctx,
+ (grpc_channel_args *)sc_args[j].args);
+ }
+ gpr_free(sc_args);
+ return;
+ }
+ }
+ }
+ // We only check for already running updates here because if the previous
+ // steps were successful, the update can be considered done without any
+ // interference (ie, no callbacks were scheduled).
+ if (p->updating_selected || p->updating_subchannels) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Update already in progress for pick first %p. Deferring update.",
+ (void *)p);
+ }
+ if (p->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ gpr_free(p->pending_update_args);
+ }
+ p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args));
+ p->pending_update_args->client_channel_factory =
+ args->client_channel_factory;
+ p->pending_update_args->args = grpc_channel_args_copy(args->args);
+ p->pending_update_args->combiner = args->combiner;
+ return;
+ }
+ /* Create the subchannels for the new subchannel args/addresses. */
+ grpc_subchannel **new_subchannels =
+ gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
+ size_t num_new_subchannels = 0;
+ for (size_t i = 0; i < sc_args_count; i++) {
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args[i]);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ char *address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_INFO,
+ "Pick First %p created subchannel %p for address uri %s",
+ (void *)p, (void *)subchannel, address_uri);
+ gpr_free(address_uri);
+ }
+ grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
+ if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
+ }
+ gpr_free(sc_args);
+ if (num_new_subchannels == 0) {
+ gpr_free(new_subchannels);
+ // Empty update. Unsubscribe from all current subchannels and put the
+ // channel in TRANSIENT_FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
+ "pf_update_no_valid_addresses");
+ stop_connectivity_watchers(exec_ctx, p);
+ return;
+ }
+
+ /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */
+ stop_connectivity_watchers(exec_ctx, p);
+
+ /* Save new subchannels. The switch over will happen in
+ * pf_connectivity_changed_locked */
+ if (p->updating_selected || p->updating_subchannels) {
+ p->num_new_subchannels = num_new_subchannels;
+ p->new_subchannels = new_subchannels;
+ } else { /* nothing is updating. Get things moving from here */
+ p->num_subchannels = num_new_subchannels;
+ p->subchannels = new_subchannels;
+ p->new_subchannels = NULL;
+ p->num_new_subchannels = 0;
+ if (p->started_picking) {
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
+ }
+}
+
static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "Pick First %p connectivity changed. Updating selected: %d; Updating "
+ "subchannels: %d; Checking %lu index (%lu total); State: %d; ",
+ (void *)p, p->updating_selected, p->updating_subchannels,
+ (unsigned long)p->checking_subchannel,
+ (unsigned long)p->num_subchannels, p->checking_connectivity);
+ }
+ bool restart = false;
+ if (p->updating_selected && error != GRPC_ERROR_NONE) {
+ /* Captured the unsubscription for p->selected */
+ GPR_ASSERT(p->selected != NULL);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
+ "pf_update_connectivity");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
+ (void *)p, (void *)p->selected);
+ }
+ p->updating_selected = false;
+ if (p->num_new_subchannels == 0) {
+ p->selected = NULL;
+ return;
+ }
+ restart = true;
+ }
+ if (p->updating_subchannels && error != GRPC_ERROR_NONE) {
+ /* Captured the unsubscription for the checking subchannel */
+ GPR_ASSERT(p->selected == NULL);
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
+ "pf_update_connectivity");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
+ (void *)p->subchannels[i]);
+ }
+ }
+ gpr_free(p->subchannels);
+ p->subchannels = NULL;
+ p->num_subchannels = 0;
+ p->updating_subchannels = false;
+ if (p->num_new_subchannels == 0) return;
+ restart = true;
+ }
+ if (restart) {
+ p->selected = NULL;
+ p->selected_key = NULL;
+ GPR_ASSERT(p->new_subchannels != NULL);
+ GPR_ASSERT(p->num_new_subchannels > 0);
+ p->num_subchannels = p->num_new_subchannels;
+ p->subchannels = p->new_subchannels;
+ p->num_new_subchannels = 0;
+ p->new_subchannels = NULL;
+ if (p->started_picking) {
+ /* If we were picking, continue to do so over the new subchannels,
+ * starting from the 0th index. */
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ /* reuses the weak ref from start_picking_locked */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
+ if (p->pending_update_args != NULL) {
+ const grpc_lb_policy_args *args = p->pending_update_args;
+ p->pending_update_args = NULL;
+ pf_update_locked(exec_ctx, &p->base, args);
+ }
+ return;
+ }
GRPC_ERROR_REF(error);
-
if (p->shutdown) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
GRPC_ERROR_UNREF(error);
@@ -272,6 +567,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected_subchannel),
"picked_first");
+
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p selected subchannel %p (connected %p)",
+ (void *)p, (void *)selected_subchannel, (void *)p->selected);
+ }
+ p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
destroy_subchannels_locked(exec_ctx, p);
@@ -279,7 +581,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Servicing pending pick with selected subchannel %p",
+ (void *)p->selected);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -290,7 +597,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
- /* only trigger transient failure when we've tried all alternatives */
+ /* only trigger transient failure when we've tried all alternatives
+ */
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
@@ -332,7 +640,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
@@ -353,32 +661,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_UNREF(error);
}
-static grpc_connectivity_state pf_check_connectivity_locked(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- return grpc_connectivity_state_get(&p->state_tracker, error);
-}
-
-static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
- current, notify);
-}
-
-static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- if (p->selected) {
- grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
- } else {
- grpc_closure_sched(exec_ctx, closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- }
-}
-
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@@ -388,7 +670,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_ping_one_locked,
pf_exit_idle_locked,
pf_check_connectivity_locked,
- pf_notify_on_state_change_locked};
+ pf_notify_on_state_change_locked,
+ pf_update_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -398,62 +681,14 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
-
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
- const grpc_arg *arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
- }
- grpc_lb_addresses *addresses = arg->value.pointer.p;
- size_t num_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (!addresses->addresses[i].is_balancer) ++num_addrs;
- }
- if (num_addrs == 0) return NULL;
-
pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
-
- p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs);
- grpc_subchannel_args sc_args;
- size_t subchannel_idx = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- /* Skip balancer addresses, since we only know how to handle backends. */
- if (addresses->addresses[i].is_balancer) continue;
-
- if (addresses->addresses[i].user_data != NULL) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
- }
-
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
- 1);
- gpr_free(addr_arg.value.string);
- sc_args.args = new_args;
- grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
- exec_ctx, args->client_channel_factory, &sc_args);
- grpc_channel_args_destroy(exec_ctx, new_args);
-
- if (subchannel != NULL) {
- p->subchannels[subchannel_idx++] = subchannel;
- }
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p);
}
- if (subchannel_idx == 0) {
- gpr_free(p->subchannels);
- gpr_free(p);
- return NULL;
- }
- p->num_subchannels = subchannel_idx;
-
+ pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
- grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
- grpc_combiner_scheduler(args->combiner, false));
+ GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
+ grpc_combiner_scheduler(args->combiner));
return &p->base;
}
@@ -472,6 +707,7 @@ static grpc_lb_policy_factory *pick_first_lb_factory_create() {
void grpc_lb_policy_pick_first_init() {
grpc_register_lb_policy(pick_first_lb_factory_create());
+ grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace);
}
void grpc_lb_policy_pick_first_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 6e7f410635..8e9d6b0f47 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -1,63 +1,28 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
/** Round Robin Policy.
*
- * This policy keeps:
- * - A circular list of ready (connected) subchannels, the *readylist*. An empty
- * readylist consists solely of its root (dummy) node.
- * - A pointer to the last element picked from the readylist, the *lastpick*.
- * Initially set to point to the readylist's root.
- *
- * Behavior:
- * - When a subchannel connects, it's *prepended* to the readylist's root node.
- * Ie, if readylist = A <-> B <-> ROOT <-> C
- * ^ ^
- * |____________________|
- * and subchannel D becomes connected, the addition of D to the readylist
- * results in readylist = A <-> B <-> D <-> ROOT <-> C
- * ^ ^
- * |__________________________|
- * - When a subchannel disconnects, it's removed from the readylist. If the
- * subchannel being removed was the most recently picked, the *lastpick*
- * pointer moves to the removed node's previous element. Note that if the
- * readylist only had one element, this is still legal, as the lastpick would
- * point to the dummy root node, for an empty readylist.
- * - Upon picking, *lastpick* is updated to point to the returned (connected)
- * subchannel. Note that it's possible that the selected subchannel becomes
- * disconnected in the interim between the selection and the actual usage of
- * the subchannel by the caller.
- */
+ * Before every pick, the \a get_next_ready_subchannel_index_locked function
+ * returns the p->subchannel_list->subchannels index for next subchannel,
+ * respecting the relative
+ * order of the addresses provided upon creation or updates. Note however that
+ * updates will start picking from the beginning of the updated list. */
#include <string.h>
@@ -72,8 +37,6 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef struct round_robin_lb_policy round_robin_lb_policy;
-
grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false);
/** List of entities waiting for a pick.
@@ -99,26 +62,41 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
-/** List of subchannels in a connectivity READY state */
-typedef struct ready_list {
- grpc_subchannel *subchannel;
- /* references namesake entry in subchannel_data */
- void *user_data;
- struct ready_list *next;
- struct ready_list *prev;
-} ready_list;
+typedef struct rr_subchannel_list rr_subchannel_list;
+typedef struct round_robin_lb_policy {
+ /** base policy: must be first */
+ grpc_lb_policy base;
+
+ rr_subchannel_list *subchannel_list;
+
+ /** have we started picking? */
+ bool started_picking;
+ /** are we shutting down? */
+ bool shutdown;
+ /** List of picks that are waiting on connectivity */
+ pending_pick *pending_picks;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
+
+ /** Index into subchannels for last pick. */
+ size_t last_ready_subchannel_index;
+
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ rr_subchannel_list *latest_pending_subchannel_list;
+} round_robin_lb_policy;
typedef struct {
- /** index within policy->subchannels */
- size_t index;
- /** backpointer to owning policy */
- round_robin_lb_policy *policy;
+ /** backpointer to owning subchannel list */
+ rr_subchannel_list *subchannel_list;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure;
- /** this subchannels current position in subchannel->ready_list */
- ready_list *ready_list_node;
/** last observed connectivity. Not updated by
* \a grpc_subchannel_notify_on_state_change. Used to determine the previous
* state while processing the new state in \a rr_connectivity_changed */
@@ -126,201 +104,193 @@ typedef struct {
/** current connectivity state. Updated by \a
* grpc_subchannel_notify_on_state_change */
grpc_connectivity_state curr_connectivity_state;
+ /** connectivity state to be updated by the watcher, not guarded by
+ * the combiner. Will be moved to curr_connectivity_state inside of
+ * the combiner by rr_connectivity_changed_locked(). */
+ grpc_connectivity_state pending_connectivity_state_unsafe;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
const grpc_lb_user_data_vtable *user_data_vtable;
} subchannel_data;
-struct round_robin_lb_policy {
- /** base policy: must be first */
- grpc_lb_policy base;
-
- /** total number of addresses received at creation time */
- size_t num_addresses;
+struct rr_subchannel_list {
+ /** backpointer to owning policy */
+ round_robin_lb_policy *policy;
/** all our subchannels */
size_t num_subchannels;
- subchannel_data **subchannels;
+ subchannel_data *subchannels;
- /** how many subchannels are in TRANSIENT_FAILURE */
+ /** how many subchannels are in state READY */
+ size_t num_ready;
+ /** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
- /** how many subchannels are IDLE */
+ /** how many subchannels are in state SHUTDOWN */
+ size_t num_shutdown;
+ /** how many subchannels are in state IDLE */
size_t num_idle;
- /** have we started picking? */
- int started_picking;
- /** are we shutting down? */
- int shutdown;
- /** List of picks that are waiting on connectivity */
- pending_pick *pending_picks;
+ /** There will be one ref for each entry in subchannels for which there is a
+ * pending connectivity state watcher callback. */
+ gpr_refcount refcount;
- /** our connectivity state tracker */
- grpc_connectivity_state_tracker state_tracker;
-
- /** (Dummy) root of the doubly linked list containing READY subchannels */
- ready_list ready_list;
- /** Last pick from the ready list. */
- ready_list *ready_list_last_pick;
+ /** Is this list shutting down? This may be true due to the shutdown of the
+ * policy itself or because a newer update has arrived while this one hadn't
+ * finished processing. */
+ bool shutting_down;
};
-/** Returns the next subchannel from the connected list or NULL if the list is
- * empty.
- *
- * Note that this function does *not* advance p->ready_list_last_pick. Use \a
- * advance_last_picked_locked() for that. */
-static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) {
- ready_list *selected;
- selected = p->ready_list_last_pick->next;
-
- while (selected != NULL) {
- if (selected == &p->ready_list) {
- GPR_ASSERT(selected->subchannel == NULL);
- /* skip dummy root */
- selected = selected->next;
- } else {
- GPR_ASSERT(selected->subchannel != NULL);
- return selected;
- }
+static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list) {
+ GPR_ASSERT(subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p",
+ (void *)subchannel_list->policy, (void *)subchannel_list);
}
- return NULL;
-}
-
-/** Advance the \a ready_list picking head. */
-static void advance_last_picked_locked(round_robin_lb_policy *p) {
- if (p->ready_list_last_pick->next != NULL) { /* non-empty list */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
- if (p->ready_list_last_pick == &p->ready_list) {
- /* skip dummy root */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
+ "rr_subchannel_list_destroy");
+ }
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
}
- } else { /* should be an empty list */
- GPR_ASSERT(p->ready_list_last_pick == &p->ready_list);
}
+ gpr_free(subchannel_list->subchannels);
+ gpr_free(subchannel_list);
+}
+static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ gpr_ref_non_zero(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, "
- "CSC %p)",
- (void *)p, (void *)p->ready_list_last_pick,
- (void *)p->ready_list_last_pick->subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->ready_list_last_pick->subchannel));
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count - 1), (unsigned long)count);
}
}
-/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
- * csc to the list of ready subchannels. */
-static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- subchannel_data *sd) {
- ready_list *new_elem = gpr_zalloc(sizeof(ready_list));
- new_elem->subchannel = sd->subchannel;
- new_elem->user_data = sd->user_data;
- if (p->ready_list.prev == NULL) {
- /* first element */
- new_elem->next = &p->ready_list;
- new_elem->prev = &p->ready_list;
- p->ready_list.next = new_elem;
- p->ready_list.prev = new_elem;
- } else {
- new_elem->next = &p->ready_list;
- new_elem->prev = p->ready_list.prev;
- p->ready_list.prev->next = new_elem;
- p->ready_list.prev = new_elem;
- }
+static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ const bool done = gpr_unref(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
- (void *)new_elem, (void *)sd->subchannel);
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count + 1), (unsigned long)count);
+ }
+ if (done) {
+ rr_subchannel_list_destroy(exec_ctx, subchannel_list);
}
- return new_elem;
}
-/** Removes \a node from the list of connected subchannels */
-static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
- ready_list *node) {
- if (node == NULL) {
- return;
- }
- if (node == p->ready_list_last_pick) {
- p->ready_list_last_pick = p->ready_list_last_pick->prev;
+/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
+ * watcher's callback will ultimately unref \a subchannel_list. */
+static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ GPR_ASSERT(!subchannel_list->shutting_down);
+ subchannel_list->shutting_down = true;
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
+ NULL,
+ &sd->connectivity_changed_closure);
+ }
}
+ rr_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+}
- /* removing last item */
- if (node->next == &p->ready_list && node->prev == &p->ready_list) {
- GPR_ASSERT(p->ready_list.next == node);
- GPR_ASSERT(p->ready_list.prev == node);
- p->ready_list.next = NULL;
- p->ready_list.prev = NULL;
- } else {
- node->prev->next = node->next;
- node->next->prev = node->prev;
+/** Returns the index into p->subchannel_list->subchannels of the next
+ * subchannel in READY state, or p->subchannel_list->num_subchannels if no
+ * subchannel is READY.
+ *
+ * Note that this function does *not* update p->last_ready_subchannel_index.
+ * The caller must do that if it returns a pick. */
+static size_t get_next_ready_subchannel_index_locked(
+ const round_robin_lb_policy *p) {
+ GPR_ASSERT(p->subchannel_list != NULL);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR %p] getting next ready subchannel (out of %lu), "
+ "last_ready_subchannel_index=%lu",
+ (void *)p, (unsigned long)p->subchannel_list->num_subchannels,
+ (unsigned long)p->last_ready_subchannel_index);
+ }
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+ const size_t index = (i + p->last_ready_subchannel_index + 1) %
+ p->subchannel_list->num_subchannels;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
+ "state=%d",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (void *)p->subchannel_list, (unsigned long)index,
+ p->subchannel_list->subchannels[index].curr_connectivity_state);
+ }
+ if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
+ GRPC_CHANNEL_READY) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] found next ready subchannel (%p) at index %lu of "
+ "subchannel_list %p",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (unsigned long)index, (void *)p->subchannel_list);
+ }
+ return index;
+ }
}
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
- (void *)node->subchannel);
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p);
}
-
- node->next = NULL;
- node->prev = NULL;
- node->subchannel = NULL;
-
- gpr_free(node);
+ return p->subchannel_list->num_subchannels;
}
-static bool is_ready_list_empty(round_robin_lb_policy *p) {
- return p->ready_list.prev == NULL;
+// Sets p->last_ready_subchannel_index to last_ready_index.
+static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
+ size_t last_ready_index) {
+ GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels);
+ p->last_ready_subchannel_index = last_ready_index;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
+ (void *)p, (unsigned long)last_ready_index,
+ (void *)p->subchannel_list->subchannels[last_ready_index].subchannel,
+ (void *)grpc_subchannel_get_connected_subchannel(
+ p->subchannel_list->subchannels[last_ready_index].subchannel));
+ }
}
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *elem;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
}
-
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
- }
- gpr_free(sd);
- }
-
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
- gpr_free(p->subchannels);
-
- elem = p->ready_list.next;
- while (elem != NULL && elem != &p->ready_list) {
- ready_list *tmp;
- tmp = elem->next;
- elem->next = NULL;
- elem->prev = NULL;
- elem->subchannel = NULL;
- gpr_free(elem);
- elem = tmp;
- }
-
gpr_free(p);
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- size_t i;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
}
-
- p->shutdown = 1;
+ p->shutdown = true;
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pp);
@@ -328,25 +298,22 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
- &sd->connectivity_changed_closure);
- }
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_shutdown");
+ p->subchannel_list = NULL;
}
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -364,15 +331,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
*pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -387,21 +353,15 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
- size_t i;
- p->started_picking = 1;
-
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- /* use some sentinel value outside of the range of grpc_connectivity_state
- * to signal an undefined previous state. We won't be referring to this
- * value again and it'll be overwritten after the first call to
- * rr_connectivity_changed */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ p->started_picking = true;
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannel_list->subchannels[i];
GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
+ rr_subchannel_list_ref(sd->subchannel_list, "start_picking");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
@@ -418,65 +378,79 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- ready_list *selected;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
}
-
- if ((selected = peek_next_connected_locked(p))) {
- /* readily available, report right away */
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "rr_picked");
-
- if (user_data != NULL) {
- *user_data = selected->user_data;
- }
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
- (void *)*target, (void *)selected);
- }
- /* only advance the last picked pointer if the selection was used */
- advance_last_picked_locked(p);
- return 1;
- } else {
- /* no pick currently available. Save for later in list of pending picks */
- if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ if (p->subchannel_list != NULL) {
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ /* readily available, report right away */
+ subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index];
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(sd->subchannel),
+ "rr_picked");
+ if (user_data != NULL) {
+ *user_data = sd->user_data;
+ }
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, "
+ "INDEX %lu)",
+ (void *)p, (void *)sd->subchannel, (void *)*target,
+ (void *)sd->subchannel_list, (unsigned long)next_ready_index);
+ }
+ /* only advance the last picked pointer if the selection was used */
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
+ return 1;
}
- pp = gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->on_complete = on_complete;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->user_data = user_data;
- p->pending_picks = pp;
- return 0;
}
+ /* no pick currently available. Save for later in list of pending picks */
+ if (!p->started_picking) {
+ start_picking_locked(exec_ctx, p);
+ }
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
+ pp->next = p->pending_picks;
+ pp->target = target;
+ pp->on_complete = on_complete;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->user_data = user_data;
+ p->pending_picks = pp;
+ return 0;
}
-static void update_state_counters(subchannel_data *sd) {
- round_robin_lb_policy *p = sd->policy;
-
- /* update p->num_transient_failures (resp. p->num_idle): if the previous
- * state was TRANSIENT_FAILURE (resp. IDLE), decrement
- * p->num_transient_failures (resp. p->num_idle). */
- if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GPR_ASSERT(p->num_transient_failures > 0);
- --p->num_transient_failures;
+static void update_state_counters_locked(subchannel_data *sd) {
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
+ if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(subchannel_list->num_ready > 0);
+ --subchannel_list->num_ready;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(subchannel_list->num_transient_failures > 0);
+ --subchannel_list->num_transient_failures;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ GPR_ASSERT(subchannel_list->num_shutdown > 0);
+ --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
- GPR_ASSERT(p->num_idle > 0);
- --p->num_idle;
+ GPR_ASSERT(subchannel_list->num_idle > 0);
+ --subchannel_list->num_idle;
+ }
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ ++subchannel_list->num_ready;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++subchannel_list->num_transient_failures;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ ++subchannel_list->num_shutdown;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
+ ++subchannel_list->num_idle;
}
}
-/* sd is the subchannel_data associted with the updated subchannel.
- * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
- * or SHUTDOWN */
-static grpc_connectivity_state update_lb_connectivity_status(
+/** Sets the policy's connectivity status based on that of the passed-in \a sd
+ * (the subchannel_data associted with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
+ * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
+ * connectivity status set. */
+static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
@@ -488,76 +462,156 @@ static grpc_connectivity_state update_lb_connectivity_status(
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->num_subchannels = 0.
+ * CHECK: p->subchannel_list->num_shutdown ==
+ * p->subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
- * CHECK: p->num_transient_failures == p->num_subchannels.
+ * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
*
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: p->num_idle == p->num_subchannels.
+ * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
- round_robin_lb_policy *p = sd->policy;
- if (!is_ready_list_empty(p)) { /* 1) READY */
+ grpc_connectivity_state new_state = sd->curr_connectivity_state;
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
+ round_robin_lb_policy *p = subchannel_list->policy;
+ if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- return GRPC_CHANNEL_READY;
+ new_state = GRPC_CHANNEL_READY;
} else if (sd->curr_connectivity_state ==
GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
- return GRPC_CHANNEL_CONNECTING;
- } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */
+ new_state = GRPC_CHANNEL_CONNECTING;
+ } else if (p->subchannel_list->num_shutdown ==
+ p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
- return GRPC_CHANNEL_SHUTDOWN;
- } else if (p->num_transient_failures ==
- p->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ new_state = GRPC_CHANNEL_SHUTDOWN;
+ } else if (subchannel_list->num_transient_failures ==
+ p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- return GRPC_CHANNEL_TRANSIENT_FAILURE;
- } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */
+ new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ } else if (subchannel_list->num_idle ==
+ p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
- return GRPC_CHANNEL_IDLE;
+ new_state = GRPC_CHANNEL_IDLE;
}
- /* no change */
- return sd->curr_connectivity_state;
+ GRPC_ERROR_UNREF(error);
+ return new_state;
}
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
- round_robin_lb_policy *p = sd->policy;
- pending_pick *pp;
-
- GRPC_ERROR_REF(error);
-
+ round_robin_lb_policy *p = sd->subchannel_list->policy;
+ // If the policy is shutting down, unref and return.
if (p->shutdown) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- GRPC_ERROR_UNREF(error);
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
+ return;
+ }
+ if (sd->subchannel_list->shutting_down) {
+ // the subchannel list associated with sd has been discarded. This callback
+ // corresponds to the unsubscription.
+ GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
return;
}
- switch (sd->curr_connectivity_state) {
- case GRPC_CHANNEL_INIT:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_READY:
- /* add the newly connected subchannel to the list of connected ones.
- * Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd);
+ // Dispose of outdated subchannel lists.
+ if (sd->subchannel_list != p->subchannel_list &&
+ sd->subchannel_list != p->latest_pending_subchannel_list) {
+ // sd belongs to an outdated subchannel_list: get rid of it.
+ rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated");
+ return;
+ }
+ // Now that we're inside the combiner, copy the pending connectivity
+ // state (which was set by the connectivity state watcher) to
+ // curr_connectivity_state, which is what we use inside of the combiner.
+ sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] connectivity changed for subchannel %p: "
+ "prev_state=%d new_state=%d",
+ (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
+ sd->curr_connectivity_state);
+ }
+ // Update state counters and determine new overall state.
+ update_state_counters_locked(sd);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ const grpc_connectivity_state new_policy_connectivity_state =
+ update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
+ // policy's state is SHUTDOWN, clean up.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
+ if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ /* the policy is shutting down. Flush all the pending picks... */
+ pending_pick *pp;
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = NULL;
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ gpr_free(pp);
+ }
+ }
+ /* unref the "rr_connectivity" weak ref from start_picking */
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
+ "rr_connectivity_sd_shutdown");
+ } else { // sd not in SHUTDOWN
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (sd->subchannel_list != p->subchannel_list) {
+ // promote sd->subchannel_list to p->subchannel_list.
+ // sd->subchannel_list must be equal to
+ // p->latest_pending_subchannel_list because we have already filtered
+ // for sds belonging to outdated subchannel lists.
+ GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(!sd->subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const unsigned long num_subchannels =
+ p->subchannel_list != NULL
+ ? (unsigned long)p->subchannel_list->num_subchannels
+ : 0;
+ gpr_log(GPR_DEBUG,
+ "[RR %p] phasing out subchannel list %p (size %lu) in favor "
+ "of %p (size %lu)",
+ (void *)p, (void *)p->subchannel_list, num_subchannels,
+ (void *)sd->subchannel_list, num_subchannels);
+ }
+ if (p->subchannel_list != NULL) {
+ // dispose of the current subchannel_list
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update_connectivity");
+ }
+ p->subchannel_list = sd->subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
+ }
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
- ready_list *selected = peek_next_connected_locked(p);
- GPR_ASSERT(selected != NULL);
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
- advance_last_picked_locked(p);
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
}
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
@@ -568,72 +622,21 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
- "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- (void *)selected->subchannel, (void *)selected);
+ "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)",
+ (void *)selected->subchannel,
+ (unsigned long)next_ready_index);
}
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_IDLE:
- ++p->num_idle;
- /* fallthrough */
- case GRPC_CHANNEL_CONNECTING:
- update_state_counters(sd);
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- ++p->num_transient_failures;
- /* remove from ready list if still present */
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_SHUTDOWN:
- update_state_counters(sd);
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- --p->num_subchannels;
- GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
- p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
- p->subchannels[sd->index]->index = sd->index;
- if (update_lb_connectivity_status(exec_ctx, sd, error) ==
- GRPC_CHANNEL_SHUTDOWN) {
- /* the policy is shutting down. Flush all the pending picks... */
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
- }
- }
- gpr_free(sd);
- /* unref the "rr_connectivity" weak ref from start_picking */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- break;
+ }
+ /* renew notification: reuses the "rr_connectivity" weak ref on the policy
+ * as well as the sd->subchannel_list ref. */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
- GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -654,66 +657,83 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *selected;
- grpc_connected_subchannel *target;
- if ((selected = peek_next_connected_locked(p))) {
- target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
+ grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
}
}
-static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown_locked,
- rr_pick_locked,
- rr_cancel_pick_locked,
- rr_cancel_picks_locked,
- rr_ping_one_locked,
- rr_exit_idle_locked,
- rr_check_connectivity_locked,
- rr_notify_on_state_change_locked};
-
-static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
-
-static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
-
-static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
- grpc_lb_policy_args *args) {
- GPR_ASSERT(args->client_channel_factory != NULL);
-
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
+static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)policy;
+ /* Find the number of backend addresses. We ignore balancer addresses, since
+ * we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
+ if (p->subchannel_list == NULL) {
+ // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "rr_update_missing");
+ } else {
+ // otherwise, keep using the current subchannel list (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for Round Robin %p update, "
+ "ignoring.",
+ (void *)p);
+ }
+ return;
}
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
if (!addresses->addresses[i].is_balancer) ++num_addrs;
}
- if (num_addrs == 0) return NULL;
-
- round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
-
- p->num_addresses = num_addrs;
- p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
-
+ if (num_addrs == 0) {
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "rr_update_empty");
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update");
+ p->subchannel_list = NULL;
+ }
+ return;
+ }
+ size_t subchannel_index = 0;
+ rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list));
+ subchannel_list->policy = p;
+ subchannel_list->subchannels =
+ gpr_zalloc(sizeof(subchannel_data) * num_addrs);
+ subchannel_list->num_subchannels = num_addrs;
+ gpr_ref_init(&subchannel_list->refcount, 1);
+ p->latest_pending_subchannel_list = subchannel_list;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
+ (void *)subchannel_list, (unsigned long)num_addrs);
+ }
grpc_subchannel_args sc_args;
- size_t subchannel_idx = 0;
+ /* We need to remove the LB addresses in order to be able to compare the
+ * subchannel keys of subchannels from a different batch of addresses. */
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ /* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue;
-
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ GPR_ASSERT(i < num_addrs);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
@@ -727,50 +747,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s",
- (void *)subchannel, address_uri);
+ gpr_log(GPR_DEBUG,
+ "index %lu: Created subchannel %p for address uri %s into "
+ "subchannel_list %p",
+ (unsigned long)subchannel_index, (void *)subchannel, address_uri,
+ (void *)subchannel_list);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
- if (subchannel != NULL) {
- subchannel_data *sd = gpr_zalloc(sizeof(*sd));
- p->subchannels[subchannel_idx] = sd;
- sd->policy = p;
- sd->index = subchannel_idx;
- sd->subchannel = subchannel;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != NULL) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
- }
- ++subchannel_idx;
- grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed_locked, sd,
- grpc_combiner_scheduler(args->combiner, false));
+ subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
+ sd->subchannel_list = subchannel_list;
+ sd->subchannel = subchannel;
+ GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
+ rr_connectivity_changed_locked, sd,
+ grpc_combiner_scheduler(args->combiner));
+ /* use some sentinel value outside of the range of
+ * grpc_connectivity_state to signal an undefined previous state. We
+ * won't be referring to this value again and it'll be overwritten after
+ * the first call to rr_connectivity_changed_locked */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ sd->user_data_vtable = addresses->user_data_vtable;
+ if (sd->user_data_vtable != NULL) {
+ sd->user_data =
+ sd->user_data_vtable->copy(addresses->addresses[i].user_data);
+ }
+ if (p->started_picking) {
+ rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking");
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update");
+ /* 2. Watch every new subchannel. A subchannel list becomes active the
+ * moment one of its subchannels is READY. At that moment, we swap
+ * p->subchannel_list for sd->subchannel_list, provided the subchannel
+ * list is still valid (ie, isn't shutting down) */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
- if (subchannel_idx == 0) {
- /* couldn't create any subchannel. Bail out */
- gpr_free(p->subchannels);
- gpr_free(p);
- return NULL;
+ if (!p->started_picking) {
+ // The policy isn't picking yet. Save the update for later, disposing of
+ // previous version if any.
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "rr_update_before_started_picking");
+ }
+ p->subchannel_list = subchannel_list;
}
- p->num_subchannels = subchannel_idx;
+}
- /* The (dummy node) root of the ready list */
- p->ready_list.subchannel = NULL;
- p->ready_list.prev = NULL;
- p->ready_list.next = NULL;
- p->ready_list_last_pick = &p->ready_list;
+static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
+ rr_destroy,
+ rr_shutdown_locked,
+ rr_pick_locked,
+ rr_cancel_pick_locked,
+ rr_cancel_picks_locked,
+ rr_ping_one_locked,
+ rr_exit_idle_locked,
+ rr_check_connectivity_locked,
+ rr_notify_on_state_change_locked,
+ rr_update_locked};
+
+static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
+
+static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
+static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args) {
+ GPR_ASSERT(args->client_channel_factory != NULL);
+ round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
+ rr_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
- (void *)p, (unsigned long)p->num_subchannels);
+ gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p,
+ (unsigned long)p->subchannel_list->num_subchannels);
}
return &p->base;
}