aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc109
1 files changed, 54 insertions, 55 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index eb494486b9..d6ff74ec7f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -24,6 +24,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
@@ -42,11 +43,16 @@ namespace {
// pick_first LB policy
//
+constexpr char kPickFirst[] = "pick_first";
+
class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(const Args& args);
- void UpdateLocked(const grpc_channel_args& args) override;
+ const char* name() const override { return kPickFirst; }
+
+ void UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@@ -74,11 +80,9 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address, grpc_subchannel* subchannel,
+ const ServerAddress& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
- : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
- combiner) {}
+ : SubchannelData(subchannel_list, address, subchannel, combiner) {}
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
@@ -94,7 +98,7 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData> {
public:
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
- const grpc_lb_addresses* addresses,
+ const ServerAddressList& addresses,
grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
@@ -159,7 +163,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
- UpdateLocked(*args.args);
+ UpdateLocked(*args.args, args.lb_config);
grpc_subchannel_index_ref();
}
@@ -234,7 +238,7 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
pending_picks_ = nullptr;
while (pick != nullptr) {
PickState* next = pick->next;
- if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -333,10 +337,11 @@ void PickFirst::UpdateChildRefsLocked() {
child_subchannels_ = std::move(cs);
}
-void PickFirst::UpdateLocked(const grpc_channel_args& args) {
+void PickFirst::UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) {
AutoChildRefsUpdater guard(this);
- const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
+ if (addresses == nullptr) {
if (subchannel_list_ == nullptr) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
@@ -352,19 +357,17 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
}
return;
}
- const grpc_lb_addresses* addresses =
- static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
- addresses->num_addresses);
+ addresses->size());
}
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
- this, &grpc_lb_pick_first_trace, addresses, combiner(),
+ this, &grpc_lb_pick_first_trace, *addresses, combiner(),
client_channel_factory(), *new_args);
grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
@@ -378,6 +381,31 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
selected_ = nullptr;
return;
}
+ // If one of the subchannels in the new list is already in state
+ // READY, then select it immediately. This can happen when the
+ // currently selected subchannel is also present in the update. It
+ // can also happen if one of the subchannels in the update is already
+ // in the subchannel index because it's in use by another channel.
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_connectivity_state state = sd->CheckConnectivityStateLocked(&error);
+ GRPC_ERROR_UNREF(error);
+ if (state == GRPC_CHANNEL_READY) {
+ subchannel_list_ = std::move(subchannel_list);
+ sd->ProcessUnselectedReadyLocked();
+ sd->StartConnectivityWatchLocked();
+ // If there was a previously pending update (which may or may
+ // not have contained the currently selected subchannel), drop
+ // it, so that it doesn't override what we've done here.
+ latest_pending_subchannel_list_.reset();
+ // Make sure that subsequent calls to ExitIdleLocked() don't cause
+ // us to start watching a subchannel other than the one we've
+ // selected.
+ started_picking_ = true;
+ return;
+ }
+ }
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
@@ -385,46 +413,14 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
- subchannel_list_->subchannel(0)
- ->CheckConnectivityStateAndStartWatchingLocked();
+ // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
+ // here, since we've already checked the initial connectivity
+ // state of all subchannels above.
+ subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
}
} else {
- // We do have a selected subchannel.
- // Check if it's present in the new list. If so, we're done.
- for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
- PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
- if (sd->subchannel() == selected_->subchannel()) {
- // The currently selected subchannel is in the update: we are done.
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Pick First %p found already selected subchannel %p "
- "at update index %" PRIuPTR " of %" PRIuPTR "; update done",
- this, selected_->subchannel(), i,
- subchannel_list->num_subchannels());
- }
- // Make sure it's in state READY. It might not be if we grabbed
- // the combiner while a connectivity state notification
- // informing us otherwise is pending.
- // Note that CheckConnectivityStateLocked() also takes a ref to
- // the connected subchannel.
- grpc_error* error = GRPC_ERROR_NONE;
- if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
- selected_ = sd;
- subchannel_list_ = std::move(subchannel_list);
- sd->StartConnectivityWatchLocked();
- // If there was a previously pending update (which may or may
- // not have contained the currently selected subchannel), drop
- // it, so that it doesn't override what we've done here.
- latest_pending_subchannel_list_.reset();
- return;
- }
- GRPC_ERROR_UNREF(error);
- }
- }
- // Not keeping the previous selected subchannel, so set the latest
- // pending subchannel list to the new subchannel list. We will wait
- // for it to report READY before swapping it into the current
- // subchannel list.
+ // We do have a selected subchannel, so keep using it until one of
+ // the subchannels in the new list reports READY.
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
@@ -438,8 +434,11 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
+ // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
+ // here, since we've already checked the initial connectivity
+ // state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0)
- ->CheckConnectivityStateAndStartWatchingLocked();
+ ->StartConnectivityWatchLocked();
}
}
}
@@ -627,7 +626,7 @@ class PickFirstFactory : public LoadBalancingPolicyFactory {
return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
}
- const char* name() const override { return "pick_first"; }
+ const char* name() const override { return kPickFirst; }
};
} // namespace