diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc | 26 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 37 |
2 files changed, 37 insertions, 26 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index cc259bcdbf..399bb452f4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -37,16 +37,27 @@ static void destroy_channel_elem(grpc_channel_element* elem) {} namespace { struct call_data { + call_data(const grpc_call_element_args& args) { + if (args.context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { + // Get stats object from context and take a ref. + client_stats = static_cast<grpc_core::GrpcLbClientStats*>( + args.context[GRPC_GRPCLB_CLIENT_STATS].value) + ->Ref(); + // Record call started. + client_stats->AddCallStarted(); + } + } + // Stats object to update. grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats; // State for intercepting send_initial_metadata. grpc_closure on_complete_for_send; grpc_closure* original_on_complete_for_send; - bool send_initial_metadata_succeeded; + bool send_initial_metadata_succeeded = false; // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; grpc_closure* original_recv_initial_metadata_ready; - bool recv_initial_metadata_succeeded; + bool recv_initial_metadata_succeeded = false; }; } // namespace @@ -70,16 +81,8 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - // Get stats object from context and take a ref. GPR_ASSERT(args->context != nullptr); - if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { - calld->client_stats = static_cast<grpc_core::GrpcLbClientStats*>( - args->context[GRPC_GRPCLB_CLIENT_STATS].value) - ->Ref(); - // Record call started. - calld->client_stats->AddCallStarted(); - } + new (elem->call_data) call_data(*args); return GRPC_ERROR_NONE; } @@ -97,6 +100,7 @@ static void destroy_call_elem(grpc_call_element* elem, // TODO(roth): Eliminate this once filter stack is converted to C++. calld->client_stats.reset(); } + calld->~call_data(); } static void start_transport_stream_op_batch( diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 25b0149393..dbb90b438c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -136,8 +136,9 @@ class GrpcLb : public LoadBalancingPolicy { void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; - void FillChildRefsForChannelz(ChildRefsList* child_subchannels, - ChildRefsList* child_channels) override; + void FillChildRefsForChannelz( + channelz::ChildRefsList* child_subchannels, + channelz::ChildRefsList* child_channels) override; private: /// Linked list of pending pick requests. It stores all information needed to @@ -852,10 +853,12 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( } } else { // No valid initial response or serverlist found. + char* response_slice_str = + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); gpr_log(GPR_ERROR, "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", - grpclb_policy, - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + grpclb_policy, response_slice_str); + gpr_free(response_slice_str); } grpc_slice_unref_internal(response_slice); if (!grpclb_policy->shutting_down_) { @@ -1256,8 +1259,9 @@ bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) { return pick_done; } -void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, - ChildRefsList* child_channels) { +void GrpcLb::FillChildRefsForChannelz( + channelz::ChildRefsList* child_subchannels, + channelz::ChildRefsList* child_channels) { // delegate to the RoundRobin to fill the children subchannels. rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); MutexLock lock(&lb_channel_mu_); @@ -1265,7 +1269,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, grpc_core::channelz::ChannelNode* channel_node = grpc_channel_get_channelz_node(lb_channel_); if (channel_node != nullptr) { - child_channels->push_back(channel_node->channel_uuid()); + child_channels->push_back(channel_node->uuid()); } } } @@ -1329,11 +1333,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { void GrpcLb::UpdateLocked(const grpc_channel_args& args) { ProcessChannelArgsLocked(args); - // If fallback is configured and the RR policy already exists, update - // it with the new fallback addresses. - if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) { - CreateOrUpdateRoundRobinPolicyLocked(); - } + // Update the existing RR policy. + if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked(); // Start watching the LB channel connectivity for connection, if not // already doing so. if (!watching_lb_channel_) { @@ -1487,7 +1488,7 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, grpclb_policy->lb_call_backoff_.Reset(); grpclb_policy->StartBalancerCallLocked(); } - // Fall through. + // fallthrough case GRPC_CHANNEL_SHUTDOWN: done: grpclb_policy->watching_lb_channel_ = false; @@ -1695,7 +1696,7 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; - const grpc_arg args_to_add[] = { + grpc_arg args_to_add[3] = { grpc_lb_addresses_create_channel_arg(addresses), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. @@ -1704,9 +1705,15 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), is_backend_from_grpclb_load_balancer), }; + size_t num_args_to_add = 2; + if (is_backend_from_grpclb_load_balancer) { + args_to_add[2] = grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); + ++num_args_to_add; + } grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, - GPR_ARRAY_SIZE(args_to_add)); + num_args_to_add); grpc_lb_addresses_destroy(addresses); return args; } |