aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc84
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc34
2 files changed, 39 insertions, 79 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index dc0e1f89ce..7e70f1c28b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -749,7 +749,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
- // Empty payload means the LB call was cancelled.
+ // Null payload means the LB call was cancelled.
if (lb_calld != grpclb_policy->lb_calld_.get() ||
lb_calld->recv_message_payload_ == nullptr) {
lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
@@ -803,54 +803,45 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
gpr_free(ipport);
}
}
- /* update serverlist */
- if (serverlist->num_servers > 0) {
- // Start sending client load report only after we start using the
- // serverlist returned from the current LB call.
- if (lb_calld->client_stats_report_interval_ > 0 &&
- lb_calld->client_stats_ == nullptr) {
- lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
- // TODO(roth): We currently track this ref manually. Once the
- // ClosureRef API is ready, we should pass the RefCountedPtr<> along
- // with the callback.
- auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
- self.release();
- lb_calld->ScheduleNextClientLoadReportLocked();
- }
- if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
- serverlist)) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Incoming server list identical to current, "
- "ignoring.",
- grpclb_policy);
- }
- grpc_grpclb_destroy_serverlist(serverlist);
- } else { /* new serverlist */
- if (grpclb_policy->serverlist_ != nullptr) {
- /* dispose of the old serverlist */
- grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
- } else {
- /* or dispose of the fallback */
- grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
- grpclb_policy->fallback_backend_addresses_ = nullptr;
- if (grpclb_policy->fallback_timer_callback_pending_) {
- grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
- }
- }
- // and update the copy in the GrpcLb instance. This
- // serverlist instance will be destroyed either upon the next
- // update or when the GrpcLb instance is destroyed.
- grpclb_policy->serverlist_ = serverlist;
- grpclb_policy->serverlist_index_ = 0;
- grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
- }
- } else {
+ // Start sending client load report only after we start using the
+ // serverlist returned from the current LB call.
+ if (lb_calld->client_stats_report_interval_ > 0 &&
+ lb_calld->client_stats_ == nullptr) {
+ lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
+ self.release();
+ lb_calld->ScheduleNextClientLoadReportLocked();
+ }
+ // Check if the serverlist differs from the previous one.
+ if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) {
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Incoming server list identical to current, "
+ "ignoring.",
grpclb_policy);
}
grpc_grpclb_destroy_serverlist(serverlist);
+ } else { // New serverlist.
+ if (grpclb_policy->serverlist_ != nullptr) {
+ // Dispose of the old serverlist.
+ grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
+ } else {
+ // Dispose of the fallback.
+ grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
+ grpclb_policy->fallback_backend_addresses_ = nullptr;
+ if (grpclb_policy->fallback_timer_callback_pending_) {
+ grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
+ }
+ }
+ // Update the serverlist in the GrpcLb instance. This serverlist
+ // instance will be destroyed either upon the next update or when the
+ // GrpcLb instance is destroyed.
+ grpclb_policy->serverlist_ = serverlist;
+ grpclb_policy->serverlist_index_ = 0;
+ grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
}
} else {
// No valid initial response or serverlist found.
@@ -1583,7 +1574,7 @@ void GrpcLb::AddPendingPick(PendingPick* pp) {
bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error) {
// Check for drops if we are not using fallback backend addresses.
- if (serverlist_ != nullptr) {
+ if (serverlist_ != nullptr && serverlist_->num_servers > 0) {
// Look at the index into the serverlist to see if we should drop this call.
grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
if (serverlist_index_ == serverlist_->num_servers) {
@@ -1681,7 +1672,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
grpc_lb_addresses* addresses;
bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
- GPR_ASSERT(serverlist_->num_servers > 0);
addresses = ProcessServerlist(serverlist_);
is_backend_from_grpclb_load_balancer = true;
} else {
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 29cd904375..89b86b38a6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -199,7 +199,6 @@ class XdsLb : public LoadBalancingPolicy {
static bool LoadReportCountersAreZero(xds_grpclb_request* request);
static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
- static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
@@ -668,6 +667,7 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero(
(drop_entries == nullptr || drop_entries->empty());
}
+// TODO(vpowar): Use LRS to send the client Load Report.
void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
@@ -685,38 +685,8 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
} else {
last_client_load_report_counters_were_zero_ = false;
}
- grpc_slice request_payload_slice = xds_grpclb_request_encode(request);
- send_message_payload_ =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(request_payload_slice);
+ // TODO(vpowar): Send the report on LRS stream.
xds_grpclb_request_destroy(request);
- // Send the report.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = send_message_payload_;
- GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
- this, grpc_combiner_scheduler(xdslb_policy()->combiner()));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- lb_call_, &op, 1, &client_load_report_closure_);
- if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
- gpr_log(GPR_ERROR, "[xdslb %p] call_error=%d", xdslb_policy_.get(),
- call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
-}
-
-void XdsLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
- grpc_error* error) {
- BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
- XdsLb* xdslb_policy = lb_calld->xdslb_policy();
- grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
- lb_calld->send_message_payload_ = nullptr;
- if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) {
- lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
- return;
- }
- lb_calld->ScheduleNextClientLoadReportLocked();
}
void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,