aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar David G. Quintas <dgq@google.com>2017-06-07 12:29:59 -0700
committerGravatar GitHub <noreply@github.com>2017-06-07 12:29:59 -0700
commit3f02e8cb4ba8882ee710b6af766bc7c6b74594e4 (patch)
treed462b1e433a4ef8e7bf06e48895f52f6094c0d30 /src/core
parentb89a100d69f9d975ba58a6e152354ad9614b7f96 (diff)
parent87d5a3130dc94898aed40e74e998a72d21156ae5 (diff)
Merge pull request #11159 from dgquintas/lb_update_server5
Implement LB policy updates
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c10
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c73
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.c6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c504
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c51
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c53
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c426
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c626
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h4
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c255
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h75
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c7
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h5
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.c21
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h8
-rw-r--r--src/core/ext/filters/workarounds/workaround_utils.h2
-rw-r--r--src/core/lib/channel/channel_args.c18
-rw-r--r--src/core/lib/channel/channel_args.h4
-rw-r--r--src/core/lib/iomgr/polling_entity.h13
-rw-r--r--src/core/lib/security/transport/lb_targets_info.c4
-rw-r--r--src/core/lib/slice/slice_hash_table.c39
-rw-r--r--src/core/lib/slice/slice_hash_table.h19
-rw-r--r--src/core/lib/transport/service_config.c2
-rw-r--r--src/core/plugin_registry/grpc_plugin_registry.c4
-rw-r--r--src/core/plugin_registry/grpc_unsecure_plugin_registry.c4
28 files changed, 1640 insertions, 618 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index 04666edbec..aa464f3ab7 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -126,9 +126,10 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
} else {
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq), NULL,
- &w->on_complete, NULL);
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL,
+ &w->on_complete, NULL);
}
gpr_mu_lock(&w->mu);
@@ -245,7 +246,8 @@ void grpc_channel_watch_connectivity_state(
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
- &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state,
+ &exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
&w->on_complete, &w->watcher_timer_init);
} else {
abort();
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 8cebbe9eca..ab71467d73 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -389,7 +389,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand = arg;
char *lb_policy_name = NULL;
grpc_lb_policy *lb_policy = NULL;
- grpc_lb_policy *old_lb_policy;
+ grpc_lb_policy *old_lb_policy = NULL;
grpc_slice_hash_table *method_params_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
@@ -399,6 +399,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
+ bool lb_policy_updated = false;
if (chand->resolver_result != NULL) {
// Find LB policy name.
const grpc_arg *channel_arg =
@@ -438,14 +439,27 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
- lb_policy =
- grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
- if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "config_change");
- GRPC_ERROR_UNREF(state_error);
- state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
- &state_error);
+
+ const bool lb_policy_type_changed =
+ (chand->info_lb_policy_name == NULL) ||
+ (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0);
+ if (chand->lb_policy != NULL && !lb_policy_type_changed) {
+ // update
+ lb_policy_updated = true;
+ grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
+ } else {
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ GRPC_ERROR_UNREF(state_error);
+ state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
+ &state_error);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ }
}
+
// Find service config.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
@@ -492,8 +506,6 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
gpr_free(chand->info_lb_policy_name);
chand->info_lb_policy_name = lb_policy_name;
}
- old_lb_policy = chand->lb_policy;
- chand->lb_policy = lb_policy;
if (service_config_json != NULL) {
gpr_free(chand->info_service_config_json);
chand->info_service_config_json = service_config_json;
@@ -516,17 +528,21 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
"Channel disconnected", &error, 1));
grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
}
- if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ if (!lb_policy_updated && lb_policy != NULL &&
+ chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
exit_idle = true;
chand->exit_idle_when_lb_policy_arrives = false;
}
if (error == GRPC_ERROR_NONE && chand->resolver) {
- set_channel_connectivity_state_locked(
- exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
- if (lb_policy != NULL) {
- watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
+ if (!lb_policy_updated) {
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ GRPC_ERROR_REF(state_error),
+ "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
+ }
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next_locked(exec_ctx, chand->resolver,
@@ -546,7 +562,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
"resolver_gone");
}
- if (exit_idle) {
+ if (!lb_policy_updated && lb_policy != NULL && exit_idle) {
grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
}
@@ -555,9 +571,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_pollset_set_del_pollset_set(
exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
+ old_lb_policy = NULL;
}
- if (lb_policy != NULL) {
+ if (!lb_policy_updated && lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
}
@@ -1447,7 +1464,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
typedef struct external_connectivity_watcher {
channel_data *chand;
- grpc_pollset *pollset;
+ grpc_polling_entity pollent;
grpc_closure *on_complete;
grpc_closure *watcher_timer_init;
grpc_connectivity_state *state;
@@ -1522,8 +1539,8 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
- grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
- w->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
external_connectivity_watcher_list_remove(w->chand, w);
@@ -1550,8 +1567,8 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
}
- grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
- w->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
@@ -1559,18 +1576,18 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *closure,
- grpc_closure *watcher_timer_init) {
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_polling_entity pollent, grpc_connectivity_state *state,
+ grpc_closure *closure, grpc_closure *watcher_timer_init) {
channel_data *chand = elem->channel_data;
external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
w->chand = chand;
- w->pollset = pollset;
+ w->pollent = pollent;
w->on_complete = closure;
w->state = state;
w->watcher_timer_init = watcher_timer_init;
-
- grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
+ chand->interested_parties);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
grpc_closure_sched(
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 356a7ab0c1..4f8987f0da 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -57,9 +57,9 @@ int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element *elem);
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *on_complete,
- grpc_closure *watcher_timer_init);
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_polling_entity pollent, grpc_connectivity_state *state,
+ grpc_closure *on_complete, grpc_closure *watcher_timer_init);
/* Debug helper: pull the subchannel call from a call stack element */
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c
index 112ba40658..7ac2cb1775 100644
--- a/src/core/ext/filters/client_channel/lb_policy.c
+++ b/src/core/ext/filters/client_channel/lb_policy.c
@@ -166,3 +166,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
return policy->vtable->check_connectivity_locked(exec_ctx, policy,
connectivity_error);
}
+
+void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ const grpc_lb_policy_args *lb_policy_args) {
+ policy->vtable->update_locked(exec_ctx, policy, lb_policy_args);
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index fb4aa084a6..07e7953009 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -42,6 +42,7 @@
is expected to be extended to contain some parameters) */
typedef struct grpc_lb_policy grpc_lb_policy;
typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable;
+typedef struct grpc_lb_policy_args grpc_lb_policy_args;
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
@@ -105,9 +106,12 @@ struct grpc_lb_policy_vtable {
grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_closure *closure);
+
+ void (*update_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args);
};
-/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
+//#define GRPC_LB_POLICY_REFCOUNT_DEBUG
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
/* Strong references: the policy will shutdown when they reach zero */
@@ -207,4 +211,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error);
+/** Update \a policy with \a lb_policy_args. */
+void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ const grpc_lb_policy_args *lb_policy_args);
+
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index d2a2856a18..6eb0a9ef21 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -115,6 +115,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -315,6 +316,9 @@ typedef struct glb_lb_policy {
/** for communicating with the LB server */
grpc_channel *lb_channel;
+ /** response generator to inject address updates into \a lb_channel */
+ grpc_fake_resolver_response_generator *response_generator;
+
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy *rr_policy;
@@ -323,6 +327,9 @@ typedef struct glb_lb_policy {
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
+ /** connectivity state of the LB channel */
+ grpc_connectivity_state lb_channel_connectivity;
+
/** stores the deserialized response from the LB. May be NULL until one such
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
@@ -340,10 +347,27 @@ typedef struct glb_lb_policy {
bool shutting_down;
+ /** are we currently updating lb_call? */
+ bool updating_lb_call;
+
+ /** are we currently updating lb_channel? */
+ bool updating_lb_channel;
+
+ /** are we already watching the LB channel's connectivity? */
+ bool watching_lb_channel;
+
+ /** is \a lb_call_retry_timer active? */
+ bool retry_timer_active;
+
+ /** called upon changes to the LB channel's connectivity. */
+ grpc_closure lb_channel_on_connectivity_changed;
+
+ /** args from the latest update received while already updating, or NULL */
+ grpc_lb_policy_args *pending_update_args;
+
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
-
/* Finished sending initial request. */
grpc_closure lb_on_sent_initial_request;
@@ -533,10 +557,9 @@ static grpc_lb_addresses *process_serverlist_locked(
return lb_addresses;
}
-/* returns true if the new RR policy should replace the current one, if any */
-static bool update_lb_connectivity_status_locked(
+static void update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
+ grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
@@ -570,28 +593,26 @@ static bool update_lb_connectivity_status_locked(
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
- switch (new_rr_state) {
+ switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
- GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE);
- return false; /* don't replace the RR policy */
+ GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
+ break;
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
- GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE);
+ GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
}
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Setting grpclb's state to %s from new RR policy %p state.",
- grpc_connectivity_state_name(new_rr_state),
- (void *)glb_policy->rr_policy);
+ gpr_log(
+ GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
+ grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
}
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- new_rr_state, GRPC_ERROR_REF(new_rr_state_error),
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
+ GRPC_ERROR_REF(rr_state_error),
"update_lb_connectivity_status_locked");
- return true;
}
/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
@@ -670,45 +691,38 @@ static bool pick_from_internal_rr_locked(
return pick_done;
}
-static grpc_lb_policy *create_rr_locked(
- grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
- GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
-
- grpc_lb_policy_args args;
- memset(&args, 0, sizeof(args));
- args.client_channel_factory = glb_policy->cc_factory;
- args.combiner = glb_policy->base.combiner;
+static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args));
+ args->client_channel_factory = glb_policy->cc_factory;
+ args->combiner = glb_policy->base.combiner;
grpc_lb_addresses *addresses =
- process_serverlist_locked(exec_ctx, serverlist);
-
+ process_serverlist_locked(exec_ctx, glb_policy->serverlist);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
- args.args = grpc_channel_args_copy_and_add_and_remove(
+ args->args = grpc_channel_args_copy_and_add_and_remove(
glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
1);
-
- grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- GPR_ASSERT(rr != NULL);
grpc_lb_addresses_destroy(exec_ctx, addresses);
- grpc_channel_args_destroy(exec_ctx, args.args);
- return rr;
+ return args;
+}
+
+static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_args *args) {
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ gpr_free(args);
}
static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error);
-/* glb_policy->rr_policy may be NULL (initial handover) */
-static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
- GPR_ASSERT(glb_policy->serverlist != NULL &&
- glb_policy->serverlist->num_servers > 0);
-
- if (glb_policy->shutting_down) return;
+static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+ grpc_lb_policy_args *args) {
+ GPR_ASSERT(glb_policy->rr_policy == NULL);
grpc_lb_policy *new_rr_policy =
- create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
+ grpc_lb_policy_create(exec_ctx, "round_robin", args);
if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR,
"Failure creating a RoundRobin policy for serverlist update with "
@@ -719,41 +733,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
(void *)glb_policy->rr_policy);
return;
}
-
- grpc_error *new_rr_state_error = NULL;
- const grpc_connectivity_state new_rr_state =
- grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy,
- &new_rr_state_error);
- /* Connectivity state is a function of the new RR policy just created */
- const bool replace_old_rr = update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
-
- if (!replace_old_rr) {
- /* dispose of the new RR policy that won't be used after all */
- GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace");
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Keeping old RR policy (%p) despite new serverlist: new RR "
- "policy was in %s connectivity state.",
- (void *)glb_policy->rr_policy,
- grpc_connectivity_state_name(new_rr_state));
- }
- return;
- }
-
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)",
- (void *)new_rr_policy, (void *)glb_policy->rr_policy);
- }
-
- if (glb_policy->rr_policy != NULL) {
- /* if we are phasing out an existing RR instance, unref it. */
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
- }
-
- /* Finally update the RR policy to the newly created one */
glb_policy->rr_policy = new_rr_policy;
+ grpc_error *rr_state_error = NULL;
+ const grpc_connectivity_state rr_state =
+ grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
+ &rr_state_error);
+ /* Connectivity state is a function of the RR policy updated/created */
+ update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
+ rr_state_error);
+
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
@@ -769,10 +758,10 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner, false));
rr_connectivity->glb_policy = glb_policy;
- rr_connectivity->state = new_rr_state;
+ rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
@@ -809,6 +798,31 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
}
}
+/* glb_policy->rr_policy may be NULL (initial handover) */
+static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
+
+ if (glb_policy->shutting_down) return;
+
+ grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
+ if (glb_policy->rr_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
+ (void *)glb_policy->rr_policy);
+ }
+ grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
+ } else {
+ create_rr_locked(exec_ctx, glb_policy, args);
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
+ (void *)glb_policy->rr_policy);
+ }
+ }
+ lb_policy_args_destroy(exec_ctx, args);
+}
+
static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg;
@@ -854,18 +868,24 @@ static grpc_slice_hash_table_entry targets_info_entry_create(
return entry;
}
-/* Returns the target URI for the LB service whose addresses are in \a
- * addresses. Using this URI, a bidirectional streaming channel will be created
- * for the reception of load balancing updates.
+static int balancer_name_cmp_fn(void *a, void *b) {
+ const char *a_str = a;
+ const char *b_str = b;
+ return strcmp(a_str, b_str);
+}
+
+/* Returns the channel args for the LB channel, used to create a bidirectional
+ * stream for the reception of load balancing updates.
*
- * The output argument \a targets_info will be updated to contain a mapping of
- * "LB server address" to "balancer name", as reported by the naming system.
- * This mapping will be propagated via the channel arguments of the
- * aforementioned LB streaming channel, to be used by the security connector for
- * secure naming checks. The user is responsible for freeing \a targets_info. */
-static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
- const grpc_lb_addresses *addresses,
- grpc_slice_hash_table **targets_info) {
+ * Inputs:
+ * - \a addresses: corresponding to the balancers.
+ * - \a response_generator: in order to propagate updates from the resolver
+ * above the grpclb policy.
+ * - \a args: other args inherited from the grpclb policy. */
+static grpc_channel_args *build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
@@ -874,53 +894,54 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
* It's the resolver's responsibility to make sure this policy is only
* instantiated and used in that case. Otherwise, something has gone wrong. */
GPR_ASSERT(num_grpclb_addrs > 0);
-
+ grpc_lb_addresses *lb_addresses =
+ grpc_lb_addresses_create(num_grpclb_addrs, NULL);
grpc_slice_hash_table_entry *targets_info_entries =
- gpr_malloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
+ gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
- /* construct a target ipvX://ip1:port1,ip2:port2,... from the addresses in \a
- * addresses */
- /* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
- size_t addr_index = 0;
-
- for (size_t i = 0; i < addresses->num_addresses; i++) {
+ size_t lb_addresses_idx = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) continue;
if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
- if (addresses->addresses[i].is_balancer) {
- char *addr_str;
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_str, &addresses->addresses[i].address, true) > 0);
- targets_info_entries[addr_index] = targets_info_entry_create(
- addr_str, addresses->addresses[i].balancer_name);
- addr_strs[addr_index++] = addr_str;
- }
+ char *addr_str;
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_str, &addresses->addresses[i].address, true) > 0);
+ targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
+ addr_str, addresses->addresses[i].balancer_name);
+ gpr_free(addr_str);
+
+ grpc_lb_addresses_set_address(
+ lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+ addresses->addresses[i].address.len, false /* is balancer */,
+ addresses->addresses[i].balancer_name, NULL /* user data */);
}
- GPR_ASSERT(addr_index == num_grpclb_addrs);
-
- size_t uri_path_len;
- char *uri_path = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs,
- ",", &uri_path_len);
- for (size_t i = 0; i < num_grpclb_addrs; i++) gpr_free(addr_strs[i]);
- gpr_free(addr_strs);
-
- char *target_uri_str = NULL;
- /* TODO(dgq): Don't assume all addresses will share the scheme of the first
- * one */
- gpr_asprintf(&target_uri_str, "%s:%s",
- grpc_sockaddr_get_uri_scheme(&addresses->addresses[0].address),
- uri_path);
- gpr_free(uri_path);
-
- *targets_info = grpc_slice_hash_table_create(
- num_grpclb_addrs, targets_info_entries, destroy_balancer_name);
+ GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+ grpc_slice_hash_table *targets_info =
+ grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
+ destroy_balancer_name, balancer_name_cmp_fn);
gpr_free(targets_info_entries);
- return target_uri_str;
+ grpc_channel_args *lb_channel_args =
+ grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
+ response_generator, args);
+
+ grpc_arg lb_channel_addresses_arg =
+ grpc_lb_addresses_create_channel_arg(lb_addresses);
+
+ grpc_channel_args *result = grpc_channel_args_copy_and_add(
+ lb_channel_args, &lb_channel_addresses_arg, 1);
+ grpc_slice_hash_table_unref(exec_ctx, targets_info);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
+ return result;
}
+static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error);
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
@@ -976,24 +997,31 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
- grpc_slice_hash_table *targets_info = NULL;
/* Create a client channel over them to communicate with a LB service */
- char *lb_service_target_addresses =
- get_lb_uri_target_addresses(exec_ctx, addresses, &targets_info);
- grpc_channel_args *lb_channel_args =
- get_lb_channel_args(exec_ctx, targets_info, args->args);
+ glb_policy->response_generator =
+ grpc_fake_resolver_response_generator_create();
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ char *uri_str;
+ gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- exec_ctx, lb_service_target_addresses, args->client_channel_factory,
- lb_channel_args);
- grpc_slice_hash_table_unref(exec_ctx, targets_info);
+ exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
+
+ /* Propagate initial resolution */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
- gpr_free(lb_service_target_addresses);
+ gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
+
+ grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed,
+ glb_lb_channel_on_connectivity_changed_cb, glb_policy,
+ grpc_combiner_scheduler(args->combiner, false));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
@@ -1009,12 +1037,15 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
- grpc_channel_destroy(glb_policy->lb_channel);
- glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
+ grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
+ if (glb_policy->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
+ gpr_free(glb_policy->pending_update_args);
+ }
gpr_free(glb_policy);
}
@@ -1022,16 +1053,6 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
glb_policy->shutting_down = true;
- pending_pick *pp = glb_policy->pending_picks;
- glb_policy->pending_picks = NULL;
- pending_ping *pping = glb_policy->pending_pings;
- glb_policy->pending_pings = NULL;
- if (glb_policy->rr_policy) {
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
- }
- grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
/* We need a copy of the lb_call pointer because we can't cancell the call
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
@@ -1045,6 +1066,30 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_call_cancel(lb_call, NULL);
/* lb_on_server_status_received will pick up the cancel and clean up */
}
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ glb_policy->retry_timer_active = false;
+ }
+
+ pending_pick *pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = NULL;
+ pending_ping *pping = glb_policy->pending_pings;
+ glb_policy->pending_pings = NULL;
+ if (glb_policy->rr_policy) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ }
+ // We destroy the LB channel here because
+ // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
+ // instance. Destroying the lb channel in glb_destroy would likely result in
+ // a callback invocation without a valid glb_policy arg.
+ if (glb_policy->lb_channel != NULL) {
+ grpc_channel_destroy(glb_policy->lb_channel);
+ glb_policy->lb_channel = NULL;
+ }
+ grpc_connectivity_state_set(
+ exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
+
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
@@ -1318,6 +1363,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
+ GPR_ASSERT(glb_policy->lb_call == NULL);
GPR_ASSERT(!glb_policy->shutting_down);
/* Note the following LB call progresses every time there's activity in \a
@@ -1403,8 +1449,10 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
lb_call_init_locked(exec_ctx, glb_policy);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
- (void *)glb_policy, (void *)glb_policy->lb_call);
+ gpr_log(GPR_INFO,
+ "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
+ (void *)glb_policy, (void *)glb_policy->lb_channel,
+ (void *)glb_policy->lb_call);
}
GPR_ASSERT(glb_policy->lb_call != NULL);
@@ -1608,8 +1656,8 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = arg;
-
- if (!glb_policy->shutting_down) {
+ glb_policy->retry_timer_active = false;
+ if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
(void *)glb_policy);
@@ -1617,31 +1665,32 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(glb_policy->lb_call == NULL);
query_for_backends_locked(exec_ctx, glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "grpclb_on_retry_timer");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
}
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = arg;
-
GPR_ASSERT(glb_policy->lb_call != NULL);
-
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
char *status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details);
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"Status from LB server received. Status = %d, Details = '%s', "
- "(call: %p)",
+ "(call: %p), error %p",
glb_policy->lb_call_status, status_details,
- (void *)glb_policy->lb_call);
+ (void *)glb_policy->lb_call, (void *)error);
gpr_free(status_details);
}
-
/* We need to perform cleanups no matter what. */
lb_call_destroy_locked(exec_ctx, glb_policy);
-
- if (!glb_policy->shutting_down) {
+ if (glb_policy->started_picking && glb_policy->updating_lb_call) {
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ }
+ if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
+ glb_policy->updating_lb_call = false;
+ } else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try =
@@ -1651,16 +1700,18 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
(void *)glb_policy);
gpr_timespec timeout = gpr_time_sub(next_try, now);
if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
- gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.",
+ gpr_log(GPR_DEBUG,
+ "... retry_timer_active in %" PRId64 ".%09d seconds.",
timeout.tv_sec, timeout.tv_nsec);
} else {
- gpr_log(GPR_DEBUG, "... retrying immediately.");
+ gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
grpc_closure_init(
&glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ glb_policy->retry_timer_active = true;
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry, now);
}
@@ -1668,6 +1719,138 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
"lb_on_server_status_received");
}
+static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
+
+ if (glb_policy->updating_lb_channel) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO,
+ "Update already in progress for grpclb %p. Deferring update.",
+ (void *)glb_policy);
+ }
+ if (glb_policy->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx,
+ glb_policy->pending_update_args->args);
+ gpr_free(glb_policy->pending_update_args);
+ }
+ glb_policy->pending_update_args =
+ gpr_zalloc(sizeof(*glb_policy->pending_update_args));
+ glb_policy->pending_update_args->client_channel_factory =
+ args->client_channel_factory;
+ glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
+ glb_policy->pending_update_args->combiner = args->combiner;
+ return;
+ }
+
+ glb_policy->updating_lb_channel = true;
+ // Propagate update to lb_channel (pick first).
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ if (glb_policy->lb_channel == NULL) {
+ // If we don't have a current channel to the LB, go into TRANSIENT
+ // FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "glb_update_missing");
+ } else {
+ // otherwise, keep using the current LB channel (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for grpclb %p update, "
+ "ignoring.",
+ (void *)glb_policy);
+ }
+ }
+ const grpc_lb_addresses *addresses = arg->value.pointer.p;
+ GPR_ASSERT(glb_policy->lb_channel != NULL);
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ /* Propagate updates to the LB channel through the fake resolver */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+
+ if (!glb_policy->watching_lb_channel) {
+ // Watch the LB channel connectivity for connection.
+ glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(glb_policy->lb_channel));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ glb_policy->watching_lb_channel = true;
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
+ &glb_policy->lb_channel_connectivity,
+ &glb_policy->lb_channel_on_connectivity_changed, NULL);
+ }
+}
+
+// Invoked as part of the update process. It continues watching the LB channel
+// until it shuts down or becomes READY. It's invoked even if the LB channel
+// stayed READY throughout the update (for example if the update is identical).
+static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
+ glb_lb_policy *glb_policy = arg;
+ if (glb_policy->shutting_down) goto done;
+ // Re-initialize the lb_call. This should also take care of updating the
+ // embedded RR policy. Note that the current RR policy, if any, will stay in
+ // effect until an update from the new lb_call is received.
+ switch (glb_policy->lb_channel_connectivity) {
+ case GRPC_CHANNEL_INIT:
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ /* resub. */
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(glb_policy->lb_channel));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
+ &glb_policy->lb_channel_connectivity,
+ &glb_policy->lb_channel_on_connectivity_changed, NULL);
+ break;
+ }
+ case GRPC_CHANNEL_IDLE:
+ // lb channel inactive (probably shutdown prior to update). Restart lb
+ // call to kick the lb channel into gear.
+ GPR_ASSERT(glb_policy->lb_call == NULL);
+ /* fallthrough */
+ case GRPC_CHANNEL_READY:
+ if (glb_policy->lb_call != NULL) {
+ glb_policy->updating_lb_channel = false;
+ glb_policy->updating_lb_call = true;
+ grpc_call_cancel(glb_policy->lb_call, NULL);
+ // lb_on_server_status_received will pick up the cancel and reinit
+ // lb_call.
+ if (glb_policy->pending_update_args != NULL) {
+ const grpc_lb_policy_args *args = glb_policy->pending_update_args;
+ glb_policy->pending_update_args = NULL;
+ glb_update_locked(exec_ctx, &glb_policy->base, args);
+ }
+ } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ glb_policy->retry_timer_active = false;
+ }
+ start_picking_locked(exec_ctx, glb_policy);
+ }
+ /* fallthrough */
+ case GRPC_CHANNEL_SHUTDOWN:
+ done:
+ glb_policy->watching_lb_channel = false;
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ "watch_lb_channel_connectivity_cb_shutdown");
+ break;
+ }
+}
+
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1678,7 +1861,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_ping_one_locked,
glb_exit_idle_locked,
glb_check_connectivity_locked,
- glb_notify_on_state_change_locked};
+ glb_notify_on_state_change_locked,
+ glb_update_locked};
static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
index d6201f2387..a190c2075d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
@@ -50,28 +50,37 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
return lb_channel;
}
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args) {
- /* We strip out the channel arg for the LB policy name, since we want
- * to use the default (pick_first) in this case.
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
+ const grpc_arg to_add[] = {
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+ /* We remove:
*
- * We also strip out the channel arg for the resolved addresses, since
- * that will be generated by the name resolver used in the LB channel.
- * Note that the LB channel will use the sockaddr resolver, so this
- * won't actually generate a query to DNS (or some other name service).
- * However, the addresses returned by the sockaddr resolver will have
- * is_balancer=false, whereas our own addresses have is_balancer=true.
- * We need the LB channel to return addresses with is_balancer=false
- * so that it does not wind up recursively using the grpclb LB policy,
- * as per the special case logic in client_channel.c.
+ * - The channel arg for the LB policy name, since we want to use the default
+ * (pick_first) in this case.
*
- * Lastly, we also strip out the channel arg for the server URI,
- * since that will be different for the LB channel than for the parent
- * channel (the client channel factory will re-add this arg with
- * the right value). */
+ * - The channel arg for the resolved addresses, since that will be generated
+ * by the name resolver used in the LB channel. Note that the LB channel
+ * will use the fake resolver, so this won't actually generate a query
+ * to DNS (or some other name service). However, the addresses returned by
+ * the fake resolver will have is_balancer=false, whereas our own
+ * addresses have is_balancer=true. We need the LB channel to return
+ * addresses with is_balancer=false so that it does not wind up recursively
+ * using the grpclb LB policy, as per the special case logic in
+ * client_channel.c.
+ *
+ * - The channel arg for the server URI, since that will be different for the
+ * LB channel than for the parent channel (the client channel factory will
+ * re-add this arg with the right value).
+ *
+ * - The fake resolver generator, because we are replacing it with the one
+ * from the grpclb policy, used to propagate updates to the LB channel. */
static const char *keys_to_remove[] = {
- GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
- return grpc_channel_args_copy_and_remove(args, keys_to_remove,
- GPR_ARRAY_SIZE(keys_to_remove));
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
+ GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+ return grpc_channel_args_copy_and_add_and_remove(
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
+ GPR_ARRAY_SIZE(to_add));
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
index 9730c971d9..dfb682ad16 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/slice/slice_hash_table.h"
/** Create the channel used for communicating with an LB service.
@@ -49,9 +50,10 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
grpc_client_channel_factory *client_channel_factory,
grpc_channel_args *args);
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args);
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
index a145cba63c..21effd75e3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
@@ -76,32 +76,39 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
return lb_channel;
}
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args) {
- const grpc_arg targets_info_arg =
- grpc_lb_targets_info_create_channel_arg(targets_info);
- /* We strip out the channel arg for the LB policy name, since we want
- * to use the default (pick_first) in this case.
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
+ const grpc_arg to_add[] = {
+ grpc_lb_targets_info_create_channel_arg(targets_info),
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+ /* We remove:
*
- * We also strip out the channel arg for the resolved addresses, since
- * that will be generated by the name resolver used in the LB channel.
- * Note that the LB channel will use the sockaddr resolver, so this
- * won't actually generate a query to DNS (or some other name service).
- * However, the addresses returned by the sockaddr resolver will have
- * is_balancer=false, whereas our own addresses have is_balancer=true.
- * We need the LB channel to return addresses with is_balancer=false
- * so that it does not wind up recursively using the grpclb LB policy,
- * as per the special case logic in client_channel.c.
+ * - The channel arg for the LB policy name, since we want to use the default
+ * (pick_first) in this case.
*
- * Lastly, we also strip out the channel arg for the server URI,
- * since that will be different for the LB channel than for the parent
- * channel (the client channel factory will re-add this arg with
- * the right value). */
+ * - The channel arg for the resolved addresses, since that will be generated
+ * by the name resolver used in the LB channel. Note that the LB channel
+ * will use the fake resolver, so this won't actually generate a query
+ * to DNS (or some other name service). However, the addresses returned by
+ * the fake resolver will have is_balancer=false, whereas our own
+ * addresses have is_balancer=true. We need the LB channel to return
+ * addresses with is_balancer=false so that it does not wind up recursively
+ * using the grpclb LB policy, as per the special case logic in
+ * client_channel.c.
+ *
+ * - The channel arg for the server URI, since that will be different for the
+ * LB channel than for the parent channel (the client channel factory will
+ * re-add this arg with the right value).
+ *
+ * - The fake resolver generator, because we are replacing it with the one
+ * from the grpclb policy, used to propagate updates to the LB channel. */
static const char *keys_to_remove[] = {
- GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
+ GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
/* Add the targets info table to be used for secure naming */
return grpc_channel_args_copy_and_add_and_remove(
- args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &targets_info_arg,
- 1);
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
+ GPR_ARRAY_SIZE(to_add));
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index b1c5dfc61c..b6f86255df 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -37,11 +37,14 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
+grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false);
+
typedef struct pending_pick {
struct pending_pick *next;
uint32_t initial_metadata_flags;
@@ -54,7 +57,9 @@ typedef struct {
grpc_lb_policy base;
/** all our subchannels */
grpc_subchannel **subchannels;
+ grpc_subchannel **new_subchannels;
size_t num_subchannels;
+ size_t num_new_subchannels;
grpc_closure connectivity_changed;
@@ -63,10 +68,19 @@ typedef struct {
/** the selected channel */
grpc_connected_subchannel *selected;
+ /** the subchannel key for \a selected, or NULL if \a selected not set */
+ const grpc_subchannel_key *selected_key;
+
/** have we started picking? */
- int started_picking;
+ bool started_picking;
/** are we shut down? */
- int shutdown;
+ bool shutdown;
+ /** are we updating the selected subchannel? */
+ bool updating_selected;
+ /** are we updating the subchannel candidates? */
+ bool updating_subchannels;
+ /** args from the latest update received while already updating, or NULL */
+ grpc_lb_policy_args *pending_update_args;
/** which subchannel are we watching? */
size_t checking_subchannel;
/** what is the connectivity of that channel? */
@@ -80,23 +94,28 @@ typedef struct {
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- size_t i;
GPR_ASSERT(p->pending_picks == NULL);
- for (i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
}
if (p->selected != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
+ "picked_first_destroy");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ if (p->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ gpr_free(p->pending_update_args);
+ }
gpr_free(p->subchannels);
+ gpr_free(p->new_subchannels);
gpr_free(p);
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- p->shutdown = 1;
+ p->shutdown = true;
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(
@@ -106,7 +125,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
- } else if (p->num_subchannels > 0) {
+ } else if (p->num_subchannels > 0 && p->started_picking) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
@@ -169,21 +188,25 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_UNREF(error);
}
-static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
- p->started_picking = 1;
- p->checking_subchannel = 0;
- p->checking_connectivity = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+static void start_picking_locked(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
+ p->started_picking = true;
+ if (p->subchannels != NULL) {
+ GPR_ASSERT(p->num_subchannels > 0);
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
}
static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
}
@@ -203,7 +226,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* No subchannel selected yet, so try again */
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -216,30 +239,290 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
- size_t i;
size_t num_subchannels = p->num_subchannels;
- grpc_subchannel **subchannels;
+ grpc_subchannel **subchannels = p->subchannels;
- subchannels = p->subchannels;
p->num_subchannels = 0;
p->subchannels = NULL;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
- for (i = 0; i < num_subchannels; i++) {
+ for (size_t i = 0; i < num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
}
-
gpr_free(subchannels);
}
+static grpc_connectivity_state pf_check_connectivity_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ return grpc_connectivity_state_get(&p->state_tracker, error);
+}
+
+static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
+ current, notify);
+}
+
+static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ if (p->selected) {
+ grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
+ } else {
+ grpc_closure_sched(exec_ctx, closure,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
+ }
+}
+
+/* unsubscribe all subchannels */
+static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
+ if (p->num_subchannels > 0) {
+ GPR_ASSERT(p->selected == NULL);
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
+ &p->connectivity_changed);
+ p->updating_subchannels = true;
+ } else if (p->selected != NULL) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+ p->updating_selected = true;
+ }
+}
+
+/* true upon success */
+static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ if (p->subchannels == NULL) {
+ // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "pf_update_missing");
+ } else {
+ // otherwise, keep using the current subchannel list (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for Pick First %p update, "
+ "ignoring.",
+ (void *)p);
+ }
+ return;
+ }
+ const grpc_lb_addresses *addresses = arg->value.pointer.p;
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (!addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) {
+ // Empty update. Unsubscribe from all current subchannels and put the
+ // channel in TRANSIENT_FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "pf_update_empty");
+ stop_connectivity_watchers(exec_ctx, p);
+ return;
+ }
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
+ (void *)p, (unsigned long)num_addrs);
+ }
+ grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs);
+ /* We remove the following keys in order for subchannel keys belonging to
+ * subchannels point to the same address to match. */
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ size_t sc_args_count = 0;
+
+ /* Create list of subchannel args for new addresses in \a args. */
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (addresses->addresses[i].is_balancer) continue;
+ if (addresses->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
+ args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
+ 1);
+ gpr_free(addr_arg.value.string);
+ sc_args[sc_args_count++].args = new_args;
+ }
+
+ /* Check if p->selected is amongst them. If so, we are done. */
+ if (p->selected != NULL) {
+ GPR_ASSERT(p->selected_key != NULL);
+ for (size_t i = 0; i < sc_args_count; i++) {
+ grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]);
+ const bool found_selected =
+ grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
+ grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
+ if (found_selected) {
+ // The currently selected subchannel is in the update: we are done.
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p found already selected subchannel %p amongst "
+ "updates. Update done.",
+ (void *)p, (void *)p->selected);
+ }
+ for (size_t j = 0; j < sc_args_count; j++) {
+ grpc_channel_args_destroy(exec_ctx,
+ (grpc_channel_args *)sc_args[j].args);
+ }
+ gpr_free(sc_args);
+ return;
+ }
+ }
+ }
+ // We only check for already running updates here because if the previous
+ // steps were successful, the update can be considered done without any
+ // interference (ie, no callbacks were scheduled).
+ if (p->updating_selected || p->updating_subchannels) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Update already in progress for pick first %p. Deferring update.",
+ (void *)p);
+ }
+ if (p->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ gpr_free(p->pending_update_args);
+ }
+ p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args));
+ p->pending_update_args->client_channel_factory =
+ args->client_channel_factory;
+ p->pending_update_args->args = grpc_channel_args_copy(args->args);
+ p->pending_update_args->combiner = args->combiner;
+ return;
+ }
+ /* Create the subchannels for the new subchannel args/addresses. */
+ grpc_subchannel **new_subchannels =
+ gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
+ size_t num_new_subchannels = 0;
+ for (size_t i = 0; i < sc_args_count; i++) {
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args[i]);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ char *address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_INFO,
+ "Pick First %p created subchannel %p for address uri %s",
+ (void *)p, (void *)subchannel, address_uri);
+ gpr_free(address_uri);
+ }
+ grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
+ if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
+ }
+ gpr_free(sc_args);
+ if (num_new_subchannels == 0) {
+ gpr_free(new_subchannels);
+ // Empty update. Unsubscribe from all current subchannels and put the
+ // channel in TRANSIENT_FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
+ "pf_update_no_valid_addresses");
+ stop_connectivity_watchers(exec_ctx, p);
+ return;
+ }
+
+ /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */
+ stop_connectivity_watchers(exec_ctx, p);
+
+ /* Save new subchannels. The switch over will happen in
+ * pf_connectivity_changed_locked */
+ if (p->updating_selected || p->updating_subchannels) {
+ p->num_new_subchannels = num_new_subchannels;
+ p->new_subchannels = new_subchannels;
+ } else { /* nothing is updating. Get things moving from here */
+ p->num_subchannels = num_new_subchannels;
+ p->subchannels = new_subchannels;
+ p->new_subchannels = NULL;
+ p->num_new_subchannels = 0;
+ if (p->started_picking) {
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
+ }
+}
+
static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
+ bool restart = false;
+ if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
+ /* Captured the unsubscription for p->selected */
+ GPR_ASSERT(p->selected != NULL);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
+ "pf_update_connectivity");
+ p->updating_selected = false;
+ if (p->num_new_subchannels == 0) {
+ p->selected = NULL;
+ return;
+ }
+ restart = true;
+ }
+ if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
+ /* Captured the unsubscription for the checking subchannel */
+ GPR_ASSERT(p->selected == NULL);
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
+ "pf_update_connectivity");
+ }
+ gpr_free(p->subchannels);
+ p->subchannels = NULL;
+ p->num_subchannels = 0;
+ p->updating_subchannels = false;
+ if (p->num_new_subchannels == 0) return;
+ restart = true;
+ }
+ if (restart) {
+ p->selected = NULL;
+ p->selected_key = NULL;
+
+ GPR_ASSERT(p->new_subchannels != NULL);
+ GPR_ASSERT(p->num_new_subchannels > 0);
+ p->num_subchannels = p->num_new_subchannels;
+ p->subchannels = p->new_subchannels;
+ p->num_new_subchannels = 0;
+ p->new_subchannels = NULL;
+
+ if (p->started_picking) {
+ /* If we were picking, continue to do so over the new subchannels,
+ * starting from the 0th index. */
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ /* reuses the weak ref from start_picking_locked */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
+ if (p->pending_update_args != NULL) {
+ const grpc_lb_policy_args *args = p->pending_update_args;
+ p->pending_update_args = NULL;
+ pf_update_locked(exec_ctx, &p->base, args);
+ }
+ return;
+ }
GRPC_ERROR_REF(error);
-
if (p->shutdown) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
GRPC_ERROR_UNREF(error);
@@ -272,6 +555,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected_subchannel),
"picked_first");
+
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
+ }
+ p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
destroy_subchannels_locked(exec_ctx, p);
@@ -279,6 +567,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Servicing pending pick with selected subchannel %p",
+ (void *)p->selected);
+ }
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
@@ -353,32 +646,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_UNREF(error);
}
-static grpc_connectivity_state pf_check_connectivity_locked(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- return grpc_connectivity_state_get(&p->state_tracker, error);
-}
-
-static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
- current, notify);
-}
-
-static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- if (p->selected) {
- grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
- } else {
- grpc_closure_sched(exec_ctx, closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- }
-}
-
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@@ -388,7 +655,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_ping_one_locked,
pf_exit_idle_locked,
pf_check_connectivity_locked,
- pf_notify_on_state_change_locked};
+ pf_notify_on_state_change_locked,
+ pf_update_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -398,59 +666,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
-
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
- const grpc_arg *arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
- }
- grpc_lb_addresses *addresses = arg->value.pointer.p;
- size_t num_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (!addresses->addresses[i].is_balancer) ++num_addrs;
- }
- if (num_addrs == 0) return NULL;
-
pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
-
- p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs);
- grpc_subchannel_args sc_args;
- size_t subchannel_idx = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- /* Skip balancer addresses, since we only know how to handle backends. */
- if (addresses->addresses[i].is_balancer) continue;
-
- if (addresses->addresses[i].user_data != NULL) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
- }
-
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
- 1);
- gpr_free(addr_arg.value.string);
- sc_args.args = new_args;
- grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
- exec_ctx, args->client_channel_factory, &sc_args);
- grpc_channel_args_destroy(exec_ctx, new_args);
-
- if (subchannel != NULL) {
- p->subchannels[subchannel_idx++] = subchannel;
- }
- }
- if (subchannel_idx == 0) {
- gpr_free(p->subchannels);
- gpr_free(p);
- return NULL;
- }
- p->num_subchannels = subchannel_idx;
-
+ pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
grpc_combiner_scheduler(args->combiner, false));
@@ -472,6 +689,7 @@ static grpc_lb_policy_factory *pick_first_lb_factory_create() {
void grpc_lb_policy_pick_first_init() {
grpc_register_lb_policy(pick_first_lb_factory_create());
+ grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace);
}
void grpc_lb_policy_pick_first_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 7ee6ffb787..c9758eef88 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -33,31 +33,11 @@
/** Round Robin Policy.
*
- * This policy keeps:
- * - A circular list of ready (connected) subchannels, the *readylist*. An empty
- * readylist consists solely of its root (dummy) node.
- * - A pointer to the last element picked from the readylist, the *lastpick*.
- * Initially set to point to the readylist's root.
- *
- * Behavior:
- * - When a subchannel connects, it's *prepended* to the readylist's root node.
- * Ie, if readylist = A <-> B <-> ROOT <-> C
- * ^ ^
- * |____________________|
- * and subchannel D becomes connected, the addition of D to the readylist
- * results in readylist = A <-> B <-> D <-> ROOT <-> C
- * ^ ^
- * |__________________________|
- * - When a subchannel disconnects, it's removed from the readylist. If the
- * subchannel being removed was the most recently picked, the *lastpick*
- * pointer moves to the removed node's previous element. Note that if the
- * readylist only had one element, this is still legal, as the lastpick would
- * point to the dummy root node, for an empty readylist.
- * - Upon picking, *lastpick* is updated to point to the returned (connected)
- * subchannel. Note that it's possible that the selected subchannel becomes
- * disconnected in the interim between the selection and the actual usage of
- * the subchannel by the caller.
- */
+ * Before every pick, the \a get_next_ready_subchannel_index_locked function
+ * returns the p->subchannel_list->subchannels index for next subchannel,
+ * respecting the relative
+ * order of the addresses provided upon creation or updates. Note however that
+ * updates will start picking from the beginning of the updated list. */
#include <string.h>
@@ -72,8 +52,6 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef struct round_robin_lb_policy round_robin_lb_policy;
-
grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false);
/** List of entities waiting for a pick.
@@ -99,9 +77,37 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
+typedef struct rr_subchannel_list rr_subchannel_list;
+typedef struct round_robin_lb_policy {
+ /** base policy: must be first */
+ grpc_lb_policy base;
+
+ rr_subchannel_list *subchannel_list;
+
+ /** have we started picking? */
+ bool started_picking;
+ /** are we shutting down? */
+ bool shutdown;
+ /** List of picks that are waiting on connectivity */
+ pending_pick *pending_picks;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
+
+ /** Index into subchannels for last pick. */
+ size_t last_ready_subchannel_index;
+
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ rr_subchannel_list *latest_pending_subchannel_list;
+} round_robin_lb_policy;
+
typedef struct {
- /** backpointer to owning policy */
- round_robin_lb_policy *policy;
+ /** backpointer to owning subchannel list */
+ rr_subchannel_list *subchannel_list;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
@@ -123,12 +129,9 @@ typedef struct {
const grpc_lb_user_data_vtable *user_data_vtable;
} subchannel_data;
-struct round_robin_lb_policy {
- /** base policy: must be first */
- grpc_lb_policy base;
-
- /** total number of addresses received at creation time */
- size_t num_addresses;
+struct rr_subchannel_list {
+ /** backpointer to owning policy */
+ round_robin_lb_policy *policy;
/** all our subchannels */
size_t num_subchannels;
@@ -141,67 +144,143 @@ struct round_robin_lb_policy {
/** how many subchannels are in state IDLE */
size_t num_idle;
- /** have we started picking? */
- bool started_picking;
- /** are we shutting down? */
- bool shutdown;
- /** List of picks that are waiting on connectivity */
- pending_pick *pending_picks;
-
- /** our connectivity state tracker */
- grpc_connectivity_state_tracker state_tracker;
+ /** There will be one ref for each entry in subchannels for which there is a
+ * pending connectivity state watcher callback. */
+ gpr_refcount refcount;
- // Index into subchannels for last pick.
- size_t last_ready_subchannel_index;
+ /** Is this list shutting down? This may be true due to the shutdown of the
+ * policy itself or because a newer update has arrived while this one hadn't
+ * finished processing. */
+ bool shutting_down;
};
-/** Returns the index into p->subchannels of the next subchannel in
- * READY state, or p->num_subchannels if no subchannel is READY.
+static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list) {
+ GPR_ASSERT(subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p",
+ (void *)subchannel_list->policy, (void *)subchannel_list);
+ }
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
+ "rr_subchannel_list_destroy");
+ }
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
+ }
+ gpr_free(subchannel_list->subchannels);
+ gpr_free(subchannel_list);
+}
+
+static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ gpr_ref_non_zero(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count - 1), (unsigned long)count);
+ }
+}
+
+static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ const bool done = gpr_unref(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count + 1), (unsigned long)count);
+ }
+ if (done) {
+ rr_subchannel_list_destroy(exec_ctx, subchannel_list);
+ }
+}
+
+/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
+ * watcher's callback will ultimately unref \a subchannel_list. */
+static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ GPR_ASSERT(!subchannel_list->shutting_down);
+ subchannel_list->shutting_down = true;
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
+ NULL,
+ &sd->connectivity_changed_closure);
+ }
+ }
+ rr_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+}
+
+/** Returns the index into p->subchannel_list->subchannels of the next
+ * subchannel in READY state, or p->subchannel_list->num_subchannels if no
+ * subchannel is READY.
*
* Note that this function does *not* update p->last_ready_subchannel_index.
* The caller must do that if it returns a pick. */
static size_t get_next_ready_subchannel_index_locked(
const round_robin_lb_policy *p) {
+ GPR_ASSERT(p->subchannel_list != NULL);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
- "[RR: %p] getting next ready subchannel, "
+ "[RR %p] getting next ready subchannel (out of %lu), "
"last_ready_subchannel_index=%lu",
- p, (unsigned long)p->last_ready_subchannel_index);
+ (void *)p, (unsigned long)p->subchannel_list->num_subchannels,
+ (unsigned long)p->last_ready_subchannel_index);
}
- for (size_t i = 0; i < p->num_subchannels; ++i) {
- const size_t index =
- (i + p->last_ready_subchannel_index + 1) % p->num_subchannels;
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+ const size_t index = (i + p->last_ready_subchannel_index + 1) %
+ p->subchannel_list->num_subchannels;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p,
- (unsigned long)index,
- p->subchannels[index].curr_connectivity_state);
+ gpr_log(GPR_DEBUG,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
+ "state=%d",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (void *)p->subchannel_list, (unsigned long)index,
+ p->subchannel_list->subchannels[index].curr_connectivity_state);
}
- if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
+ GRPC_CHANNEL_READY) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu",
- p, (unsigned long)index);
+ gpr_log(GPR_DEBUG,
+ "[RR %p] found next ready subchannel (%p) at index %lu of "
+ "subchannel_list %p",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (unsigned long)index, (void *)p->subchannel_list);
}
return index;
}
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p);
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p);
}
- return p->num_subchannels;
+ return p->subchannel_list->num_subchannels;
}
// Sets p->last_ready_subchannel_index to last_ready_index.
static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
size_t last_ready_index) {
- GPR_ASSERT(last_ready_index < p->num_subchannels);
+ GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels);
p->last_ready_subchannel_index = last_ready_index;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
- (void *)p, (unsigned long)last_ready_index,
- (void *)p->subchannels[last_ready_index].subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->subchannels[last_ready_index].subchannel));
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
+ (void *)p, (unsigned long)last_ready_index,
+ (void *)p->subchannel_list->subchannels[last_ready_index].subchannel,
+ (void *)grpc_subchannel_get_connected_subchannel(
+ p->subchannel_list->subchannels[last_ready_index].subchannel));
}
}
@@ -210,18 +289,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
}
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
- }
- }
- }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
- gpr_free(p->subchannels);
gpr_free(p);
}
@@ -243,14 +311,9 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
- NULL,
- &sd->connectivity_changed_closure);
- }
- }
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_shutdown");
+ p->subchannel_list = NULL;
}
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@@ -304,15 +367,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
p->started_picking = true;
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
- }
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannel_list->subchannels[i];
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
+ rr_subchannel_list_ref(sd->subchannel_list, "start_picking");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
@@ -332,63 +394,70 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
}
- const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->num_subchannels) {
- /* readily available, report right away */
- subchannel_data *sd = &p->subchannels[next_ready_index];
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked");
- if (user_data != NULL) {
- *user_data = sd->user_data;
- }
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)",
- (void *)*target, (unsigned long)next_ready_index);
- }
- /* only advance the last picked pointer if the selection was used */
- update_last_ready_subchannel_index_locked(p, next_ready_index);
- return 1;
- } else {
- /* no pick currently available. Save for later in list of pending picks */
- if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ if (p->subchannel_list != NULL) {
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ /* readily available, report right away */
+ subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index];
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(sd->subchannel),
+ "rr_picked");
+ if (user_data != NULL) {
+ *user_data = sd->user_data;
+ }
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, "
+ "INDEX %lu)",
+ (void *)p, (void *)sd->subchannel, (void *)*target,
+ (void *)sd->subchannel_list, (unsigned long)next_ready_index);
+ }
+ /* only advance the last picked pointer if the selection was used */
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
+ return 1;
}
- pending_pick *pp = gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->on_complete = on_complete;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->user_data = user_data;
- p->pending_picks = pp;
- return 0;
}
+ /* no pick currently available. Save for later in list of pending picks */
+ if (!p->started_picking) {
+ start_picking_locked(exec_ctx, p);
+ }
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
+ pp->next = p->pending_picks;
+ pp->target = target;
+ pp->on_complete = on_complete;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->user_data = user_data;
+ p->pending_picks = pp;
+ return 0;
}
static void update_state_counters_locked(subchannel_data *sd) {
- round_robin_lb_policy *p = sd->policy;
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
- GPR_ASSERT(p->num_ready > 0);
- --p->num_ready;
+ GPR_ASSERT(subchannel_list->num_ready > 0);
+ --subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GPR_ASSERT(p->num_transient_failures > 0);
- --p->num_transient_failures;
+ GPR_ASSERT(subchannel_list->num_transient_failures > 0);
+ --subchannel_list->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
- GPR_ASSERT(p->num_idle > 0);
- --p->num_idle;
+ GPR_ASSERT(subchannel_list->num_idle > 0);
+ --subchannel_list->num_idle;
}
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
- ++p->num_ready;
+ ++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- ++p->num_transient_failures;
+ ++subchannel_list->num_transient_failures;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
- ++p->num_idle;
+ ++subchannel_list->num_idle;
}
}
-/* sd is the subchannel_data associted with the updated subchannel.
- * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
- * or SHUTDOWN */
+/** Sets the policy's connectivity status based on that of the passed-in \a sd
+ * (the subchannel_data associted with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
+ * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
+ * connectivity status set. */
static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
@@ -401,17 +470,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->num_subchannels = 0.
+ * CHECK: p->subchannel_list->num_subchannels = 0.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
- * CHECK: p->num_transient_failures == p->num_subchannels.
+ * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
*
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: p->num_idle == p->num_subchannels.
+ * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
- round_robin_lb_policy *p = sd->policy;
- if (p->num_ready > 0) { /* 1) READY */
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
+ round_robin_lb_policy *p = subchannel_list->policy;
+ if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY;
@@ -421,18 +491,19 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
return GRPC_CHANNEL_CONNECTING;
- } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */
+ } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
return GRPC_CHANNEL_SHUTDOWN;
- } else if (p->num_transient_failures ==
- p->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ } else if (subchannel_list->num_transient_failures ==
+ p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
return GRPC_CHANNEL_TRANSIENT_FAILURE;
- } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */
+ } else if (subchannel_list->num_idle ==
+ p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
return GRPC_CHANNEL_IDLE;
@@ -444,7 +515,28 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
- round_robin_lb_policy *p = sd->policy;
+ round_robin_lb_policy *p = sd->subchannel_list->policy;
+ // If the policy is shutting down, unref and return.
+ if (p->shutdown) {
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
+ return;
+ }
+ if (sd->subchannel_list->shutting_down) {
+ // the subchannel list associated with sd has been discarded. This callback
+ // corresponds to the unsubscription.
+ GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
+ return;
+ }
+ // Dispose of outdated subchannel lists.
+ if (sd->subchannel_list != p->subchannel_list &&
+ sd->subchannel_list != p->latest_pending_subchannel_list) {
+ // sd belongs to an outdated subchannel_list: get rid of it.
+ rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated");
+ return;
+ }
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
@@ -453,21 +545,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p: "
"prev_state=%d new_state=%d",
- p, sd->subchannel, sd->prev_connectivity_state,
+ (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
sd->curr_connectivity_state);
}
- // If we're shutting down, unref and return.
- if (p->shutdown) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- return;
- }
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
- grpc_connectivity_state new_connectivity_state =
+ const grpc_connectivity_state new_policy_connectivity_state =
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
- // If the new state is SHUTDOWN, unref the subchannel, and if the new
- // overall state is SHUTDOWN, clean up.
+ // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
+ // policy's state is SHUTDOWN, clean up.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
sd->subchannel = NULL;
@@ -475,7 +562,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
}
- if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
pending_pick *pp;
while ((pp = p->pending_picks)) {
@@ -486,15 +573,42 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
}
/* unref the "rr_connectivity" weak ref from start_picking */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- } else {
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
+ "rr_connectivity_sd_shutdown");
+ } else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (sd->subchannel_list != p->subchannel_list) {
+ // promote sd->subchannel_list to p->subchannel_list.
+ // sd->subchannel_list must be equal to
+ // p->latest_pending_subchannel_list because we have already filtered
+ // for sds belonging to outdated subchannel lists.
+ GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(!sd->subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] phasing out subchannel list %p (size %lu) in favor "
+ "of %p (size %lu)",
+ (void *)p, (void *)p->subchannel_list,
+ (unsigned long)p->subchannel_list->num_subchannels,
+ (void *)sd->subchannel_list,
+ (unsigned long)sd->subchannel_list->num_subchannels);
+ }
+ if (p->subchannel_list != NULL) {
+ // dispose of the current subchannel_list
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update_connectivity");
+ }
+ p->subchannel_list = sd->subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
+ }
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- GPR_ASSERT(next_ready_index < p->num_subchannels);
- subchannel_data *selected = &p->subchannels[next_ready_index];
+ GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
@@ -519,7 +633,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(pp);
}
}
- /* renew notification: reuses the "rr_connectivity" weak ref */
+ /* renew notification: reuses the "rr_connectivity" weak ref on the policy
+ * as well as the sd->subchannel_list ref. */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
@@ -546,8 +661,9 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->num_subchannels) {
- subchannel_data *selected = &p->subchannels[next_ready_index];
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
@@ -559,52 +675,68 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
-static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown_locked,
- rr_pick_locked,
- rr_cancel_pick_locked,
- rr_cancel_picks_locked,
- rr_ping_one_locked,
- rr_exit_idle_locked,
- rr_check_connectivity_locked,
- rr_notify_on_state_change_locked};
-
-static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
-
-static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
-
-static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
- grpc_lb_policy_args *args) {
- GPR_ASSERT(args->client_channel_factory != NULL);
-
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
+static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)policy;
+ /* Find the number of backend addresses. We ignore balancer addresses, since
+ * we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
+ if (p->subchannel_list == NULL) {
+ // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "rr_update_missing");
+ } else {
+ // otherwise, keep using the current subchannel list (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for Round Robin %p update, "
+ "ignoring.",
+ (void *)p);
+ }
+ return;
}
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
if (!addresses->addresses[i].is_balancer) ++num_addrs;
}
- if (num_addrs == 0) return NULL;
-
- round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
-
- p->num_addresses = num_addrs;
- p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
-
- grpc_subchannel_args sc_args;
+ if (num_addrs == 0) {
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "rr_update_empty");
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update");
+ p->subchannel_list = NULL;
+ }
+ return;
+ }
size_t subchannel_index = 0;
+ rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list));
+ subchannel_list->policy = p;
+ subchannel_list->subchannels =
+ gpr_zalloc(sizeof(subchannel_data) * num_addrs);
+ subchannel_list->num_subchannels = num_addrs;
+ gpr_ref_init(&subchannel_list->refcount, 1);
+ p->latest_pending_subchannel_list = subchannel_list;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
+ (void *)subchannel_list, (unsigned long)num_addrs);
+ }
+ grpc_subchannel_args sc_args;
+ /* We need to remove the LB addresses in order to be able to compare the
+ * subchannel keys of subchannels from a different batch of addresses. */
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ /* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue;
-
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ GPR_ASSERT(i < num_addrs);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
@@ -618,52 +750,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s",
- (unsigned long)subchannel_index, (void *)subchannel, address_uri);
+ gpr_log(GPR_DEBUG,
+ "index %lu: Created subchannel %p for address uri %s into "
+ "subchannel_list %p",
+ (unsigned long)subchannel_index, (void *)subchannel, address_uri,
+ (void *)subchannel_list);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
- if (subchannel != NULL) {
- subchannel_data *sd = &p->subchannels[subchannel_index];
- sd->policy = p;
- sd->subchannel = subchannel;
- /* use some sentinel value outside of the range of grpc_connectivity_state
- * to signal an undefined previous state. We won't be referring to this
- * value again and it'll be overwritten after the first call to
- * rr_connectivity_changed */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != NULL) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
- }
- grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed_locked, sd,
- grpc_combiner_scheduler(args->combiner, false));
- ++subchannel_index;
+ subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
+ sd->subchannel_list = subchannel_list;
+ sd->subchannel = subchannel;
+ grpc_closure_init(&sd->connectivity_changed_closure,
+ rr_connectivity_changed_locked, sd,
+ grpc_combiner_scheduler(args->combiner, false));
+ /* use some sentinel value outside of the range of
+ * grpc_connectivity_state to signal an undefined previous state. We
+ * won't be referring to this value again and it'll be overwritten after
+ * the first call to rr_connectivity_changed_locked */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ sd->user_data_vtable = addresses->user_data_vtable;
+ if (sd->user_data_vtable != NULL) {
+ sd->user_data =
+ sd->user_data_vtable->copy(addresses->addresses[i].user_data);
+ }
+ if (p->started_picking) {
+ rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking");
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update");
+ /* 2. Watch every new subchannel. A subchannel list becomes active the
+ * moment one of its subchannels is READY. At that moment, we swap
+ * p->subchannel_list for sd->subchannel_list, provided the subchannel
+ * list is still valid (ie, isn't shutting down) */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
- if (subchannel_index == 0) {
- /* couldn't create any subchannel. Bail out */
- gpr_free(p->subchannels);
- gpr_free(p);
- return NULL;
+ if (!p->started_picking) {
+ // The policy isn't picking yet. Save the update for later, disposing of
+ // previous version if any.
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "rr_update_before_started_picking");
+ }
+ p->subchannel_list = subchannel_list;
}
- p->num_subchannels = subchannel_index;
+}
+
+static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
+ rr_destroy,
+ rr_shutdown_locked,
+ rr_pick_locked,
+ rr_cancel_pick_locked,
+ rr_cancel_picks_locked,
+ rr_ping_one_locked,
+ rr_exit_idle_locked,
+ rr_check_connectivity_locked,
+ rr_notify_on_state_change_locked,
+ rr_update_locked};
+
+static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
- // Initialize the last pick index to the last subchannel, so that the
- // first pick will start at the beginning of the list.
- p->last_ready_subchannel_index = subchannel_index - 1;
+static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
+static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args) {
+ GPR_ASSERT(args->client_channel_factory != NULL);
+ round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
+ rr_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
- (void *)p, (unsigned long)p->num_subchannels);
+ gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p,
+ (unsigned long)p->subchannel_list->num_subchannels);
}
return &p->base;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 9d6c0fc139..5c6464bc87 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -118,11 +118,11 @@ grpc_lb_addresses *grpc_lb_addresses_find_channel_arg(
const grpc_channel_args *channel_args);
/** Arguments passed to LB policies. */
-typedef struct grpc_lb_policy_args {
+struct grpc_lb_policy_args {
grpc_client_channel_factory *client_channel_factory;
grpc_channel_args *args;
grpc_combiner *combiner;
-} grpc_lb_policy_args;
+};
struct grpc_lb_policy_factory_vtable {
void (*ref)(grpc_lb_policy_factory *factory);
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
new file mode 100644
index 0000000000..cb9b1f07c2
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -0,0 +1,255 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+// This is similar to the sockaddr resolver, except that it supports a
+// bunch of query args that are useful for dependency injection in tests.
+
+#include <limits.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/unix_sockets_posix.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/string.h"
+
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+
+//
+// fake_resolver
+//
+
+typedef struct {
+ // base class -- must be first
+ grpc_resolver base;
+
+ // passed-in parameters
+ grpc_channel_args* channel_args;
+
+ // If not NULL, the next set of resolution results to be returned to
+ // grpc_resolver_next_locked()'s closure.
+ grpc_channel_args* next_results;
+
+ // pending next completion, or NULL
+ grpc_closure* next_completion;
+ // target result address for next completion
+ grpc_channel_args** target_result;
+} fake_resolver;
+
+static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
+ fake_resolver* r = (fake_resolver*)gr;
+ grpc_channel_args_destroy(exec_ctx, r->next_results);
+ grpc_channel_args_destroy(exec_ctx, r->channel_args);
+ gpr_free(r);
+}
+
+static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx,
+ grpc_resolver* resolver) {
+ fake_resolver* r = (fake_resolver*)resolver;
+ if (r->next_completion != NULL) {
+ *r->target_result = NULL;
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
+ r->next_completion = NULL;
+ }
+}
+
+static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
+ fake_resolver* r) {
+ if (r->next_completion != NULL && r->next_results != NULL) {
+ *r->target_result =
+ grpc_channel_args_union(r->next_results, r->channel_args);
+ grpc_channel_args_destroy(exec_ctx, r->next_results);
+ grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
+ r->next_completion = NULL;
+ r->next_results = NULL;
+ }
+}
+
+static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
+ grpc_resolver* resolver) {
+ fake_resolver* r = (fake_resolver*)resolver;
+ fake_resolver_maybe_finish_next_locked(exec_ctx, r);
+}
+
+static void fake_resolver_next_locked(grpc_exec_ctx* exec_ctx,
+ grpc_resolver* resolver,
+ grpc_channel_args** target_result,
+ grpc_closure* on_complete) {
+ fake_resolver* r = (fake_resolver*)resolver;
+ GPR_ASSERT(!r->next_completion);
+ r->next_completion = on_complete;
+ r->target_result = target_result;
+ fake_resolver_maybe_finish_next_locked(exec_ctx, r);
+}
+
+static const grpc_resolver_vtable fake_resolver_vtable = {
+ fake_resolver_destroy, fake_resolver_shutdown_locked,
+ fake_resolver_channel_saw_error_locked, fake_resolver_next_locked};
+
+struct grpc_fake_resolver_response_generator {
+ fake_resolver* resolver; // Set by the fake_resolver constructor to itself.
+ grpc_channel_args* next_response;
+ gpr_refcount refcount;
+};
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_create() {
+ grpc_fake_resolver_response_generator* generator =
+ (grpc_fake_resolver_response_generator*)gpr_zalloc(sizeof(*generator));
+ gpr_ref_init(&generator->refcount, 1);
+ return generator;
+}
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_ref(
+ grpc_fake_resolver_response_generator* generator) {
+ gpr_ref(&generator->refcount);
+ return generator;
+}
+
+void grpc_fake_resolver_response_generator_unref(
+ grpc_fake_resolver_response_generator* generator) {
+ if (gpr_unref(&generator->refcount)) {
+ gpr_free(generator);
+ }
+}
+
+static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_fake_resolver_response_generator* generator =
+ (grpc_fake_resolver_response_generator*)arg;
+ fake_resolver* r = generator->resolver;
+ if (r->next_results != NULL) {
+ grpc_channel_args_destroy(exec_ctx, r->next_results);
+ }
+ r->next_results = generator->next_response;
+ fake_resolver_maybe_finish_next_locked(exec_ctx, r);
+}
+
+void grpc_fake_resolver_response_generator_set_response(
+ grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
+ grpc_channel_args* next_response) {
+ GPR_ASSERT(generator->resolver != NULL);
+ generator->next_response = grpc_channel_args_copy(next_response);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(
+ set_response_cb, generator,
+ grpc_combiner_scheduler(generator->resolver->base.combiner, false)),
+ GRPC_ERROR_NONE);
+}
+
+static void* response_generator_arg_copy(void* p) {
+ return grpc_fake_resolver_response_generator_ref(
+ (grpc_fake_resolver_response_generator*)p);
+}
+
+static void response_generator_arg_destroy(grpc_exec_ctx* exec_ctx, void* p) {
+ grpc_fake_resolver_response_generator_unref(
+ (grpc_fake_resolver_response_generator*)p);
+}
+
+static int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
+
+static const grpc_arg_pointer_vtable response_generator_arg_vtable = {
+ response_generator_arg_copy, response_generator_arg_destroy,
+ response_generator_cmp};
+
+grpc_arg grpc_fake_resolver_response_generator_arg(
+ grpc_fake_resolver_response_generator* generator) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_POINTER;
+ arg.key = GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR;
+ arg.value.pointer.p = generator;
+ arg.value.pointer.vtable = &response_generator_arg_vtable;
+ return arg;
+}
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_get_response_generator(const grpc_channel_args* args) {
+ const grpc_arg* arg =
+ grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) return NULL;
+ return (grpc_fake_resolver_response_generator*)arg->value.pointer.p;
+}
+
+//
+// fake_resolver_factory
+//
+
+static void fake_resolver_factory_ref(grpc_resolver_factory* factory) {}
+
+static void fake_resolver_factory_unref(grpc_resolver_factory* factory) {}
+
+static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx,
+ grpc_resolver_factory* factory,
+ grpc_resolver_args* args) {
+ fake_resolver* r = (fake_resolver*)gpr_zalloc(sizeof(*r));
+ r->channel_args = grpc_channel_args_copy(args->args);
+ grpc_resolver_init(&r->base, &fake_resolver_vtable, args->combiner);
+ grpc_fake_resolver_response_generator* response_generator =
+ grpc_fake_resolver_get_response_generator(args->args);
+ if (response_generator != NULL) response_generator->resolver = r;
+ return &r->base;
+}
+
+static char* fake_resolver_get_default_authority(grpc_resolver_factory* factory,
+ grpc_uri* uri) {
+ const char* path = uri->path;
+ if (path[0] == '/') ++path;
+ return gpr_strdup(path);
+}
+
+static const grpc_resolver_factory_vtable fake_resolver_factory_vtable = {
+ fake_resolver_factory_ref, fake_resolver_factory_unref,
+ fake_resolver_create, fake_resolver_get_default_authority, "fake"};
+
+static grpc_resolver_factory fake_resolver_factory = {
+ &fake_resolver_factory_vtable};
+
+void grpc_resolver_fake_init(void) {
+ grpc_register_resolver_type(&fake_resolver_factory);
+}
+
+void grpc_resolver_fake_shutdown(void) {}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
new file mode 100644
index 0000000000..373509b97f
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -0,0 +1,75 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H
+
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
+#include "src/core/lib/channel/channel_args.h"
+
+#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \
+ "grpc.fake_resolver.response_generator"
+
+void grpc_resolver_fake_init();
+
+// Instances of \a grpc_fake_resolver_response_generator are passed to the
+// fake resolver in a channel argument (see \a
+// grpc_fake_resolver_response_generator_arg) in order to inject and trigger
+// custom resolutions. See also \a
+// grpc_fake_resolver_response_generator_set_response.
+typedef struct grpc_fake_resolver_response_generator
+ grpc_fake_resolver_response_generator;
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_create();
+
+// Instruct the fake resolver associated with the \a response_generator instance
+// to trigger a new resolution for \a uri and \a args.
+void grpc_fake_resolver_response_generator_set_response(
+ grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
+ grpc_channel_args* next_response);
+
+// Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance.
+grpc_arg grpc_fake_resolver_response_generator_arg(
+ grpc_fake_resolver_response_generator* generator);
+// Return the \a grpc_fake_resolver_response_generator instance in \a args or
+// NULL.
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_get_response_generator(const grpc_channel_args* args);
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_ref(
+ grpc_fake_resolver_response_generator* generator);
+void grpc_fake_resolver_response_generator_unref(
+ grpc_fake_resolver_response_generator* generator);
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H \
+ */
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index dd14bf1d02..1007916b40 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -307,7 +307,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
const grpc_subchannel_args *args) {
- grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args);
+ grpc_subchannel_key *key = grpc_subchannel_key_create(args);
grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
if (c) {
grpc_subchannel_key_destroy(exec_ctx, key);
@@ -770,6 +770,11 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
+const grpc_subchannel_key *grpc_subchannel_get_key(
+ const grpc_subchannel *subchannel) {
+ return subchannel->key;
+}
+
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
const grpc_connected_subchannel_call_args *args,
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index e433c33e40..e284001fa2 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -50,6 +50,7 @@ typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
+typedef struct grpc_subchannel_key grpc_subchannel_key;
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \
@@ -155,6 +156,10 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_subchannel *subchannel);
+/** return the subchannel index key for \a subchannel */
+const grpc_subchannel_key *grpc_subchannel_get_key(
+ const grpc_subchannel *subchannel);
+
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c
index b25dbfcf51..2a707353c0 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.c
+++ b/src/core/ext/filters/client_channel/subchannel_index.c
@@ -50,7 +50,6 @@ static gpr_avl g_subchannel_index;
static gpr_mu g_mu;
struct grpc_subchannel_key {
- grpc_connector *connector;
grpc_subchannel_args args;
};
@@ -73,10 +72,9 @@ static grpc_exec_ctx *current_ctx() {
}
static grpc_subchannel_key *create_key(
- grpc_connector *connector, const grpc_subchannel_args *args,
+ const grpc_subchannel_args *args,
grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) {
grpc_subchannel_key *k = gpr_malloc(sizeof(*k));
- k->connector = grpc_connector_ref(connector);
k->args.filter_count = args->filter_count;
if (k->args.filter_count > 0) {
k->args.filters =
@@ -91,19 +89,17 @@ static grpc_subchannel_key *create_key(
}
grpc_subchannel_key *grpc_subchannel_key_create(
- grpc_connector *connector, const grpc_subchannel_args *args) {
- return create_key(connector, args, grpc_channel_args_normalize);
+ const grpc_subchannel_args *args) {
+ return create_key(args, grpc_channel_args_normalize);
}
static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) {
- return create_key(k->connector, &k->args, grpc_channel_args_copy);
+ return create_key(&k->args, grpc_channel_args_copy);
}
-static int subchannel_key_compare(grpc_subchannel_key *a,
- grpc_subchannel_key *b) {
- int c = GPR_ICMP(a->connector, b->connector);
- if (c != 0) return c;
- c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
+int grpc_subchannel_key_compare(const grpc_subchannel_key *a,
+ const grpc_subchannel_key *b) {
+ int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
if (a->args.filter_count > 0) {
c = memcmp(a->args.filters, b->args.filters,
@@ -115,7 +111,6 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *k) {
- grpc_connector_unref(exec_ctx, k->connector);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args);
gpr_free(k);
@@ -128,7 +123,7 @@ static void sck_avl_destroy(void *p) {
static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); }
static long sck_avl_compare(void *a, void *b) {
- return subchannel_key_compare(a, b);
+ return grpc_subchannel_key_compare(a, b);
}
static void scv_avl_destroy(void *p) {
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index f673ade378..55e90bb645 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -34,17 +34,14 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
-#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
/** \file Provides an index of active subchannels so that they can be
shared amongst channels */
-typedef struct grpc_subchannel_key grpc_subchannel_key;
-
/** Create a key that can be used to uniquely identify a subchannel */
grpc_subchannel_key *grpc_subchannel_key_create(
- grpc_connector *con, const grpc_subchannel_args *args);
+ const grpc_subchannel_args *args);
/** Destroy a subchannel key */
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
@@ -69,6 +66,9 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed);
+int grpc_subchannel_key_compare(const grpc_subchannel_key *a,
+ const grpc_subchannel_key *b);
+
/** Initialize the subchannel index (global) */
void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
diff --git a/src/core/ext/filters/workarounds/workaround_utils.h b/src/core/ext/filters/workarounds/workaround_utils.h
index 7cd70c12d8..0608d1e937 100644
--- a/src/core/ext/filters/workarounds/workaround_utils.h
+++ b/src/core/ext/filters/workarounds/workaround_utils.h
@@ -49,4 +49,4 @@ typedef bool (*user_agent_parser)(grpc_mdelem);
void grpc_register_workaround(uint32_t id, user_agent_parser parser);
-#endif
+#endif /* GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H */
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 247b134938..2df07c8ae6 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -129,9 +129,23 @@ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) {
return grpc_channel_args_copy_and_add(src, NULL, 0);
}
-grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a,
const grpc_channel_args *b) {
- return grpc_channel_args_copy_and_add(a, b->args, b->num_args);
+ const size_t max_out = (a->num_args + b->num_args);
+ grpc_arg *uniques = gpr_malloc(sizeof(*uniques) * max_out);
+ for (size_t i = 0; i < a->num_args; ++i) uniques[i] = a->args[i];
+
+ size_t uniques_idx = a->num_args;
+ for (size_t i = 0; i < b->num_args; ++i) {
+ const char *b_key = b->args[i].key;
+ if (grpc_channel_args_find(a, b_key) == NULL) { // not found
+ uniques[uniques_idx++] = b->args[i];
+ }
+ }
+ grpc_channel_args *result =
+ grpc_channel_args_copy_and_add(NULL, uniques, uniques_idx);
+ gpr_free(uniques);
+ return result;
}
static int cmp_arg(const grpc_arg *a, const grpc_arg *b) {
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index f0f603e251..128afd00d4 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -63,8 +63,8 @@ grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
const grpc_channel_args *src, const char **to_remove, size_t num_to_remove,
const grpc_arg *to_add, size_t num_to_add);
-/** Concatenate args from \a a and \a b into a new instance */
-grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+/** Perform the union of \a a and \a b, prioritizing \a a entries */
+grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a,
const grpc_channel_args *b);
/** Destroy arguments created by \a grpc_channel_args_copy */
diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h
index e81531053c..740e553b2a 100644
--- a/src/core/lib/iomgr/polling_entity.h
+++ b/src/core/lib/iomgr/polling_entity.h
@@ -38,9 +38,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
/* A grpc_polling_entity is a pollset-or-pollset_set container. It allows
- * functions that
- * accept a pollset XOR a pollset_set to do so through an abstract interface.
- * No ownership is taken. */
+ * functions that accept a pollset XOR a pollset_set to do so through an
+ * abstract interface. No ownership is taken. */
typedef struct grpc_polling_entity {
union {
@@ -64,18 +63,14 @@ grpc_pollset_set *grpc_polling_entity_pollset_set(grpc_polling_entity *pollent);
bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent);
/** Add the pollset or pollset_set in \a pollent to the destination pollset_set
- * \a
- * pss_dst */
+ * \a * pss_dst */
void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_pollset_set *pss_dst);
/** Delete the pollset or pollset_set in \a pollent from the destination
- * pollset_set \a
- * pss_dst */
+ * pollset_set \a * pss_dst */
void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_pollset_set *pss_dst);
-/* pollset_set specific */
-
#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
diff --git a/src/core/lib/security/transport/lb_targets_info.c b/src/core/lib/security/transport/lb_targets_info.c
index e73483c039..8bb06b1f61 100644
--- a/src/core/lib/security/transport/lb_targets_info.c
+++ b/src/core/lib/security/transport/lb_targets_info.c
@@ -44,7 +44,9 @@ static void *targets_info_copy(void *p) { return grpc_slice_hash_table_ref(p); }
static void targets_info_destroy(grpc_exec_ctx *exec_ctx, void *p) {
grpc_slice_hash_table_unref(exec_ctx, p);
}
-static int targets_info_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
+static int targets_info_cmp(void *a, void *b) {
+ return grpc_slice_hash_table_cmp(a, b);
+}
static const grpc_arg_pointer_vtable server_to_balancer_names_vtable = {
targets_info_copy, targets_info_destroy, targets_info_cmp};
diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c
index 444f22aa19..2d9ff61c95 100644
--- a/src/core/lib/slice/slice_hash_table.c
+++ b/src/core/lib/slice/slice_hash_table.c
@@ -43,6 +43,7 @@
struct grpc_slice_hash_table {
gpr_refcount refs;
void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value);
+ int (*value_cmp)(void* a, void* b);
size_t size;
size_t max_num_probes;
grpc_slice_hash_table_entry* entries;
@@ -72,10 +73,12 @@ static void grpc_slice_hash_table_add(grpc_slice_hash_table* table,
grpc_slice_hash_table* grpc_slice_hash_table_create(
size_t num_entries, grpc_slice_hash_table_entry* entries,
- void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) {
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value),
+ int (*value_cmp)(void* a, void* b)) {
grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table));
gpr_ref_init(&table->refs, 1);
table->destroy_value = destroy_value;
+ table->value_cmp = value_cmp;
// Keep load factor low to improve performance of lookups.
table->size = num_entries * 2;
const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size;
@@ -121,3 +124,37 @@ void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table,
}
return NULL; // Not found.
}
+
+static int pointer_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
+int grpc_slice_hash_table_cmp(const grpc_slice_hash_table* a,
+ const grpc_slice_hash_table* b) {
+ int (*const value_cmp_fn_a)(void* a, void* b) =
+ a->value_cmp != NULL ? a->value_cmp : pointer_cmp;
+ int (*const value_cmp_fn_b)(void* a, void* b) =
+ b->value_cmp != NULL ? b->value_cmp : pointer_cmp;
+ // Compare value_fns
+ const int value_fns_cmp =
+ GPR_ICMP((void*)value_cmp_fn_a, (void*)value_cmp_fn_b);
+ if (value_fns_cmp != 0) return value_fns_cmp;
+ // Compare sizes
+ if (a->size < b->size) return -1;
+ if (a->size > b->size) return 1;
+ // Compare rows.
+ for (size_t i = 0; i < a->size; ++i) {
+ if (is_empty(&a->entries[i])) {
+ if (!is_empty(&b->entries[i])) {
+ return -1; // a empty but b non-empty
+ }
+ continue; // both empty, no need to check key or value
+ } else if (is_empty(&b->entries[i])) {
+ return 1; // a non-empty but b empty
+ }
+ // neither entry is empty
+ const int key_cmp = grpc_slice_cmp(a->entries[i].key, b->entries[i].key);
+ if (key_cmp != 0) return key_cmp;
+ const int value_cmp =
+ value_cmp_fn_a(a->entries[i].value, b->entries[i].value);
+ if (value_cmp != 0) return value_cmp;
+ }
+ return 0;
+}
diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h
index 1e61c5eb11..07988abff4 100644
--- a/src/core/lib/slice/slice_hash_table.h
+++ b/src/core/lib/slice/slice_hash_table.h
@@ -54,11 +54,15 @@ typedef struct grpc_slice_hash_table_entry {
} grpc_slice_hash_table_entry;
/** Creates a new hash table of containing \a entries, which is an array
- of length \a num_entries. Takes ownership of all keys and values in
- \a entries. Values will be cleaned up via \a destroy_value(). */
+ of length \a num_entries. Takes ownership of all keys and values in \a
+ entries. Values will be cleaned up via \a destroy_value(). If not NULL, \a
+ value_cmp will be used to compare values in the context of \a
+ grpc_slice_hash_table_cmp. If NULL, raw pointer (\a GPR_ICMP) comparison
+ will be used. */
grpc_slice_hash_table *grpc_slice_hash_table_create(
size_t num_entries, grpc_slice_hash_table_entry *entries,
- void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value));
+ void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value),
+ int (*value_cmp)(void *a, void *b));
grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table);
void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx,
@@ -69,4 +73,13 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx,
void *grpc_slice_hash_table_get(const grpc_slice_hash_table *table,
const grpc_slice key);
+/** Compares \a a vs. \a b.
+ * A table is considered "smaller" (resp. "greater") if:
+ * - GPR_ICMP(a->value_cmp, b->value_cmp) < 1 (resp. > 1),
+ * - else, it contains fewer (resp. more) entries,
+ * - else, if strcmp(a_key, b_key) < 1 (resp. > 1),
+ * - else, if value_cmp(a_value, b_value) < 1 (resp. > 1). */
+int grpc_slice_hash_table_cmp(const grpc_slice_hash_table *a,
+ const grpc_slice_hash_table *b);
+
#endif /* GRPC_CORE_LIB_SLICE_SLICE_HASH_TABLE_H */
diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c
index 6aecb7fa93..107818f4d1 100644
--- a/src/core/lib/transport/service_config.c
+++ b/src/core/lib/transport/service_config.c
@@ -229,7 +229,7 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
grpc_slice_hash_table* method_config_table = NULL;
if (entries != NULL) {
method_config_table =
- grpc_slice_hash_table_create(num_entries, entries, destroy_value);
+ grpc_slice_hash_table_create(num_entries, entries, destroy_value, NULL);
gpr_free(entries);
}
return method_config_table;
diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c
index 510cf5d5a0..64e3c07510 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_plugin_registry.c
@@ -41,6 +41,8 @@ extern void grpc_deadline_filter_init(void);
extern void grpc_deadline_filter_shutdown(void);
extern void grpc_client_channel_init(void);
extern void grpc_client_channel_shutdown(void);
+extern void grpc_resolver_fake_init(void);
+extern void grpc_resolver_fake_shutdown(void);
extern void grpc_lb_policy_grpclb_init(void);
extern void grpc_lb_policy_grpclb_shutdown(void);
extern void grpc_lb_policy_pick_first_init(void);
@@ -73,6 +75,8 @@ void grpc_register_built_in_plugins(void) {
grpc_deadline_filter_shutdown);
grpc_register_plugin(grpc_client_channel_init,
grpc_client_channel_shutdown);
+ grpc_register_plugin(grpc_resolver_fake_init,
+ grpc_resolver_fake_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
index e5eb68f934..76b17ed06d 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
@@ -47,6 +47,8 @@ extern void grpc_resolver_dns_native_init(void);
extern void grpc_resolver_dns_native_shutdown(void);
extern void grpc_resolver_sockaddr_init(void);
extern void grpc_resolver_sockaddr_shutdown(void);
+extern void grpc_resolver_fake_init(void);
+extern void grpc_resolver_fake_shutdown(void);
extern void grpc_load_reporting_plugin_init(void);
extern void grpc_load_reporting_plugin_shutdown(void);
extern void grpc_lb_policy_grpclb_init(void);
@@ -79,6 +81,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init,
grpc_resolver_sockaddr_shutdown);
+ grpc_register_plugin(grpc_resolver_fake_init,
+ grpc_resolver_fake_shutdown);
grpc_register_plugin(grpc_load_reporting_plugin_init,
grpc_load_reporting_plugin_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,