aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc83
1 files changed, 47 insertions, 36 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a9a5965ed1..ba40febd53 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -122,10 +122,14 @@ TraceFlag grpc_lb_glb_trace(false, "glb");
namespace {
+constexpr char kGrpclb[] = "grpclb";
+
class GrpcLb : public LoadBalancingPolicy {
public:
explicit GrpcLb(const Args& args);
+ const char* name() const override { return kGrpclb; }
+
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
@@ -361,7 +365,9 @@ void lb_token_destroy(void* token) {
}
}
int lb_token_cmp(void* token1, void* token2) {
- return GPR_ICMP(token1, token2);
+ // Always indicate a match, since we don't want this channel arg to
+ // affect the subchannel's key in the index.
+ return 0;
}
const grpc_arg_pointer_vtable lb_token_arg_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
@@ -422,7 +428,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
- void* lb_token;
+ grpc_mdelem lb_token;
if (server->has_load_balance_token) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
@@ -430,9 +436,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- lb_token =
- (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
- .payload;
+ lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
} else {
char* uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -440,14 +444,16 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
"be used instead",
uri);
gpr_free(uri);
- lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
}
// Add address.
grpc_arg arg = grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
- &lb_token_arg_vtable);
+ const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
+ (void*)lb_token.payload, &lb_token_arg_vtable);
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
addresses.emplace_back(addr, args);
+ // Clean up.
+ GRPC_MDELEM_UNREF(lb_token);
}
return addresses;
}
@@ -525,8 +531,7 @@ void GrpcLb::BalancerCallState::Orphan() {
void GrpcLb::BalancerCallState::StartQuery() {
GPR_ASSERT(lb_call_ != nullptr);
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
+ gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
grpclb_policy_.get(), this, lb_call_);
}
// Create the ops.
@@ -670,8 +675,9 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &client_load_report_closure_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
- call_error);
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
+ grpclb_policy_.get(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
@@ -732,15 +738,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
&initial_response->client_stats_report_interval));
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; "
- "client load reporting interval = %" PRId64 " milliseconds",
- grpclb_policy, lb_calld->client_stats_report_interval_);
+ "[grpclb %p] lb_calld=%p: Received initial LB response "
+ "message; client load reporting interval = %" PRId64
+ " milliseconds",
+ grpclb_policy, lb_calld,
+ lb_calld->client_stats_report_interval_);
}
} else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; client load "
- "reporting NOT enabled",
- grpclb_policy);
+ "[grpclb %p] lb_calld=%p: Received initial LB response message; "
+ "client load reporting NOT enabled",
+ grpclb_policy, lb_calld);
}
grpc_grpclb_initial_response_destroy(initial_response);
lb_calld->seen_initial_response_ = true;
@@ -750,15 +758,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
GPR_ASSERT(lb_calld->lb_call_ != nullptr);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
- grpclb_policy, serverlist->num_servers);
+ "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
+ " servers received",
+ grpclb_policy, lb_calld, serverlist->num_servers);
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_resolved_address addr;
ParseServer(serverlist->servers[i], &addr);
char* ipport;
grpc_sockaddr_to_string(&ipport, &addr, false);
- gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
- grpclb_policy, i, ipport);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] lb_calld=%p: Serverlist[%" PRIuPTR "]: %s",
+ grpclb_policy, lb_calld, i, ipport);
gpr_free(ipport);
}
}
@@ -778,9 +788,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Incoming server list identical to current, "
- "ignoring.",
- grpclb_policy);
+ "[grpclb %p] lb_calld=%p: Incoming server list identical to "
+ "current, ignoring.",
+ grpclb_policy, lb_calld);
}
grpc_grpclb_destroy_serverlist(serverlist);
} else { // New serverlist.
@@ -806,8 +816,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
char* response_slice_str =
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
gpr_log(GPR_ERROR,
- "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
- grpclb_policy, response_slice_str);
+ "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
+ "Ignoring.",
+ grpclb_policy, lb_calld, response_slice_str);
gpr_free(response_slice_str);
}
grpc_slice_unref_internal(response_slice);
@@ -838,9 +849,9 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
char* status_details =
grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
gpr_log(GPR_INFO,
- "[grpclb %p] Status from LB server received. Status = %d, details "
- "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
- grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
+ "[grpclb %p] lb_calld=%p: Status from LB server received. "
+ "Status = %d, details = '%s', (lb_call: %p), error '%s'",
+ grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
lb_calld->lb_call_, grpc_error_string(error));
gpr_free(status_details);
}
@@ -1129,7 +1140,7 @@ void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
pending_picks_ = nullptr;
while (pp != nullptr) {
PendingPick* next = pp->next;
- if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
// Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete,
@@ -1592,6 +1603,10 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
this);
return;
}
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
+ rr_policy_.get());
+ }
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
@@ -1685,10 +1700,6 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
lb_policy_args.client_channel_factory = client_channel_factory();
lb_policy_args.args = args;
CreateRoundRobinPolicyLocked(lb_policy_args);
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
- rr_policy_.get());
- }
}
grpc_channel_args_destroy(args);
}
@@ -1812,7 +1823,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args));
}
- const char* name() const override { return "grpclb"; }
+ const char* name() const override { return kGrpclb; }
};
} // namespace