diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 93 |
1 files changed, 53 insertions, 40 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 0b2a30818e..263b51ae89 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -76,6 +76,7 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" @@ -158,9 +159,8 @@ class GrpcLb : public LoadBalancingPolicy { // The LB token associated with the pick. This is set via user_data in // the pick. grpc_mdelem lb_token; - // Stats for client-side load reporting. Note that this holds a - // reference, which must be either passed on via context or unreffed. - grpc_grpclb_client_stats* client_stats = nullptr; + // Stats for client-side load reporting. + RefCountedPtr<GrpcLbClientStats> client_stats; // Next pending pick. PendingPick* next = nullptr; }; @@ -185,10 +185,15 @@ class GrpcLb : public LoadBalancingPolicy { void StartQuery(); - grpc_grpclb_client_stats* client_stats() const { return client_stats_; } + GrpcLbClientStats* client_stats() const { return client_stats_.get(); } + bool seen_initial_response() const { return seen_initial_response_; } private: + // So Delete() can access our private dtor. + template <typename T> + friend void grpc_core::Delete(T*); + ~BalancerCallState(); GrpcLb* grpclb_policy() const { @@ -232,7 +237,7 @@ class GrpcLb : public LoadBalancingPolicy { // The stats for client-side load reporting associated with this LB call. // Created after the first serverlist is received. - grpc_grpclb_client_stats* client_stats_ = nullptr; + RefCountedPtr<GrpcLbClientStats> client_stats_; grpc_millis client_stats_report_interval_ = 0; grpc_timer client_load_report_timer_; bool client_load_report_timer_callback_pending_ = false; @@ -394,7 +399,7 @@ grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { if (server->drop) return false; const grpc_grpclb_ip_address* ip = &server->ip_address; - if (server->port >> 16 != 0) { + if (GPR_UNLIKELY(server->port >> 16 != 0)) { if (log) { gpr_log(GPR_ERROR, "Invalid port '%d' at index %lu of serverlist. Ignoring.", @@ -402,7 +407,7 @@ bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { } return false; } - if (ip->size != 4 && ip->size != 16) { + if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) { if (log) { gpr_log(GPR_ERROR, "Expected IP to be 4 or 16 bytes, got %d at index %lu of " @@ -543,9 +548,6 @@ GrpcLb::BalancerCallState::~BalancerCallState() { grpc_byte_buffer_destroy(send_message_payload_); grpc_byte_buffer_destroy(recv_message_payload_); grpc_slice_unref_internal(lb_call_status_details_); - if (client_stats_ != nullptr) { - grpc_grpclb_client_stats_unref(client_stats_); - } } void GrpcLb::BalancerCallState::Orphan() { @@ -668,22 +670,22 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( bool GrpcLb::BalancerCallState::LoadReportCountersAreZero( grpc_grpclb_request* request) { - grpc_grpclb_dropped_call_counts* drop_entries = - static_cast<grpc_grpclb_dropped_call_counts*>( + GrpcLbClientStats::DroppedCallCounts* drop_entries = + static_cast<GrpcLbClientStats::DroppedCallCounts*>( request->client_stats.calls_finished_with_drop.arg); return request->client_stats.num_calls_started == 0 && request->client_stats.num_calls_finished == 0 && request->client_stats.num_calls_finished_with_client_failed_to_send == 0 && request->client_stats.num_calls_finished_known_received == 0 && - (drop_entries == nullptr || drop_entries->num_entries == 0); + (drop_entries == nullptr || drop_entries->size() == 0); } void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { // Construct message payload. GPR_ASSERT(send_message_payload_ == nullptr); grpc_grpclb_request* request = - grpc_grpclb_load_report_request_create_locked(client_stats_); + grpc_grpclb_load_report_request_create_locked(client_stats_.get()); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. if (LoadReportCountersAreZero(request)) { @@ -710,7 +712,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { this, grpc_combiner_scheduler(grpclb_policy()->combiner())); grpc_call_error call_error = grpc_call_start_batch_and_execute( lb_call_, &op, 1, &client_load_report_closure_); - if (call_error != GRPC_CALL_OK) { + if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(), call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -774,7 +776,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Received initial LB response message; " - "client load reporting interval = %" PRIdPTR " milliseconds", + "client load reporting interval = %" PRId64 " milliseconds", grpclb_policy, lb_calld->client_stats_report_interval_); } } else if (grpc_lb_glb_trace.enabled()) { @@ -809,7 +811,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( // serverlist returned from the current LB call. if (lb_calld->client_stats_report_interval_ > 0 && lb_calld->client_stats_ == nullptr) { - lb_calld->client_stats_ = grpc_grpclb_client_stats_create(); + lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); // TODO(roth): We currently track this ref manually. Once the // ClosureRef API is ready, we should pass the RefCountedPtr<> along // with the callback. @@ -932,7 +934,7 @@ grpc_lb_addresses* ExtractBalancerAddresses( size_t lb_addresses_idx = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (!addresses->addresses[i].is_balancer) continue; - if (addresses->addresses[i].user_data != nullptr) { + if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); } @@ -999,6 +1001,9 @@ grpc_channel_args* BuildBalancerChannelArgs( // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( response_generator), + // A channel arg indicating the target is a grpclb load balancer. + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1), }; // Construct channel args. grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( @@ -1243,7 +1248,7 @@ bool GrpcLb::PickLocked(PickState* pick) { } } else { // rr_policy_ == NULL if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "[grpclb %p] No RR policy. Adding to grpclb's pending picks", this); } @@ -1280,7 +1285,7 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { // Ignore this update. gpr_log( GPR_ERROR, @@ -1409,14 +1414,13 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { void GrpcLb::StartBalancerCallRetryTimerLocked() { grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", this); + gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this); grpc_millis timeout = next_try - ExecCtx::Get()->Now(); if (timeout > 0) { - gpr_log(GPR_DEBUG, - "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", this, - timeout); + gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.", + this, timeout); } else { - gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", + gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.", this); } } @@ -1509,7 +1513,7 @@ grpc_error* AddLbTokenToInitialMetadata( // Destroy function used when embedding client stats in call context. void DestroyClientStats(void* arg) { - grpc_grpclb_client_stats_unref(static_cast<grpc_grpclb_client_stats*>(arg)); + static_cast<GrpcLbClientStats*>(arg)->Unref(); } void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { @@ -1517,7 +1521,7 @@ void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { * policy (e.g., all addresses failed to connect). There won't be any * user_data/token available */ if (pp->pick->connected_subchannel != nullptr) { - if (!GRPC_MDISNULL(pp->lb_token)) { + if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), &pp->pick->lb_token_mdelem_storage, pp->pick->initial_metadata); @@ -1530,14 +1534,12 @@ void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { // Pass on client stats via context. Passes ownership of the reference. if (pp->client_stats != nullptr) { pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = - pp->client_stats; + pp->client_stats.release(); pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = DestroyClientStats; } } else { - if (pp->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(pp->client_stats); - } + pp->client_stats.reset(); } } @@ -1603,8 +1605,8 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) { // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { - grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, lb_calld_->client_stats()); + lb_calld_->client_stats()->AddCallDroppedLocked( + server->load_balance_token); } if (force_async) { GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); @@ -1617,7 +1619,7 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) { } // Set client_stats and user_data. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { - pp->client_stats = grpc_grpclb_client_stats_ref(lb_calld_->client_stats()); + pp->client_stats = lb_calld_->client_stats()->Ref(); } GPR_ASSERT(pp->pick->user_data == nullptr); pp->pick->user_data = (void**)&pp->lb_token; @@ -1642,7 +1644,7 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { GPR_ASSERT(rr_policy_ == nullptr); rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( "round_robin", args); - if (rr_policy_ == nullptr) { + if (GPR_UNLIKELY(rr_policy_ == nullptr)) { gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy", this); return; @@ -1695,9 +1697,11 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { grpc_lb_addresses* addresses; + bool is_backend_from_grpclb_load_balancer = false; if (serverlist_ != nullptr) { GPR_ASSERT(serverlist_->num_servers > 0); addresses = ProcessServerlist(serverlist_); + is_backend_from_grpclb_load_balancer = true; } else { // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't // received any serverlist from the balancer, we use the fallback backends @@ -1711,9 +1715,18 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; - const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); + const grpc_arg args_to_add[] = { + grpc_lb_addresses_create_channel_arg(addresses), + // A channel arg indicating if the target is a backend inferred from a + // grpclb load balancer. + grpc_channel_arg_integer_create( + const_cast<char*>( + GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), + is_backend_from_grpclb_load_balancer), + }; grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( - args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); + args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, + GPR_ARRAY_SIZE(args_to_add)); grpc_lb_addresses_destroy(addresses); return args; } @@ -1724,7 +1737,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { GPR_ASSERT(args != nullptr); if (rr_policy_ != nullptr) { if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", this, + gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, rr_policy_.get()); } rr_policy_->UpdateLocked(*args); @@ -1735,7 +1748,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { lb_policy_args.args = args; CreateRoundRobinPolicyLocked(lb_policy_args); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", this, + gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, rr_policy_.get()); } } @@ -1751,7 +1764,7 @@ void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg, } if (grpc_lb_glb_trace.enabled()) { gpr_log( - GPR_DEBUG, + GPR_INFO, "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", grpclb_policy, grpclb_policy->rr_policy_.get()); } |