diff options
Diffstat (limited to 'src/core')
49 files changed, 1015 insertions, 544 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 024c9d737e..fead8feb17 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -571,15 +571,32 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (chand->lb_policy == nullptr) { - GRPC_CLOSURE_SCHED( - op->send_ping.on_initiate, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); - GRPC_CLOSURE_SCHED( - op->send_ping.on_ack, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); + grpc_error* error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); + GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); } else { - chand->lb_policy->PingOneLocked(op->send_ping.on_initiate, - op->send_ping.on_ack); + grpc_error* error = GRPC_ERROR_NONE; + grpc_core::LoadBalancingPolicy::PickState pick_state; + pick_state.initial_metadata = nullptr; + pick_state.initial_metadata_flags = 0; + pick_state.on_complete = nullptr; + memset(&pick_state.subchannel_call_context, 0, + sizeof(pick_state.subchannel_call_context)); + pick_state.user_data = nullptr; + // Pick must return synchronously, because pick_state.on_complete is null. + GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); + if (pick_state.connected_subchannel != nullptr) { + pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, + op->send_ping.on_ack); + } else { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "LB policy dropped call on ping"); + } + GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); + } op->bind_pollset = nullptr; } op->send_ping.on_initiate = nullptr; @@ -2684,14 +2701,15 @@ class LbPicker { grpc_combiner_scheduler(chand->combiner)); calld->pick.on_complete = &calld->pick_closure; GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); - const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); + grpc_error* error = GRPC_ERROR_NONE; + const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error); if (GPR_LIKELY(pick_done)) { // Pick completed synchronously. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", chand, calld); } - pick_done_locked(elem, GRPC_ERROR_NONE); + pick_done_locked(elem, error); GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } else { // Pick will be returned asynchronously. diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 235b8f3207..86c765df52 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -41,8 +41,9 @@ static const grpc_arg_pointer_vtable client_channel_channelz_vtable = { client_channel_channelz_cmp}; ClientChannelNode::ClientChannelNode(grpc_channel* channel, - size_t channel_tracer_max_nodes) - : ChannelNode(channel, channel_tracer_max_nodes) { + size_t channel_tracer_max_nodes, + bool is_top_level_channel) + : ChannelNode(channel, channel_tracer_max_nodes, is_top_level_channel) { client_channel_ = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); GPR_ASSERT(client_channel_->filter == &grpc_client_channel_filter); @@ -69,7 +70,7 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) { grpc_json* json_iterator = nullptr; grpc_client_channel_populate_child_refs(client_channel_, &child_subchannels, &child_channels); - if (child_subchannels.size() > 0) { + if (!child_subchannels.empty()) { grpc_json* array_parent = grpc_json_create_child( nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false); for (size_t i = 0; i < child_subchannels.size(); ++i) { @@ -80,16 +81,16 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) { child_subchannels[i]); } } - if (child_channels.size() > 0) { + if (!child_channels.empty()) { grpc_json* array_parent = grpc_json_create_child( nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false); json_iterator = nullptr; - for (size_t i = 0; i < child_subchannels.size(); ++i) { + for (size_t i = 0; i < child_channels.size(); ++i) { json_iterator = grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false); grpc_json_add_number_string_child(json_iterator, nullptr, "channelId", - child_subchannels[i]); + child_channels[i]); } } } @@ -102,9 +103,10 @@ grpc_arg ClientChannelNode::CreateChannelArg() { } RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode( - grpc_channel* channel, size_t channel_tracer_max_nodes) { - return MakePolymorphicRefCounted<ChannelNode, ClientChannelNode>( - channel, channel_tracer_max_nodes); + grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel) { + return MakeRefCounted<ClientChannelNode>(channel, channel_tracer_max_nodes, + is_top_level_channel); } } // namespace channelz diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index 0547109d36..6f27b5c8b7 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -40,7 +40,8 @@ namespace channelz { class ClientChannelNode : public ChannelNode { public: static RefCountedPtr<ChannelNode> MakeClientChannelNode( - grpc_channel* channel, size_t channel_tracer_max_nodes); + grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); // Override this functionality since client_channels have a notion of // channel connectivity. @@ -56,7 +57,8 @@ class ClientChannelNode : public ChannelNode { protected: GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes); + ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); virtual ~ClientChannelNode() {} private: diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 3150df8847..31c08246ae 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -71,6 +71,7 @@ class LoadBalancingPolicy /// Storage for LB token in \a initial_metadata, or nullptr if not used. grpc_linked_mdelem lb_token_mdelem_storage; /// Closure to run when pick is complete, if not completed synchronously. + /// If null, pick will fail if a result is not available synchronously. grpc_closure* on_complete; /// Will be set to the selected subchannel, or nullptr on failure or when /// the LB policy decides to drop the call. @@ -99,10 +100,15 @@ class LoadBalancingPolicy /// Finds an appropriate subchannel for a call, based on data in \a pick. /// \a pick must remain alive until the pick is complete. /// - /// If the pick succeeds and a result is known immediately, returns true. - /// Otherwise, \a pick->on_complete will be invoked once the pick is - /// complete with its error argument set to indicate success or failure. - virtual bool PickLocked(PickState* pick) GRPC_ABSTRACT; + /// If a result is known immediately, returns true, setting \a *error + /// upon failure. Otherwise, \a pick->on_complete will be invoked once + /// the pick is complete with its error argument set to indicate success + /// or failure. + /// + /// If \a pick->on_complete is null and no result is known immediately, + /// a synchronous failure will be returned (i.e., \a *error will be + /// set and true will be returned). + virtual bool PickLocked(PickState* pick, grpc_error** error) GRPC_ABSTRACT; /// Cancels \a pick. /// The \a on_complete callback of the pending pick will be invoked with @@ -133,12 +139,6 @@ class LoadBalancingPolicy virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) GRPC_ABSTRACT; - /// Performs a connected subchannel ping via \a ConnectedSubchannel::Ping() - /// against one of the connected subchannels managed by the policy. - /// Note: This is intended only for use in tests. - virtual void PingOneLocked(grpc_closure* on_initiate, - grpc_closure* on_ack) GRPC_ABSTRACT; - /// Tries to enter a READY connectivity state. /// TODO(roth): As part of restructuring how we handle IDLE state, /// consider whether this method is still needed. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index f757d6057c..2d1f777474 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -123,7 +123,7 @@ class GrpcLb : public LoadBalancingPolicy { GrpcLb(const grpc_lb_addresses* addresses, const Args& args); void UpdateLocked(const grpc_channel_args& args) override; - bool PickLocked(PickState* pick) override; + bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, @@ -133,11 +133,9 @@ class GrpcLb : public LoadBalancingPolicy { grpc_connectivity_state CheckConnectivityLocked( grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; - void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; - // TODO(ncteisen): implement this in a follow up PR void FillChildRefsForChannelz(ChildRefsList* child_subchannels, - ChildRefsList* child_channels) override {} + ChildRefsList* child_channels) override; private: /// Linked list of pending pick requests. It stores all information needed to @@ -168,13 +166,6 @@ class GrpcLb : public LoadBalancingPolicy { PendingPick* next = nullptr; }; - /// A linked list of pending pings waiting for the RR policy to be created. - struct PendingPing { - grpc_closure* on_initiate; - grpc_closure* on_ack; - PendingPing* next = nullptr; - }; - /// Contains a call to the LB server and all the data related to the call. class BalancerCallState : public InternallyRefCountedWithTracing<BalancerCallState> { @@ -273,14 +264,12 @@ class GrpcLb : public LoadBalancingPolicy { void AddPendingPick(PendingPick* pp); static void OnPendingPickComplete(void* arg, grpc_error* error); - // Pending ping methods. - void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack); - // Methods for dealing with the RR policy. void CreateOrUpdateRoundRobinPolicyLocked(); grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); void CreateRoundRobinPolicyLocked(const Args& args); - bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp); + bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, + grpc_error** error); void UpdateConnectivityStateFromRoundRobinPolicyLocked( grpc_error* rr_state_error); static void OnRoundRobinConnectivityChangedLocked(void* arg, @@ -301,6 +290,9 @@ class GrpcLb : public LoadBalancingPolicy { // The channel for communicating with the LB server. grpc_channel* lb_channel_ = nullptr; + // Mutex to protect the channel to the LB server. This is used when + // processing a channelz request. + gpr_mu lb_channel_mu_; grpc_connectivity_state lb_channel_connectivity_; grpc_closure lb_channel_on_connectivity_changed_; // Are we already watching the LB channel's connectivity? @@ -340,9 +332,8 @@ class GrpcLb : public LoadBalancingPolicy { grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; - // Pending picks and pings that are waiting on the RR policy's connectivity. + // Pending picks that are waiting on the RR policy's connectivity. PendingPick* pending_picks_ = nullptr; - PendingPing* pending_pings_ = nullptr; // The RR policy to use for the backends. OrphanablePtr<LoadBalancingPolicy> rr_policy_; @@ -1007,6 +998,10 @@ grpc_channel_args* BuildBalancerChannelArgs( // A channel arg indicating the target is a grpclb load balancer. grpc_channel_arg_integer_create( const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1), + // A channel arg indicating this is an internal channels, aka it is + // owned by components in Core, not by the user application. + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1), }; // Construct channel args. grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( @@ -1036,6 +1031,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { // Initialization. + gpr_mu_init(&lb_channel_mu_); grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this, @@ -1073,7 +1069,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, GrpcLb::~GrpcLb() { GPR_ASSERT(pending_picks_ == nullptr); - GPR_ASSERT(pending_pings_ == nullptr); + gpr_mu_destroy(&lb_channel_mu_); gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); grpc_connectivity_state_destroy(&state_tracker_); @@ -1103,8 +1099,10 @@ void GrpcLb::ShutdownLocked() { // OnBalancerChannelConnectivityChangedLocked(), and we need to be // alive when that callback is invoked. if (lb_channel_ != nullptr) { + gpr_mu_lock(&lb_channel_mu_); grpc_channel_destroy(lb_channel_); lb_channel_ = nullptr; + gpr_mu_unlock(&lb_channel_mu_); } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "grpclb_shutdown"); @@ -1116,14 +1114,6 @@ void GrpcLb::ShutdownLocked() { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); } - // Clear pending pings. - PendingPing* pping; - while ((pping = pending_pings_) != nullptr) { - pending_pings_ = pping->next; - GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error)); - GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error)); - Delete(pping); - } GRPC_ERROR_UNREF(error); } @@ -1137,9 +1127,10 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; pp->pick->user_data = nullptr; - if (new_policy->PickLocked(pp->pick)) { + grpc_error* error = GRPC_ERROR_NONE; + if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. - GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->pick->on_complete, error); } Delete(pp); } @@ -1223,54 +1214,47 @@ void GrpcLb::ExitIdleLocked() { } } -bool GrpcLb::PickLocked(PickState* pick) { +bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) { PendingPick* pp = PendingPickCreate(pick); bool pick_done = false; if (rr_policy_ != nullptr) { - const grpc_connectivity_state rr_connectivity_state = - rr_policy_->CheckConnectivityLocked(nullptr); - // The RR policy may have transitioned to SHUTDOWN but the callback - // registered to capture this event (on_rr_connectivity_changed_) may not - // have been invoked yet. We need to make sure we aren't trying to pick - // from an RR policy instance that's in shutdown. - if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this, + rr_policy_.get()); + } + pick_done = + PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error); + } else { // rr_policy_ == NULL + if (pick->on_complete == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No pick result available but synchronous result required."); + pick_done = true; + } else { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] NOT picking from from RR %p: RR conn state=%s", - this, rr_policy_.get(), - grpc_connectivity_state_name(rr_connectivity_state)); + "[grpclb %p] No RR policy. Adding to grpclb's pending picks", + this); } AddPendingPick(pp); - pick_done = false; - } else { // RR not in shutdown - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this, - rr_policy_.get()); + if (!started_picking_) { + StartPickingLocked(); } - pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp); - } - } else { // rr_policy_ == NULL - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] No RR policy. Adding to grpclb's pending picks", - this); - } - AddPendingPick(pp); - if (!started_picking_) { - StartPickingLocked(); + pick_done = false; } - pick_done = false; } return pick_done; } -void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - if (rr_policy_ != nullptr) { - rr_policy_->PingOneLocked(on_initiate, on_ack); - } else { - AddPendingPing(on_initiate, on_ack); - if (!started_picking_) { - StartPickingLocked(); +void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* child_channels) { + // delegate to the RoundRobin to fill the children subchannels. + rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); + mu_guard guard(&lb_channel_mu_); + if (lb_channel_ != nullptr) { + grpc_core::channelz::ChannelNode* channel_node = + grpc_channel_get_channelz_node(lb_channel_); + if (channel_node != nullptr) { + child_channels->push_back(channel_node->channel_uuid()); } } } @@ -1318,9 +1302,11 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { if (lb_channel_ == nullptr) { char* uri_str; gpr_asprintf(&uri_str, "fake:///%s", server_name_); + gpr_mu_lock(&lb_channel_mu_); lb_channel_ = grpc_client_channel_factory_create_channel( client_channel_factory(), uri_str, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args); + gpr_mu_unlock(&lb_channel_mu_); GPR_ASSERT(lb_channel_ != nullptr); gpr_free(uri_str); } @@ -1573,18 +1559,6 @@ void GrpcLb::AddPendingPick(PendingPick* pp) { } // -// PendingPing -// - -void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) { - PendingPing* pping = New<PendingPing>(); - pping->on_initiate = on_initiate; - pping->on_ack = on_ack; - pping->next = pending_pings_; - pending_pings_ = pping; -} - -// // code for interacting with the RR policy // @@ -1593,7 +1567,8 @@ void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) { // cleanups this callback would otherwise be responsible for. // If \a force_async is true, then we will manually schedule the // completion callback even if the pick is available immediately. -bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) { +bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, + grpc_error** error) { // Check for drops if we are not using fallback backend addresses. if (serverlist_ != nullptr) { // Look at the index into the serverlist to see if we should drop this call. @@ -1627,11 +1602,12 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) { GPR_ASSERT(pp->pick->user_data == nullptr); pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. - bool pick_done = rr_policy_->PickLocked(pp->pick); + bool pick_done = rr_policy_->PickLocked(pp->pick, error); if (pick_done) { PendingPickSetMetadataAndContext(pp); if (force_async) { - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); + *error = GRPC_ERROR_NONE; pick_done = false; } Delete(pp); @@ -1683,18 +1659,8 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { "[grpclb %p] Pending pick about to (async) PICK from RR %p", this, rr_policy_.get()); } - PickFromRoundRobinPolicyLocked(true /* force_async */, pp); - } - // Send pending pings to RR policy. - PendingPing* pping; - while ((pping = pending_pings_)) { - pending_pings_ = pping->next; - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", - this, rr_policy_.get()); - } - rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack); - Delete(pping); + grpc_error* error = GRPC_ERROR_NONE; + PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index c50deb9679..46acbf628b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -46,7 +46,7 @@ class PickFirst : public LoadBalancingPolicy { explicit PickFirst(const Args& args); void UpdateLocked(const grpc_channel_args& args) override; - bool PickLocked(PickState* pick) override; + bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, @@ -56,10 +56,9 @@ class PickFirst : public LoadBalancingPolicy { grpc_connectivity_state CheckConnectivityLocked( grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; - void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; void FillChildRefsForChannelz(ChildRefsList* child_subchannels, - ChildRefsList* child_channels) override; + ChildRefsList* ignored) override; private: ~PickFirst(); @@ -173,15 +172,16 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { pending_picks_ = pick->next; - if (new_policy->PickLocked(pick)) { + grpc_error* error = GRPC_ERROR_NONE; + if (new_policy->PickLocked(pick, &error)) { // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pick->on_complete, error); } } } void PickFirst::ShutdownLocked() { - AutoChildRefsUpdater(this); + AutoChildRefsUpdater guard(this); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p Shutting down", this); @@ -259,13 +259,18 @@ void PickFirst::ExitIdleLocked() { } } -bool PickFirst::PickLocked(PickState* pick) { +bool PickFirst::PickLocked(PickState* pick, grpc_error** error) { // If we have a selected subchannel already, return synchronously. if (selected_ != nullptr) { pick->connected_subchannel = selected_->connected_subchannel()->Ref(); return true; } // No subchannel selected yet, so handle asynchronously. + if (pick->on_complete == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No pick result available but synchronous result required."); + return true; + } if (!started_picking_) { StartPickingLocked(); } @@ -293,17 +298,6 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } -void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - if (selected_ != nullptr) { - selected_->connected_subchannel()->Ping(on_initiate, on_ack); - } else { - GRPC_CLOSURE_SCHED(on_initiate, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - GRPC_CLOSURE_SCHED(on_ack, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - } -} - void PickFirst::FillChildRefsForChannelz( ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { mu_guard guard(&child_refs_mu_); @@ -327,30 +321,10 @@ void PickFirst::FillChildRefsForChannelz( void PickFirst::UpdateChildRefsLocked() { ChildRefsList cs; if (subchannel_list_ != nullptr) { - for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { - if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { - grpc_core::channelz::SubchannelNode* subchannel_node = - grpc_subchannel_get_channelz_node( - subchannel_list_->subchannel(i)->subchannel()); - if (subchannel_node != nullptr) { - cs.push_back(subchannel_node->subchannel_uuid()); - } - } - } + subchannel_list_->PopulateChildRefsList(&cs); } if (latest_pending_subchannel_list_ != nullptr) { - for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels(); - ++i) { - if (latest_pending_subchannel_list_->subchannel(i)->subchannel() != - nullptr) { - grpc_core::channelz::SubchannelNode* subchannel_node = - grpc_subchannel_get_channelz_node( - latest_pending_subchannel_list_->subchannel(i)->subchannel()); - if (subchannel_node != nullptr) { - cs.push_back(subchannel_node->subchannel_uuid()); - } - } - } + latest_pending_subchannel_list_->PopulateChildRefsList(&cs); } // atomically update the data that channelz will actually be looking at. mu_guard guard(&child_refs_mu_); @@ -471,6 +445,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // latest pending subchannel lists. GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || subchannel_list() == p->latest_pending_subchannel_list_.get()); + GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); // Handle updates for the currently selected subchannel. if (p->selected_ == this) { if (grpc_lb_pick_first_trace.enabled()) { @@ -500,14 +475,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( "update"), "selected_not_ready+switch_to_update"); } else { - // TODO(juanlishen): we re-resolve when the selected subchannel goes to - // TRANSIENT_FAILURE because we used to shut down in this case before - // re-resolution is introduced. But we need to investigate whether we - // really want to take any action instead of waiting for the selected - // subchannel reconnecting. - GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // If the selected channel goes bad, request a re-resolution. + // If the selected subchannel goes bad, request a re-resolution. We also + // set the channel state to IDLE and reset started_picking_. The reason + // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY + // reception we don't want to connect to the re-resolved backends until + // we leave the IDLE state. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "selected_changed+reresolve"); @@ -588,9 +561,10 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) { + p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); + GRPC_ERROR_REF(error), "exhausted_subchannels"); } sd->StartConnectivityWatchLocked(); break; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 09634a2ad4..9c3a15c67b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -57,7 +57,7 @@ class RoundRobin : public LoadBalancingPolicy { explicit RoundRobin(const Args& args); void UpdateLocked(const grpc_channel_args& args) override; - bool PickLocked(PickState* pick) override; + bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, @@ -67,11 +67,9 @@ class RoundRobin : public LoadBalancingPolicy { grpc_connectivity_state CheckConnectivityLocked( grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; - void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; - // TODO(ncteisen): implement this in a follow up PR void FillChildRefsForChannelz(ChildRefsList* child_subchannels, - ChildRefsList* child_channels) override {} + ChildRefsList* ignored) override; private: ~RoundRobin(); @@ -183,11 +181,24 @@ class RoundRobin : public LoadBalancingPolicy { size_t last_ready_index_ = -1; // Index into list of last pick. }; + // Helper class to ensure that any function that modifies the child refs + // data structures will update the channelz snapshot data structures before + // returning. + class AutoChildRefsUpdater { + public: + explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {} + ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); } + + private: + RoundRobin* rr_; + }; + void ShutdownLocked() override; void StartPickingLocked(); bool DoPickLocked(PickState* pick); void DrainPendingPicksLocked(); + void UpdateChildRefsLocked(); /** list of subchannels */ OrphanablePtr<RoundRobinSubchannelList> subchannel_list_; @@ -205,10 +216,16 @@ class RoundRobin : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; + /// Lock and data used to capture snapshots of this channel's child + /// channels and subchannels. This data is consumed by channelz. + gpr_mu child_refs_mu_; + ChildRefsList child_subchannels_; + ChildRefsList child_channels_; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { GPR_ASSERT(args.client_channel_factory != nullptr); + gpr_mu_init(&child_refs_mu_); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "round_robin"); UpdateLocked(*args.args); @@ -223,6 +240,7 @@ RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } + gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr); @@ -234,14 +252,16 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { pending_picks_ = pick->next; - if (new_policy->PickLocked(pick)) { + grpc_error* error = GRPC_ERROR_NONE; + if (new_policy->PickLocked(pick, &error)) { // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pick->on_complete, error); } } } void RoundRobin::ShutdownLocked() { + AutoChildRefsUpdater guard(this); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Shutting down", this); @@ -348,7 +368,7 @@ void RoundRobin::DrainPendingPicksLocked() { } } -bool RoundRobin::PickLocked(PickState* pick) { +bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_); } @@ -356,6 +376,11 @@ bool RoundRobin::PickLocked(PickState* pick) { if (subchannel_list_ != nullptr) { if (DoPickLocked(pick)) return true; } + if (pick->on_complete == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No pick result available but synchronous result required."); + return true; + } /* no pick currently available. Save for later in list of pending picks */ pick->next = pending_picks_; pending_picks_ = pick; @@ -365,6 +390,39 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } +void RoundRobin::FillChildRefsForChannelz( + ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { + mu_guard guard(&child_refs_mu_); + for (size_t i = 0; i < child_subchannels_.size(); ++i) { + // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might + // have to implement lightweight set. For now, we don't care about + // performance when channelz requests are made. + bool found = false; + for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) { + if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) { + found = true; + break; + } + } + if (!found) { + child_subchannels_to_fill->push_back(child_subchannels_[i]); + } + } +} + +void RoundRobin::UpdateChildRefsLocked() { + ChildRefsList cs; + if (subchannel_list_ != nullptr) { + subchannel_list_->PopulateChildRefsList(&cs); + } + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->PopulateChildRefsList(&cs); + } + // atomically update the data that channelz will actually be looking at. + mu_guard guard(&child_refs_mu_); + child_subchannels_ = std::move(cs); +} + void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { if (num_subchannels() == 0) return; // Check current state of each subchannel synchronously, since any @@ -455,6 +513,7 @@ void RoundRobin::RoundRobinSubchannelList:: void RoundRobin::RoundRobinSubchannelList:: UpdateRoundRobinStateFromSubchannelStateCountsLocked() { RoundRobin* p = static_cast<RoundRobin*>(policy()); + AutoChildRefsUpdater guard(p); if (num_ready_ > 0) { if (p->subchannel_list_.get() != this) { // Promote this list to p->subchannel_list_. @@ -593,24 +652,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } -void RoundRobin::PingOneLocked(grpc_closure* on_initiate, - grpc_closure* on_ack) { - const size_t next_ready_index = - subchannel_list_->GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels()) { - RoundRobinSubchannelData* selected = - subchannel_list_->subchannel(next_ready_index); - selected->connected_subchannel()->Ping(on_initiate, on_ack); - } else { - GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Round Robin not connected")); - GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Round Robin not connected")); - } -} - void RoundRobin::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); + AutoChildRefsUpdater guard(this); if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 7e2046bcdc..018ac3bb86 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -189,6 +189,19 @@ class SubchannelList // Returns true if the subchannel list is shutting down. bool shutting_down() const { return shutting_down_; } + // Populates refs_list with the uuids of this SubchannelLists's subchannels. + void PopulateChildRefsList(ChildRefsList* refs_list) { + for (size_t i = 0; i < subchannels_.size(); ++i) { + if (subchannels_[i].subchannel() != nullptr) { + grpc_core::channelz::SubchannelNode* subchannel_node = + grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); + if (subchannel_node != nullptr) { + refs_list->push_back(subchannel_node->subchannel_uuid()); + } + } + } + } + // Accessors. LoadBalancingPolicy* policy() const { return policy_; } TraceFlag* tracer() const { return tracer_; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index f4f6444c5f..7050e82121 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -23,7 +23,6 @@ #include <limits.h> #include <stdio.h> #include <string.h> -#include <unistd.h> #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> @@ -142,8 +141,8 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args) channel_args_ = grpc_channel_args_copy(args.args); const grpc_arg* arg = grpc_channel_args_find( channel_args_, GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION); - request_service_config_ = !grpc_channel_arg_get_integer( - arg, (grpc_integer_options){false, false, true}); + grpc_integer_options integer_options = {false, false, true}; + request_service_config_ = !grpc_channel_arg_get_integer(arg, integer_options); arg = grpc_channel_args_find(channel_args_, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS); min_time_between_resolutions_ = diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index c886795608..0068d0d5f4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -18,11 +18,10 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) +#if GRPC_ARES == 1 && !defined(GRPC_UV) #include <ares.h> #include <string.h> -#include <sys/ioctl.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -32,7 +31,6 @@ #include <grpc/support/time.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -314,4 +312,4 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { } } -#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ +#endif /* GRPC_ARES == 1 && !defined(GRPC_UV) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc new file mode 100644 index 0000000000..5d65ae3ab3 --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -0,0 +1,59 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" +#if GRPC_ARES == 1 && defined(GPR_WINDOWS) + +#include <ares.h> +#include <string.h> +#include "src/core/lib/gprpp/memory.h" + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" + +namespace grpc_core { + +/* TODO: fill in the body of GrpcPolledFdWindows to enable c-ares on Windows. + This dummy implementation only allows grpc to compile on windows with + GRPC_ARES=1. */ +class GrpcPolledFdWindows : public GrpcPolledFd { + public: + GrpcPolledFdWindows() { abort(); } + ~GrpcPolledFdWindows() { abort(); } + void RegisterForOnReadableLocked(grpc_closure* read_closure) override { + abort(); + } + void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { + abort(); + } + bool IsFdStillReadableLocked() override { abort(); } + void ShutdownLocked(grpc_error* error) override { abort(); } + ares_socket_t GetWrappedAresSocketLocked() override { abort(); } + const char* GetName() override { abort(); } +}; + +GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, + grpc_pollset_set* driver_pollset_set) { + return nullptr; +} + +void ConfigureAresChannelLocked(ares_channel* channel) { abort(); } + +} // namespace grpc_core + +#endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 497ad998af..b3d6437e9a 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -22,7 +22,6 @@ #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/iomgr/sockaddr.h" -#include "src/core/lib/iomgr/socket_utils_posix.h" #include <string.h> #include <sys/types.h> @@ -215,7 +214,7 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts, memset(&addr, 0, addr_len); memcpy(&addr.sin6_addr, hostent->h_addr_list[i - prev_naddr], sizeof(struct in6_addr)); - addr.sin6_family = static_cast<sa_family_t>(hostent->h_addrtype); + addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype); addr.sin6_port = hr->port; grpc_lb_addresses_set_address( *lb_addresses, i, &addr, addr_len, @@ -236,7 +235,7 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts, memset(&addr, 0, addr_len); memcpy(&addr.sin_addr, hostent->h_addr_list[i - prev_naddr], sizeof(struct in_addr)); - addr.sin_family = static_cast<sa_family_t>(hostent->h_addrtype); + addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype); addr.sin_port = hr->port; grpc_lb_addresses_set_address( *lb_addresses, i, &addr, addr_len, @@ -281,7 +280,7 @@ static void on_srv_query_done_locked(void* arg, int status, int timeouts, grpc_ares_ev_driver_get_channel_locked(r->ev_driver); for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { - if (grpc_ipv6_loopback_available()) { + if (grpc_ares_query_ipv6()) { grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( r, srv_it->host, htons(srv_it->port), true /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, @@ -452,7 +451,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( } } r->pending_queries = 1; - if (grpc_ipv6_loopback_available()) { + if (grpc_ares_query_ipv6()) { hr = create_hostbyname_request_locked(r, host, strhtons(port), false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index ce26f5d524..17eaa7ccf0 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -70,6 +70,10 @@ void grpc_ares_cleanup(void); * and destroys the grpc_ares_request */ void grpc_ares_complete_request_locked(grpc_ares_request* request); +/* Indicates whether or not AAAA queries should be attempted. */ +/* E.g., return false if ipv6 is known to not be available. */ +bool grpc_ares_query_ipv6(); + /* Exposed only for testing */ void grpc_cares_wrapper_test_only_address_sorting_sort( grpc_lb_addresses* lb_addrs); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc new file mode 100644 index 0000000000..23c0fec74f --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc @@ -0,0 +1,29 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" + +bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } + +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc new file mode 100644 index 0000000000..ee827e284e --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc @@ -0,0 +1,29 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" +#if GRPC_ARES == 1 && defined(GPR_WINDOWS) + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/iomgr/socket_windows.h" + +bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } + +#endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9d608c3c55..71ef8c518b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -390,8 +390,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, } grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( - grpc_subchannel* s) { - return s->channelz_subchannel.get(); + grpc_subchannel* subchannel) { + return subchannel->channelz_subchannel.get(); } static void continue_connect_locked(grpc_subchannel* c) { @@ -402,8 +402,6 @@ static void continue_connect_locked(grpc_subchannel* c) { c->next_attempt_deadline = c->backoff->NextAttemptTime(); args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->on_connected); } @@ -459,27 +457,24 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { /* Don't try to connect if we're already disconnected */ return; } - if (c->connecting) { /* Already connecting: don't restart */ return; } - if (c->connected_subchannel != nullptr) { /* Already connected: don't restart */ return; } - if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) { /* Nobody is interested in connecting: so don't just yet */ return; } - c->connecting = true; GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - if (!c->backoff_begun) { c->backoff_begun = true; + grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "connecting"); continue_connect_locked(c); } else { GPR_ASSERT(!c->have_alarm); @@ -494,6 +489,11 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); + // During backoff, we prefer the connectivity state of CONNECTING instead of + // TRANSIENT_FAILURE in order to prevent triggering re-resolution + // continuously in pick_first. + grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "backoff"); } } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index bd6fec6fbe..9ad271753c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1208,7 +1208,7 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, grpc_error_add_child(closure->error_data.error, error); } if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { - if (s->seen_error || (t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || + if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { GRPC_CLOSURE_RUN(closure, closure->error_data.error); } else { diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 7581f937b6..35c3fb01ea 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -82,7 +82,7 @@ typedef struct { typedef struct { grpc_call_stats stats; grpc_status_code final_status; - const char** error_string; + const char* error_string; } grpc_call_final_info; /* Channel filters specify: diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc index c17885a713..b3443310ac 100644 --- a/src/core/lib/channel/channel_trace.cc +++ b/src/core/lib/channel/channel_trace.cc @@ -174,7 +174,7 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const { } } -grpc_json* ChannelTrace::RenderJSON() const { +grpc_json* ChannelTrace::RenderJson() const { if (!max_list_size_) return nullptr; // tracing is disabled if max_events == 0 grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT); diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h index 0dd162a777..596af7402f 100644 --- a/src/core/lib/channel/channel_trace.h +++ b/src/core/lib/channel/channel_trace.h @@ -71,7 +71,7 @@ class ChannelTrace { // Creates and returns the raw grpc_json object, so a parent channelz // object may incorporate the json before rendering. - grpc_json* RenderJSON() const; + grpc_json* RenderJson() const; private: // Types of objects that can be references by trace events. diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 79a9220503..9d6002ed8a 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -41,18 +41,22 @@ namespace grpc_core { namespace channelz { -ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes) - : channel_(channel), target_(nullptr), channel_uuid_(-1) { +ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel) + : channel_(channel), + target_(nullptr), + channel_uuid_(-1), + is_top_level_channel_(is_top_level_channel) { trace_.Init(channel_tracer_max_nodes); target_ = UniquePtr<char>(grpc_channel_get_target(channel_)); - channel_uuid_ = ChannelzRegistry::Register(this); + channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this); gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } ChannelNode::~ChannelNode() { trace_.Destroy(); - ChannelzRegistry::Unregister(channel_uuid_); + ChannelzRegistry::UnregisterChannelNode(channel_uuid_); } void ChannelNode::RecordCallStarted() { @@ -65,7 +69,7 @@ void ChannelNode::PopulateConnectivityState(grpc_json* json) {} void ChannelNode::PopulateChildRefs(grpc_json* json) {} -char* ChannelNode::RenderJSON() { +grpc_json* ChannelNode::RenderJson() { // We need to track these three json objects to build our object grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; @@ -86,30 +90,32 @@ char* ChannelNode::RenderJSON() { json = data; json_iterator = nullptr; PopulateConnectivityState(json); + GPR_ASSERT(target_.get() != nullptr); json_iterator = grpc_json_create_child( json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false); // fill in the channel trace if applicable - grpc_json* trace = trace_->RenderJSON(); + grpc_json* trace = trace_->RenderJson(); if (trace != nullptr) { - // we manuall link up and fill the child since it was created for us in - // ChannelTrace::RenderJSON + // we manually link up and fill the child since it was created for us in + // ChannelTrace::RenderJson + trace->key = "trace"; // this object is named trace in channelz.proto json_iterator = grpc_json_link_child(json, trace, json_iterator); - trace->parent = json; - trace->value = nullptr; - trace->key = "trace"; - trace->owns_value = false; } // reset the parent to be the data object. json = data; json_iterator = nullptr; - // We use -1 as sentinel values since proto default value for integers is - // zero, and the confuses the parser into thinking the value weren't present - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsStarted", calls_started_); - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsSucceeded", calls_succeeded_); - json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsFailed", calls_failed_); + if (calls_started_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsStarted", calls_started_); + } + if (calls_succeeded_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsSucceeded", calls_succeeded_); + } + if (calls_failed_) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "callsFailed", calls_failed_); + } gpr_timespec ts = grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); json_iterator = @@ -118,25 +124,29 @@ char* ChannelNode::RenderJSON() { json = top_level_json; json_iterator = nullptr; PopulateChildRefs(json); + return top_level_json; +} - // render and return the over json object - char* json_str = grpc_json_dump_to_string(top_level_json, 0); - grpc_json_destroy(top_level_json); +char* ChannelNode::RenderJsonString() { + grpc_json* json = RenderJson(); + char* json_str = grpc_json_dump_to_string(json, 0); + grpc_json_destroy(json); return json_str; } RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( - grpc_channel* channel, size_t channel_tracer_max_nodes) { + grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel) { return MakeRefCounted<grpc_core::channelz::ChannelNode>( - channel, channel_tracer_max_nodes); + channel, channel_tracer_max_nodes, is_top_level_channel); } SubchannelNode::SubchannelNode() { - subchannel_uuid_ = ChannelzRegistry::Register(this); + subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this); } SubchannelNode::~SubchannelNode() { - ChannelzRegistry::Unregister(subchannel_uuid_); + ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_); } } // namespace channelz diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 7184af93ec..07eb73d626 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -35,6 +35,10 @@ #define GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC \ "grpc.channelz_channel_node_creation_func" +// Channel arg key to signal that the channel is an internal channel. +#define GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL \ + "grpc.channelz_channel_is_internal_channel" + namespace grpc_core { namespace channelz { @@ -45,7 +49,8 @@ class ChannelNodePeer; class ChannelNode : public RefCounted<ChannelNode> { public: static RefCountedPtr<ChannelNode> MakeChannelNode( - grpc_channel* channel, size_t channel_tracer_max_nodes); + grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); void RecordCallStarted(); void RecordCallFailed() { @@ -55,7 +60,8 @@ class ChannelNode : public RefCounted<ChannelNode> { gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1))); } - char* RenderJSON(); + grpc_json* RenderJson(); + char* RenderJsonString(); // helper for getting and populating connectivity state. It is virtual // because it allows the client_channel specific code to live in ext/ @@ -74,11 +80,13 @@ class ChannelNode : public RefCounted<ChannelNode> { bool ChannelIsDestroyed() { return channel_ == nullptr; } intptr_t channel_uuid() { return channel_uuid_; } + bool is_top_level_channel() { return is_top_level_channel_; } protected: GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes); + ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, + bool is_top_level_channel); virtual ~ChannelNode(); private: @@ -92,6 +100,7 @@ class ChannelNode : public RefCounted<ChannelNode> { gpr_atm calls_failed_ = 0; gpr_atm last_call_started_millis_ = 0; intptr_t channel_uuid_; + bool is_top_level_channel_ = true; ManualConstructor<ChannelTrace> trace_; }; @@ -116,7 +125,7 @@ class SubchannelNode : public RefCounted<SubchannelNode> { // Creation functions typedef RefCountedPtr<ChannelNode> (*ChannelNodeCreationFunc)(grpc_channel*, - size_t); + size_t, bool); } // namespace channelz } // namespace grpc_core diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index 023ede552a..38496b3d78 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -19,16 +19,19 @@ #include <grpc/impl/codegen/port_platform.h> #include "src/core/lib/channel/channel_trace.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include <cstring> namespace grpc_core { +namespace channelz { namespace { // singleton instance of the registry. @@ -49,12 +52,93 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); } ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); } -void ChannelzRegistry::InternalUnregister(intptr_t uuid) { +intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) { + mu_guard guard(&mu_); + entities_.push_back(entry); + intptr_t uuid = entities_.size(); + return uuid; +} + +void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) { GPR_ASSERT(uuid >= 1); - gpr_mu_lock(&mu_); + mu_guard guard(&mu_); GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size()); - entities_[uuid - 1] = nullptr; - gpr_mu_unlock(&mu_); + GPR_ASSERT(entities_[uuid - 1].type == type); + entities_[uuid - 1].object = nullptr; + entities_[uuid - 1].type = EntityType::kUnset; +} + +void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) { + mu_guard guard(&mu_); + if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) { + return nullptr; + } + if (entities_[uuid - 1].type == type) { + return entities_[uuid - 1].object; + } else { + return nullptr; + } +} + +char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + InlinedVector<ChannelNode*, 10> top_level_channels; + // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is + // reserved). However, we want to support requests coming in with + // start_channel_id=0, which signifies "give me everything." Hence this + // funky looking line below. + size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1; + for (size_t i = start_idx; i < entities_.size(); ++i) { + if (entities_[i].type == EntityType::kChannelNode) { + ChannelNode* channel_node = + static_cast<ChannelNode*>(entities_[i].object); + if (channel_node->is_top_level_channel()) { + top_level_channels.push_back(channel_node); + } + } + } + if (top_level_channels.size() > 0) { + // create list of channels + grpc_json* array_parent = grpc_json_create_child( + nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false); + for (size_t i = 0; i < top_level_channels.size(); ++i) { + grpc_json* channel_json = top_level_channels[i]->RenderJson(); + json_iterator = + grpc_json_link_child(array_parent, channel_json, json_iterator); + } + } + // For now we do not have any pagination rules. In the future we could + // pick a constant for max_channels_sent for a GetTopChannels request. + // Tracking: https://github.com/grpc/grpc/issues/16019. + json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, + GRPC_JSON_TRUE, false); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; } +} // namespace channelz } // namespace grpc_core + +char* grpc_channelz_get_top_channels(intptr_t start_channel_id) { + return grpc_core::channelz::ChannelzRegistry::GetTopChannels( + start_channel_id); +} + +char* grpc_channelz_get_channel(intptr_t channel_id) { + grpc_core::channelz::ChannelNode* channel_node = + grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id); + if (channel_node == nullptr) { + return nullptr; + } + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* channel_json = channel_node->RenderJson(); + channel_json->key = "channel"; + grpc_json_link_child(json, channel_json, nullptr); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h index a5a187a054..5d7c936726 100644 --- a/src/core/lib/channel/channelz_registry.h +++ b/src/core/lib/channel/channelz_registry.h @@ -22,11 +22,13 @@ #include <grpc/impl/codegen/port_platform.h> #include "src/core/lib/channel/channel_trace.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/inlined_vector.h" #include <stdint.h> namespace grpc_core { +namespace channelz { // singleton registry object to track all objects that are needed to support // channelz bookkeeping. All objects share globally distributed uuids. @@ -35,26 +37,56 @@ class ChannelzRegistry { // To be called in grpc_init() static void Init(); - // To be callen in grpc_shutdown(); + // To be called in grpc_shutdown(); static void Shutdown(); - // globally registers a channelz Object. Returns its unique uuid - template <typename Object> - static intptr_t Register(Object* object) { - return Default()->InternalRegister(object); + // Register/Unregister/Get for ChannelNode + static intptr_t RegisterChannelNode(ChannelNode* channel_node) { + RegistryEntry entry(channel_node, EntityType::kChannelNode); + return Default()->InternalRegisterEntry(entry); + } + static void UnregisterChannelNode(intptr_t uuid) { + Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode); + } + static ChannelNode* GetChannelNode(intptr_t uuid) { + void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode); + return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten); } - // globally unregisters the object that is associated to uuid. - static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); } + // Register/Unregister/Get for SubchannelNode + static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) { + RegistryEntry entry(channel_node, EntityType::kSubchannelNode); + return Default()->InternalRegisterEntry(entry); + } + static void UnregisterSubchannelNode(intptr_t uuid) { + Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode); + } + static SubchannelNode* GetSubchannelNode(intptr_t uuid) { + void* gotten = + Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode); + return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten); + } - // if object with uuid has previously been registered, returns the - // Object associated with that uuid. Else returns nullptr. - template <typename Object> - static Object* Get(intptr_t uuid) { - return Default()->InternalGet<Object>(uuid); + // Returns the allocated JSON string that represents the proto + // GetTopChannelsResponse as per channelz.proto. + static char* GetTopChannels(intptr_t start_channel_id) { + return Default()->InternalGetTopChannels(start_channel_id); } private: + enum class EntityType { + kChannelNode, + kSubchannelNode, + kUnset, + }; + + struct RegistryEntry { + RegistryEntry(void* object_in, EntityType type_in) + : object(object_in), type(type_in) {} + void* object; + EntityType type; + }; + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE @@ -64,40 +96,25 @@ class ChannelzRegistry { // Returned the singleton instance of ChannelzRegistry; static ChannelzRegistry* Default(); - // globally registers a channelz Object. Returns its unique uuid - template <typename Object> - intptr_t InternalRegister(Object* object) { - gpr_mu_lock(&mu_); - entities_.push_back(static_cast<void*>(object)); - intptr_t uuid = entities_.size(); - gpr_mu_unlock(&mu_); - return uuid; - } + // globally registers an Entry. Returns its unique uuid + intptr_t InternalRegisterEntry(const RegistryEntry& entry); - // globally unregisters the object that is associated to uuid. - void InternalUnregister(intptr_t uuid); - - // if object with uuid has previously been registered, returns the - // Object associated with that uuid. Else returns nullptr. - template <typename Object> - Object* InternalGet(intptr_t uuid) { - gpr_mu_lock(&mu_); - if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) { - gpr_mu_unlock(&mu_); - return nullptr; - } - Object* ret = static_cast<Object*>(entities_[uuid - 1]); - gpr_mu_unlock(&mu_); - return ret; - } + // globally unregisters the object that is associated to uuid. Also does + // sanity check that an object doesn't try to unregister the wrong type. + void InternalUnregisterEntry(intptr_t uuid, EntityType type); + + // if object with uuid has previously been registered as the correct type, + // returns the void* associated with that uuid. Else returns nullptr. + void* InternalGetEntry(intptr_t uuid, EntityType type); - // private members + char* InternalGetTopChannels(intptr_t start_channel_id); // protects entities_ and uuid_ gpr_mu mu_; - InlinedVector<void*, 20> entities_; + InlinedVector<RegistryEntry, 20> entities_; }; +} // namespace channelz } // namespace grpc_core #endif /* GRPC_CORE_LIB_CHANNEL_CHANNELZ_REGISTRY_H */ diff --git a/src/core/lib/gpr/arena.cc b/src/core/lib/gpr/arena.cc index 141de69426..e30b297aea 100644 --- a/src/core/lib/gpr/arena.cc +++ b/src/core/lib/gpr/arena.cc @@ -25,6 +25,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include "src/core/lib/gpr/alloc.h" @@ -36,8 +37,6 @@ #ifdef SIMPLE_ARENA_FOR_DEBUGGING -#include <grpc/support/sync.h> - struct gpr_arena { gpr_mu mu; void** ptrs; @@ -78,14 +77,17 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) { // would allow us to use the alignment actually needed by the caller. typedef struct zone { - size_t size_begin; - size_t size_end; - gpr_atm next_atm; + size_t size_begin; // All the space we have set aside for allocations up + // until this zone. + size_t size_end; // size_end = size_begin plus all the space we set aside for + // allocations in zone z itself. + zone* next; } zone; struct gpr_arena { gpr_atm size_so_far; zone initial_zone; + gpr_mu arena_growth_mutex; }; static void* zalloc_aligned(size_t size) { @@ -99,15 +101,17 @@ gpr_arena* gpr_arena_create(size_t initial_size) { gpr_arena* a = static_cast<gpr_arena*>(zalloc_aligned( GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + initial_size)); a->initial_zone.size_end = initial_size; + gpr_mu_init(&a->arena_growth_mutex); return a; } size_t gpr_arena_destroy(gpr_arena* arena) { + gpr_mu_destroy(&arena->arena_growth_mutex); gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far); - zone* z = (zone*)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm); + zone* z = arena->initial_zone.next; gpr_free_aligned(arena); while (z) { - zone* next_z = (zone*)gpr_atm_no_barrier_load(&z->next_atm); + zone* next_z = z->next; gpr_free_aligned(z); z = next_z; } @@ -116,37 +120,55 @@ size_t gpr_arena_destroy(gpr_arena* arena) { void* gpr_arena_alloc(gpr_arena* arena, size_t size) { size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(size); - size_t start = static_cast<size_t>( + size_t previous_size_of_arena_allocations = static_cast<size_t>( gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size)); + size_t updated_size_of_arena_allocations = + previous_size_of_arena_allocations + size; zone* z = &arena->initial_zone; - while (start > z->size_end) { - zone* next_z = (zone*)gpr_atm_acq_load(&z->next_atm); - if (next_z == nullptr) { - size_t next_z_size = - static_cast<size_t>(gpr_atm_no_barrier_load(&arena->size_so_far)); - next_z = static_cast<zone*>(zalloc_aligned( - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + next_z_size)); - next_z->size_begin = z->size_end; - next_z->size_end = z->size_end + next_z_size; - if (!gpr_atm_rel_cas(&z->next_atm, static_cast<gpr_atm>(NULL), - (gpr_atm)next_z)) { - gpr_free_aligned(next_z); - next_z = (zone*)gpr_atm_acq_load(&z->next_atm); + // Check to see if the allocation isn't able to end in the initial zone. + // This statement is true only in the uncommon case because of our arena + // sizing historesis (that is, most calls should have a large enough initial + // zone and will not need to grow the arena). + if (updated_size_of_arena_allocations > z->size_end) { + // Find a zone to fit this allocation + gpr_mu_lock(&arena->arena_growth_mutex); + while (updated_size_of_arena_allocations > z->size_end) { + if (z->next == nullptr) { + // Note that we do an extra increment of size_so_far to prevent multiple + // simultaneous callers from stepping on each other. However, this extra + // increment means some space in the arena is wasted. + // So whenever we need to allocate x bytes and there are x - n (where + // n > 0) remaining in the current zone, we will waste x bytes (x - n + // in the current zone and n in the new zone). + previous_size_of_arena_allocations = static_cast<size_t>( + gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size)); + updated_size_of_arena_allocations = + previous_size_of_arena_allocations + size; + size_t next_z_size = updated_size_of_arena_allocations; + z->next = static_cast<zone*>(zalloc_aligned( + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + next_z_size)); + z->next->size_begin = z->size_end; + z->next->size_end = z->size_end + next_z_size; } + z = z->next; } - z = next_z; - } - if (start + size > z->size_end) { - return gpr_arena_alloc(arena, size); + gpr_mu_unlock(&arena->arena_growth_mutex); } - GPR_ASSERT(start >= z->size_begin); - GPR_ASSERT(start + size <= z->size_end); - char* ptr = (z == &arena->initial_zone) - ? reinterpret_cast<char*>(arena) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) - : reinterpret_cast<char*>(z) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)); - return ptr + start - z->size_begin; + GPR_ASSERT(previous_size_of_arena_allocations >= z->size_begin); + GPR_ASSERT(updated_size_of_arena_allocations <= z->size_end); + // Skip the first part of the zone, which just contains tracking information. + // For the initial zone, this is the gpr_arena struct and for any other zone, + // it's the zone struct. + char* start_of_allocation_space = + (z == &arena->initial_zone) + ? reinterpret_cast<char*>(arena) + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + : reinterpret_cast<char*>(z) + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)); + // previous_size_of_arena_allocations - size_begin is how many bytes have been + // allocated into the current zone + return start_of_allocation_space + previous_size_of_arena_allocations - + z->size_begin; } #endif // SIMPLE_ARENA_FOR_DEBUGGING diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h index d0ec9b6461..3123e3f5a3 100644 --- a/src/core/lib/gprpp/orphanable.h +++ b/src/core/lib/gprpp/orphanable.h @@ -86,7 +86,8 @@ class InternallyRefCounted : public Orphanable { GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE // Allow RefCountedPtr<> to access Unref() and IncrementRefCount(). - friend class RefCountedPtr<Child>; + template <typename T> + friend class RefCountedPtr; InternallyRefCounted() { gpr_ref_init(&refs_, 1); } virtual ~InternallyRefCounted() {} @@ -129,7 +130,8 @@ class InternallyRefCountedWithTracing : public Orphanable { GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE // Allow RefCountedPtr<> to access Unref() and IncrementRefCount(). - friend class RefCountedPtr<Child>; + template <typename T> + friend class RefCountedPtr; InternallyRefCountedWithTracing() : InternallyRefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {} diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h index ddac5bd475..03c293f6ed 100644 --- a/src/core/lib/gprpp/ref_counted.h +++ b/src/core/lib/gprpp/ref_counted.h @@ -73,7 +73,8 @@ class RefCounted { private: // Allow RefCountedPtr<> to access IncrementRefCount(). - friend class RefCountedPtr<Child>; + template <typename T> + friend class RefCountedPtr; void IncrementRefCount() { gpr_ref(&refs_); } @@ -152,7 +153,8 @@ class RefCountedWithTracing { private: // Allow RefCountedPtr<> to access IncrementRefCount(). - friend class RefCountedPtr<Child>; + template <typename T> + friend class RefCountedPtr; void IncrementRefCount() { gpr_ref(&refs_); } diff --git a/src/core/lib/gprpp/ref_counted_ptr.h b/src/core/lib/gprpp/ref_counted_ptr.h index 534d3d03cb..c2dfbdd90f 100644 --- a/src/core/lib/gprpp/ref_counted_ptr.h +++ b/src/core/lib/gprpp/ref_counted_ptr.h @@ -36,25 +36,49 @@ class RefCountedPtr { RefCountedPtr(std::nullptr_t) {} // If value is non-null, we take ownership of a ref to it. - explicit RefCountedPtr(T* value) { value_ = value; } + template <typename Y> + explicit RefCountedPtr(Y* value) { + value_ = value; + } - // Move support. + // Move ctors. RefCountedPtr(RefCountedPtr&& other) { value_ = other.value_; other.value_ = nullptr; } + template <typename Y> + RefCountedPtr(RefCountedPtr<Y>&& other) { + value_ = other.value_; + other.value_ = nullptr; + } + + // Move assignment. RefCountedPtr& operator=(RefCountedPtr&& other) { if (value_ != nullptr) value_->Unref(); value_ = other.value_; other.value_ = nullptr; return *this; } + template <typename Y> + RefCountedPtr& operator=(RefCountedPtr<Y>&& other) { + if (value_ != nullptr) value_->Unref(); + value_ = other.value_; + other.value_ = nullptr; + return *this; + } - // Copy support. + // Copy ctors. RefCountedPtr(const RefCountedPtr& other) { if (other.value_ != nullptr) other.value_->IncrementRefCount(); value_ = other.value_; } + template <typename Y> + RefCountedPtr(const RefCountedPtr<Y>& other) { + if (other.value_ != nullptr) other.value_->IncrementRefCount(); + value_ = other.value_; + } + + // Copy assignment. RefCountedPtr& operator=(const RefCountedPtr& other) { // Note: Order of reffing and unreffing is important here in case value_ // and other.value_ are the same object. @@ -63,17 +87,32 @@ class RefCountedPtr { value_ = other.value_; return *this; } + template <typename Y> + RefCountedPtr& operator=(const RefCountedPtr<Y>& other) { + // Note: Order of reffing and unreffing is important here in case value_ + // and other.value_ are the same object. + if (other.value_ != nullptr) other.value_->IncrementRefCount(); + if (value_ != nullptr) value_->Unref(); + value_ = other.value_; + return *this; + } ~RefCountedPtr() { if (value_ != nullptr) value_->Unref(); } // If value is non-null, we take ownership of a ref to it. - void reset(T* value = nullptr) { + template <typename Y> + void reset(Y* value) { if (value_ != nullptr) value_->Unref(); value_ = value; } + void reset() { + if (value_ != nullptr) value_->Unref(); + value_ = nullptr; + } + // TODO(roth): This method exists solely as a transition mechanism to allow // us to pass a ref to idiomatic C code that does not use RefCountedPtr<>. // Once all of our code has been converted to idiomatic C++, this @@ -89,16 +128,34 @@ class RefCountedPtr { T& operator*() const { return *value_; } T* operator->() const { return value_; } - bool operator==(const RefCountedPtr& other) const { + template <typename Y> + bool operator==(const RefCountedPtr<Y>& other) const { return value_ == other.value_; } - bool operator==(const T* other) const { return value_ == other; } - bool operator!=(const RefCountedPtr& other) const { + + template <typename Y> + bool operator==(const Y* other) const { + return value_ == other; + } + + bool operator==(std::nullptr_t) const { return value_ == nullptr; } + + template <typename Y> + bool operator!=(const RefCountedPtr<Y>& other) const { return value_ != other.value_; } - bool operator!=(const T* other) const { return value_ != other; } + + template <typename Y> + bool operator!=(const Y* other) const { + return value_ != other; + } + + bool operator!=(std::nullptr_t) const { return value_ != nullptr; } private: + template <typename Y> + friend class RefCountedPtr; + T* value_ = nullptr; }; @@ -107,11 +164,6 @@ inline RefCountedPtr<T> MakeRefCounted(Args&&... args) { return RefCountedPtr<T>(New<T>(std::forward<Args>(args)...)); } -template <typename Parent, typename Child, typename... Args> -inline RefCountedPtr<Parent> MakePolymorphicRefCounted(Args&&... args) { - return RefCountedPtr<Parent>(New<Child>(std::forward<Args>(args)...)); -} - } // namespace grpc_core #endif /* GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 86a0243d2e..66e0f1fd6d 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -140,10 +140,6 @@ struct grpc_fd { struct grpc_fd* freelist_next; - /* The pollset that last noticed that the fd is readable. The actual type - * stored in this is (grpc_pollset *) */ - gpr_atm read_notifier_pollset; - grpc_iomgr_object iomgr_object; }; @@ -293,7 +289,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); new_fd->error_closure->InitEvent(); - gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -376,11 +371,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, gpr_mu_unlock(&fd_freelist_mu); } -static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) { - gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset); - return (grpc_pollset*)notifier; -} - static bool fd_is_shutdown(grpc_fd* fd) { return fd->read_closure->IsShutdown(); } @@ -397,11 +387,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { fd->error_closure->NotifyOn(closure); } -static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { - fd->read_closure->SetReady(); - /* Use release store to match with acquire load in fd_get_read_notifier */ - gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); -} +static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); } static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } @@ -642,7 +628,7 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) { } if (read_ev || cancel || err_fallback) { - fd_become_readable(fd, pollset); + fd_become_readable(fd); } if (write_ev || cancel || err_fallback) { @@ -1217,8 +1203,10 @@ static const grpc_event_engine_vtable vtable = { fd_notify_on_read, fd_notify_on_write, fd_notify_on_error, + fd_become_readable, + fd_become_writable, + fd_has_errors, fd_is_shutdown, - fd_get_read_notifier_pollset, pollset_init, pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7b368410cf..96eae30345 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -135,7 +135,7 @@ struct pollable { // underlying epoll set (i.e whenever fd_orphan() is called). // // Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a - // lot of complexity since an fd can be present in multiple pollalbles. So our + // lot of complexity since an fd can be present in multiple pollables. So our // implementation ONLY DOES (1) and NOT (2). // // The cache_fd.salt variable helps here to maintain correctness (it serves as @@ -220,10 +220,6 @@ struct grpc_fd { struct grpc_fd* freelist_next; grpc_closure* on_done_closure; - // The pollset that last noticed that the fd is readable. The actual type - // stored in this is (grpc_pollset *) - gpr_atm read_notifier_pollset; - grpc_iomgr_object iomgr_object; // Do we need to track EPOLLERR events separately? @@ -353,7 +349,6 @@ static void invalidate_fd(grpc_fd* fd) { memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu)); fd->pollable_obj = nullptr; fd->on_done_closure = nullptr; - gpr_atm_no_barrier_store(&fd->read_notifier_pollset, 0); memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object)); fd->track_err = false; } @@ -445,7 +440,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { new_fd->error_closure->InitEvent(); new_fd->freelist_next = nullptr; new_fd->on_done_closure = nullptr; - gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); char* fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); @@ -514,11 +508,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, UNREF_BY(fd, 2, reason); /* Drop the reference */ } -static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) { - gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset); - return (grpc_pollset*)notifier; -} - static bool fd_is_shutdown(grpc_fd* fd) { return fd->read_closure->IsShutdown(); } @@ -875,17 +864,7 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) { return static_cast<int>(delta); } -static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { - fd->read_closure->SetReady(); - - /* Note, it is possible that fd_become_readable might be called twice with - different 'notifier's when an fd becomes readable and it is in two epoll - sets (This can happen briefly during polling island merges). In such cases - it does not really matter which notifer is set as the read_notifier_pollset - (They would both point to the same polling island anyway) */ - /* Use release store to match with acquire load in fd_get_read_notifier */ - gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); -} +static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); } static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } @@ -983,7 +962,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, fd_has_errors(fd); } if (read_ev || cancel || err_fallback) { - fd_become_readable(fd, pollset); + fd_become_readable(fd); } if (write_ev || cancel || err_fallback) { fd_become_writable(fd); @@ -1636,8 +1615,10 @@ static const grpc_event_engine_vtable vtable = { fd_notify_on_read, fd_notify_on_write, fd_notify_on_error, + fd_become_readable, + fd_become_writable, + fd_has_errors, fd_is_shutdown, - fd_get_read_notifier_pollset, pollset_init, pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 2189801c18..5695ac795d 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -137,10 +137,6 @@ struct grpc_fd { struct grpc_fd* freelist_next; grpc_closure* on_done_closure; - /* The pollset that last noticed that the fd is readable. The actual type - * stored in this is (grpc_pollset *) */ - gpr_atm read_notifier_pollset; - grpc_iomgr_object iomgr_object; /* Do we need to track EPOLLERR events separately? */ @@ -845,7 +841,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { new_fd->write_closure->InitEvent(); new_fd->error_closure->InitEvent(); new_fd->track_err = track_err; - gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; new_fd->on_done_closure = nullptr; @@ -927,11 +922,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, GRPC_ERROR_UNREF(error); } -static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) { - gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset); - return (grpc_pollset*)notifier; -} - static bool fd_is_shutdown(grpc_fd* fd) { return fd->read_closure->IsShutdown(); } @@ -958,6 +948,12 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { fd->error_closure->NotifyOn(closure); } +static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); } + +static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } + +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + /******************************************************************************* * Pollset Definitions */ @@ -1115,22 +1111,6 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) { return static_cast<int>(delta); } -static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { - fd->read_closure->SetReady(); - - /* Note, it is possible that fd_become_readable might be called twice with - different 'notifier's when an fd becomes readable and it is in two epoll - sets (This can happen briefly during polling island merges). In such cases - it does not really matter which notifer is set as the read_notifier_pollset - (They would both point to the same polling island anyway) */ - /* Use release store to match with acquire load in fd_get_read_notifier */ - gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); -} - -static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } - -static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } - static void pollset_release_polling_island(grpc_pollset* ps, const char* reason) { if (ps->po.pi != nullptr) { @@ -1283,7 +1263,7 @@ static void pollset_work_and_unlock(grpc_pollset* pollset, fd_has_errors(fd); } if (read_ev || cancel || err_fallback) { - fd_become_readable(fd, pollset); + fd_become_readable(fd); } if (write_ev || cancel || err_fallback) { fd_become_writable(fd); @@ -1667,8 +1647,10 @@ static const grpc_event_engine_vtable vtable = { fd_notify_on_read, fd_notify_on_write, fd_notify_on_error, + fd_become_readable, + fd_become_writable, + fd_has_errors, fd_is_shutdown, - fd_get_read_notifier_pollset, pollset_init, pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index c9c09881a2..fb4c71ef71 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -108,9 +108,6 @@ struct grpc_fd { grpc_closure* on_done_closure; grpc_iomgr_object iomgr_object; - - /* The pollset that last noticed and notified that the fd is readable */ - grpc_pollset* read_notifier_pollset; }; /* Begin polling on an fd. @@ -131,8 +128,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, MUST NOT be called with a pollset lock taken if got_read or got_write are 1, also does the become_{readable,writable} as appropriate. */ -static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write, - grpc_pollset* read_notifier_pollset); +static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write); /* Return 1 if this fd is orphaned, 0 otherwise */ static bool fd_is_orphaned(grpc_fd* fd); @@ -346,7 +342,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { r->closed = 0; r->released = 0; gpr_atm_no_barrier_store(&r->pollhup, 0); - r->read_notifier_pollset = nullptr; char* name2; gpr_asprintf(&name2, "%s fd=%d", name, fd); @@ -359,17 +354,6 @@ static bool fd_is_orphaned(grpc_fd* fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -/* Return the read-notifier pollset */ -static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) { - grpc_pollset* notifier = nullptr; - - gpr_mu_lock(&fd->mu); - notifier = fd->read_notifier_pollset; - gpr_mu_unlock(&fd->mu); - - return notifier; -} - static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); @@ -512,11 +496,6 @@ static int set_ready_locked(grpc_fd* fd, grpc_closure** st) { } } -static void set_read_notifier_pollset_locked( - grpc_fd* fd, grpc_pollset* read_notifier_pollset) { - fd->read_notifier_pollset = read_notifier_pollset; -} - static void fd_shutdown(grpc_fd* fd, grpc_error* why) { gpr_mu_lock(&fd->mu); /* only shutdown once */ @@ -553,8 +532,28 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { } static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { - gpr_log(GPR_ERROR, "Polling engine does not support tracking errors."); - abort(); + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_ERROR, "Polling engine does not support tracking errors."); + } + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED); +} + +static void fd_set_readable(grpc_fd* fd) { + gpr_mu_lock(&fd->mu); + set_ready_locked(fd, &fd->read_closure); + gpr_mu_unlock(&fd->mu); +} + +static void fd_set_writable(grpc_fd* fd) { + gpr_mu_lock(&fd->mu); + set_ready_locked(fd, &fd->write_closure); + gpr_mu_unlock(&fd->mu); +} + +static void fd_set_error(grpc_fd* fd) { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_ERROR, "Polling engine does not support tracking errors."); + } } static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, @@ -608,8 +607,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, return mask; } -static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write, - grpc_pollset* read_notifier_pollset) { +static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) { int was_polling = 0; int kick = 0; grpc_fd* fd = watcher->fd; @@ -645,9 +643,6 @@ static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write, if (set_ready_locked(fd, &fd->read_closure)) { kick = 1; } - if (read_notifier_pollset != nullptr) { - set_read_notifier_pollset_locked(fd, read_notifier_pollset); - } } if (got_write) { if (set_ready_locked(fd, &fd->write_closure)) { @@ -997,16 +992,16 @@ static grpc_error* pollset_work(grpc_pollset* pollset, for (i = 1; i < pfd_count; i++) { if (watchers[i].fd == nullptr) { - fd_end_poll(&watchers[i], 0, 0, nullptr); + fd_end_poll(&watchers[i], 0, 0); } else { // Wake up all the file descriptors, if we have an invalid one // we can identify it on the next pollset_work() - fd_end_poll(&watchers[i], 1, 1, pollset); + fd_end_poll(&watchers[i], 1, 1); } } } else if (r == 0) { for (i = 1; i < pfd_count; i++) { - fd_end_poll(&watchers[i], 0, 0, nullptr); + fd_end_poll(&watchers[i], 0, 0); } } else { if (pfds[0].revents & POLLIN_CHECK) { @@ -1018,7 +1013,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, } for (i = 1; i < pfd_count; i++) { if (watchers[i].fd == nullptr) { - fd_end_poll(&watchers[i], 0, 0, nullptr); + fd_end_poll(&watchers[i], 0, 0); } else { if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset, @@ -1032,7 +1027,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1); } fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK, pollset); + pfds[i].revents & POLLOUT_CHECK); } } } @@ -1723,8 +1718,10 @@ static const grpc_event_engine_vtable vtable = { fd_notify_on_read, fd_notify_on_write, fd_notify_on_error, + fd_set_readable, + fd_set_writable, + fd_set_error, fd_is_shutdown, - fd_get_read_notifier_pollset, pollset_init, pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 1139b3273a..0e45fc42ca 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -239,6 +239,12 @@ void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { g_event_engine->fd_notify_on_error(fd, closure); } +void grpc_fd_set_readable(grpc_fd* fd) { g_event_engine->fd_set_readable(fd); } + +void grpc_fd_set_writable(grpc_fd* fd) { g_event_engine->fd_set_writable(fd); } + +void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); } + static size_t pollset_size(void) { return g_event_engine->pollset_size; } static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index b4c17fc80d..8d0bcc0710 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -51,8 +51,10 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure); + void (*fd_set_readable)(grpc_fd* fd); + void (*fd_set_writable)(grpc_fd* fd); + void (*fd_set_error)(grpc_fd* fd); bool (*fd_is_shutdown)(grpc_fd* fd); - grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd); void (*pollset_init)(grpc_pollset* pollset, gpr_mu** mu); void (*pollset_shutdown)(grpc_pollset* pollset, grpc_closure* closure); @@ -142,8 +144,20 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure); * needs to have been set on grpc_fd_create */ void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure); -/* Return the read notifier pollset from the fd */ -grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd); +/* Forcibly set the fd to be readable, resulting in the closure registered with + * grpc_fd_notify_on_read being invoked. + */ +void grpc_fd_set_readable(grpc_fd* fd); + +/* Forcibly set the fd to be writable, resulting in the closure registered with + * grpc_fd_notify_on_write being invoked. + */ +void grpc_fd_set_writable(grpc_fd* fd); + +/* Forcibly set the fd to have errored, resulting in the closure registered with + * grpc_fd_notify_on_error being invoked. + */ +void grpc_fd_set_error(grpc_fd* fd); /* pollset_posix functions */ diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 1ad13b831d..45d96b80eb 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -40,19 +40,25 @@ gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \ } +#define EXECUTOR_TRACE0(str) \ + if (executor_trace.enabled()) { \ + gpr_log(GPR_INFO, "EXECUTOR " str); \ + } + grpc_core::TraceFlag executor_trace(false, "executor"); GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) { +GrpcExecutor::GrpcExecutor(const char* name) : name_(name) { adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; - gpr_atm_no_barrier_store(&num_threads_, 0); + gpr_atm_rel_store(&num_threads_, 0); max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); } void GrpcExecutor::Init() { SetThreading(true); } -size_t GrpcExecutor::RunClosures(grpc_closure_list list) { +size_t GrpcExecutor::RunClosures(const char* executor_name, + grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; @@ -60,11 +66,11 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; #ifndef NDEBUG - EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created, - c->line_created); + EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c, + c->file_created, c->line_created); c->scheduled = false; #else - EXECUTOR_TRACE("run %p", c); + EXECUTOR_TRACE("(%s) run %p", executor_name, c); #endif c->cb(c->cb_arg, error); GRPC_ERROR_UNREF(error); @@ -77,17 +83,21 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { } bool GrpcExecutor::IsThreaded() const { - return gpr_atm_no_barrier_load(&num_threads_) > 0; + return gpr_atm_acq_load(&num_threads_) > 0; } void GrpcExecutor::SetThreading(bool threading) { - gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); + gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_); + EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading); if (threading) { - if (curr_num_threads > 0) return; + if (curr_num_threads > 0) { + EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_); + return; + } GPR_ASSERT(num_threads_ == 0); - gpr_atm_no_barrier_store(&num_threads_, 1); + gpr_atm_rel_store(&num_threads_, 1); gpr_tls_init(&g_this_thread_state); thd_state_ = static_cast<ThreadState*>( gpr_zalloc(sizeof(ThreadState) * max_threads_)); @@ -96,6 +106,7 @@ void GrpcExecutor::SetThreading(bool threading) { gpr_mu_init(&thd_state_[i].mu); gpr_cv_init(&thd_state_[i].cv); thd_state_[i].id = i; + thd_state_[i].name = name_; thd_state_[i].thd = grpc_core::Thread(); thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT; } @@ -104,7 +115,10 @@ void GrpcExecutor::SetThreading(bool threading) { grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); thd_state_[0].thd.Start(); } else { // !threading - if (curr_num_threads == 0) return; + if (curr_num_threads == 0) { + EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_); + return; + } for (size_t i = 0; i < max_threads_; i++) { gpr_mu_lock(&thd_state_[i].mu); @@ -121,20 +135,22 @@ void GrpcExecutor::SetThreading(bool threading) { curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); for (gpr_atm i = 0; i < curr_num_threads; i++) { thd_state_[i].thd.Join(); - EXECUTOR_TRACE(" Thread %" PRIdPTR " of %" PRIdPTR " joined", i, - curr_num_threads); + EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_, + i + 1, curr_num_threads); } - gpr_atm_no_barrier_store(&num_threads_, 0); + gpr_atm_rel_store(&num_threads_, 0); for (size_t i = 0; i < max_threads_; i++) { gpr_mu_destroy(&thd_state_[i].mu); gpr_cv_destroy(&thd_state_[i].cv); - RunClosures(thd_state_[i].elems); + RunClosures(thd_state_[i].name, thd_state_[i].elems); } gpr_free(thd_state_); gpr_tls_destroy(&g_this_thread_state); } + + EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); } void GrpcExecutor::Shutdown() { SetThreading(false); } @@ -147,8 +163,8 @@ void GrpcExecutor::ThreadMain(void* arg) { size_t subtract_depth = 0; for (;;) { - EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id, - subtract_depth); + EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", + ts->name, ts->id, subtract_depth); gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; @@ -159,7 +175,7 @@ void GrpcExecutor::ThreadMain(void* arg) { } if (ts->shutdown) { - EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id); + EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id); gpr_mu_unlock(&ts->mu); break; } @@ -169,10 +185,10 @@ void GrpcExecutor::ThreadMain(void* arg) { ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); - EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id); + EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id); grpc_core::ExecCtx::Get()->InvalidateNow(); - subtract_depth = RunClosures(closures); + subtract_depth = RunClosures(ts->name, closures); } } @@ -188,16 +204,16 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, do { retry_push = false; size_t cur_thread_count = - static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_)); + static_cast<size_t>(gpr_atm_acq_load(&num_threads_)); // If the number of threads is zero(i.e either the executor is not threaded // or already shutdown), then queue the closure on the exec context itself if (cur_thread_count == 0) { #ifndef NDEBUG - EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure, + EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure, closure->file_created, closure->line_created); #else - EXECUTOR_TRACE("schedule %p inline", closure); + EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure); #endif grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); @@ -213,18 +229,18 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } ThreadState* orig_ts = ts; - bool try_new_thread = false; + for (;;) { #ifndef NDEBUG EXECUTOR_TRACE( - "try to schedule %p (%s) (created %s:%d) to thread " + "(%s) try to schedule %p (%s) (created %s:%d) to thread " "%" PRIdPTR, - closure, is_short ? "short" : "long", closure->file_created, + name_, closure, is_short ? "short" : "long", closure->file_created, closure->line_created, ts->id); #else - EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure, - is_short ? "short" : "long", ts->id); + EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_, + closure, is_short ? "short" : "long", ts->id); #endif gpr_mu_lock(&ts->mu); @@ -236,18 +252,22 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, size_t idx = ts->id; ts = &thd_state_[(idx + 1) % cur_thread_count]; if (ts == orig_ts) { - // We cycled through all the threads. Retry enqueue again (by creating - // a new thread) + // We cycled through all the threads. Retry enqueue again by creating + // a new thread + // + // TODO (sreek): There is a potential issue here. We are + // unconditionally setting try_new_thread to true here. What if the + // executor is shutdown OR if cur_thread_count is already equal to + // max_threads ? + // (Fortunately, this is not an issue yet (as of july 2018) because + // there is only one instance of long job in gRPC and hence we will + // not hit this code path) retry_push = true; - // TODO (sreek): What if the executor is shutdown OR if - // cur_thread_count is already equal to max_threads ? (currently - as - // of July 2018, we do not run in to this issue because there is only - // one instance of long job in gRPC. This has to be fixed soon) try_new_thread = true; break; } - continue; + continue; // Try the next thread-state } // == Found the thread state (i.e thread) to enqueue this closure! == @@ -277,13 +297,11 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) { - cur_thread_count = - static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_)); + cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_)); if (cur_thread_count < max_threads_) { - // Increment num_threads (Safe to do a no_barrier_store instead of a - // cas because we always increment num_threads under the - // 'adding_thread_lock') - gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1); + // Increment num_threads (safe to do a store instead of a cas because we + // always increment num_threads under the 'adding_thread_lock') + gpr_atm_rel_store(&num_threads_, cur_thread_count + 1); thd_state_[cur_thread_count].thd = grpc_core::Thread( name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); @@ -298,60 +316,118 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -static GrpcExecutor* global_executor; +static GrpcExecutor* executors[GRPC_NUM_EXECUTORS]; -void enqueue_long(grpc_closure* closure, grpc_error* error) { - global_executor->Enqueue(closure, error, false /* is_short */); +void default_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, + true /* is_short */); } -void enqueue_short(grpc_closure* closure, grpc_error* error) { - global_executor->Enqueue(closure, error, true /* is_short */); +void default_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, + false /* is_short */); } -// Short-Job executor scheduler -static const grpc_closure_scheduler_vtable global_executor_vtable_short = { - enqueue_short, enqueue_short, "executor-short"}; -static grpc_closure_scheduler global_scheduler_short = { - &global_executor_vtable_short}; +void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, + true /* is_short */); +} -// Long-job executor scheduler -static const grpc_closure_scheduler_vtable global_executor_vtable_long = { - enqueue_long, enqueue_long, "executor-long"}; -static grpc_closure_scheduler global_scheduler_long = { - &global_executor_vtable_long}; +void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, + false /* is_short */); +} + +static const grpc_closure_scheduler_vtable + vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { + {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"}, + {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, + {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"}, + {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}}; + +static grpc_closure_scheduler + schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { + {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]}, + {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}}, + {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]}, + {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}}; // grpc_executor_init() and grpc_executor_shutdown() functions are called in the // the grpc_init() and grpc_shutdown() code paths which are protected by a // global mutex. So it is okay to assume that these functions are thread-safe void grpc_executor_init() { - if (global_executor != nullptr) { - // grpc_executor_init() already called once (and grpc_executor_shutdown() - // wasn't called) + EXECUTOR_TRACE0("grpc_executor_init() enter"); + + // Return if grpc_executor_init() is already called earlier + if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) { + GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr); return; } - global_executor = grpc_core::New<GrpcExecutor>("global-executor"); - global_executor->Init(); + executors[GRPC_DEFAULT_EXECUTOR] = + grpc_core::New<GrpcExecutor>("default-executor"); + executors[GRPC_RESOLVER_EXECUTOR] = + grpc_core::New<GrpcExecutor>("resolver-executor"); + + executors[GRPC_DEFAULT_EXECUTOR]->Init(); + executors[GRPC_RESOLVER_EXECUTOR]->Init(); + + EXECUTOR_TRACE0("grpc_executor_init() done"); +} + +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, + GrpcExecutorJobType job_type) { + return &schedulers_[executor_type][job_type]; +} + +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { + return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type); } void grpc_executor_shutdown() { - // Shutdown already called - if (global_executor == nullptr) { + EXECUTOR_TRACE0("grpc_executor_shutdown() enter"); + + // Return if grpc_executor_shutdown() is already called earlier + if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) { + GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr); return; } - global_executor->Shutdown(); - grpc_core::Delete<GrpcExecutor>(global_executor); - global_executor = nullptr; + executors[GRPC_DEFAULT_EXECUTOR]->Shutdown(); + executors[GRPC_RESOLVER_EXECUTOR]->Shutdown(); + + // Delete the executor objects. + // + // NOTE: It is important to call Shutdown() on all executors first before + // calling Delete() because it is possible for one executor (that is not + // shutdown yet) to call Enqueue() on a different executor which is already + // shutdown. This is legal and in such cases, the Enqueue() operation + // effectively "fails" and enqueues that closure on the calling thread's + // exec_ctx. + // + // By ensuring that all executors are shutdown first, we are also ensuring + // that no thread is active across all executors. + + grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]); + grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]); + executors[GRPC_DEFAULT_EXECUTOR] = nullptr; + executors[GRPC_RESOLVER_EXECUTOR] = nullptr; + + EXECUTOR_TRACE0("grpc_executor_shutdown() done"); } -bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); } +bool grpc_executor_is_threaded(GrpcExecutorType executor_type) { + GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS); + return executors[executor_type]->IsThreaded(); +} -void grpc_executor_set_threading(bool enable) { - global_executor->SetThreading(enable); +bool grpc_executor_is_threaded() { + return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR); } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { - return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short - : &global_scheduler_long; +void grpc_executor_set_threading(bool enable) { + EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable); + for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { + executors[i]->SetThreading(enable); + } } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 395fc52863..8829138c5f 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -27,7 +27,8 @@ typedef struct { gpr_mu mu; - size_t id; // For debugging purposes + size_t id; // For debugging purposes + const char* name; // Thread state name gpr_cv cv; grpc_closure_list elems; size_t depth; // Number of closures in the closure list @@ -36,7 +37,11 @@ typedef struct { grpc_core::Thread thd; } ThreadState; -typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType; +typedef enum { + GRPC_EXECUTOR_SHORT = 0, + GRPC_EXECUTOR_LONG, + GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this +} GrpcExecutorJobType; class GrpcExecutor { public: @@ -58,7 +63,7 @@ class GrpcExecutor { void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); private: - static size_t RunClosures(grpc_closure_list list); + static size_t RunClosures(const char* executor_name, grpc_closure_list list); static void ThreadMain(void* arg); const char* name_; @@ -70,14 +75,42 @@ class GrpcExecutor { // == Global executor functions == +typedef enum { + GRPC_DEFAULT_EXECUTOR = 0, + GRPC_RESOLVER_EXECUTOR, + + GRPC_NUM_EXECUTORS // Add new values above this +} GrpcExecutorType; + +// TODO(sreek): Currently we have two executors (available globally): The +// default executor and the resolver executor. +// +// Some of the functions below operate on the DEFAULT executor only while some +// operate of ALL the executors. This is a bit confusing and should be cleaned +// up in future (where we make all the following functions take executor_type +// and/or job_type) + +// Initialize ALL the executors void grpc_executor_init(); +// Shutdown ALL the executors +void grpc_executor_shutdown(); + +// Set the threading mode for ALL the executors +void grpc_executor_set_threading(bool enable); + +// Get the DEFAULT executor scheduler for the given job_type grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type); -void grpc_executor_shutdown(); +// Get the executor scheduler for a given executor_type and a job_type +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, + GrpcExecutorJobType job_type); -bool grpc_executor_is_threaded(); +// Return if a given executor is running in threaded mode (i.e if +// grpc_executor_set_threading(true) was called previously on that executor) +bool grpc_executor_is_threaded(GrpcExecutorType executor_type); -void grpc_executor_set_threading(bool enable); +// Return if the DEFAULT executor is threaded +bool grpc_executor_is_threaded(); #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc index 5b6b79fa91..085fea40a4 100644 --- a/src/core/lib/iomgr/lockfree_event.cc +++ b/src/core/lib/iomgr/lockfree_event.cc @@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() { void LockfreeEvent::NotifyOn(grpc_closure* closure) { while (true) { - gpr_atm curr = gpr_atm_no_barrier_load(&state_); + /* This load needs to be an acquire load because this can be a shutdown + * error that we might need to reference. Adding acquire semantics makes + * sure that the shutdown error has been initialized properly before us + * referencing it. */ + gpr_atm curr = gpr_atm_acq_load(&state_); if (grpc_polling_trace.enabled()) { gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this, (void*)curr, closure); diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 7a825643e1..c285d7eca6 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -166,8 +166,9 @@ static void posix_resolve_address(const char* name, const char* default_port, grpc_closure* on_done, grpc_resolved_addresses** addrs) { request* r = static_cast<request*>(gpr_malloc(sizeof(request))); - GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); + GRPC_CLOSURE_INIT( + &r->request_closure, do_request_thread, r, + grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 71c92615ad..3e977dca2d 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -151,8 +151,9 @@ static void windows_resolve_address(const char* name, const char* default_port, grpc_closure* on_done, grpc_resolved_addresses** addresses) { request* r = (request*)gpr_malloc(sizeof(request)); - GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); + GRPC_CLOSURE_INIT( + &r->request_closure, do_request_thread, r, + grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc index 2e23409582..4ad31cb35d 100644 --- a/src/core/lib/iomgr/socket_windows.cc +++ b/src/core/lib/iomgr/socket_windows.cc @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_windows.h" +#include "src/core/lib/iomgr/sockaddr_windows.h" #include "src/core/lib/iomgr/socket_windows.h" grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name) { @@ -148,4 +149,32 @@ void grpc_socket_become_ready(grpc_winsocket* socket, if (should_destroy) destroy(socket); } +static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT; +static bool g_ipv6_loopback_available = false; + +static void probe_ipv6_once(void) { + SOCKET s = socket(AF_INET6, SOCK_STREAM, 0); + g_ipv6_loopback_available = 0; + if (s == INVALID_SOCKET) { + gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed."); + } else { + grpc_sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_addr.s6_addr[15] = 1; /* [::1]:0 */ + if (bind(s, reinterpret_cast<grpc_sockaddr*>(&addr), sizeof(addr)) == 0) { + g_ipv6_loopback_available = 1; + } else { + gpr_log(GPR_INFO, + "Disabling AF_INET6 sockets because ::1 is not available."); + } + closesocket(s); + } +} + +int grpc_ipv6_loopback_available(void) { + gpr_once_init(&g_probe_ipv6_once, probe_ipv6_once); + return g_ipv6_loopback_available; +} + #endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index 7bd01eded5..b09b9da562 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -108,6 +108,10 @@ void grpc_socket_notify_on_read(grpc_winsocket* winsocket, void grpc_socket_become_ready(grpc_winsocket* winsocket, grpc_winsocket_callback_info* ci); +/* Returns true if this system can create AF_INET6 sockets bound to ::1. + The value is probed once, and cached for the life of the process. */ +int grpc_ipv6_loopback_available(void); + #endif #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/lib/json/json.cc b/src/core/lib/json/json.cc index 6d359573f8..e78b73cefd 100644 --- a/src/core/lib/json/json.cc +++ b/src/core/lib/json/json.cc @@ -58,6 +58,8 @@ void grpc_json_destroy(grpc_json* json) { grpc_json* grpc_json_link_child(grpc_json* parent, grpc_json* child, grpc_json* sibling) { + // link child up to parent + child->parent = parent; // first child case. if (parent->child == nullptr) { GPR_ASSERT(sibling == nullptr); @@ -81,7 +83,6 @@ grpc_json* grpc_json_create_child(grpc_json* sibling, grpc_json* parent, grpc_json* child = grpc_json_create(type); grpc_json_link_child(parent, child, sibling); child->owns_value = owns_value; - child->parent = parent; child->value = value; child->key = key; return child; diff --git a/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc b/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc index 7c4d7a71cd..8454fd7558 100644 --- a/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc +++ b/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc @@ -41,8 +41,9 @@ namespace internal { bool check_bios_data(const char* bios_data_file) { char* bios_data = read_bios_file(bios_data_file); - bool result = (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) || - (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE)); + bool result = + bios_data && ((!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) || + (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE))); gpr_free(bios_data); return result; } diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index cc72bb6164..59cf3a0af1 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -57,6 +57,10 @@ static const char* installed_roots_path = INSTALL_PREFIX "/share/grpc/roots.pem"; #endif +#ifndef TSI_OPENSSL_ALPN_SUPPORT +#define TSI_OPENSSL_ALPN_SUPPORT 1 +#endif + /* -- Overridden default roots. -- */ static grpc_ssl_roots_override_callback ssl_roots_override_cb = nullptr; @@ -850,7 +854,8 @@ grpc_auth_context* grpc_ssl_peer_to_auth_context(const tsi_peer* peer) { static grpc_error* ssl_check_peer(grpc_security_connector* sc, const char* peer_name, const tsi_peer* peer, grpc_auth_context** auth_context) { - /* Check the ALPN. */ +#if TSI_OPENSSL_ALPN_SUPPORT + /* Check the ALPN if ALPN is supported. */ const tsi_peer_property* p = tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL); if (p == nullptr) { @@ -861,7 +866,7 @@ static grpc_error* ssl_check_peer(grpc_security_connector* sc, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Cannot check peer: invalid ALPN value."); } - +#endif /* TSI_OPENSSL_ALPN_SUPPORT */ /* Check the peer name if specified. */ if (peer_name != nullptr && !grpc_ssl_host_matches_name(peer, peer_name)) { char* msg; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 88e015ce22..dbad5ded4d 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -529,6 +529,7 @@ void grpc_call_internal_unref(grpc_call* c REF_ARG) { static void release_call(void* call, grpc_error* error) { grpc_call* c = static_cast<grpc_call*>(call); grpc_channel* channel = c->channel; + gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string))); grpc_call_combiner_destroy(&c->call_combiner); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(channel, "call"); @@ -561,7 +562,7 @@ static void destroy_call(void* call, grpc_error* error) { } get_final_status(c, set_status_value_directly, &c->final_info.final_status, - nullptr, c->final_info.error_string); + nullptr, &(c->final_info.error_string)); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 8f3ad6c191..7cbd61adef 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -105,6 +105,7 @@ grpc_channel* grpc_channel_create_with_builder( channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); size_t channel_tracer_max_nodes = 0; // default to off bool channelz_enabled = false; + bool internal_channel = false; // this creates the default ChannelNode. Different types of channels may // override this to ensure a correct ChannelNode is created. grpc_core::channelz::ChannelNodeCreationFunc channel_node_create_func = @@ -158,13 +159,17 @@ grpc_channel* grpc_channel_create_with_builder( channel_node_create_func = reinterpret_cast<grpc_core::channelz::ChannelNodeCreationFunc>( args->args[i].value.pointer.p); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL)) { + internal_channel = grpc_channel_arg_get_bool(&args->args[i], false); } } grpc_channel_args_destroy(args); if (channelz_enabled) { - channel->channelz_channel = - channel_node_create_func(channel, channel_tracer_max_nodes); + bool is_top_level_channel = channel->is_client && !internal_channel; + channel->channelz_channel = channel_node_create_func( + channel, channel_tracer_max_nodes, is_top_level_channel); channel->channelz_channel->trace()->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Channel created")); diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 16be81e9c2..0ad82fed99 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -127,7 +127,7 @@ void grpc_init(void) { grpc_slice_intern_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); - grpc_core::ChannelzRegistry::Init(); + grpc_core::channelz::ChannelzRegistry::Init(); grpc_security_pre_init(); grpc_core::ExecCtx::GlobalInit(); grpc_iomgr_init(); @@ -176,7 +176,7 @@ void grpc_shutdown(void) { grpc_mdctx_global_shutdown(); grpc_handshaker_factory_registry_shutdown(); grpc_slice_intern_shutdown(); - grpc_core::ChannelzRegistry::Shutdown(); + grpc_core::channelz::ChannelzRegistry::Shutdown(); grpc_stats_shutdown(); grpc_core::Fork::GlobalShutdown(); } diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index f4b1bb0cd8..e92fe2c5a1 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -23,6 +23,6 @@ #include <grpc/grpc.h> -const char* grpc_version_string(void) { return "6.0.0"; } +const char* grpc_version_string(void) { return "6.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "gladiolus"; } +const char* grpc_g_stands_for(void) { return "glider"; } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index 06b999899a..1df1021bb1 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -462,6 +462,14 @@ void alts_tsi_handshaker_handle_response(alts_tsi_handshaker* handshaker, set_unused_bytes(result, &handshaker->recv_bytes, resp->bytes_consumed); } grpc_status_code code = static_cast<grpc_status_code>(resp->status.code); + if (code != GRPC_STATUS_OK) { + grpc_slice* details = static_cast<grpc_slice*>(resp->status.details.arg); + if (details != nullptr) { + char* error_details = grpc_slice_to_c_string(*details); + gpr_log(GPR_ERROR, "Error from handshaker service:%s", error_details); + gpr_free(error_details); + } + } grpc_gcp_handshaker_resp_destroy(resp); cb(alts_tsi_utils_convert_to_tsi_result(code), user_data, bytes_to_send, bytes_to_send_size, result); |