aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc420
1 files changed, 192 insertions, 228 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 38f8072e8d..89b86b38a6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -26,30 +26,26 @@
/// channel that uses pick_first to select from the list of balancer
/// addresses.
///
-/// The first time the policy gets a request for a pick, a ping, or to exit
-/// the idle state, \a StartPickingLocked() is called. This method is
-/// responsible for instantiating the internal *streaming* call to the LB
-/// server (whichever address pick_first chose). The call will be complete
-/// when either the balancer sends status or when we cancel the call (e.g.,
-/// because we are shutting down). In needed, we retry the call. If we
-/// received at least one valid message from the server, a new call attempt
-/// will be made immediately; otherwise, we apply back-off delays between
-/// attempts.
+/// The first time the xDS policy gets a request for a pick or to exit the idle
+/// state, \a StartPickingLocked() is called. This method is responsible for
+/// instantiating the internal *streaming* call to the LB server (whichever
+/// address pick_first chose). The call will be complete when either the
+/// balancer sends status or when we cancel the call (e.g., because we are
+/// shutting down). In needed, we retry the call. If we received at least one
+/// valid message from the server, a new call attempt will be made immediately;
+/// otherwise, we apply back-off delays between attempts.
///
-/// We maintain an internal round_robin policy instance for distributing
+/// We maintain an internal child policy (round_robin) instance for distributing
/// requests across backends. Whenever we receive a new serverlist from
-/// the balancer, we update the round_robin policy with the new list of
-/// addresses. If we cannot communicate with the balancer on startup,
-/// however, we may enter fallback mode, in which case we will populate
-/// the RR policy's addresses from the backend addresses returned by the
-/// resolver.
+/// the balancer, we update the child policy with the new list of
+/// addresses.
///
-/// Once an RR policy instance is in place (and getting updated as described),
-/// calls for a pick, a ping, or a cancellation will be serviced right
-/// away by forwarding them to the RR instance. Any time there's no RR
-/// policy available (i.e., right after the creation of the gRPCLB policy),
-/// pick and ping requests are added to a list of pending picks and pings
-/// to be flushed and serviced when the RR policy instance becomes available.
+/// Once a child policy instance is in place (and getting updated as
+/// described), calls for a pick, or a cancellation will be serviced right away
+/// by forwarding them to the child policy instance. Any time there's no child
+/// policy available (i.e., right after the creation of the xDS policy), pick
+/// requests are added to a list of pending picks to be flushed and serviced
+/// when the child policy instance becomes available.
///
/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
/// high level design and details.
@@ -122,7 +118,8 @@ class XdsLb : public LoadBalancingPolicy {
public:
XdsLb(const grpc_lb_addresses* addresses, const Args& args);
- void UpdateLocked(const grpc_channel_args& args) override;
+ void UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@@ -141,10 +138,10 @@ class XdsLb : public LoadBalancingPolicy {
private:
/// Linked list of pending pick requests. It stores all information needed to
- /// eventually call (Round Robin's) pick() on them. They mainly stay pending
- /// waiting for the RR policy to be created.
+ /// eventually call pick() on them. They mainly stay pending waiting for the
+ /// child policy to be created.
///
- /// Note that when a pick is sent to the RR policy, we inject our own
+ /// Note that when a pick is sent to the child policy, we inject our own
/// on_complete callback, so that we can intercept the result before
/// invoking the original on_complete callback. This allows us to set the
/// LB token metadata and add client_stats to the call context.
@@ -202,7 +199,6 @@ class XdsLb : public LoadBalancingPolicy {
static bool LoadReportCountersAreZero(xds_grpclb_request* request);
static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
- static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
@@ -266,18 +262,18 @@ class XdsLb : public LoadBalancingPolicy {
void AddPendingPick(PendingPick* pp);
static void OnPendingPickComplete(void* arg, grpc_error* error);
- // Methods for dealing with the RR policy.
- void CreateOrUpdateRoundRobinPolicyLocked();
- grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
- void CreateRoundRobinPolicyLocked(const Args& args);
- bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
- grpc_error** error);
- void UpdateConnectivityStateFromRoundRobinPolicyLocked(
- grpc_error* rr_state_error);
- static void OnRoundRobinConnectivityChangedLocked(void* arg,
- grpc_error* error);
- static void OnRoundRobinRequestReresolutionLocked(void* arg,
- grpc_error* error);
+ // Methods for dealing with the child policy.
+ void CreateOrUpdateChildPolicyLocked();
+ grpc_channel_args* CreateChildPolicyArgsLocked();
+ void CreateChildPolicyLocked(const Args& args);
+ bool PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error);
+ void UpdateConnectivityStateFromChildPolicyLocked(
+ grpc_error* child_state_error);
+ static void OnChildPolicyConnectivityChangedLocked(void* arg,
+ grpc_error* error);
+ static void OnChildPolicyRequestReresolutionLocked(void* arg,
+ grpc_error* error);
// Who the client is trying to communicate with.
const char* server_name_ = nullptr;
@@ -330,14 +326,14 @@ class XdsLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
- // Pending picks that are waiting on the RR policy's connectivity.
+ // Pending picks that are waiting on the xDS policy's connectivity.
PendingPick* pending_picks_ = nullptr;
- // The RR policy to use for the backends.
- OrphanablePtr<LoadBalancingPolicy> rr_policy_;
- grpc_connectivity_state rr_connectivity_state_;
- grpc_closure on_rr_connectivity_changed_;
- grpc_closure on_rr_request_reresolution_;
+ // The policy to use for the backends.
+ OrphanablePtr<LoadBalancingPolicy> child_policy_;
+ grpc_connectivity_state child_connectivity_state_;
+ grpc_closure on_child_connectivity_changed_;
+ grpc_closure on_child_request_reresolution_;
};
//
@@ -444,7 +440,7 @@ grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) {
grpc_lb_addresses* lb_addresses =
grpc_lb_addresses_create(num_valid, &lb_token_vtable);
/* second pass: actually populate the addresses and LB tokens (aka user data
- * to the outside world) to be read by the RR policy during its creation.
+ * to the outside world) to be read by the child policy during its creation.
* Given that the validity tests are very cheap, they are performed again
* instead of marking the valid ones during the first pass, as this would
* incurr in an allocation due to the arbitrary number of server */
@@ -671,6 +667,7 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero(
(drop_entries == nullptr || drop_entries->empty());
}
+// TODO(vpowar): Use LRS to send the client Load Report.
void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
@@ -688,38 +685,8 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
} else {
last_client_load_report_counters_were_zero_ = false;
}
- grpc_slice request_payload_slice = xds_grpclb_request_encode(request);
- send_message_payload_ =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(request_payload_slice);
+ // TODO(vpowar): Send the report on LRS stream.
xds_grpclb_request_destroy(request);
- // Send the report.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = send_message_payload_;
- GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
- this, grpc_combiner_scheduler(xdslb_policy()->combiner()));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- lb_call_, &op, 1, &client_load_report_closure_);
- if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
- gpr_log(GPR_ERROR, "[xdslb %p] call_error=%d", xdslb_policy_.get(),
- call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
-}
-
-void XdsLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
- grpc_error* error) {
- BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
- XdsLb* xdslb_policy = lb_calld->xdslb_policy();
- grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
- lb_calld->send_message_payload_ = nullptr;
- if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) {
- lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
- return;
- }
- lb_calld->ScheduleNextClientLoadReportLocked();
}
void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
@@ -833,7 +800,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
// serverlist instance will be destroyed either upon the next
// update or when the XdsLb instance is destroyed.
xdslb_policy->serverlist_ = serverlist;
- xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked();
+ xdslb_policy->CreateOrUpdateChildPolicyLocked();
}
} else {
if (grpc_lb_xds_trace.enabled()) {
@@ -866,7 +833,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
&lb_calld->lb_on_balancer_message_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
} else {
- lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
+ lb_calld->Unref(DEBUG_LOCATION, "on_message_received+xds_shutdown");
}
}
@@ -944,7 +911,7 @@ grpc_lb_addresses* ExtractBalancerAddresses(
* - \a addresses: corresponding to the balancers.
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
- * - \a args: other args inherited from the grpclb policy. */
+ * - \a args: other args inherited from the xds policy. */
grpc_channel_args* BuildBalancerChannelArgs(
const grpc_lb_addresses* addresses,
FakeResolverResponseGenerator* response_generator,
@@ -966,10 +933,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
// resolver will have is_balancer=false, whereas our own addresses have
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
- // grpclb LB policy, as per the special case logic in client_channel.c.
+ // xds LB policy, as per the special case logic in client_channel.c.
GRPC_ARG_LB_ADDRESSES,
// The fake resolver response generator, because we are replacing it
- // with the one from the grpclb policy, used to propagate updates to
+ // with the one from the xds policy, used to propagate updates to
// the LB channel.
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
// The LB channel should use the authority indicated by the target
@@ -991,7 +958,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator),
- // A channel arg indicating the target is a grpclb load balancer.
+ // A channel arg indicating the target is a xds load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1),
// A channel arg indicating this is an internal channels, aka it is
@@ -1014,6 +981,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
+// TODO(vishalpowar): Use lb_config in args to configure LB policy.
XdsLb::XdsLb(const grpc_lb_addresses* addresses,
const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
@@ -1031,11 +999,11 @@ XdsLb::XdsLb(const grpc_lb_addresses* addresses,
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&XdsLb::OnBalancerChannelConnectivityChangedLocked, this,
grpc_combiner_scheduler(args.combiner));
- GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
- &XdsLb::OnRoundRobinConnectivityChangedLocked, this,
+ GRPC_CLOSURE_INIT(&on_child_connectivity_changed_,
+ &XdsLb::OnChildPolicyConnectivityChangedLocked, this,
grpc_combiner_scheduler(args.combiner));
- GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
- &XdsLb::OnRoundRobinRequestReresolutionLocked, this,
+ GRPC_CLOSURE_INIT(&on_child_request_reresolution_,
+ &XdsLb::OnChildPolicyRequestReresolutionLocked, this,
grpc_combiner_scheduler(args.combiner));
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "xds");
// Record server name.
@@ -1087,7 +1055,7 @@ void XdsLb::ShutdownLocked() {
if (fallback_timer_callback_pending_) {
grpc_timer_cancel(&lb_fallback_timer_);
}
- rr_policy_.reset();
+ child_policy_.reset();
TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_CANCELLED);
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
@@ -1100,7 +1068,7 @@ void XdsLb::ShutdownLocked() {
gpr_mu_unlock(&lb_channel_mu_);
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_REF(error), "grpclb_shutdown");
+ GRPC_ERROR_REF(error), "xds_shutdown");
// Clear pending picks.
PendingPick* pp;
while ((pp = pending_picks_) != nullptr) {
@@ -1133,13 +1101,13 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
// Cancel a specific pending pick.
//
-// A grpclb pick progresses as follows:
-// - If there's a Round Robin policy (rr_policy_) available, it'll be
-// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
-// that point onwards, it'll be RR's responsibility. For cancellations, that
-// implies the pick needs also be cancelled by the RR instance.
-// - Otherwise, without an RR instance, picks stay pending at this policy's
-// level (grpclb), inside the pending_picks_ list. To cancel these,
+// A pick progresses as follows:
+// - If there's a child policy available, it'll be handed over to child policy
+// (in CreateChildPolicyLocked()). From that point onwards, it'll be the
+// child policy's responsibility. For cancellations, that implies the pick
+// needs to be also cancelled by the child policy instance.
+// - Otherwise, without a child policy instance, picks stay pending at this
+// policy's level (xds), inside the pending_picks_ list. To cancel these,
// we invoke the completion closure and set the pick's connected
// subchannel to nullptr right here.
void XdsLb::CancelPickLocked(PickState* pick, grpc_error* error) {
@@ -1159,21 +1127,21 @@ void XdsLb::CancelPickLocked(PickState* pick, grpc_error* error) {
}
pp = next;
}
- if (rr_policy_ != nullptr) {
- rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
+ if (child_policy_ != nullptr) {
+ child_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
// Cancel all pending picks.
//
-// A grpclb pick progresses as follows:
-// - If there's a Round Robin policy (rr_policy_) available, it'll be
-// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
-// that point onwards, it'll be RR's responsibility. For cancellations, that
-// implies the pick needs also be cancelled by the RR instance.
-// - Otherwise, without an RR instance, picks stay pending at this policy's
-// level (grpclb), inside the pending_picks_ list. To cancel these,
+// A pick progresses as follows:
+// - If there's a child policy available, it'll be handed over to child policy
+// (in CreateChildPolicyLocked()). From that point onwards, it'll be the
+// child policy's responsibility. For cancellations, that implies the pick
+// needs to be also cancelled by the child policy instance.
+// - Otherwise, without a child policy instance, picks stay pending at this
+// policy's level (xds), inside the pending_picks_ list. To cancel these,
// we invoke the completion closure and set the pick's connected
// subchannel to nullptr right here.
void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@@ -1195,10 +1163,10 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
}
pp = next;
}
- if (rr_policy_ != nullptr) {
- rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
- initial_metadata_flags_eq,
- GRPC_ERROR_REF(error));
+ if (child_policy_ != nullptr) {
+ child_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
+ initial_metadata_flags_eq,
+ GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -1213,22 +1181,21 @@ void XdsLb::ResetBackoffLocked() {
if (lb_channel_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_channel_);
}
- if (rr_policy_ != nullptr) {
- rr_policy_->ResetBackoffLocked();
+ if (child_policy_ != nullptr) {
+ child_policy_->ResetBackoffLocked();
}
}
bool XdsLb::PickLocked(PickState* pick, grpc_error** error) {
PendingPick* pp = PendingPickCreate(pick);
bool pick_done = false;
- if (rr_policy_ != nullptr) {
+ if (child_policy_ != nullptr) {
if (grpc_lb_xds_trace.enabled()) {
- gpr_log(GPR_INFO, "[xdslb %p] about to PICK from RR %p", this,
- rr_policy_.get());
+ gpr_log(GPR_INFO, "[xdslb %p] about to PICK from policy %p", this,
+ child_policy_.get());
}
- pick_done =
- PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
- } else { // rr_policy_ == NULL
+ pick_done = PickFromChildPolicyLocked(false /* force_async */, pp, error);
+ } else { // child_policy_ == NULL
if (pick->on_complete == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No pick result available but synchronous result required.");
@@ -1236,7 +1203,7 @@ bool XdsLb::PickLocked(PickState* pick, grpc_error** error) {
} else {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
- "[xdslb %p] No RR policy. Adding to grpclb's pending picks",
+ "[xdslb %p] No child policy. Adding to xds's pending picks",
this);
}
AddPendingPick(pp);
@@ -1251,8 +1218,8 @@ bool XdsLb::PickLocked(PickState* pick, grpc_error** error) {
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
- // delegate to the RoundRobin to fill the children subchannels.
- rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
+ // delegate to the child_policy_ to fill the children subchannels.
+ child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
MutexLock lock(&lb_channel_mu_);
if (lb_channel_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
@@ -1319,13 +1286,15 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
-void XdsLb::UpdateLocked(const grpc_channel_args& args) {
+// TODO(vishalpowar): Use lb_config to configure LB policy.
+void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
ProcessChannelArgsLocked(args);
- // If fallback is configured and the RR policy already exists, update
- // it with the new fallback addresses.
- if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
- CreateOrUpdateRoundRobinPolicyLocked();
- }
+ // Update the existing child policy.
+ // Note: We have disabled fallback mode in the code, so this child policy must
+ // have been created from a serverlist.
+ // TODO(vpowar): Handle the fallback_address changes when we add support for
+ // fallback in xDS.
+ if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if (!watching_lb_channel_) {
@@ -1393,11 +1362,10 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
if (xdslb_policy->serverlist_ == nullptr && !xdslb_policy->shutting_down_ &&
error == GRPC_ERROR_NONE) {
if (grpc_lb_xds_trace.enabled()) {
- gpr_log(GPR_INFO, "[xdslb %p] Falling back to use backends from resolver",
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Fallback timer fired. Not using fallback backends",
xdslb_policy);
}
- GPR_ASSERT(xdslb_policy->fallback_backend_addresses_ != nullptr);
- xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked();
}
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
@@ -1447,8 +1415,8 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
if (xdslb_policy->shutting_down_) goto done;
// Re-initialize the lb_call. This should also take care of updating the
- // embedded RR policy. Note that the current RR policy, if any, will stay in
- // effect until an update from the new lb_call is received.
+ // child policy. Note that the current child policy, if any, will
+ // stay in effect until an update from the new lb_call is received.
switch (xdslb_policy->lb_channel_connectivity_) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
@@ -1507,8 +1475,8 @@ void DestroyClientStats(void* arg) {
}
void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
- /* if connected_subchannel is nullptr, no pick has been made by the RR
- * policy (e.g., all addresses failed to connect). There won't be any
+ /* if connected_subchannel is nullptr, no pick has been made by the
+ * child policy (e.g., all addresses failed to connect). There won't be any
* user_data/token available */
if (pp->pick->connected_subchannel != nullptr) {
if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
@@ -1534,8 +1502,8 @@ void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
}
/* The \a on_complete closure passed as part of the pick requires keeping a
- * reference to its associated round robin instance. We wrap this closure in
- * order to unref the round robin instance upon its invocation */
+ * reference to its associated child policy instance. We wrap this closure in
+ * order to unref the child policy instance upon its invocation */
void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) {
PendingPick* pp = static_cast<PendingPick*>(arg);
PendingPickSetMetadataAndContext(pp);
@@ -1560,24 +1528,24 @@ void XdsLb::AddPendingPick(PendingPick* pp) {
}
//
-// code for interacting with the RR policy
+// code for interacting with the child policy
//
-// Performs a pick over \a rr_policy_. Given that a pick can return
+// Performs a pick over \a child_policy_. Given that a pick can return
// immediately (ignoring its completion callback), we need to perform the
// cleanups this callback would otherwise be responsible for.
// If \a force_async is true, then we will manually schedule the
// completion callback even if the pick is available immediately.
-bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
- grpc_error** error) {
+bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error) {
// Set client_stats and user_data.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
pp->client_stats = lb_calld_->client_stats()->Ref();
}
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
- // Pick via the RR policy.
- bool pick_done = rr_policy_->PickLocked(pp->pick, error);
+ // Pick via the child policy.
+ bool pick_done = child_policy_->PickLocked(pp->pick, error);
if (pick_done) {
PendingPickSetMetadataAndContext(pp);
if (force_async) {
@@ -1588,72 +1556,67 @@ bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
Delete(pp);
}
// else, the pending pick will be registered and taken care of by the
- // pending pick list inside the RR policy. Eventually,
+ // pending pick list inside the child policy. Eventually,
// OnPendingPickComplete() will be called, which will (among other
// things) add the LB token to the call's initial metadata.
return pick_done;
}
-void XdsLb::CreateRoundRobinPolicyLocked(const Args& args) {
- GPR_ASSERT(rr_policy_ == nullptr);
- rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+void XdsLb::CreateChildPolicyLocked(const Args& args) {
+ GPR_ASSERT(child_policy_ == nullptr);
+ child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"round_robin", args);
- if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
- gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a RoundRobin policy", this);
+ if (GPR_UNLIKELY(child_policy_ == nullptr)) {
+ gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this);
return;
}
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
- auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+ auto self = Ref(DEBUG_LOCATION, "on_child_reresolution_requested");
self.release();
- rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
- grpc_error* rr_state_error = nullptr;
- rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
- // Connectivity state is a function of the RR policy updated/created.
- UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
- // Add the gRPC LB's interested_parties pollset_set to that of the newly
- // created RR policy. This will make the RR policy progress upon activity on
- // gRPC LB, which in turn is tied to the application's call.
- grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
+ child_policy_->SetReresolutionClosureLocked(&on_child_request_reresolution_);
+ grpc_error* child_state_error = nullptr;
+ child_connectivity_state_ =
+ child_policy_->CheckConnectivityLocked(&child_state_error);
+ // Connectivity state is a function of the child policy updated/created.
+ UpdateConnectivityStateFromChildPolicyLocked(child_state_error);
+ // Add the xDS's interested_parties pollset_set to that of the newly created
+ // child policy. This will make the child policy progress upon activity on
+ // xDS LB, which in turn is tied to the application's call.
+ grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
- // Subscribe to changes to the connectivity of the new RR.
+ // Subscribe to changes to the connectivity of the new child policy.
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
- self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
+ self = Ref(DEBUG_LOCATION, "on_child_connectivity_changed");
self.release();
- rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
- &on_rr_connectivity_changed_);
- rr_policy_->ExitIdleLocked();
- // Send pending picks to RR policy.
+ child_policy_->NotifyOnStateChangeLocked(&child_connectivity_state_,
+ &on_child_connectivity_changed_);
+ child_policy_->ExitIdleLocked();
+ // Send pending picks to child policy.
PendingPick* pp;
while ((pp = pending_picks_)) {
pending_picks_ = pp->next;
if (grpc_lb_xds_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[xdslb %p] Pending pick about to (async) PICK from RR %p", this,
- rr_policy_.get());
+ gpr_log(
+ GPR_INFO,
+ "[xdslb %p] Pending pick about to (async) PICK from child policy %p",
+ this, child_policy_.get());
}
grpc_error* error = GRPC_ERROR_NONE;
- PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
+ PickFromChildPolicyLocked(true /* force_async */, pp, &error);
}
}
-grpc_channel_args* XdsLb::CreateRoundRobinPolicyArgsLocked() {
+grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
grpc_lb_addresses* addresses;
bool is_backend_from_grpclb_load_balancer = false;
- if (serverlist_ != nullptr) {
- GPR_ASSERT(serverlist_->num_servers > 0);
- addresses = ProcessServerlist(serverlist_);
- is_backend_from_grpclb_load_balancer = true;
- } else {
- // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
- // received any serverlist from the balancer, we use the fallback backends
- // returned by the resolver. Note that the fallback backend list may be
- // empty, in which case the new round_robin policy will keep the requested
- // picks pending.
- GPR_ASSERT(fallback_backend_addresses_ != nullptr);
- addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
- }
+ // This should never be invoked if we do not have serverlist_, as fallback
+ // mode is disabled for xDS plugin.
+ GPR_ASSERT(serverlist_ != nullptr);
+ GPR_ASSERT(serverlist_->num_servers > 0);
+ addresses = ProcessServerlist(serverlist_);
+ is_backend_from_grpclb_load_balancer = true;
GPR_ASSERT(addresses != nullptr);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
@@ -1673,66 +1636,68 @@ grpc_channel_args* XdsLb::CreateRoundRobinPolicyArgsLocked() {
return args;
}
-void XdsLb::CreateOrUpdateRoundRobinPolicyLocked() {
+void XdsLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
- grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
+ grpc_channel_args* args = CreateChildPolicyArgsLocked();
GPR_ASSERT(args != nullptr);
- if (rr_policy_ != nullptr) {
+ if (child_policy_ != nullptr) {
if (grpc_lb_xds_trace.enabled()) {
- gpr_log(GPR_INFO, "[xdslb %p] Updating RR policy %p", this,
- rr_policy_.get());
+ gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this,
+ child_policy_.get());
}
- rr_policy_->UpdateLocked(*args);
+ // TODO(vishalpowar): Pass the correct LB config.
+ child_policy_->UpdateLocked(*args, nullptr);
} else {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.client_channel_factory = client_channel_factory();
lb_policy_args.args = args;
- CreateRoundRobinPolicyLocked(lb_policy_args);
+ CreateChildPolicyLocked(lb_policy_args);
if (grpc_lb_xds_trace.enabled()) {
- gpr_log(GPR_INFO, "[xdslb %p] Created new RR policy %p", this,
- rr_policy_.get());
+ gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this,
+ child_policy_.get());
}
}
grpc_channel_args_destroy(args);
}
-void XdsLb::OnRoundRobinRequestReresolutionLocked(void* arg,
- grpc_error* error) {
+void XdsLb::OnChildPolicyRequestReresolutionLocked(void* arg,
+ grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
if (xdslb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
- xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+ xdslb_policy->Unref(DEBUG_LOCATION, "on_child_reresolution_requested");
return;
}
if (grpc_lb_xds_trace.enabled()) {
- gpr_log(
- GPR_INFO,
- "[xdslb %p] Re-resolution requested from the internal RR policy (%p).",
- xdslb_policy, xdslb_policy->rr_policy_.get());
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Re-resolution requested from child policy "
+ "(%p).",
+ xdslb_policy, xdslb_policy->child_policy_.get());
}
// If we are talking to a balancer, we expect to get updated addresses form
- // the balancer, so we can ignore the re-resolution request from the RR
- // policy. Otherwise, handle the re-resolution request using the
- // grpclb policy's original re-resolution closure.
+ // the balancer, so we can ignore the re-resolution request from the child
+ // policy.
+ // Otherwise, handle the re-resolution request using the xds policy's
+ // original re-resolution closure.
if (xdslb_policy->lb_calld_ == nullptr ||
!xdslb_policy->lb_calld_->seen_initial_response()) {
xdslb_policy->TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_NONE);
}
- // Give back the wrapper closure to the RR policy.
- xdslb_policy->rr_policy_->SetReresolutionClosureLocked(
- &xdslb_policy->on_rr_request_reresolution_);
+ // Give back the wrapper closure to the child policy.
+ xdslb_policy->child_policy_->SetReresolutionClosureLocked(
+ &xdslb_policy->on_child_request_reresolution_);
}
-void XdsLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
- grpc_error* rr_state_error) {
+void XdsLb::UpdateConnectivityStateFromChildPolicyLocked(
+ grpc_error* child_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&state_tracker_);
/* The new connectivity status is a function of the previous one and the new
- * input coming from the status of the RR policy.
+ * input coming from the status of the child policy.
*
- * current state (grpclb's)
+ * current state (xds's)
* |
- * v || I | C | R | TF | SD | <- new state (RR's)
+ * v || I | C | R | TF | SD | <- new state (child policy's)
* ===++====+=====+=====+======+======+
* I || I | C | R | [I] | [I] |
* ---++----+-----+-----+------+------+
@@ -1745,52 +1710,51 @@ void XdsLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
* SD || NA | NA | NA | NA | NA | (*)
* ---++----+-----+-----+------+------+
*
- * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
- * is the current state of grpclb, which is left untouched.
+ * A [STATE] indicates that the old child policy is kept. In those cases,
+ * STATE is the current state of xds, which is left untouched.
*
* In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
- * the previous RR instance.
+ * the previous child policy instance.
*
* Note that the status is never updated to SHUTDOWN as a result of calling
* this function. Only glb_shutdown() has the power to set that state.
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
- switch (rr_connectivity_state_) {
+ switch (child_connectivity_state_) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
- GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
+ GPR_ASSERT(child_state_error != GRPC_ERROR_NONE);
break;
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
- GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
+ GPR_ASSERT(child_state_error == GRPC_ERROR_NONE);
}
if (grpc_lb_xds_trace.enabled()) {
- gpr_log(
- GPR_INFO,
- "[xdslb %p] Setting grpclb's state to %s from new RR policy %p state.",
- this, grpc_connectivity_state_name(rr_connectivity_state_),
- rr_policy_.get());
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Setting xds's state to %s from child policy %p state.",
+ this, grpc_connectivity_state_name(child_connectivity_state_),
+ child_policy_.get());
}
- grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
- rr_state_error,
+ grpc_connectivity_state_set(&state_tracker_, child_connectivity_state_,
+ child_state_error,
"update_lb_connectivity_status_locked");
}
-void XdsLb::OnRoundRobinConnectivityChangedLocked(void* arg,
- grpc_error* error) {
+void XdsLb::OnChildPolicyConnectivityChangedLocked(void* arg,
+ grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
if (xdslb_policy->shutting_down_) {
- xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
+ xdslb_policy->Unref(DEBUG_LOCATION, "on_child_connectivity_changed");
return;
}
- xdslb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
+ xdslb_policy->UpdateConnectivityStateFromChildPolicyLocked(
GRPC_ERROR_REF(error));
- // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
- xdslb_policy->rr_policy_->NotifyOnStateChangeLocked(
- &xdslb_policy->rr_connectivity_state_,
- &xdslb_policy->on_rr_connectivity_changed_);
+ // Resubscribe. Reuse the "on_child_connectivity_changed" ref.
+ xdslb_policy->child_policy_->NotifyOnStateChangeLocked(
+ &xdslb_policy->child_connectivity_state_,
+ &xdslb_policy->on_child_connectivity_changed_);
}
//