diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
6 files changed, 71 insertions, 56 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index a9a5965ed1..ba40febd53 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -122,10 +122,14 @@ TraceFlag grpc_lb_glb_trace(false, "glb"); namespace { +constexpr char kGrpclb[] = "grpclb"; + class GrpcLb : public LoadBalancingPolicy { public: explicit GrpcLb(const Args& args); + const char* name() const override { return kGrpclb; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -361,7 +365,9 @@ void lb_token_destroy(void* token) { } } int lb_token_cmp(void* token1, void* token2) { - return GPR_ICMP(token1, token2); + // Always indicate a match, since we don't want this channel arg to + // affect the subchannel's key in the index. + return 0; } const grpc_arg_pointer_vtable lb_token_arg_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; @@ -422,7 +428,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { grpc_resolved_address addr; ParseServer(server, &addr); // LB token processing. - void* lb_token; + grpc_mdelem lb_token; if (server->has_load_balance_token) { const size_t lb_token_max_length = GPR_ARRAY_SIZE(server->load_balance_token); @@ -430,9 +436,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { strnlen(server->load_balance_token, lb_token_max_length); grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( server->load_balance_token, lb_token_length); - lb_token = - (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) - .payload; + lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr); } else { char* uri = grpc_sockaddr_to_uri(&addr); gpr_log(GPR_INFO, @@ -440,14 +444,16 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { "be used instead", uri); gpr_free(uri); - lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY; } // Add address. grpc_arg arg = grpc_channel_arg_pointer_create( - const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, - &lb_token_arg_vtable); + const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), + (void*)lb_token.payload, &lb_token_arg_vtable); grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); addresses.emplace_back(addr, args); + // Clean up. + GRPC_MDELEM_UNREF(lb_token); } return addresses; } @@ -525,8 +531,7 @@ void GrpcLb::BalancerCallState::Orphan() { void GrpcLb::BalancerCallState::StartQuery() { GPR_ASSERT(lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)", + gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p", grpclb_policy_.get(), this, lb_call_); } // Create the ops. @@ -670,8 +675,9 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { grpc_call_error call_error = grpc_call_start_batch_and_execute( lb_call_, &op, 1, &client_load_report_closure_); if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(), - call_error); + gpr_log(GPR_ERROR, + "[grpclb %p] lb_calld=%p call_error=%d sending client load report", + grpclb_policy_.get(), this, call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } } @@ -732,15 +738,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( &initial_response->client_stats_report_interval)); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; " - "client load reporting interval = %" PRId64 " milliseconds", - grpclb_policy, lb_calld->client_stats_report_interval_); + "[grpclb %p] lb_calld=%p: Received initial LB response " + "message; client load reporting interval = %" PRId64 + " milliseconds", + grpclb_policy, lb_calld, + lb_calld->client_stats_report_interval_); } } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; client load " - "reporting NOT enabled", - grpclb_policy); + "[grpclb %p] lb_calld=%p: Received initial LB response message; " + "client load reporting NOT enabled", + grpclb_policy, lb_calld); } grpc_grpclb_initial_response_destroy(initial_response); lb_calld->seen_initial_response_ = true; @@ -750,15 +758,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( GPR_ASSERT(lb_calld->lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Serverlist with %" PRIuPTR " servers received", - grpclb_policy, serverlist->num_servers); + "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR + " servers received", + grpclb_policy, lb_calld, serverlist->num_servers); for (size_t i = 0; i < serverlist->num_servers; ++i) { grpc_resolved_address addr; ParseServer(serverlist->servers[i], &addr); char* ipport; grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", - grpclb_policy, i, ipport); + gpr_log(GPR_INFO, + "[grpclb %p] lb_calld=%p: Serverlist[%" PRIuPTR "]: %s", + grpclb_policy, lb_calld, i, ipport); gpr_free(ipport); } } @@ -778,9 +788,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - grpclb_policy); + "[grpclb %p] lb_calld=%p: Incoming server list identical to " + "current, ignoring.", + grpclb_policy, lb_calld); } grpc_grpclb_destroy_serverlist(serverlist); } else { // New serverlist. @@ -806,8 +816,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( char* response_slice_str = grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); gpr_log(GPR_ERROR, - "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", - grpclb_policy, response_slice_str); + "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. " + "Ignoring.", + grpclb_policy, lb_calld, response_slice_str); gpr_free(response_slice_str); } grpc_slice_unref_internal(response_slice); @@ -838,9 +849,9 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( char* status_details = grpc_slice_to_c_string(lb_calld->lb_call_status_details_); gpr_log(GPR_INFO, - "[grpclb %p] Status from LB server received. Status = %d, details " - "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", - grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld, + "[grpclb %p] lb_calld=%p: Status from LB server received. " + "Status = %d, details = '%s', (lb_call: %p), error '%s'", + grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details, lb_calld->lb_call_, grpc_error_string(error)); gpr_free(status_details); } @@ -1129,7 +1140,7 @@ void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pp != nullptr) { PendingPick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, @@ -1592,6 +1603,10 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { this); return; } + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, + rr_policy_.get()); + } // TODO(roth): We currently track this ref manually. Once the new // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); @@ -1685,10 +1700,6 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { lb_policy_args.client_channel_factory = client_channel_factory(); lb_policy_args.args = args; CreateRoundRobinPolicyLocked(lb_policy_args); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, - rr_policy_.get()); - } } grpc_channel_args_destroy(args); } @@ -1812,7 +1823,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args)); } - const char* name() const override { return "grpclb"; } + const char* name() const override { return kGrpclb; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc index 6e8fbdcab7..657ff69312 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -88,22 +88,18 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( // bearer token credentials. grpc_channel_credentials* channel_credentials = grpc_channel_credentials_find_in_args(args); - grpc_channel_credentials* creds_sans_call_creds = nullptr; + grpc_core::RefCountedPtr<grpc_channel_credentials> creds_sans_call_creds; if (channel_credentials != nullptr) { creds_sans_call_creds = - grpc_channel_credentials_duplicate_without_call_credentials( - channel_credentials); + channel_credentials->duplicate_without_call_credentials(); GPR_ASSERT(creds_sans_call_creds != nullptr); args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS; args_to_add[num_args_to_add++] = - grpc_channel_credentials_to_arg(creds_sans_call_creds); + grpc_channel_credentials_to_arg(creds_sans_call_creds.get()); } grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove( args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); // Clean up. grpc_channel_args_destroy(args); - if (creds_sans_call_creds != nullptr) { - grpc_channel_credentials_unref(creds_sans_call_creds); - } return result; } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 74c17612a2..d6ff74ec7f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -43,10 +43,14 @@ namespace { // pick_first LB policy // +constexpr char kPickFirst[] = "pick_first"; + class PickFirst : public LoadBalancingPolicy { public: explicit PickFirst(const Args& args); + const char* name() const override { return kPickFirst; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -234,7 +238,7 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pick != nullptr) { PickState* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -622,7 +626,7 @@ class PickFirstFactory : public LoadBalancingPolicyFactory { return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args)); } - const char* name() const override { return "pick_first"; } + const char* name() const override { return kPickFirst; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 63089afbd7..3bcb33ef11 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -53,10 +53,14 @@ namespace { // round_robin LB policy // +constexpr char kRoundRobin[] = "round_robin"; + class RoundRobin : public LoadBalancingPolicy { public: explicit RoundRobin(const Args& args); + const char* name() const override { return kRoundRobin; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -291,7 +295,7 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pick != nullptr) { PickState* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, @@ -700,7 +704,7 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory { return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args)); } - const char* name() const override { return "round_robin"; } + const char* name() const override { return kRoundRobin; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 3c25de2386..8787f5bcc2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -115,10 +115,14 @@ TraceFlag grpc_lb_xds_trace(false, "xds"); namespace { +constexpr char kXds[] = "xds_experimental"; + class XdsLb : public LoadBalancingPolicy { public: explicit XdsLb(const Args& args); + const char* name() const override { return kXds; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -1053,7 +1057,7 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pp != nullptr) { PendingPick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, @@ -1651,7 +1655,7 @@ class XdsFactory : public LoadBalancingPolicyFactory { return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args)); } - const char* name() const override { return "xds_experimental"; } + const char* name() const override { return kXds; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc index 9a11f8e39f..55c646e6ee 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc @@ -87,22 +87,18 @@ grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( // bearer token credentials. grpc_channel_credentials* channel_credentials = grpc_channel_credentials_find_in_args(args); - grpc_channel_credentials* creds_sans_call_creds = nullptr; + grpc_core::RefCountedPtr<grpc_channel_credentials> creds_sans_call_creds; if (channel_credentials != nullptr) { creds_sans_call_creds = - grpc_channel_credentials_duplicate_without_call_credentials( - channel_credentials); + channel_credentials->duplicate_without_call_credentials(); GPR_ASSERT(creds_sans_call_creds != nullptr); args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS; args_to_add[num_args_to_add++] = - grpc_channel_credentials_to_arg(creds_sans_call_creds); + grpc_channel_credentials_to_arg(creds_sans_call_creds.get()); } grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove( args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); // Clean up. grpc_channel_args_destroy(args); - if (creds_sans_call_creds != nullptr) { - grpc_channel_credentials_unref(creds_sans_call_creds); - } return result; } |