aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc38
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc20
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.h6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h20
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc146
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc70
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc90
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h13
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc5
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc6
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc59
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc9
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h4
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc29
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc29
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc18
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc2
-rw-r--r--src/core/lib/channel/channel_stack.h2
-rw-r--r--src/core/lib/channel/channel_trace.cc2
-rw-r--r--src/core/lib/channel/channel_trace.h2
-rw-r--r--src/core/lib/channel/channelz.cc64
-rw-r--r--src/core/lib/channel/channelz.h17
-rw-r--r--src/core/lib/channel/channelz_registry.cc92
-rw-r--r--src/core/lib/channel/channelz_registry.h95
-rw-r--r--src/core/lib/gpr/arena.cc88
-rw-r--r--src/core/lib/gprpp/orphanable.h6
-rw-r--r--src/core/lib/gprpp/ref_counted.h6
-rw-r--r--src/core/lib/gprpp/ref_counted_ptr.h78
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc22
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc31
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc38
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc67
-rw-r--r--src/core/lib/iomgr/ev_posix.cc6
-rw-r--r--src/core/lib/iomgr/ev_posix.h20
-rw-r--r--src/core/lib/iomgr/executor.cc220
-rw-r--r--src/core/lib/iomgr/executor.h45
-rw-r--r--src/core/lib/iomgr/lockfree_event.cc6
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc5
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.cc5
-rw-r--r--src/core/lib/iomgr/socket_windows.cc29
-rw-r--r--src/core/lib/iomgr/socket_windows.h4
-rw-r--r--src/core/lib/json/json.cc3
-rw-r--r--src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc5
-rw-r--r--src/core/lib/security/security_connector/security_connector.cc9
-rw-r--r--src/core/lib/surface/call.cc3
-rw-r--r--src/core/lib/surface/channel.cc9
-rw-r--r--src/core/lib/surface/init.cc4
-rw-r--r--src/core/lib/surface/version.cc4
-rw-r--r--src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc8
49 files changed, 1015 insertions, 544 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 024c9d737e..fead8feb17 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -571,15 +571,32 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
if (chand->lb_policy == nullptr) {
- GRPC_CLOSURE_SCHED(
- op->send_ping.on_initiate,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
- GRPC_CLOSURE_SCHED(
- op->send_ping.on_ack,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
+ GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
} else {
- chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
- op->send_ping.on_ack);
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_core::LoadBalancingPolicy::PickState pick_state;
+ pick_state.initial_metadata = nullptr;
+ pick_state.initial_metadata_flags = 0;
+ pick_state.on_complete = nullptr;
+ memset(&pick_state.subchannel_call_context, 0,
+ sizeof(pick_state.subchannel_call_context));
+ pick_state.user_data = nullptr;
+ // Pick must return synchronously, because pick_state.on_complete is null.
+ GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error));
+ if (pick_state.connected_subchannel != nullptr) {
+ pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
+ op->send_ping.on_ack);
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "LB policy dropped call on ping");
+ }
+ GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
+ }
op->bind_pollset = nullptr;
}
op->send_ping.on_initiate = nullptr;
@@ -2684,14 +2701,15 @@ class LbPicker {
grpc_combiner_scheduler(chand->combiner));
calld->pick.on_complete = &calld->pick_closure;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
- const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
+ grpc_error* error = GRPC_ERROR_NONE;
+ const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error);
if (GPR_LIKELY(pick_done)) {
// Pick completed synchronously.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
}
- pick_done_locked(elem, GRPC_ERROR_NONE);
+ pick_done_locked(elem, error);
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
} else {
// Pick will be returned asynchronously.
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index 235b8f3207..86c765df52 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -41,8 +41,9 @@ static const grpc_arg_pointer_vtable client_channel_channelz_vtable = {
client_channel_channelz_cmp};
ClientChannelNode::ClientChannelNode(grpc_channel* channel,
- size_t channel_tracer_max_nodes)
- : ChannelNode(channel, channel_tracer_max_nodes) {
+ size_t channel_tracer_max_nodes,
+ bool is_top_level_channel)
+ : ChannelNode(channel, channel_tracer_max_nodes, is_top_level_channel) {
client_channel_ =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
GPR_ASSERT(client_channel_->filter == &grpc_client_channel_filter);
@@ -69,7 +70,7 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
grpc_json* json_iterator = nullptr;
grpc_client_channel_populate_child_refs(client_channel_, &child_subchannels,
&child_channels);
- if (child_subchannels.size() > 0) {
+ if (!child_subchannels.empty()) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false);
for (size_t i = 0; i < child_subchannels.size(); ++i) {
@@ -80,16 +81,16 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
child_subchannels[i]);
}
}
- if (child_channels.size() > 0) {
+ if (!child_channels.empty()) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr;
- for (size_t i = 0; i < child_subchannels.size(); ++i) {
+ for (size_t i = 0; i < child_channels.size(); ++i) {
json_iterator =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
- child_subchannels[i]);
+ child_channels[i]);
}
}
}
@@ -102,9 +103,10 @@ grpc_arg ClientChannelNode::CreateChannelArg() {
}
RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes) {
- return MakePolymorphicRefCounted<ChannelNode, ClientChannelNode>(
- channel, channel_tracer_max_nodes);
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel) {
+ return MakeRefCounted<ClientChannelNode>(channel, channel_tracer_max_nodes,
+ is_top_level_channel);
}
} // namespace channelz
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h
index 0547109d36..6f27b5c8b7 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.h
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.h
@@ -40,7 +40,8 @@ namespace channelz {
class ClientChannelNode : public ChannelNode {
public:
static RefCountedPtr<ChannelNode> MakeClientChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes);
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
// Override this functionality since client_channels have a notion of
// channel connectivity.
@@ -56,7 +57,8 @@ class ClientChannelNode : public ChannelNode {
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes);
+ ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
virtual ~ClientChannelNode() {}
private:
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 3150df8847..31c08246ae 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -71,6 +71,7 @@ class LoadBalancingPolicy
/// Storage for LB token in \a initial_metadata, or nullptr if not used.
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
+ /// If null, pick will fail if a result is not available synchronously.
grpc_closure* on_complete;
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
@@ -99,10 +100,15 @@ class LoadBalancingPolicy
/// Finds an appropriate subchannel for a call, based on data in \a pick.
/// \a pick must remain alive until the pick is complete.
///
- /// If the pick succeeds and a result is known immediately, returns true.
- /// Otherwise, \a pick->on_complete will be invoked once the pick is
- /// complete with its error argument set to indicate success or failure.
- virtual bool PickLocked(PickState* pick) GRPC_ABSTRACT;
+ /// If a result is known immediately, returns true, setting \a *error
+ /// upon failure. Otherwise, \a pick->on_complete will be invoked once
+ /// the pick is complete with its error argument set to indicate success
+ /// or failure.
+ ///
+ /// If \a pick->on_complete is null and no result is known immediately,
+ /// a synchronous failure will be returned (i.e., \a *error will be
+ /// set and true will be returned).
+ virtual bool PickLocked(PickState* pick, grpc_error** error) GRPC_ABSTRACT;
/// Cancels \a pick.
/// The \a on_complete callback of the pending pick will be invoked with
@@ -133,12 +139,6 @@ class LoadBalancingPolicy
virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy)
GRPC_ABSTRACT;
- /// Performs a connected subchannel ping via \a ConnectedSubchannel::Ping()
- /// against one of the connected subchannels managed by the policy.
- /// Note: This is intended only for use in tests.
- virtual void PingOneLocked(grpc_closure* on_initiate,
- grpc_closure* on_ack) GRPC_ABSTRACT;
-
/// Tries to enter a READY connectivity state.
/// TODO(roth): As part of restructuring how we handle IDLE state,
/// consider whether this method is still needed.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index f757d6057c..2d1f777474 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -123,7 +123,7 @@ class GrpcLb : public LoadBalancingPolicy {
GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -133,11 +133,9 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
- // TODO(ncteisen): implement this in a follow up PR
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
- ChildRefsList* child_channels) override {}
+ ChildRefsList* child_channels) override;
private:
/// Linked list of pending pick requests. It stores all information needed to
@@ -168,13 +166,6 @@ class GrpcLb : public LoadBalancingPolicy {
PendingPick* next = nullptr;
};
- /// A linked list of pending pings waiting for the RR policy to be created.
- struct PendingPing {
- grpc_closure* on_initiate;
- grpc_closure* on_ack;
- PendingPing* next = nullptr;
- };
-
/// Contains a call to the LB server and all the data related to the call.
class BalancerCallState
: public InternallyRefCountedWithTracing<BalancerCallState> {
@@ -273,14 +264,12 @@ class GrpcLb : public LoadBalancingPolicy {
void AddPendingPick(PendingPick* pp);
static void OnPendingPickComplete(void* arg, grpc_error* error);
- // Pending ping methods.
- void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack);
-
// Methods for dealing with the RR policy.
void CreateOrUpdateRoundRobinPolicyLocked();
grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
void CreateRoundRobinPolicyLocked(const Args& args);
- bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp);
+ bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error);
void UpdateConnectivityStateFromRoundRobinPolicyLocked(
grpc_error* rr_state_error);
static void OnRoundRobinConnectivityChangedLocked(void* arg,
@@ -301,6 +290,9 @@ class GrpcLb : public LoadBalancingPolicy {
// The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr;
+ // Mutex to protect the channel to the LB server. This is used when
+ // processing a channelz request.
+ gpr_mu lb_channel_mu_;
grpc_connectivity_state lb_channel_connectivity_;
grpc_closure lb_channel_on_connectivity_changed_;
// Are we already watching the LB channel's connectivity?
@@ -340,9 +332,8 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
- // Pending picks and pings that are waiting on the RR policy's connectivity.
+ // Pending picks that are waiting on the RR policy's connectivity.
PendingPick* pending_picks_ = nullptr;
- PendingPing* pending_pings_ = nullptr;
// The RR policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> rr_policy_;
@@ -1007,6 +998,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
// A channel arg indicating the target is a grpclb load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
+ // A channel arg indicating this is an internal channels, aka it is
+ // owned by components in Core, not by the user application.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
};
// Construct channel args.
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
@@ -1036,6 +1031,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Initialization.
+ gpr_mu_init(&lb_channel_mu_);
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
@@ -1073,7 +1069,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
GrpcLb::~GrpcLb() {
GPR_ASSERT(pending_picks_ == nullptr);
- GPR_ASSERT(pending_pings_ == nullptr);
+ gpr_mu_destroy(&lb_channel_mu_);
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
grpc_connectivity_state_destroy(&state_tracker_);
@@ -1103,8 +1099,10 @@ void GrpcLb::ShutdownLocked() {
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if (lb_channel_ != nullptr) {
+ gpr_mu_lock(&lb_channel_mu_);
grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr;
+ gpr_mu_unlock(&lb_channel_mu_);
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "grpclb_shutdown");
@@ -1116,14 +1114,6 @@ void GrpcLb::ShutdownLocked() {
// Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
}
- // Clear pending pings.
- PendingPing* pping;
- while ((pping = pending_pings_) != nullptr) {
- pending_pings_ = pping->next;
- GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
- GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
- Delete(pping);
- }
GRPC_ERROR_UNREF(error);
}
@@ -1137,9 +1127,10 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete;
pp->pick->user_data = nullptr;
- if (new_policy->PickLocked(pp->pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure.
- GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
}
Delete(pp);
}
@@ -1223,54 +1214,47 @@ void GrpcLb::ExitIdleLocked() {
}
}
-bool GrpcLb::PickLocked(PickState* pick) {
+bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
PendingPick* pp = PendingPickCreate(pick);
bool pick_done = false;
if (rr_policy_ != nullptr) {
- const grpc_connectivity_state rr_connectivity_state =
- rr_policy_->CheckConnectivityLocked(nullptr);
- // The RR policy may have transitioned to SHUTDOWN but the callback
- // registered to capture this event (on_rr_connectivity_changed_) may not
- // have been invoked yet. We need to make sure we aren't trying to pick
- // from an RR policy instance that's in shutdown.
- if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
+ rr_policy_.get());
+ }
+ pick_done =
+ PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
+ } else { // rr_policy_ == NULL
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ pick_done = true;
+ } else {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
- this, rr_policy_.get(),
- grpc_connectivity_state_name(rr_connectivity_state));
+ "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
+ this);
}
AddPendingPick(pp);
- pick_done = false;
- } else { // RR not in shutdown
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
- rr_policy_.get());
+ if (!started_picking_) {
+ StartPickingLocked();
}
- pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp);
- }
- } else { // rr_policy_ == NULL
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
- this);
- }
- AddPendingPick(pp);
- if (!started_picking_) {
- StartPickingLocked();
+ pick_done = false;
}
- pick_done = false;
}
return pick_done;
}
-void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
- if (rr_policy_ != nullptr) {
- rr_policy_->PingOneLocked(on_initiate, on_ack);
- } else {
- AddPendingPing(on_initiate, on_ack);
- if (!started_picking_) {
- StartPickingLocked();
+void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+ ChildRefsList* child_channels) {
+ // delegate to the RoundRobin to fill the children subchannels.
+ rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
+ mu_guard guard(&lb_channel_mu_);
+ if (lb_channel_ != nullptr) {
+ grpc_core::channelz::ChannelNode* channel_node =
+ grpc_channel_get_channelz_node(lb_channel_);
+ if (channel_node != nullptr) {
+ child_channels->push_back(channel_node->channel_uuid());
}
}
}
@@ -1318,9 +1302,11 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
if (lb_channel_ == nullptr) {
char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_);
+ gpr_mu_lock(&lb_channel_mu_);
lb_channel_ = grpc_client_channel_factory_create_channel(
client_channel_factory(), uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
+ gpr_mu_unlock(&lb_channel_mu_);
GPR_ASSERT(lb_channel_ != nullptr);
gpr_free(uri_str);
}
@@ -1573,18 +1559,6 @@ void GrpcLb::AddPendingPick(PendingPick* pp) {
}
//
-// PendingPing
-//
-
-void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
- PendingPing* pping = New<PendingPing>();
- pping->on_initiate = on_initiate;
- pping->on_ack = on_ack;
- pping->next = pending_pings_;
- pending_pings_ = pping;
-}
-
-//
// code for interacting with the RR policy
//
@@ -1593,7 +1567,8 @@ void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
// cleanups this callback would otherwise be responsible for.
// If \a force_async is true, then we will manually schedule the
// completion callback even if the pick is available immediately.
-bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
+bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error) {
// Check for drops if we are not using fallback backend addresses.
if (serverlist_ != nullptr) {
// Look at the index into the serverlist to see if we should drop this call.
@@ -1627,11 +1602,12 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
- bool pick_done = rr_policy_->PickLocked(pp->pick);
+ bool pick_done = rr_policy_->PickLocked(pp->pick, error);
if (pick_done) {
PendingPickSetMetadataAndContext(pp);
if (force_async) {
- GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
+ *error = GRPC_ERROR_NONE;
pick_done = false;
}
Delete(pp);
@@ -1683,18 +1659,8 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
"[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
rr_policy_.get());
}
- PickFromRoundRobinPolicyLocked(true /* force_async */, pp);
- }
- // Send pending pings to RR policy.
- PendingPing* pping;
- while ((pping = pending_pings_)) {
- pending_pings_ = pping->next;
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
- this, rr_policy_.get());
- }
- rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack);
- Delete(pping);
+ grpc_error* error = GRPC_ERROR_NONE;
+ PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index c50deb9679..46acbf628b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -46,7 +46,7 @@ class PickFirst : public LoadBalancingPolicy {
explicit PickFirst(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -56,10 +56,9 @@ class PickFirst : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
- ChildRefsList* child_channels) override;
+ ChildRefsList* ignored) override;
private:
~PickFirst();
@@ -173,15 +172,16 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
- if (new_policy->PickLocked(pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
void PickFirst::ShutdownLocked() {
- AutoChildRefsUpdater(this);
+ AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
@@ -259,13 +259,18 @@ void PickFirst::ExitIdleLocked() {
}
}
-bool PickFirst::PickLocked(PickState* pick) {
+bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ return true;
+ }
if (!started_picking_) {
StartPickingLocked();
}
@@ -293,17 +298,6 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
- if (selected_ != nullptr) {
- selected_->connected_subchannel()->Ping(on_initiate, on_ack);
- } else {
- GRPC_CLOSURE_SCHED(on_initiate,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- GRPC_CLOSURE_SCHED(on_ack,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- }
-}
-
void PickFirst::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
mu_guard guard(&child_refs_mu_);
@@ -327,30 +321,10 @@ void PickFirst::FillChildRefsForChannelz(
void PickFirst::UpdateChildRefsLocked() {
ChildRefsList cs;
if (subchannel_list_ != nullptr) {
- for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
- if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
- grpc_core::channelz::SubchannelNode* subchannel_node =
- grpc_subchannel_get_channelz_node(
- subchannel_list_->subchannel(i)->subchannel());
- if (subchannel_node != nullptr) {
- cs.push_back(subchannel_node->subchannel_uuid());
- }
- }
- }
+ subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
- for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels();
- ++i) {
- if (latest_pending_subchannel_list_->subchannel(i)->subchannel() !=
- nullptr) {
- grpc_core::channelz::SubchannelNode* subchannel_node =
- grpc_subchannel_get_channelz_node(
- latest_pending_subchannel_list_->subchannel(i)->subchannel());
- if (subchannel_node != nullptr) {
- cs.push_back(subchannel_node->subchannel_uuid());
- }
- }
- }
+ latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
mu_guard guard(&child_refs_mu_);
@@ -471,6 +445,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
+ GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
if (grpc_lb_pick_first_trace.enabled()) {
@@ -500,14 +475,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"update"),
"selected_not_ready+switch_to_update");
} else {
- // TODO(juanlishen): we re-resolve when the selected subchannel goes to
- // TRANSIENT_FAILURE because we used to shut down in this case before
- // re-resolution is introduced. But we need to investigate whether we
- // really want to take any action instead of waiting for the selected
- // subchannel reconnecting.
- GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // If the selected channel goes bad, request a re-resolution.
+ // If the selected subchannel goes bad, request a re-resolution. We also
+ // set the channel state to IDLE and reset started_picking_. The reason
+ // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
+ // reception we don't want to connect to the re-resolved backends until
+ // we leave the IDLE state.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"selected_changed+reresolve");
@@ -588,9 +561,10 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
+ p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "connecting_transient_failure");
+ GRPC_ERROR_REF(error), "exhausted_subchannels");
}
sd->StartConnectivityWatchLocked();
break;
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 09634a2ad4..9c3a15c67b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -57,7 +57,7 @@ class RoundRobin : public LoadBalancingPolicy {
explicit RoundRobin(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -67,11 +67,9 @@ class RoundRobin : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
- // TODO(ncteisen): implement this in a follow up PR
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
- ChildRefsList* child_channels) override {}
+ ChildRefsList* ignored) override;
private:
~RoundRobin();
@@ -183,11 +181,24 @@ class RoundRobin : public LoadBalancingPolicy {
size_t last_ready_index_ = -1; // Index into list of last pick.
};
+ // Helper class to ensure that any function that modifies the child refs
+ // data structures will update the channelz snapshot data structures before
+ // returning.
+ class AutoChildRefsUpdater {
+ public:
+ explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
+ ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
+
+ private:
+ RoundRobin* rr_;
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
bool DoPickLocked(PickState* pick);
void DrainPendingPicksLocked();
+ void UpdateChildRefsLocked();
/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
@@ -205,10 +216,16 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
+ /// Lock and data used to capture snapshots of this channel's child
+ /// channels and subchannels. This data is consumed by channelz.
+ gpr_mu child_refs_mu_;
+ ChildRefsList child_subchannels_;
+ ChildRefsList child_channels_;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
+ gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args);
@@ -223,6 +240,7 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
+ gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
@@ -234,14 +252,16 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
- if (new_policy->PickLocked(pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
void RoundRobin::ShutdownLocked() {
+ AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
@@ -348,7 +368,7 @@ void RoundRobin::DrainPendingPicksLocked() {
}
}
-bool RoundRobin::PickLocked(PickState* pick) {
+bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
}
@@ -356,6 +376,11 @@ bool RoundRobin::PickLocked(PickState* pick) {
if (subchannel_list_ != nullptr) {
if (DoPickLocked(pick)) return true;
}
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ return true;
+ }
/* no pick currently available. Save for later in list of pending picks */
pick->next = pending_picks_;
pending_picks_ = pick;
@@ -365,6 +390,39 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
+void RoundRobin::FillChildRefsForChannelz(
+ ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
+ mu_guard guard(&child_refs_mu_);
+ for (size_t i = 0; i < child_subchannels_.size(); ++i) {
+ // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
+ // have to implement lightweight set. For now, we don't care about
+ // performance when channelz requests are made.
+ bool found = false;
+ for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
+ if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ child_subchannels_to_fill->push_back(child_subchannels_[i]);
+ }
+ }
+}
+
+void RoundRobin::UpdateChildRefsLocked() {
+ ChildRefsList cs;
+ if (subchannel_list_ != nullptr) {
+ subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
+ }
+ // atomically update the data that channelz will actually be looking at.
+ mu_guard guard(&child_refs_mu_);
+ child_subchannels_ = std::move(cs);
+}
+
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
@@ -455,6 +513,7 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
+ AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
@@ -593,24 +652,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- const size_t next_ready_index =
- subchannel_list_->GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels()) {
- RoundRobinSubchannelData* selected =
- subchannel_list_->subchannel(next_ready_index);
- selected->connected_subchannel()->Ping(on_initiate, on_ack);
- } else {
- GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Round Robin not connected"));
- GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Round Robin not connected"));
- }
-}
-
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 7e2046bcdc..018ac3bb86 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -189,6 +189,19 @@ class SubchannelList
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
+ // Populates refs_list with the uuids of this SubchannelLists's subchannels.
+ void PopulateChildRefsList(ChildRefsList* refs_list) {
+ for (size_t i = 0; i < subchannels_.size(); ++i) {
+ if (subchannels_[i].subchannel() != nullptr) {
+ grpc_core::channelz::SubchannelNode* subchannel_node =
+ grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
+ if (subchannel_node != nullptr) {
+ refs_list->push_back(subchannel_node->subchannel_uuid());
+ }
+ }
+ }
+ }
+
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index f4f6444c5f..7050e82121 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -23,7 +23,6 @@
#include <limits.h>
#include <stdio.h>
#include <string.h>
-#include <unistd.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@@ -142,8 +141,8 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args)
channel_args_ = grpc_channel_args_copy(args.args);
const grpc_arg* arg = grpc_channel_args_find(
channel_args_, GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION);
- request_service_config_ = !grpc_channel_arg_get_integer(
- arg, (grpc_integer_options){false, false, true});
+ grpc_integer_options integer_options = {false, false, true};
+ request_service_config_ = !grpc_channel_arg_get_integer(arg, integer_options);
arg = grpc_channel_args_find(channel_args_,
GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS);
min_time_between_resolutions_ =
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
index c886795608..0068d0d5f4 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
@@ -18,11 +18,10 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
-#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
+#if GRPC_ARES == 1 && !defined(GRPC_UV)
#include <ares.h>
#include <string.h>
-#include <sys/ioctl.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
@@ -32,7 +31,6 @@
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -314,4 +312,4 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
}
}
-#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */
+#endif /* GRPC_ARES == 1 && !defined(GRPC_UV) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
new file mode 100644
index 0000000000..5d65ae3ab3
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+#if GRPC_ARES == 1 && defined(GPR_WINDOWS)
+
+#include <ares.h>
+#include <string.h>
+#include "src/core/lib/gprpp/memory.h"
+
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+
+namespace grpc_core {
+
+/* TODO: fill in the body of GrpcPolledFdWindows to enable c-ares on Windows.
+ This dummy implementation only allows grpc to compile on windows with
+ GRPC_ARES=1. */
+class GrpcPolledFdWindows : public GrpcPolledFd {
+ public:
+ GrpcPolledFdWindows() { abort(); }
+ ~GrpcPolledFdWindows() { abort(); }
+ void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
+ abort();
+ }
+ void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
+ abort();
+ }
+ bool IsFdStillReadableLocked() override { abort(); }
+ void ShutdownLocked(grpc_error* error) override { abort(); }
+ ares_socket_t GetWrappedAresSocketLocked() override { abort(); }
+ const char* GetName() override { abort(); }
+};
+
+GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
+ grpc_pollset_set* driver_pollset_set) {
+ return nullptr;
+}
+
+void ConfigureAresChannelLocked(ares_channel* channel) { abort(); }
+
+} // namespace grpc_core
+
+#endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 497ad998af..b3d6437e9a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -22,7 +22,6 @@
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/iomgr/sockaddr.h"
-#include "src/core/lib/iomgr/socket_utils_posix.h"
#include <string.h>
#include <sys/types.h>
@@ -215,7 +214,7 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts,
memset(&addr, 0, addr_len);
memcpy(&addr.sin6_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in6_addr));
- addr.sin6_family = static_cast<sa_family_t>(hostent->h_addrtype);
+ addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin6_port = hr->port;
grpc_lb_addresses_set_address(
*lb_addresses, i, &addr, addr_len,
@@ -236,7 +235,7 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts,
memset(&addr, 0, addr_len);
memcpy(&addr.sin_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in_addr));
- addr.sin_family = static_cast<sa_family_t>(hostent->h_addrtype);
+ addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin_port = hr->port;
grpc_lb_addresses_set_address(
*lb_addresses, i, &addr, addr_len,
@@ -281,7 +280,7 @@ static void on_srv_query_done_locked(void* arg, int status, int timeouts,
grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
srv_it = srv_it->next) {
- if (grpc_ipv6_loopback_available()) {
+ if (grpc_ares_query_ipv6()) {
grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked(
r, srv_it->host, htons(srv_it->port), true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6,
@@ -452,7 +451,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
}
}
r->pending_queries = 1;
- if (grpc_ipv6_loopback_available()) {
+ if (grpc_ares_query_ipv6()) {
hr = create_hostbyname_request_locked(r, host, strhtons(port),
false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked,
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index ce26f5d524..17eaa7ccf0 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -70,6 +70,10 @@ void grpc_ares_cleanup(void);
* and destroys the grpc_ares_request */
void grpc_ares_complete_request_locked(grpc_ares_request* request);
+/* Indicates whether or not AAAA queries should be attempted. */
+/* E.g., return false if ipv6 is known to not be available. */
+bool grpc_ares_query_ipv6();
+
/* Exposed only for testing */
void grpc_cares_wrapper_test_only_address_sorting_sort(
grpc_lb_addresses* lb_addrs);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
new file mode 100644
index 0000000000..23c0fec74f
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
+
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+#include "src/core/lib/iomgr/socket_utils_posix.h"
+
+bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); }
+
+#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
new file mode 100644
index 0000000000..ee827e284e
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+#if GRPC_ARES == 1 && defined(GPR_WINDOWS)
+
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+#include "src/core/lib/iomgr/socket_windows.h"
+
+bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); }
+
+#endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 9d608c3c55..71ef8c518b 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -390,8 +390,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
}
grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
- grpc_subchannel* s) {
- return s->channelz_subchannel.get();
+ grpc_subchannel* subchannel) {
+ return subchannel->channelz_subchannel.get();
}
static void continue_connect_locked(grpc_subchannel* c) {
@@ -402,8 +402,6 @@ static void continue_connect_locked(grpc_subchannel* c) {
c->next_attempt_deadline = c->backoff->NextAttemptTime();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
@@ -459,27 +457,24 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
/* Don't try to connect if we're already disconnected */
return;
}
-
if (c->connecting) {
/* Already connecting: don't restart */
return;
}
-
if (c->connected_subchannel != nullptr) {
/* Already connected: don't restart */
return;
}
-
if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
/* Nobody is interested in connecting: so don't just yet */
return;
}
-
c->connecting = true;
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
-
if (!c->backoff_begun) {
c->backoff_begun = true;
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "connecting");
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
@@ -494,6 +489,11 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
+ // During backoff, we prefer the connectivity state of CONNECTING instead of
+ // TRANSIENT_FAILURE in order to prevent triggering re-resolution
+ // continuously in pick_first.
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "backoff");
}
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index bd6fec6fbe..9ad271753c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1208,7 +1208,7 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_error_add_child(closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
- if (s->seen_error || (t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
+ if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
} else {
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 7581f937b6..35c3fb01ea 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -82,7 +82,7 @@ typedef struct {
typedef struct {
grpc_call_stats stats;
grpc_status_code final_status;
- const char** error_string;
+ const char* error_string;
} grpc_call_final_info;
/* Channel filters specify:
diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc
index c17885a713..b3443310ac 100644
--- a/src/core/lib/channel/channel_trace.cc
+++ b/src/core/lib/channel/channel_trace.cc
@@ -174,7 +174,7 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const {
}
}
-grpc_json* ChannelTrace::RenderJSON() const {
+grpc_json* ChannelTrace::RenderJson() const {
if (!max_list_size_)
return nullptr; // tracing is disabled if max_events == 0
grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT);
diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h
index 0dd162a777..596af7402f 100644
--- a/src/core/lib/channel/channel_trace.h
+++ b/src/core/lib/channel/channel_trace.h
@@ -71,7 +71,7 @@ class ChannelTrace {
// Creates and returns the raw grpc_json object, so a parent channelz
// object may incorporate the json before rendering.
- grpc_json* RenderJSON() const;
+ grpc_json* RenderJson() const;
private:
// Types of objects that can be references by trace events.
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 79a9220503..9d6002ed8a 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -41,18 +41,22 @@
namespace grpc_core {
namespace channelz {
-ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes)
- : channel_(channel), target_(nullptr), channel_uuid_(-1) {
+ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel)
+ : channel_(channel),
+ target_(nullptr),
+ channel_uuid_(-1),
+ is_top_level_channel_(is_top_level_channel) {
trace_.Init(channel_tracer_max_nodes);
target_ = UniquePtr<char>(grpc_channel_get_target(channel_));
- channel_uuid_ = ChannelzRegistry::Register(this);
+ channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this);
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
ChannelNode::~ChannelNode() {
trace_.Destroy();
- ChannelzRegistry::Unregister(channel_uuid_);
+ ChannelzRegistry::UnregisterChannelNode(channel_uuid_);
}
void ChannelNode::RecordCallStarted() {
@@ -65,7 +69,7 @@ void ChannelNode::PopulateConnectivityState(grpc_json* json) {}
void ChannelNode::PopulateChildRefs(grpc_json* json) {}
-char* ChannelNode::RenderJSON() {
+grpc_json* ChannelNode::RenderJson() {
// We need to track these three json objects to build our object
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
@@ -86,30 +90,32 @@ char* ChannelNode::RenderJSON() {
json = data;
json_iterator = nullptr;
PopulateConnectivityState(json);
+ GPR_ASSERT(target_.get() != nullptr);
json_iterator = grpc_json_create_child(
json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false);
// fill in the channel trace if applicable
- grpc_json* trace = trace_->RenderJSON();
+ grpc_json* trace = trace_->RenderJson();
if (trace != nullptr) {
- // we manuall link up and fill the child since it was created for us in
- // ChannelTrace::RenderJSON
+ // we manually link up and fill the child since it was created for us in
+ // ChannelTrace::RenderJson
+ trace->key = "trace"; // this object is named trace in channelz.proto
json_iterator = grpc_json_link_child(json, trace, json_iterator);
- trace->parent = json;
- trace->value = nullptr;
- trace->key = "trace";
- trace->owns_value = false;
}
// reset the parent to be the data object.
json = data;
json_iterator = nullptr;
- // We use -1 as sentinel values since proto default value for integers is
- // zero, and the confuses the parser into thinking the value weren't present
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsStarted", calls_started_);
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsSucceeded", calls_succeeded_);
- json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsFailed", calls_failed_);
+ if (calls_started_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsStarted", calls_started_);
+ }
+ if (calls_succeeded_ != 0) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsSucceeded", calls_succeeded_);
+ }
+ if (calls_failed_) {
+ json_iterator = grpc_json_add_number_string_child(
+ json, json_iterator, "callsFailed", calls_failed_);
+ }
gpr_timespec ts =
grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
json_iterator =
@@ -118,25 +124,29 @@ char* ChannelNode::RenderJSON() {
json = top_level_json;
json_iterator = nullptr;
PopulateChildRefs(json);
+ return top_level_json;
+}
- // render and return the over json object
- char* json_str = grpc_json_dump_to_string(top_level_json, 0);
- grpc_json_destroy(top_level_json);
+char* ChannelNode::RenderJsonString() {
+ grpc_json* json = RenderJson();
+ char* json_str = grpc_json_dump_to_string(json, 0);
+ grpc_json_destroy(json);
return json_str;
}
RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes) {
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel) {
return MakeRefCounted<grpc_core::channelz::ChannelNode>(
- channel, channel_tracer_max_nodes);
+ channel, channel_tracer_max_nodes, is_top_level_channel);
}
SubchannelNode::SubchannelNode() {
- subchannel_uuid_ = ChannelzRegistry::Register(this);
+ subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this);
}
SubchannelNode::~SubchannelNode() {
- ChannelzRegistry::Unregister(subchannel_uuid_);
+ ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_);
}
} // namespace channelz
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index 7184af93ec..07eb73d626 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -35,6 +35,10 @@
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC \
"grpc.channelz_channel_node_creation_func"
+// Channel arg key to signal that the channel is an internal channel.
+#define GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL \
+ "grpc.channelz_channel_is_internal_channel"
+
namespace grpc_core {
namespace channelz {
@@ -45,7 +49,8 @@ class ChannelNodePeer;
class ChannelNode : public RefCounted<ChannelNode> {
public:
static RefCountedPtr<ChannelNode> MakeChannelNode(
- grpc_channel* channel, size_t channel_tracer_max_nodes);
+ grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
void RecordCallStarted();
void RecordCallFailed() {
@@ -55,7 +60,8 @@ class ChannelNode : public RefCounted<ChannelNode> {
gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
}
- char* RenderJSON();
+ grpc_json* RenderJson();
+ char* RenderJsonString();
// helper for getting and populating connectivity state. It is virtual
// because it allows the client_channel specific code to live in ext/
@@ -74,11 +80,13 @@ class ChannelNode : public RefCounted<ChannelNode> {
bool ChannelIsDestroyed() { return channel_ == nullptr; }
intptr_t channel_uuid() { return channel_uuid_; }
+ bool is_top_level_channel() { return is_top_level_channel_; }
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes);
+ ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
+ bool is_top_level_channel);
virtual ~ChannelNode();
private:
@@ -92,6 +100,7 @@ class ChannelNode : public RefCounted<ChannelNode> {
gpr_atm calls_failed_ = 0;
gpr_atm last_call_started_millis_ = 0;
intptr_t channel_uuid_;
+ bool is_top_level_channel_ = true;
ManualConstructor<ChannelTrace> trace_;
};
@@ -116,7 +125,7 @@ class SubchannelNode : public RefCounted<SubchannelNode> {
// Creation functions
typedef RefCountedPtr<ChannelNode> (*ChannelNodeCreationFunc)(grpc_channel*,
- size_t);
+ size_t, bool);
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index 023ede552a..38496b3d78 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -19,16 +19,19 @@
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/lib/channel/channel_trace.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
#include <cstring>
namespace grpc_core {
+namespace channelz {
namespace {
// singleton instance of the registry.
@@ -49,12 +52,93 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
-void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
+intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) {
+ mu_guard guard(&mu_);
+ entities_.push_back(entry);
+ intptr_t uuid = entities_.size();
+ return uuid;
+}
+
+void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) {
GPR_ASSERT(uuid >= 1);
- gpr_mu_lock(&mu_);
+ mu_guard guard(&mu_);
GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
- entities_[uuid - 1] = nullptr;
- gpr_mu_unlock(&mu_);
+ GPR_ASSERT(entities_[uuid - 1].type == type);
+ entities_[uuid - 1].object = nullptr;
+ entities_[uuid - 1].type = EntityType::kUnset;
+}
+
+void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) {
+ mu_guard guard(&mu_);
+ if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
+ return nullptr;
+ }
+ if (entities_[uuid - 1].type == type) {
+ return entities_[uuid - 1].object;
+ } else {
+ return nullptr;
+ }
+}
+
+char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ InlinedVector<ChannelNode*, 10> top_level_channels;
+ // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
+ // reserved). However, we want to support requests coming in with
+ // start_channel_id=0, which signifies "give me everything." Hence this
+ // funky looking line below.
+ size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1;
+ for (size_t i = start_idx; i < entities_.size(); ++i) {
+ if (entities_[i].type == EntityType::kChannelNode) {
+ ChannelNode* channel_node =
+ static_cast<ChannelNode*>(entities_[i].object);
+ if (channel_node->is_top_level_channel()) {
+ top_level_channels.push_back(channel_node);
+ }
+ }
+ }
+ if (top_level_channels.size() > 0) {
+ // create list of channels
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false);
+ for (size_t i = 0; i < top_level_channels.size(); ++i) {
+ grpc_json* channel_json = top_level_channels[i]->RenderJson();
+ json_iterator =
+ grpc_json_link_child(array_parent, channel_json, json_iterator);
+ }
+ }
+ // For now we do not have any pagination rules. In the future we could
+ // pick a constant for max_channels_sent for a GetTopChannels request.
+ // Tracking: https://github.com/grpc/grpc/issues/16019.
+ json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
+ GRPC_JSON_TRUE, false);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
}
+} // namespace channelz
} // namespace grpc_core
+
+char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
+ return grpc_core::channelz::ChannelzRegistry::GetTopChannels(
+ start_channel_id);
+}
+
+char* grpc_channelz_get_channel(intptr_t channel_id) {
+ grpc_core::channelz::ChannelNode* channel_node =
+ grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id);
+ if (channel_node == nullptr) {
+ return nullptr;
+ }
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* channel_json = channel_node->RenderJson();
+ channel_json->key = "channel";
+ grpc_json_link_child(json, channel_json, nullptr);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h
index a5a187a054..5d7c936726 100644
--- a/src/core/lib/channel/channelz_registry.h
+++ b/src/core/lib/channel/channelz_registry.h
@@ -22,11 +22,13 @@
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/lib/channel/channel_trace.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include <stdint.h>
namespace grpc_core {
+namespace channelz {
// singleton registry object to track all objects that are needed to support
// channelz bookkeeping. All objects share globally distributed uuids.
@@ -35,26 +37,56 @@ class ChannelzRegistry {
// To be called in grpc_init()
static void Init();
- // To be callen in grpc_shutdown();
+ // To be called in grpc_shutdown();
static void Shutdown();
- // globally registers a channelz Object. Returns its unique uuid
- template <typename Object>
- static intptr_t Register(Object* object) {
- return Default()->InternalRegister(object);
+ // Register/Unregister/Get for ChannelNode
+ static intptr_t RegisterChannelNode(ChannelNode* channel_node) {
+ RegistryEntry entry(channel_node, EntityType::kChannelNode);
+ return Default()->InternalRegisterEntry(entry);
+ }
+ static void UnregisterChannelNode(intptr_t uuid) {
+ Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode);
+ }
+ static ChannelNode* GetChannelNode(intptr_t uuid) {
+ void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode);
+ return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten);
}
- // globally unregisters the object that is associated to uuid.
- static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); }
+ // Register/Unregister/Get for SubchannelNode
+ static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) {
+ RegistryEntry entry(channel_node, EntityType::kSubchannelNode);
+ return Default()->InternalRegisterEntry(entry);
+ }
+ static void UnregisterSubchannelNode(intptr_t uuid) {
+ Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode);
+ }
+ static SubchannelNode* GetSubchannelNode(intptr_t uuid) {
+ void* gotten =
+ Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode);
+ return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten);
+ }
- // if object with uuid has previously been registered, returns the
- // Object associated with that uuid. Else returns nullptr.
- template <typename Object>
- static Object* Get(intptr_t uuid) {
- return Default()->InternalGet<Object>(uuid);
+ // Returns the allocated JSON string that represents the proto
+ // GetTopChannelsResponse as per channelz.proto.
+ static char* GetTopChannels(intptr_t start_channel_id) {
+ return Default()->InternalGetTopChannels(start_channel_id);
}
private:
+ enum class EntityType {
+ kChannelNode,
+ kSubchannelNode,
+ kUnset,
+ };
+
+ struct RegistryEntry {
+ RegistryEntry(void* object_in, EntityType type_in)
+ : object(object_in), type(type_in) {}
+ void* object;
+ EntityType type;
+ };
+
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
@@ -64,40 +96,25 @@ class ChannelzRegistry {
// Returned the singleton instance of ChannelzRegistry;
static ChannelzRegistry* Default();
- // globally registers a channelz Object. Returns its unique uuid
- template <typename Object>
- intptr_t InternalRegister(Object* object) {
- gpr_mu_lock(&mu_);
- entities_.push_back(static_cast<void*>(object));
- intptr_t uuid = entities_.size();
- gpr_mu_unlock(&mu_);
- return uuid;
- }
+ // globally registers an Entry. Returns its unique uuid
+ intptr_t InternalRegisterEntry(const RegistryEntry& entry);
- // globally unregisters the object that is associated to uuid.
- void InternalUnregister(intptr_t uuid);
-
- // if object with uuid has previously been registered, returns the
- // Object associated with that uuid. Else returns nullptr.
- template <typename Object>
- Object* InternalGet(intptr_t uuid) {
- gpr_mu_lock(&mu_);
- if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
- gpr_mu_unlock(&mu_);
- return nullptr;
- }
- Object* ret = static_cast<Object*>(entities_[uuid - 1]);
- gpr_mu_unlock(&mu_);
- return ret;
- }
+ // globally unregisters the object that is associated to uuid. Also does
+ // sanity check that an object doesn't try to unregister the wrong type.
+ void InternalUnregisterEntry(intptr_t uuid, EntityType type);
+
+ // if object with uuid has previously been registered as the correct type,
+ // returns the void* associated with that uuid. Else returns nullptr.
+ void* InternalGetEntry(intptr_t uuid, EntityType type);
- // private members
+ char* InternalGetTopChannels(intptr_t start_channel_id);
// protects entities_ and uuid_
gpr_mu mu_;
- InlinedVector<void*, 20> entities_;
+ InlinedVector<RegistryEntry, 20> entities_;
};
+} // namespace channelz
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNELZ_REGISTRY_H */
diff --git a/src/core/lib/gpr/arena.cc b/src/core/lib/gpr/arena.cc
index 141de69426..e30b297aea 100644
--- a/src/core/lib/gpr/arena.cc
+++ b/src/core/lib/gpr/arena.cc
@@ -25,6 +25,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
#include "src/core/lib/gpr/alloc.h"
@@ -36,8 +37,6 @@
#ifdef SIMPLE_ARENA_FOR_DEBUGGING
-#include <grpc/support/sync.h>
-
struct gpr_arena {
gpr_mu mu;
void** ptrs;
@@ -78,14 +77,17 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) {
// would allow us to use the alignment actually needed by the caller.
typedef struct zone {
- size_t size_begin;
- size_t size_end;
- gpr_atm next_atm;
+ size_t size_begin; // All the space we have set aside for allocations up
+ // until this zone.
+ size_t size_end; // size_end = size_begin plus all the space we set aside for
+ // allocations in zone z itself.
+ zone* next;
} zone;
struct gpr_arena {
gpr_atm size_so_far;
zone initial_zone;
+ gpr_mu arena_growth_mutex;
};
static void* zalloc_aligned(size_t size) {
@@ -99,15 +101,17 @@ gpr_arena* gpr_arena_create(size_t initial_size) {
gpr_arena* a = static_cast<gpr_arena*>(zalloc_aligned(
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + initial_size));
a->initial_zone.size_end = initial_size;
+ gpr_mu_init(&a->arena_growth_mutex);
return a;
}
size_t gpr_arena_destroy(gpr_arena* arena) {
+ gpr_mu_destroy(&arena->arena_growth_mutex);
gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far);
- zone* z = (zone*)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm);
+ zone* z = arena->initial_zone.next;
gpr_free_aligned(arena);
while (z) {
- zone* next_z = (zone*)gpr_atm_no_barrier_load(&z->next_atm);
+ zone* next_z = z->next;
gpr_free_aligned(z);
z = next_z;
}
@@ -116,37 +120,55 @@ size_t gpr_arena_destroy(gpr_arena* arena) {
void* gpr_arena_alloc(gpr_arena* arena, size_t size) {
size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(size);
- size_t start = static_cast<size_t>(
+ size_t previous_size_of_arena_allocations = static_cast<size_t>(
gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size));
+ size_t updated_size_of_arena_allocations =
+ previous_size_of_arena_allocations + size;
zone* z = &arena->initial_zone;
- while (start > z->size_end) {
- zone* next_z = (zone*)gpr_atm_acq_load(&z->next_atm);
- if (next_z == nullptr) {
- size_t next_z_size =
- static_cast<size_t>(gpr_atm_no_barrier_load(&arena->size_so_far));
- next_z = static_cast<zone*>(zalloc_aligned(
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + next_z_size));
- next_z->size_begin = z->size_end;
- next_z->size_end = z->size_end + next_z_size;
- if (!gpr_atm_rel_cas(&z->next_atm, static_cast<gpr_atm>(NULL),
- (gpr_atm)next_z)) {
- gpr_free_aligned(next_z);
- next_z = (zone*)gpr_atm_acq_load(&z->next_atm);
+ // Check to see if the allocation isn't able to end in the initial zone.
+ // This statement is true only in the uncommon case because of our arena
+ // sizing historesis (that is, most calls should have a large enough initial
+ // zone and will not need to grow the arena).
+ if (updated_size_of_arena_allocations > z->size_end) {
+ // Find a zone to fit this allocation
+ gpr_mu_lock(&arena->arena_growth_mutex);
+ while (updated_size_of_arena_allocations > z->size_end) {
+ if (z->next == nullptr) {
+ // Note that we do an extra increment of size_so_far to prevent multiple
+ // simultaneous callers from stepping on each other. However, this extra
+ // increment means some space in the arena is wasted.
+ // So whenever we need to allocate x bytes and there are x - n (where
+ // n > 0) remaining in the current zone, we will waste x bytes (x - n
+ // in the current zone and n in the new zone).
+ previous_size_of_arena_allocations = static_cast<size_t>(
+ gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size));
+ updated_size_of_arena_allocations =
+ previous_size_of_arena_allocations + size;
+ size_t next_z_size = updated_size_of_arena_allocations;
+ z->next = static_cast<zone*>(zalloc_aligned(
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + next_z_size));
+ z->next->size_begin = z->size_end;
+ z->next->size_end = z->size_end + next_z_size;
}
+ z = z->next;
}
- z = next_z;
- }
- if (start + size > z->size_end) {
- return gpr_arena_alloc(arena, size);
+ gpr_mu_unlock(&arena->arena_growth_mutex);
}
- GPR_ASSERT(start >= z->size_begin);
- GPR_ASSERT(start + size <= z->size_end);
- char* ptr = (z == &arena->initial_zone)
- ? reinterpret_cast<char*>(arena) +
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena))
- : reinterpret_cast<char*>(z) +
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone));
- return ptr + start - z->size_begin;
+ GPR_ASSERT(previous_size_of_arena_allocations >= z->size_begin);
+ GPR_ASSERT(updated_size_of_arena_allocations <= z->size_end);
+ // Skip the first part of the zone, which just contains tracking information.
+ // For the initial zone, this is the gpr_arena struct and for any other zone,
+ // it's the zone struct.
+ char* start_of_allocation_space =
+ (z == &arena->initial_zone)
+ ? reinterpret_cast<char*>(arena) +
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena))
+ : reinterpret_cast<char*>(z) +
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone));
+ // previous_size_of_arena_allocations - size_begin is how many bytes have been
+ // allocated into the current zone
+ return start_of_allocation_space + previous_size_of_arena_allocations -
+ z->size_begin;
}
#endif // SIMPLE_ARENA_FOR_DEBUGGING
diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h
index d0ec9b6461..3123e3f5a3 100644
--- a/src/core/lib/gprpp/orphanable.h
+++ b/src/core/lib/gprpp/orphanable.h
@@ -86,7 +86,8 @@ class InternallyRefCounted : public Orphanable {
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
// Allow RefCountedPtr<> to access Unref() and IncrementRefCount().
- friend class RefCountedPtr<Child>;
+ template <typename T>
+ friend class RefCountedPtr;
InternallyRefCounted() { gpr_ref_init(&refs_, 1); }
virtual ~InternallyRefCounted() {}
@@ -129,7 +130,8 @@ class InternallyRefCountedWithTracing : public Orphanable {
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
// Allow RefCountedPtr<> to access Unref() and IncrementRefCount().
- friend class RefCountedPtr<Child>;
+ template <typename T>
+ friend class RefCountedPtr;
InternallyRefCountedWithTracing()
: InternallyRefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {}
diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h
index ddac5bd475..03c293f6ed 100644
--- a/src/core/lib/gprpp/ref_counted.h
+++ b/src/core/lib/gprpp/ref_counted.h
@@ -73,7 +73,8 @@ class RefCounted {
private:
// Allow RefCountedPtr<> to access IncrementRefCount().
- friend class RefCountedPtr<Child>;
+ template <typename T>
+ friend class RefCountedPtr;
void IncrementRefCount() { gpr_ref(&refs_); }
@@ -152,7 +153,8 @@ class RefCountedWithTracing {
private:
// Allow RefCountedPtr<> to access IncrementRefCount().
- friend class RefCountedPtr<Child>;
+ template <typename T>
+ friend class RefCountedPtr;
void IncrementRefCount() { gpr_ref(&refs_); }
diff --git a/src/core/lib/gprpp/ref_counted_ptr.h b/src/core/lib/gprpp/ref_counted_ptr.h
index 534d3d03cb..c2dfbdd90f 100644
--- a/src/core/lib/gprpp/ref_counted_ptr.h
+++ b/src/core/lib/gprpp/ref_counted_ptr.h
@@ -36,25 +36,49 @@ class RefCountedPtr {
RefCountedPtr(std::nullptr_t) {}
// If value is non-null, we take ownership of a ref to it.
- explicit RefCountedPtr(T* value) { value_ = value; }
+ template <typename Y>
+ explicit RefCountedPtr(Y* value) {
+ value_ = value;
+ }
- // Move support.
+ // Move ctors.
RefCountedPtr(RefCountedPtr&& other) {
value_ = other.value_;
other.value_ = nullptr;
}
+ template <typename Y>
+ RefCountedPtr(RefCountedPtr<Y>&& other) {
+ value_ = other.value_;
+ other.value_ = nullptr;
+ }
+
+ // Move assignment.
RefCountedPtr& operator=(RefCountedPtr&& other) {
if (value_ != nullptr) value_->Unref();
value_ = other.value_;
other.value_ = nullptr;
return *this;
}
+ template <typename Y>
+ RefCountedPtr& operator=(RefCountedPtr<Y>&& other) {
+ if (value_ != nullptr) value_->Unref();
+ value_ = other.value_;
+ other.value_ = nullptr;
+ return *this;
+ }
- // Copy support.
+ // Copy ctors.
RefCountedPtr(const RefCountedPtr& other) {
if (other.value_ != nullptr) other.value_->IncrementRefCount();
value_ = other.value_;
}
+ template <typename Y>
+ RefCountedPtr(const RefCountedPtr<Y>& other) {
+ if (other.value_ != nullptr) other.value_->IncrementRefCount();
+ value_ = other.value_;
+ }
+
+ // Copy assignment.
RefCountedPtr& operator=(const RefCountedPtr& other) {
// Note: Order of reffing and unreffing is important here in case value_
// and other.value_ are the same object.
@@ -63,17 +87,32 @@ class RefCountedPtr {
value_ = other.value_;
return *this;
}
+ template <typename Y>
+ RefCountedPtr& operator=(const RefCountedPtr<Y>& other) {
+ // Note: Order of reffing and unreffing is important here in case value_
+ // and other.value_ are the same object.
+ if (other.value_ != nullptr) other.value_->IncrementRefCount();
+ if (value_ != nullptr) value_->Unref();
+ value_ = other.value_;
+ return *this;
+ }
~RefCountedPtr() {
if (value_ != nullptr) value_->Unref();
}
// If value is non-null, we take ownership of a ref to it.
- void reset(T* value = nullptr) {
+ template <typename Y>
+ void reset(Y* value) {
if (value_ != nullptr) value_->Unref();
value_ = value;
}
+ void reset() {
+ if (value_ != nullptr) value_->Unref();
+ value_ = nullptr;
+ }
+
// TODO(roth): This method exists solely as a transition mechanism to allow
// us to pass a ref to idiomatic C code that does not use RefCountedPtr<>.
// Once all of our code has been converted to idiomatic C++, this
@@ -89,16 +128,34 @@ class RefCountedPtr {
T& operator*() const { return *value_; }
T* operator->() const { return value_; }
- bool operator==(const RefCountedPtr& other) const {
+ template <typename Y>
+ bool operator==(const RefCountedPtr<Y>& other) const {
return value_ == other.value_;
}
- bool operator==(const T* other) const { return value_ == other; }
- bool operator!=(const RefCountedPtr& other) const {
+
+ template <typename Y>
+ bool operator==(const Y* other) const {
+ return value_ == other;
+ }
+
+ bool operator==(std::nullptr_t) const { return value_ == nullptr; }
+
+ template <typename Y>
+ bool operator!=(const RefCountedPtr<Y>& other) const {
return value_ != other.value_;
}
- bool operator!=(const T* other) const { return value_ != other; }
+
+ template <typename Y>
+ bool operator!=(const Y* other) const {
+ return value_ != other;
+ }
+
+ bool operator!=(std::nullptr_t) const { return value_ != nullptr; }
private:
+ template <typename Y>
+ friend class RefCountedPtr;
+
T* value_ = nullptr;
};
@@ -107,11 +164,6 @@ inline RefCountedPtr<T> MakeRefCounted(Args&&... args) {
return RefCountedPtr<T>(New<T>(std::forward<Args>(args)...));
}
-template <typename Parent, typename Child, typename... Args>
-inline RefCountedPtr<Parent> MakePolymorphicRefCounted(Args&&... args) {
- return RefCountedPtr<Parent>(New<Child>(std::forward<Args>(args)...));
-}
-
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 86a0243d2e..66e0f1fd6d 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -140,10 +140,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
};
@@ -293,7 +289,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -376,11 +371,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
gpr_mu_unlock(&fd_freelist_mu);
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -397,11 +387,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
@@ -642,7 +628,7 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) {
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
@@ -1217,8 +1203,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7b368410cf..96eae30345 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -135,7 +135,7 @@ struct pollable {
// underlying epoll set (i.e whenever fd_orphan() is called).
//
// Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
- // lot of complexity since an fd can be present in multiple pollalbles. So our
+ // lot of complexity since an fd can be present in multiple pollables. So our
// implementation ONLY DOES (1) and NOT (2).
//
// The cache_fd.salt variable helps here to maintain correctness (it serves as
@@ -220,10 +220,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- // The pollset that last noticed that the fd is readable. The actual type
- // stored in this is (grpc_pollset *)
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
// Do we need to track EPOLLERR events separately?
@@ -353,7 +349,6 @@ static void invalidate_fd(grpc_fd* fd) {
memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu));
fd->pollable_obj = nullptr;
fd->on_done_closure = nullptr;
- gpr_atm_no_barrier_store(&fd->read_notifier_pollset, 0);
memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object));
fd->track_err = false;
}
@@ -445,7 +440,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->error_closure->InitEvent();
new_fd->freelist_next = nullptr;
new_fd->on_done_closure = nullptr;
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
char* fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@@ -514,11 +508,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
UNREF_BY(fd, 2, reason); /* Drop the reference */
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -875,17 +864,7 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
@@ -983,7 +962,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
@@ -1636,8 +1615,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 2189801c18..5695ac795d 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -137,10 +137,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
/* Do we need to track EPOLLERR events separately? */
@@ -845,7 +841,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
new_fd->track_err = track_err;
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
new_fd->on_done_closure = nullptr;
@@ -927,11 +922,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
GRPC_ERROR_UNREF(error);
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -958,6 +948,12 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
+
+static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1115,22 +1111,6 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
-
-static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
-
-static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
-
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1283,7 +1263,7 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
@@ -1667,8 +1647,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index c9c09881a2..fb4c71ef71 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -108,9 +108,6 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
-
- /* The pollset that last noticed and notified that the fd is readable */
- grpc_pollset* read_notifier_pollset;
};
/* Begin polling on an fd.
@@ -131,8 +128,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
MUST NOT be called with a pollset lock taken
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
-static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write,
- grpc_pollset* read_notifier_pollset);
+static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd* fd);
@@ -346,7 +342,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
r->closed = 0;
r->released = 0;
gpr_atm_no_barrier_store(&r->pollhup, 0);
- r->read_notifier_pollset = nullptr;
char* name2;
gpr_asprintf(&name2, "%s fd=%d", name, fd);
@@ -359,17 +354,6 @@ static bool fd_is_orphaned(grpc_fd* fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-/* Return the read-notifier pollset */
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- grpc_pollset* notifier = nullptr;
-
- gpr_mu_lock(&fd->mu);
- notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->mu);
-
- return notifier;
-}
-
static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
@@ -512,11 +496,6 @@ static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
}
}
-static void set_read_notifier_pollset_locked(
- grpc_fd* fd, grpc_pollset* read_notifier_pollset) {
- fd->read_notifier_pollset = read_notifier_pollset;
-}
-
static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
gpr_mu_lock(&fd->mu);
/* only shutdown once */
@@ -553,8 +532,28 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
- gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
- abort();
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ }
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
+}
+
+static void fd_set_readable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->read_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_writable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_error(grpc_fd* fd) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ }
}
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
@@ -608,8 +607,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
return mask;
}
-static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write,
- grpc_pollset* read_notifier_pollset) {
+static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
int was_polling = 0;
int kick = 0;
grpc_fd* fd = watcher->fd;
@@ -645,9 +643,6 @@ static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write,
if (set_ready_locked(fd, &fd->read_closure)) {
kick = 1;
}
- if (read_notifier_pollset != nullptr) {
- set_read_notifier_pollset_locked(fd, read_notifier_pollset);
- }
}
if (got_write) {
if (set_ready_locked(fd, &fd->write_closure)) {
@@ -997,16 +992,16 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
for (i = 1; i < pfd_count; i++) {
if (watchers[i].fd == nullptr) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
} else {
// Wake up all the file descriptors, if we have an invalid one
// we can identify it on the next pollset_work()
- fd_end_poll(&watchers[i], 1, 1, pollset);
+ fd_end_poll(&watchers[i], 1, 1);
}
}
} else if (r == 0) {
for (i = 1; i < pfd_count; i++) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
@@ -1018,7 +1013,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
}
for (i = 1; i < pfd_count; i++) {
if (watchers[i].fd == nullptr) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
} else {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
@@ -1032,7 +1027,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
}
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK, pollset);
+ pfds[i].revents & POLLOUT_CHECK);
}
}
}
@@ -1723,8 +1718,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_set_readable,
+ fd_set_writable,
+ fd_set_error,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 1139b3273a..0e45fc42ca 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -239,6 +239,12 @@ void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_error(fd, closure);
}
+void grpc_fd_set_readable(grpc_fd* fd) { g_event_engine->fd_set_readable(fd); }
+
+void grpc_fd_set_writable(grpc_fd* fd) { g_event_engine->fd_set_writable(fd); }
+
+void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); }
+
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index b4c17fc80d..8d0bcc0710 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -51,8 +51,10 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
+ void (*fd_set_readable)(grpc_fd* fd);
+ void (*fd_set_writable)(grpc_fd* fd);
+ void (*fd_set_error)(grpc_fd* fd);
bool (*fd_is_shutdown)(grpc_fd* fd);
- grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd);
void (*pollset_init)(grpc_pollset* pollset, gpr_mu** mu);
void (*pollset_shutdown)(grpc_pollset* pollset, grpc_closure* closure);
@@ -142,8 +144,20 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
* needs to have been set on grpc_fd_create */
void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
-/* Return the read notifier pollset from the fd */
-grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd);
+/* Forcibly set the fd to be readable, resulting in the closure registered with
+ * grpc_fd_notify_on_read being invoked.
+ */
+void grpc_fd_set_readable(grpc_fd* fd);
+
+/* Forcibly set the fd to be writable, resulting in the closure registered with
+ * grpc_fd_notify_on_write being invoked.
+ */
+void grpc_fd_set_writable(grpc_fd* fd);
+
+/* Forcibly set the fd to have errored, resulting in the closure registered with
+ * grpc_fd_notify_on_error being invoked.
+ */
+void grpc_fd_set_error(grpc_fd* fd);
/* pollset_posix functions */
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index 1ad13b831d..45d96b80eb 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -40,19 +40,25 @@
gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
}
+#define EXECUTOR_TRACE0(str) \
+ if (executor_trace.enabled()) { \
+ gpr_log(GPR_INFO, "EXECUTOR " str); \
+ }
+
grpc_core::TraceFlag executor_trace(false, "executor");
GPR_TLS_DECL(g_this_thread_state);
-GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) {
+GrpcExecutor::GrpcExecutor(const char* name) : name_(name) {
adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
- gpr_atm_no_barrier_store(&num_threads_, 0);
+ gpr_atm_rel_store(&num_threads_, 0);
max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
}
void GrpcExecutor::Init() { SetThreading(true); }
-size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
+size_t GrpcExecutor::RunClosures(const char* executor_name,
+ grpc_closure_list list) {
size_t n = 0;
grpc_closure* c = list.head;
@@ -60,11 +66,11 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
grpc_closure* next = c->next_data.next;
grpc_error* error = c->error_data.error;
#ifndef NDEBUG
- EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created,
- c->line_created);
+ EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
+ c->file_created, c->line_created);
c->scheduled = false;
#else
- EXECUTOR_TRACE("run %p", c);
+ EXECUTOR_TRACE("(%s) run %p", executor_name, c);
#endif
c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error);
@@ -77,17 +83,21 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
}
bool GrpcExecutor::IsThreaded() const {
- return gpr_atm_no_barrier_load(&num_threads_) > 0;
+ return gpr_atm_acq_load(&num_threads_) > 0;
}
void GrpcExecutor::SetThreading(bool threading) {
- gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
+ gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
+ EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
if (threading) {
- if (curr_num_threads > 0) return;
+ if (curr_num_threads > 0) {
+ EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_);
+ return;
+ }
GPR_ASSERT(num_threads_ == 0);
- gpr_atm_no_barrier_store(&num_threads_, 1);
+ gpr_atm_rel_store(&num_threads_, 1);
gpr_tls_init(&g_this_thread_state);
thd_state_ = static_cast<ThreadState*>(
gpr_zalloc(sizeof(ThreadState) * max_threads_));
@@ -96,6 +106,7 @@ void GrpcExecutor::SetThreading(bool threading) {
gpr_mu_init(&thd_state_[i].mu);
gpr_cv_init(&thd_state_[i].cv);
thd_state_[i].id = i;
+ thd_state_[i].name = name_;
thd_state_[i].thd = grpc_core::Thread();
thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
}
@@ -104,7 +115,10 @@ void GrpcExecutor::SetThreading(bool threading) {
grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]);
thd_state_[0].thd.Start();
} else { // !threading
- if (curr_num_threads == 0) return;
+ if (curr_num_threads == 0) {
+ EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_);
+ return;
+ }
for (size_t i = 0; i < max_threads_; i++) {
gpr_mu_lock(&thd_state_[i].mu);
@@ -121,20 +135,22 @@ void GrpcExecutor::SetThreading(bool threading) {
curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
for (gpr_atm i = 0; i < curr_num_threads; i++) {
thd_state_[i].thd.Join();
- EXECUTOR_TRACE(" Thread %" PRIdPTR " of %" PRIdPTR " joined", i,
- curr_num_threads);
+ EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_,
+ i + 1, curr_num_threads);
}
- gpr_atm_no_barrier_store(&num_threads_, 0);
+ gpr_atm_rel_store(&num_threads_, 0);
for (size_t i = 0; i < max_threads_; i++) {
gpr_mu_destroy(&thd_state_[i].mu);
gpr_cv_destroy(&thd_state_[i].cv);
- RunClosures(thd_state_[i].elems);
+ RunClosures(thd_state_[i].name, thd_state_[i].elems);
}
gpr_free(thd_state_);
gpr_tls_destroy(&g_this_thread_state);
}
+
+ EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
}
void GrpcExecutor::Shutdown() { SetThreading(false); }
@@ -147,8 +163,8 @@ void GrpcExecutor::ThreadMain(void* arg) {
size_t subtract_depth = 0;
for (;;) {
- EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id,
- subtract_depth);
+ EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
+ ts->name, ts->id, subtract_depth);
gpr_mu_lock(&ts->mu);
ts->depth -= subtract_depth;
@@ -159,7 +175,7 @@ void GrpcExecutor::ThreadMain(void* arg) {
}
if (ts->shutdown) {
- EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id);
+ EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id);
gpr_mu_unlock(&ts->mu);
break;
}
@@ -169,10 +185,10 @@ void GrpcExecutor::ThreadMain(void* arg) {
ts->elems = GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
- EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id);
+ EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id);
grpc_core::ExecCtx::Get()->InvalidateNow();
- subtract_depth = RunClosures(closures);
+ subtract_depth = RunClosures(ts->name, closures);
}
}
@@ -188,16 +204,16 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
do {
retry_push = false;
size_t cur_thread_count =
- static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
+ static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
// If the number of threads is zero(i.e either the executor is not threaded
// or already shutdown), then queue the closure on the exec context itself
if (cur_thread_count == 0) {
#ifndef NDEBUG
- EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure,
+ EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure,
closure->file_created, closure->line_created);
#else
- EXECUTOR_TRACE("schedule %p inline", closure);
+ EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure);
#endif
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
closure, error);
@@ -213,18 +229,18 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
}
ThreadState* orig_ts = ts;
-
bool try_new_thread = false;
+
for (;;) {
#ifndef NDEBUG
EXECUTOR_TRACE(
- "try to schedule %p (%s) (created %s:%d) to thread "
+ "(%s) try to schedule %p (%s) (created %s:%d) to thread "
"%" PRIdPTR,
- closure, is_short ? "short" : "long", closure->file_created,
+ name_, closure, is_short ? "short" : "long", closure->file_created,
closure->line_created, ts->id);
#else
- EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure,
- is_short ? "short" : "long", ts->id);
+ EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_,
+ closure, is_short ? "short" : "long", ts->id);
#endif
gpr_mu_lock(&ts->mu);
@@ -236,18 +252,22 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
size_t idx = ts->id;
ts = &thd_state_[(idx + 1) % cur_thread_count];
if (ts == orig_ts) {
- // We cycled through all the threads. Retry enqueue again (by creating
- // a new thread)
+ // We cycled through all the threads. Retry enqueue again by creating
+ // a new thread
+ //
+ // TODO (sreek): There is a potential issue here. We are
+ // unconditionally setting try_new_thread to true here. What if the
+ // executor is shutdown OR if cur_thread_count is already equal to
+ // max_threads ?
+ // (Fortunately, this is not an issue yet (as of july 2018) because
+ // there is only one instance of long job in gRPC and hence we will
+ // not hit this code path)
retry_push = true;
- // TODO (sreek): What if the executor is shutdown OR if
- // cur_thread_count is already equal to max_threads ? (currently - as
- // of July 2018, we do not run in to this issue because there is only
- // one instance of long job in gRPC. This has to be fixed soon)
try_new_thread = true;
break;
}
- continue;
+ continue; // Try the next thread-state
}
// == Found the thread state (i.e thread) to enqueue this closure! ==
@@ -277,13 +297,11 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
}
if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
- cur_thread_count =
- static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
+ cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
if (cur_thread_count < max_threads_) {
- // Increment num_threads (Safe to do a no_barrier_store instead of a
- // cas because we always increment num_threads under the
- // 'adding_thread_lock')
- gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1);
+ // Increment num_threads (safe to do a store instead of a cas because we
+ // always increment num_threads under the 'adding_thread_lock')
+ gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
thd_state_[cur_thread_count].thd = grpc_core::Thread(
name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]);
@@ -298,60 +316,118 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
} while (retry_push);
}
-static GrpcExecutor* global_executor;
+static GrpcExecutor* executors[GRPC_NUM_EXECUTORS];
-void enqueue_long(grpc_closure* closure, grpc_error* error) {
- global_executor->Enqueue(closure, error, false /* is_short */);
+void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
+ true /* is_short */);
}
-void enqueue_short(grpc_closure* closure, grpc_error* error) {
- global_executor->Enqueue(closure, error, true /* is_short */);
+void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
+ false /* is_short */);
}
-// Short-Job executor scheduler
-static const grpc_closure_scheduler_vtable global_executor_vtable_short = {
- enqueue_short, enqueue_short, "executor-short"};
-static grpc_closure_scheduler global_scheduler_short = {
- &global_executor_vtable_short};
+void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
+ true /* is_short */);
+}
-// Long-job executor scheduler
-static const grpc_closure_scheduler_vtable global_executor_vtable_long = {
- enqueue_long, enqueue_long, "executor-long"};
-static grpc_closure_scheduler global_scheduler_long = {
- &global_executor_vtable_long};
+void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
+ false /* is_short */);
+}
+
+static const grpc_closure_scheduler_vtable
+ vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
+ {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"},
+ {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
+ {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"},
+ {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}};
+
+static grpc_closure_scheduler
+ schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
+ {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]},
+ {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}},
+ {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]},
+ {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}};
// grpc_executor_init() and grpc_executor_shutdown() functions are called in the
// the grpc_init() and grpc_shutdown() code paths which are protected by a
// global mutex. So it is okay to assume that these functions are thread-safe
void grpc_executor_init() {
- if (global_executor != nullptr) {
- // grpc_executor_init() already called once (and grpc_executor_shutdown()
- // wasn't called)
+ EXECUTOR_TRACE0("grpc_executor_init() enter");
+
+ // Return if grpc_executor_init() is already called earlier
+ if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) {
+ GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr);
return;
}
- global_executor = grpc_core::New<GrpcExecutor>("global-executor");
- global_executor->Init();
+ executors[GRPC_DEFAULT_EXECUTOR] =
+ grpc_core::New<GrpcExecutor>("default-executor");
+ executors[GRPC_RESOLVER_EXECUTOR] =
+ grpc_core::New<GrpcExecutor>("resolver-executor");
+
+ executors[GRPC_DEFAULT_EXECUTOR]->Init();
+ executors[GRPC_RESOLVER_EXECUTOR]->Init();
+
+ EXECUTOR_TRACE0("grpc_executor_init() done");
+}
+
+grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
+ GrpcExecutorJobType job_type) {
+ return &schedulers_[executor_type][job_type];
+}
+
+grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
+ return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type);
}
void grpc_executor_shutdown() {
- // Shutdown already called
- if (global_executor == nullptr) {
+ EXECUTOR_TRACE0("grpc_executor_shutdown() enter");
+
+ // Return if grpc_executor_shutdown() is already called earlier
+ if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) {
+ GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr);
return;
}
- global_executor->Shutdown();
- grpc_core::Delete<GrpcExecutor>(global_executor);
- global_executor = nullptr;
+ executors[GRPC_DEFAULT_EXECUTOR]->Shutdown();
+ executors[GRPC_RESOLVER_EXECUTOR]->Shutdown();
+
+ // Delete the executor objects.
+ //
+ // NOTE: It is important to call Shutdown() on all executors first before
+ // calling Delete() because it is possible for one executor (that is not
+ // shutdown yet) to call Enqueue() on a different executor which is already
+ // shutdown. This is legal and in such cases, the Enqueue() operation
+ // effectively "fails" and enqueues that closure on the calling thread's
+ // exec_ctx.
+ //
+ // By ensuring that all executors are shutdown first, we are also ensuring
+ // that no thread is active across all executors.
+
+ grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]);
+ grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]);
+ executors[GRPC_DEFAULT_EXECUTOR] = nullptr;
+ executors[GRPC_RESOLVER_EXECUTOR] = nullptr;
+
+ EXECUTOR_TRACE0("grpc_executor_shutdown() done");
}
-bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); }
+bool grpc_executor_is_threaded(GrpcExecutorType executor_type) {
+ GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS);
+ return executors[executor_type]->IsThreaded();
+}
-void grpc_executor_set_threading(bool enable) {
- global_executor->SetThreading(enable);
+bool grpc_executor_is_threaded() {
+ return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR);
}
-grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
- return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short
- : &global_scheduler_long;
+void grpc_executor_set_threading(bool enable) {
+ EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable);
+ for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
+ executors[i]->SetThreading(enable);
+ }
}
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index 395fc52863..8829138c5f 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -27,7 +27,8 @@
typedef struct {
gpr_mu mu;
- size_t id; // For debugging purposes
+ size_t id; // For debugging purposes
+ const char* name; // Thread state name
gpr_cv cv;
grpc_closure_list elems;
size_t depth; // Number of closures in the closure list
@@ -36,7 +37,11 @@ typedef struct {
grpc_core::Thread thd;
} ThreadState;
-typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType;
+typedef enum {
+ GRPC_EXECUTOR_SHORT = 0,
+ GRPC_EXECUTOR_LONG,
+ GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this
+} GrpcExecutorJobType;
class GrpcExecutor {
public:
@@ -58,7 +63,7 @@ class GrpcExecutor {
void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short);
private:
- static size_t RunClosures(grpc_closure_list list);
+ static size_t RunClosures(const char* executor_name, grpc_closure_list list);
static void ThreadMain(void* arg);
const char* name_;
@@ -70,14 +75,42 @@ class GrpcExecutor {
// == Global executor functions ==
+typedef enum {
+ GRPC_DEFAULT_EXECUTOR = 0,
+ GRPC_RESOLVER_EXECUTOR,
+
+ GRPC_NUM_EXECUTORS // Add new values above this
+} GrpcExecutorType;
+
+// TODO(sreek): Currently we have two executors (available globally): The
+// default executor and the resolver executor.
+//
+// Some of the functions below operate on the DEFAULT executor only while some
+// operate of ALL the executors. This is a bit confusing and should be cleaned
+// up in future (where we make all the following functions take executor_type
+// and/or job_type)
+
+// Initialize ALL the executors
void grpc_executor_init();
+// Shutdown ALL the executors
+void grpc_executor_shutdown();
+
+// Set the threading mode for ALL the executors
+void grpc_executor_set_threading(bool enable);
+
+// Get the DEFAULT executor scheduler for the given job_type
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type);
-void grpc_executor_shutdown();
+// Get the executor scheduler for a given executor_type and a job_type
+grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
+ GrpcExecutorJobType job_type);
-bool grpc_executor_is_threaded();
+// Return if a given executor is running in threaded mode (i.e if
+// grpc_executor_set_threading(true) was called previously on that executor)
+bool grpc_executor_is_threaded(GrpcExecutorType executor_type);
-void grpc_executor_set_threading(bool enable);
+// Return if the DEFAULT executor is threaded
+bool grpc_executor_is_threaded();
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc
index 5b6b79fa91..085fea40a4 100644
--- a/src/core/lib/iomgr/lockfree_event.cc
+++ b/src/core/lib/iomgr/lockfree_event.cc
@@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() {
void LockfreeEvent::NotifyOn(grpc_closure* closure) {
while (true) {
- gpr_atm curr = gpr_atm_no_barrier_load(&state_);
+ /* This load needs to be an acquire load because this can be a shutdown
+ * error that we might need to reference. Adding acquire semantics makes
+ * sure that the shutdown error has been initialized properly before us
+ * referencing it. */
+ gpr_atm curr = gpr_atm_acq_load(&state_);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
(void*)curr, closure);
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index 7a825643e1..c285d7eca6 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -166,8 +166,9 @@ static void posix_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addrs) {
request* r = static_cast<request*>(gpr_malloc(sizeof(request)));
- GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
+ GRPC_CLOSURE_INIT(
+ &r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc
index 71c92615ad..3e977dca2d 100644
--- a/src/core/lib/iomgr/resolve_address_windows.cc
+++ b/src/core/lib/iomgr/resolve_address_windows.cc
@@ -151,8 +151,9 @@ static void windows_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addresses) {
request* r = (request*)gpr_malloc(sizeof(request));
- GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
+ GRPC_CLOSURE_INIT(
+ &r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc
index 2e23409582..4ad31cb35d 100644
--- a/src/core/lib/iomgr/socket_windows.cc
+++ b/src/core/lib/iomgr/socket_windows.cc
@@ -36,6 +36,7 @@
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_windows.h"
+#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h"
grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name) {
@@ -148,4 +149,32 @@ void grpc_socket_become_ready(grpc_winsocket* socket,
if (should_destroy) destroy(socket);
}
+static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
+static bool g_ipv6_loopback_available = false;
+
+static void probe_ipv6_once(void) {
+ SOCKET s = socket(AF_INET6, SOCK_STREAM, 0);
+ g_ipv6_loopback_available = 0;
+ if (s == INVALID_SOCKET) {
+ gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed.");
+ } else {
+ grpc_sockaddr_in6 addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin6_family = AF_INET6;
+ addr.sin6_addr.s6_addr[15] = 1; /* [::1]:0 */
+ if (bind(s, reinterpret_cast<grpc_sockaddr*>(&addr), sizeof(addr)) == 0) {
+ g_ipv6_loopback_available = 1;
+ } else {
+ gpr_log(GPR_INFO,
+ "Disabling AF_INET6 sockets because ::1 is not available.");
+ }
+ closesocket(s);
+ }
+}
+
+int grpc_ipv6_loopback_available(void) {
+ gpr_once_init(&g_probe_ipv6_once, probe_ipv6_once);
+ return g_ipv6_loopback_available;
+}
+
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index 7bd01eded5..b09b9da562 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -108,6 +108,10 @@ void grpc_socket_notify_on_read(grpc_winsocket* winsocket,
void grpc_socket_become_ready(grpc_winsocket* winsocket,
grpc_winsocket_callback_info* ci);
+/* Returns true if this system can create AF_INET6 sockets bound to ::1.
+ The value is probed once, and cached for the life of the process. */
+int grpc_ipv6_loopback_available(void);
+
#endif
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/lib/json/json.cc b/src/core/lib/json/json.cc
index 6d359573f8..e78b73cefd 100644
--- a/src/core/lib/json/json.cc
+++ b/src/core/lib/json/json.cc
@@ -58,6 +58,8 @@ void grpc_json_destroy(grpc_json* json) {
grpc_json* grpc_json_link_child(grpc_json* parent, grpc_json* child,
grpc_json* sibling) {
+ // link child up to parent
+ child->parent = parent;
// first child case.
if (parent->child == nullptr) {
GPR_ASSERT(sibling == nullptr);
@@ -81,7 +83,6 @@ grpc_json* grpc_json_create_child(grpc_json* sibling, grpc_json* parent,
grpc_json* child = grpc_json_create(type);
grpc_json_link_child(parent, child, sibling);
child->owns_value = owns_value;
- child->parent = parent;
child->value = value;
child->key = key;
return child;
diff --git a/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc b/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc
index 7c4d7a71cd..8454fd7558 100644
--- a/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc
+++ b/src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc
@@ -41,8 +41,9 @@ namespace internal {
bool check_bios_data(const char* bios_data_file) {
char* bios_data = read_bios_file(bios_data_file);
- bool result = (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) ||
- (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE));
+ bool result =
+ bios_data && ((!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) ||
+ (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE)));
gpr_free(bios_data);
return result;
}
diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc
index cc72bb6164..59cf3a0af1 100644
--- a/src/core/lib/security/security_connector/security_connector.cc
+++ b/src/core/lib/security/security_connector/security_connector.cc
@@ -57,6 +57,10 @@ static const char* installed_roots_path =
INSTALL_PREFIX "/share/grpc/roots.pem";
#endif
+#ifndef TSI_OPENSSL_ALPN_SUPPORT
+#define TSI_OPENSSL_ALPN_SUPPORT 1
+#endif
+
/* -- Overridden default roots. -- */
static grpc_ssl_roots_override_callback ssl_roots_override_cb = nullptr;
@@ -850,7 +854,8 @@ grpc_auth_context* grpc_ssl_peer_to_auth_context(const tsi_peer* peer) {
static grpc_error* ssl_check_peer(grpc_security_connector* sc,
const char* peer_name, const tsi_peer* peer,
grpc_auth_context** auth_context) {
- /* Check the ALPN. */
+#if TSI_OPENSSL_ALPN_SUPPORT
+ /* Check the ALPN if ALPN is supported. */
const tsi_peer_property* p =
tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL);
if (p == nullptr) {
@@ -861,7 +866,7 @@ static grpc_error* ssl_check_peer(grpc_security_connector* sc,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Cannot check peer: invalid ALPN value.");
}
-
+#endif /* TSI_OPENSSL_ALPN_SUPPORT */
/* Check the peer name if specified. */
if (peer_name != nullptr && !grpc_ssl_host_matches_name(peer, peer_name)) {
char* msg;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 88e015ce22..dbad5ded4d 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -529,6 +529,7 @@ void grpc_call_internal_unref(grpc_call* c REF_ARG) {
static void release_call(void* call, grpc_error* error) {
grpc_call* c = static_cast<grpc_call*>(call);
grpc_channel* channel = c->channel;
+ gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string)));
grpc_call_combiner_destroy(&c->call_combiner);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
@@ -561,7 +562,7 @@ static void destroy_call(void* call, grpc_error* error) {
}
get_final_status(c, set_status_value_directly, &c->final_info.final_status,
- nullptr, c->final_info.error_string);
+ nullptr, &(c->final_info.error_string));
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 8f3ad6c191..7cbd61adef 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -105,6 +105,7 @@ grpc_channel* grpc_channel_create_with_builder(
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
size_t channel_tracer_max_nodes = 0; // default to off
bool channelz_enabled = false;
+ bool internal_channel = false;
// this creates the default ChannelNode. Different types of channels may
// override this to ensure a correct ChannelNode is created.
grpc_core::channelz::ChannelNodeCreationFunc channel_node_create_func =
@@ -158,13 +159,17 @@ grpc_channel* grpc_channel_create_with_builder(
channel_node_create_func =
reinterpret_cast<grpc_core::channelz::ChannelNodeCreationFunc>(
args->args[i].value.pointer.p);
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL)) {
+ internal_channel = grpc_channel_arg_get_bool(&args->args[i], false);
}
}
grpc_channel_args_destroy(args);
if (channelz_enabled) {
- channel->channelz_channel =
- channel_node_create_func(channel, channel_tracer_max_nodes);
+ bool is_top_level_channel = channel->is_client && !internal_channel;
+ channel->channelz_channel = channel_node_create_func(
+ channel, channel_tracer_max_nodes, is_top_level_channel);
channel->channelz_channel->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 16be81e9c2..0ad82fed99 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -127,7 +127,7 @@ void grpc_init(void) {
grpc_slice_intern_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
- grpc_core::ChannelzRegistry::Init();
+ grpc_core::channelz::ChannelzRegistry::Init();
grpc_security_pre_init();
grpc_core::ExecCtx::GlobalInit();
grpc_iomgr_init();
@@ -176,7 +176,7 @@ void grpc_shutdown(void) {
grpc_mdctx_global_shutdown();
grpc_handshaker_factory_registry_shutdown();
grpc_slice_intern_shutdown();
- grpc_core::ChannelzRegistry::Shutdown();
+ grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
grpc_core::Fork::GlobalShutdown();
}
diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc
index f4b1bb0cd8..e92fe2c5a1 100644
--- a/src/core/lib/surface/version.cc
+++ b/src/core/lib/surface/version.cc
@@ -23,6 +23,6 @@
#include <grpc/grpc.h>
-const char* grpc_version_string(void) { return "6.0.0"; }
+const char* grpc_version_string(void) { return "6.0.0-dev"; }
-const char* grpc_g_stands_for(void) { return "gladiolus"; }
+const char* grpc_g_stands_for(void) { return "glider"; }
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
index 06b999899a..1df1021bb1 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
@@ -462,6 +462,14 @@ void alts_tsi_handshaker_handle_response(alts_tsi_handshaker* handshaker,
set_unused_bytes(result, &handshaker->recv_bytes, resp->bytes_consumed);
}
grpc_status_code code = static_cast<grpc_status_code>(resp->status.code);
+ if (code != GRPC_STATUS_OK) {
+ grpc_slice* details = static_cast<grpc_slice*>(resp->status.details.arg);
+ if (details != nullptr) {
+ char* error_details = grpc_slice_to_c_string(*details);
+ gpr_log(GPR_ERROR, "Error from handshaker service:%s", error_details);
+ gpr_free(error_details);
+ }
+ }
grpc_gcp_handshaker_resp_destroy(resp);
cb(alts_tsi_utils_convert_to_tsi_result(code), user_data, bytes_to_send,
bytes_to_send_size, result);