aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters')
-rw-r--r--src/core/ext/filters/census/grpc_context.cc38
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc49
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h25
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc137
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc155
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc44
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h27
-rw-r--r--src/core/ext/filters/client_channel/resolver.h8
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc9
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc17
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h22
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc18
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc508
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h7
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc9
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc27
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h7
19 files changed, 878 insertions, 235 deletions
diff --git a/src/core/ext/filters/census/grpc_context.cc b/src/core/ext/filters/census/grpc_context.cc
new file mode 100644
index 0000000000..599a798dda
--- /dev/null
+++ b/src/core/ext/filters/census/grpc_context.cc
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/census.h>
+#include <grpc/grpc.h>
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/call.h"
+
+void grpc_census_call_set_context(grpc_call* call, census_context* context) {
+ GRPC_API_TRACE("grpc_census_call_set_context(call=%p, census_context=%p)", 2,
+ (call, context));
+ if (context != nullptr) {
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, nullptr);
+ }
+}
+
+census_context* grpc_census_call_get_context(grpc_call* call) {
+ GRPC_API_TRACE("grpc_census_call_get_context(call=%p)", 1, (call));
+ return static_cast<census_context*>(
+ grpc_call_context_get(call, GRPC_CONTEXT_TRACING));
+}
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 024c9d737e..b06f09d8c7 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -571,15 +571,32 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
if (chand->lb_policy == nullptr) {
- GRPC_CLOSURE_SCHED(
- op->send_ping.on_initiate,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
- GRPC_CLOSURE_SCHED(
- op->send_ping.on_ack,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
+ GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
} else {
- chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
- op->send_ping.on_ack);
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_core::LoadBalancingPolicy::PickState pick_state;
+ pick_state.initial_metadata = nullptr;
+ pick_state.initial_metadata_flags = 0;
+ pick_state.on_complete = nullptr;
+ memset(&pick_state.subchannel_call_context, 0,
+ sizeof(pick_state.subchannel_call_context));
+ pick_state.user_data = nullptr;
+ // Pick must return synchronously, because pick_state.on_complete is null.
+ GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error));
+ if (pick_state.connected_subchannel != nullptr) {
+ pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
+ op->send_ping.on_ack);
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "LB policy dropped call on ping");
+ }
+ GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
+ }
op->bind_pollset = nullptr;
}
op->send_ping.on_initiate = nullptr;
@@ -605,6 +622,17 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
+
+ if (op->reset_connect_backoff) {
+ if (chand->resolver != nullptr) {
+ chand->resolver->ResetBackoffLocked();
+ chand->resolver->RequestReresolutionLocked();
+ }
+ if (chand->lb_policy != nullptr) {
+ chand->lb_policy->ResetBackoffLocked();
+ }
+ }
+
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
@@ -2684,14 +2712,15 @@ class LbPicker {
grpc_combiner_scheduler(chand->combiner));
calld->pick.on_complete = &calld->pick_closure;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
- const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
+ grpc_error* error = GRPC_ERROR_NONE;
+ const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error);
if (GPR_LIKELY(pick_done)) {
// Pick completed synchronously.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
}
- pick_done_locked(elem, GRPC_ERROR_NONE);
+ pick_done_locked(elem, error);
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
} else {
// Pick will be returned asynchronously.
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index 4c9c9a6bd6..86c765df52 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -105,8 +105,8 @@ grpc_arg ClientChannelNode::CreateChannelArg() {
RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel) {
- return MakePolymorphicRefCounted<ChannelNode, ClientChannelNode>(
- channel, channel_tracer_max_nodes, is_top_level_channel);
+ return MakeRefCounted<ClientChannelNode>(channel, channel_tracer_max_nodes,
+ is_top_level_channel);
}
} // namespace channelz
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 3150df8847..3c0a9c1118 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -71,6 +71,7 @@ class LoadBalancingPolicy
/// Storage for LB token in \a initial_metadata, or nullptr if not used.
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
+ /// If null, pick will fail if a result is not available synchronously.
grpc_closure* on_complete;
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
@@ -99,10 +100,15 @@ class LoadBalancingPolicy
/// Finds an appropriate subchannel for a call, based on data in \a pick.
/// \a pick must remain alive until the pick is complete.
///
- /// If the pick succeeds and a result is known immediately, returns true.
- /// Otherwise, \a pick->on_complete will be invoked once the pick is
- /// complete with its error argument set to indicate success or failure.
- virtual bool PickLocked(PickState* pick) GRPC_ABSTRACT;
+ /// If a result is known immediately, returns true, setting \a *error
+ /// upon failure. Otherwise, \a pick->on_complete will be invoked once
+ /// the pick is complete with its error argument set to indicate success
+ /// or failure.
+ ///
+ /// If \a pick->on_complete is null and no result is known immediately,
+ /// a synchronous failure will be returned (i.e., \a *error will be
+ /// set and true will be returned).
+ virtual bool PickLocked(PickState* pick, grpc_error** error) GRPC_ABSTRACT;
/// Cancels \a pick.
/// The \a on_complete callback of the pending pick will be invoked with
@@ -133,18 +139,15 @@ class LoadBalancingPolicy
virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy)
GRPC_ABSTRACT;
- /// Performs a connected subchannel ping via \a ConnectedSubchannel::Ping()
- /// against one of the connected subchannels managed by the policy.
- /// Note: This is intended only for use in tests.
- virtual void PingOneLocked(grpc_closure* on_initiate,
- grpc_closure* on_ack) GRPC_ABSTRACT;
-
/// Tries to enter a READY connectivity state.
/// TODO(roth): As part of restructuring how we handle IDLE state,
/// consider whether this method is still needed.
virtual void ExitIdleLocked() GRPC_ABSTRACT;
- /// populates child_subchannels and child_channels with the uuids of this
+ /// Resets connection backoff.
+ virtual void ResetBackoffLocked() GRPC_ABSTRACT;
+
+ /// Populates child_subchannels and child_channels with the uuids of this
/// LB policy's referenced children. This is not invoked from the
/// client_channel's combiner. The implementation is responsible for
/// providing its own synchronization.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 959c7441a3..25b0149393 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -92,6 +92,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -123,7 +124,7 @@ class GrpcLb : public LoadBalancingPolicy {
GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -133,8 +134,8 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) override;
@@ -167,13 +168,6 @@ class GrpcLb : public LoadBalancingPolicy {
PendingPick* next = nullptr;
};
- /// A linked list of pending pings waiting for the RR policy to be created.
- struct PendingPing {
- grpc_closure* on_initiate;
- grpc_closure* on_ack;
- PendingPing* next = nullptr;
- };
-
/// Contains a call to the LB server and all the data related to the call.
class BalancerCallState
: public InternallyRefCountedWithTracing<BalancerCallState> {
@@ -272,14 +266,12 @@ class GrpcLb : public LoadBalancingPolicy {
void AddPendingPick(PendingPick* pp);
static void OnPendingPickComplete(void* arg, grpc_error* error);
- // Pending ping methods.
- void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack);
-
// Methods for dealing with the RR policy.
void CreateOrUpdateRoundRobinPolicyLocked();
grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
void CreateRoundRobinPolicyLocked(const Args& args);
- bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp);
+ bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error);
void UpdateConnectivityStateFromRoundRobinPolicyLocked(
grpc_error* rr_state_error);
static void OnRoundRobinConnectivityChangedLocked(void* arg,
@@ -342,9 +334,8 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
- // Pending picks and pings that are waiting on the RR policy's connectivity.
+ // Pending picks that are waiting on the RR policy's connectivity.
PendingPick* pending_picks_ = nullptr;
- PendingPing* pending_pings_ = nullptr;
// The RR policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> rr_policy_;
@@ -1080,7 +1071,6 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
GrpcLb::~GrpcLb() {
GPR_ASSERT(pending_picks_ == nullptr);
- GPR_ASSERT(pending_pings_ == nullptr);
gpr_mu_destroy(&lb_channel_mu_);
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
@@ -1126,14 +1116,6 @@ void GrpcLb::ShutdownLocked() {
// Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
}
- // Clear pending pings.
- PendingPing* pping;
- while ((pping = pending_pings_) != nullptr) {
- pending_pings_ = pping->next;
- GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
- GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
- Delete(pping);
- }
GRPC_ERROR_UNREF(error);
}
@@ -1147,9 +1129,10 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete;
pp->pick->user_data = nullptr;
- if (new_policy->PickLocked(pp->pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure.
- GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
}
Delete(pp);
}
@@ -1233,63 +1216,51 @@ void GrpcLb::ExitIdleLocked() {
}
}
-bool GrpcLb::PickLocked(PickState* pick) {
+void GrpcLb::ResetBackoffLocked() {
+ if (lb_channel_ != nullptr) {
+ grpc_channel_reset_connect_backoff(lb_channel_);
+ }
+ if (rr_policy_ != nullptr) {
+ rr_policy_->ResetBackoffLocked();
+ }
+}
+
+bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
PendingPick* pp = PendingPickCreate(pick);
bool pick_done = false;
if (rr_policy_ != nullptr) {
- const grpc_connectivity_state rr_connectivity_state =
- rr_policy_->CheckConnectivityLocked(nullptr);
- // The RR policy may have transitioned to SHUTDOWN but the callback
- // registered to capture this event (on_rr_connectivity_changed_) may not
- // have been invoked yet. We need to make sure we aren't trying to pick
- // from an RR policy instance that's in shutdown.
- if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
+ rr_policy_.get());
+ }
+ pick_done =
+ PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
+ } else { // rr_policy_ == NULL
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ pick_done = true;
+ } else {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
- this, rr_policy_.get(),
- grpc_connectivity_state_name(rr_connectivity_state));
+ "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
+ this);
}
AddPendingPick(pp);
- pick_done = false;
- } else { // RR not in shutdown
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
- rr_policy_.get());
+ if (!started_picking_) {
+ StartPickingLocked();
}
- pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp);
- }
- } else { // rr_policy_ == NULL
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
- this);
- }
- AddPendingPick(pp);
- if (!started_picking_) {
- StartPickingLocked();
+ pick_done = false;
}
- pick_done = false;
}
return pick_done;
}
-void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
- if (rr_policy_ != nullptr) {
- rr_policy_->PingOneLocked(on_initiate, on_ack);
- } else {
- AddPendingPing(on_initiate, on_ack);
- if (!started_picking_) {
- StartPickingLocked();
- }
- }
-}
-
void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) {
// delegate to the RoundRobin to fill the children subchannels.
rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
- mu_guard guard(&lb_channel_mu_);
+ MutexLock lock(&lb_channel_mu_);
if (lb_channel_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
@@ -1599,18 +1570,6 @@ void GrpcLb::AddPendingPick(PendingPick* pp) {
}
//
-// PendingPing
-//
-
-void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
- PendingPing* pping = New<PendingPing>();
- pping->on_initiate = on_initiate;
- pping->on_ack = on_ack;
- pping->next = pending_pings_;
- pending_pings_ = pping;
-}
-
-//
// code for interacting with the RR policy
//
@@ -1619,7 +1578,8 @@ void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
// cleanups this callback would otherwise be responsible for.
// If \a force_async is true, then we will manually schedule the
// completion callback even if the pick is available immediately.
-bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
+bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error) {
// Check for drops if we are not using fallback backend addresses.
if (serverlist_ != nullptr) {
// Look at the index into the serverlist to see if we should drop this call.
@@ -1653,11 +1613,12 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
- bool pick_done = rr_policy_->PickLocked(pp->pick);
+ bool pick_done = rr_policy_->PickLocked(pp->pick, error);
if (pick_done) {
PendingPickSetMetadataAndContext(pp);
if (force_async) {
- GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
+ *error = GRPC_ERROR_NONE;
pick_done = false;
}
Delete(pp);
@@ -1709,18 +1670,8 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
"[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
rr_policy_.get());
}
- PickFromRoundRobinPolicyLocked(true /* force_async */, pp);
- }
- // Send pending pings to RR policy.
- PendingPing* pping;
- while ((pping = pending_pings_)) {
- pending_pings_ = pping->next;
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
- this, rr_policy_.get());
- }
- rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack);
- Delete(pping);
+ grpc_error* error = GRPC_ERROR_NONE;
+ PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index d217dc0e63..9120abfa3c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -27,6 +27,7 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -46,7 +47,7 @@ class PickFirst : public LoadBalancingPolicy {
explicit PickFirst(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -56,8 +57,8 @@ class PickFirst : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* ignored) override;
@@ -80,6 +81,11 @@ class PickFirst : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
+
+ // Processes the connectivity change to READY for an unselected subchannel.
+ void ProcessUnselectedReadyLocked();
+
+ void CheckConnectivityStateAndStartWatchingLocked();
};
class PickFirstSubchannelList
@@ -173,9 +179,10 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
- if (new_policy->PickLocked(pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
@@ -246,7 +253,8 @@ void PickFirst::StartPickingLocked() {
if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
- subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
+ subchannel_list_->subchannel(i)
+ ->CheckConnectivityStateAndStartWatchingLocked();
break;
}
}
@@ -259,18 +267,30 @@ void PickFirst::ExitIdleLocked() {
}
}
-bool PickFirst::PickLocked(PickState* pick) {
+void PickFirst::ResetBackoffLocked() {
+ subchannel_list_->ResetBackoffLocked();
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->ResetBackoffLocked();
+ }
+}
+
+bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
- if (!started_picking_) {
- StartPickingLocked();
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ return true;
}
pick->next = pending_picks_;
pending_picks_ = pick;
+ if (!started_picking_) {
+ StartPickingLocked();
+ }
return false;
}
@@ -293,20 +313,9 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
- if (selected_ != nullptr) {
- selected_->connected_subchannel()->Ping(on_initiate, on_ack);
- } else {
- GRPC_CLOSURE_SCHED(on_initiate,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- GRPC_CLOSURE_SCHED(on_ack,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- }
-}
-
void PickFirst::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
@@ -333,7 +342,7 @@ void PickFirst::UpdateChildRefsLocked() {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
@@ -384,7 +393,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
- subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
+ subchannel_list_->subchannel(0)
+ ->CheckConnectivityStateAndStartWatchingLocked();
}
} else {
// We do have a selected subchannel.
@@ -438,7 +448,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// subchannel in the new list.
if (started_picking_) {
latest_pending_subchannel_list_->subchannel(0)
- ->StartConnectivityWatchLocked();
+ ->CheckConnectivityStateAndStartWatchingLocked();
}
}
}
@@ -517,41 +527,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// select in place of the current one.
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
- // Case 2. Promote p->latest_pending_subchannel_list_ to
- // p->subchannel_list_.
- if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Pick First %p promoting pending subchannel list %p to "
- "replace %p",
- p, p->latest_pending_subchannel_list_.get(),
- p->subchannel_list_.get());
- }
- p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
- }
- // Cases 1 and 2.
- grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
- GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = this;
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- subchannel());
- }
- // Drop all other subchannels, since we are now connected.
- p->DestroyUnselectedSubchannelsLocked();
- // Update any calls that were waiting for a pick.
- PickState* pick;
- while ((pick = p->pending_picks_)) {
- p->pending_picks_ = pick->next;
- pick->connected_subchannel =
- p->selected_->connected_subchannel()->Ref();
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Servicing pending pick with selected subchannel %p",
- p->selected_->subchannel());
- }
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
- }
+ ProcessUnselectedReadyLocked();
// Renew notification.
RenewConnectivityWatchLocked();
break;
@@ -572,7 +548,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "exhausted_subchannels");
}
- sd->StartConnectivityWatchLocked();
+ sd->CheckConnectivityStateAndStartWatchingLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
@@ -593,6 +569,67 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_ERROR_UNREF(error);
}
+void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ // If we get here, there are two possible cases:
+ // 1. We do not currently have a selected subchannel, and the update is
+ // for a subchannel in p->subchannel_list_ that we're trying to
+ // connect to. The goal here is to find a subchannel that we can
+ // select.
+ // 2. We do currently have a selected subchannel, and the update is
+ // for a subchannel in p->latest_pending_subchannel_list_. The
+ // goal here is to find a subchannel from the update that we can
+ // select in place of the current one.
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
+ // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
+ }
+ // Cases 1 and 2.
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "subchannel_ready");
+ p->selected_ = this;
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
+ }
+ // Drop all other subchannels, since we are now connected.
+ p->DestroyUnselectedSubchannelsLocked();
+ // Update any calls that were waiting for a pick.
+ PickState* pick;
+ while ((pick = p->pending_picks_)) {
+ p->pending_picks_ = pick->next;
+ pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
+ p->selected_->subchannel());
+ }
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+}
+
+void PickFirst::PickFirstSubchannelData::
+ CheckConnectivityStateAndStartWatchingLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (p->selected_ != this &&
+ CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
+ // We must process the READY subchannel before we start watching it.
+ // Otherwise, we won't know it's READY because we will be waiting for its
+ // connectivity state to change from READY.
+ ProcessUnselectedReadyLocked();
+ }
+ GRPC_ERROR_UNREF(error);
+ StartConnectivityWatchLocked();
+}
+
//
// factory
//
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index fc56a4961f..c730b3bd2b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -36,6 +36,7 @@
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -57,7 +58,7 @@ class RoundRobin : public LoadBalancingPolicy {
explicit RoundRobin(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -67,8 +68,8 @@ class RoundRobin : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* ignored) override;
@@ -253,9 +254,10 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
- if (new_policy->PickLocked(pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
@@ -333,6 +335,13 @@ void RoundRobin::ExitIdleLocked() {
}
}
+void RoundRobin::ResetBackoffLocked() {
+ subchannel_list_->ResetBackoffLocked();
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->ResetBackoffLocked();
+ }
+}
+
bool RoundRobin::DoPickLocked(PickState* pick) {
const size_t next_ready_index =
subchannel_list_->GetNextReadySubchannelIndexLocked();
@@ -368,7 +377,7 @@ void RoundRobin::DrainPendingPicksLocked() {
}
}
-bool RoundRobin::PickLocked(PickState* pick) {
+bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
}
@@ -376,6 +385,11 @@ bool RoundRobin::PickLocked(PickState* pick) {
if (subchannel_list_ != nullptr) {
if (DoPickLocked(pick)) return true;
}
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ return true;
+ }
/* no pick currently available. Save for later in list of pending picks */
pick->next = pending_picks_;
pending_picks_ = pick;
@@ -387,7 +401,7 @@ bool RoundRobin::PickLocked(PickState* pick) {
void RoundRobin::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
@@ -414,7 +428,7 @@ void RoundRobin::UpdateChildRefsLocked() {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
@@ -647,22 +661,6 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- const size_t next_ready_index =
- subchannel_list_->GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels()) {
- RoundRobinSubchannelData* selected =
- subchannel_list_->subchannel(next_ready_index);
- selected->connected_subchannel()->Ping(on_initiate, on_ack);
- } else {
- GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Round Robin not connected"));
- GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Round Robin not connected"));
- }
-}
-
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this);
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 018ac3bb86..0fa2f04e73 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -107,6 +107,11 @@ class SubchannelData {
// being unreffed.
virtual void UnrefSubchannelLocked(const char* reason);
+ // Resets the connection backoff.
+ // TODO(roth): This method should go away when we move the backoff
+ // code out of the subchannel and into the LB policies.
+ void ResetBackoffLocked();
+
// Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called when the
// connectivity state changes.
@@ -206,6 +211,11 @@ class SubchannelList
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
+ // Resets connection backoff of all subchannels.
+ // TODO(roth): We will probably need to rethink this as part of moving
+ // the backoff code out of subchannels and into LB policies.
+ void ResetBackoffLocked();
+
// Note: Caller must ensure that this is invoked inside of the combiner.
void Orphan() override {
ShutdownLocked();
@@ -300,6 +310,14 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
+ SubchannelDataType>::ResetBackoffLocked() {
+ if (subchannel_ != nullptr) {
+ grpc_subchannel_reset_backoff(subchannel_);
+ }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
SubchannelDataType>::StartConnectivityWatchLocked() {
if (subchannel_list_->tracer()->enabled()) {
gpr_log(GPR_INFO,
@@ -544,6 +562,15 @@ void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
}
}
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelList<SubchannelListType,
+ SubchannelDataType>::ResetBackoffLocked() {
+ for (size_t i = 0; i < subchannels_.size(); i++) {
+ SubchannelDataType* sd = &subchannels_[i];
+ sd->ResetBackoffLocked();
+ }
+}
+
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h
index c7e37e4468..48f2e89095 100644
--- a/src/core/ext/filters/client_channel/resolver.h
+++ b/src/core/ext/filters/client_channel/resolver.h
@@ -94,6 +94,14 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
/// throw away unselected subchannels.
virtual void RequestReresolutionLocked() GRPC_ABSTRACT;
+ /// Resets the re-resolution backoff, if any.
+ /// This needs to be implemented only by pull-based implementations;
+ /// for push-based implementations, it will be a no-op.
+ /// TODO(roth): Pull the backoff code out of resolver and into
+ /// client_channel, so that it can be shared across resolver
+ /// implementations. At that point, this method can go away.
+ virtual void ResetBackoffLocked() {}
+
void Orphan() override {
// Invoke ShutdownAndUnrefLocked() inside of the combiner.
GRPC_CLOSURE_SCHED(
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 7050e82121..f2bb5f3c71 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -66,6 +66,8 @@ class AresDnsResolver : public Resolver {
void RequestReresolutionLocked() override;
+ void ResetBackoffLocked() override;
+
void ShutdownLocked() override;
private:
@@ -187,6 +189,13 @@ void AresDnsResolver::RequestReresolutionLocked() {
}
}
+void AresDnsResolver::ResetBackoffLocked() {
+ if (have_next_resolution_timer_) {
+ grpc_timer_cancel(&next_resolution_timer_);
+ }
+ backoff_.Reset();
+}
+
void AresDnsResolver::ShutdownLocked() {
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
index 0068d0d5f4..fdbd07ebf5 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
@@ -74,6 +74,8 @@ struct grpc_ares_ev_driver {
bool shutting_down;
/** request object that's using this ev driver */
grpc_ares_request* request;
+ /** Owned by the ev_driver. Creates new GrpcPolledFd's */
+ grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory;
};
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
@@ -93,7 +95,7 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel);
grpc_ares_complete_request_locked(ev_driver->request);
- gpr_free(ev_driver);
+ grpc_core::Delete(ev_driver);
}
}
@@ -118,13 +120,11 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
grpc_combiner* combiner,
grpc_ares_request* request) {
- *ev_driver = static_cast<grpc_ares_ev_driver*>(
- gpr_malloc(sizeof(grpc_ares_ev_driver)));
+ *ev_driver = grpc_core::New<grpc_ares_ev_driver>();
ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
- grpc_core::ConfigureAresChannelLocked(&(*ev_driver)->channel);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
if (status != ARES_SUCCESS) {
char* err_msg;
@@ -142,6 +142,10 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
(*ev_driver)->working = false;
(*ev_driver)->shutting_down = false;
(*ev_driver)->request = request;
+ (*ev_driver)->polled_fd_factory =
+ grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner);
+ (*ev_driver)
+ ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
return GRPC_ERROR_NONE;
}
@@ -245,8 +249,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
// Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) {
fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
- fdn->grpc_polled_fd = grpc_core::NewGrpcPolledFdLocked(
- socks[i], ev_driver->pollset_set);
+ fdn->grpc_polled_fd =
+ ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
+ socks[i], ev_driver->pollset_set, ev_driver->combiner);
gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName());
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
index 2c9db71011..671c537fe7 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -81,10 +81,24 @@ class GrpcPolledFd {
GRPC_ABSTRACT_BASE_CLASS
};
-/* Creates a new wrapped fd for the current platform */
-GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
- grpc_pollset_set* driver_pollset_set);
-void ConfigureAresChannelLocked(ares_channel* channel);
+/* A GrpcPolledFdFactory is 1-to-1 with and owned by the
+ * ares event driver. It knows how to create GrpcPolledFd's
+ * for the current platform, and the ares driver uses it for all of
+ * its fd's. */
+class GrpcPolledFdFactory {
+ public:
+ virtual ~GrpcPolledFdFactory() {}
+ /* Creates a new wrapped fd for the current platform */
+ virtual GrpcPolledFd* NewGrpcPolledFdLocked(
+ ares_socket_t as, grpc_pollset_set* driver_pollset_set,
+ grpc_combiner* combiner) GRPC_ABSTRACT;
+ /* Optionally configures the ares channel after creation */
+ virtual void ConfigureAresChannelLocked(ares_channel channel) GRPC_ABSTRACT;
+
+ GRPC_ABSTRACT_BASE_CLASS
+};
+
+UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner);
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index fffe9eda8e..aa58e1aaf5 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -86,12 +86,20 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
grpc_pollset_set* driver_pollset_set_;
};
-GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
- grpc_pollset_set* driver_pollset_set) {
- return grpc_core::New<GrpcPolledFdPosix>(as, driver_pollset_set);
-}
+class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
+ public:
+ GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
+ grpc_pollset_set* driver_pollset_set,
+ grpc_combiner* combiner) override {
+ return New<GrpcPolledFdPosix>(as, driver_pollset_set);
+ }
-void ConfigureAresChannelLocked(ares_channel* channel) {}
+ void ConfigureAresChannelLocked(ares_channel channel) override {}
+};
+
+UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
+ return UniquePtr<GrpcPolledFdFactory>(New<GrpcPolledFdFactoryPosix>());
+}
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
index 5d65ae3ab3..02121aa0ab 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
@@ -21,38 +21,516 @@
#if GRPC_ARES == 1 && defined(GPR_WINDOWS)
#include <ares.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/log_windows.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
#include <string.h>
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/socket_windows.h"
+#include "src/core/lib/iomgr/tcp_windows.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+
+/* TODO(apolcyn): remove this hack after fixing upstream.
+ * Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
+ * which uses "struct iovec" type, which on Windows is defined inside of
+ * a c-ares header that is not public.
+ * See https://github.com/c-ares/c-ares/issues/206. */
+struct iovec {
+ void* iov_base;
+ size_t iov_len;
+};
namespace grpc_core {
-/* TODO: fill in the body of GrpcPolledFdWindows to enable c-ares on Windows.
- This dummy implementation only allows grpc to compile on windows with
- GRPC_ARES=1. */
+/* c-ares creates its own sockets and is meant to read them when readable and
+ * write them when writeable. To fit this socket usage model into the grpc
+ * windows poller (which gives notifications when attempted reads and writes are
+ * actually fulfilled rather than possible), this GrpcPolledFdWindows class
+ * takes advantage of the ares_set_socket_functions API and acts as a virtual
+ * socket. It holds its own read and write buffers which are written to and read
+ * from c-ares and are used with the grpc windows poller, and it, e.g.,
+ * manufactures virtual socket error codes when it e.g. needs to tell the c-ares
+ * library to wait for an async read. */
class GrpcPolledFdWindows : public GrpcPolledFd {
public:
- GrpcPolledFdWindows() { abort(); }
- ~GrpcPolledFdWindows() { abort(); }
+ enum WriteState {
+ WRITE_IDLE,
+ WRITE_REQUESTED,
+ WRITE_PENDING,
+ WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
+ };
+
+ GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner)
+ : read_buf_(grpc_empty_slice()),
+ write_buf_(grpc_empty_slice()),
+ write_state_(WRITE_IDLE),
+ gotten_into_driver_list_(false) {
+ gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
+ winsocket_ = grpc_winsocket_create(as, name_);
+ combiner_ = GRPC_COMBINER_REF(combiner, name_);
+ GRPC_CLOSURE_INIT(&outer_read_closure_,
+ &GrpcPolledFdWindows::OnIocpReadable, this,
+ grpc_combiner_scheduler(combiner_));
+ GRPC_CLOSURE_INIT(&outer_write_closure_,
+ &GrpcPolledFdWindows::OnIocpWriteable, this,
+ grpc_combiner_scheduler(combiner_));
+ }
+
+ ~GrpcPolledFdWindows() {
+ GRPC_COMBINER_UNREF(combiner_, name_);
+ grpc_slice_unref_internal(read_buf_);
+ grpc_slice_unref_internal(write_buf_);
+ GPR_ASSERT(read_closure_ == nullptr);
+ GPR_ASSERT(write_closure_ == nullptr);
+ grpc_winsocket_destroy(winsocket_);
+ gpr_free(name_);
+ }
+
+ void ScheduleAndNullReadClosure(grpc_error* error) {
+ GRPC_CLOSURE_SCHED(read_closure_, error);
+ read_closure_ = nullptr;
+ }
+
+ void ScheduleAndNullWriteClosure(grpc_error* error) {
+ GRPC_CLOSURE_SCHED(write_closure_, error);
+ write_closure_ = nullptr;
+ }
+
void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
- abort();
+ GPR_ASSERT(read_closure_ == nullptr);
+ read_closure_ = read_closure;
+ GPR_ASSERT(GRPC_SLICE_LENGTH(read_buf_) == 0);
+ grpc_slice_unref_internal(read_buf_);
+ read_buf_ = GRPC_SLICE_MALLOC(4192);
+ WSABUF buffer;
+ buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_);
+ buffer.len = GRPC_SLICE_LENGTH(read_buf_);
+ memset(&winsocket_->read_info.overlapped, 0, sizeof(OVERLAPPED));
+ recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
+ DWORD flags = 0;
+ if (WSARecvFrom(grpc_winsocket_wrapped_socket(winsocket_), &buffer, 1,
+ nullptr, &flags, (sockaddr*)recv_from_source_addr_,
+ &recv_from_source_addr_len_,
+ &winsocket_->read_info.overlapped, nullptr)) {
+ char* msg = gpr_format_message(WSAGetLastError());
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ GRPC_CARES_TRACE_LOG(
+ "RegisterForOnReadableLocked: WSARecvFrom error:|%s|. fd:|%s|", msg,
+ GetName());
+ gpr_free(msg);
+ if (WSAGetLastError() != WSA_IO_PENDING) {
+ ScheduleAndNullReadClosure(error);
+ return;
+ }
+ }
+ grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
}
+
void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
+ GRPC_CARES_TRACE_LOG(
+ "RegisterForOnWriteableLocked. fd:|%s|. Current write state: %d",
+ GetName(), write_state_);
+ GPR_ASSERT(write_closure_ == nullptr);
+ write_closure_ = write_closure;
+ switch (write_state_) {
+ case WRITE_IDLE:
+ ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
+ break;
+ case WRITE_REQUESTED:
+ write_state_ = WRITE_PENDING;
+ SendWriteBuf(nullptr, &winsocket_->write_info.overlapped);
+ grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
+ break;
+ case WRITE_PENDING:
+ case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
+ abort();
+ }
+ }
+
+ bool IsFdStillReadableLocked() override {
+ return GRPC_SLICE_LENGTH(read_buf_) > 0;
+ }
+
+ void ShutdownLocked(grpc_error* error) override {
+ grpc_winsocket_shutdown(winsocket_);
+ }
+
+ ares_socket_t GetWrappedAresSocketLocked() override {
+ return grpc_winsocket_wrapped_socket(winsocket_);
+ }
+
+ const char* GetName() override { return name_; }
+
+ ares_ssize_t RecvFrom(void* data, ares_socket_t data_len, int flags,
+ struct sockaddr* from, ares_socklen_t* from_len) {
+ GRPC_CARES_TRACE_LOG(
+ "RecvFrom called on fd:|%s|. Current read buf length:|%d|", GetName(),
+ GRPC_SLICE_LENGTH(read_buf_));
+ if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
+ WSASetLastError(WSAEWOULDBLOCK);
+ return -1;
+ }
+ ares_ssize_t bytes_read = 0;
+ for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
+ ((char*)data)[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
+ bytes_read++;
+ }
+ read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
+ GRPC_SLICE_LENGTH(read_buf_));
+ /* c-ares overloads this recv_from virtual socket function to receive
+ * data on both UDP and TCP sockets, and from is nullptr for TCP. */
+ if (from != nullptr) {
+ GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
+ memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
+ *from_len = recv_from_source_addr_len_;
+ }
+ return bytes_read;
+ }
+
+ grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
+ int total = 0;
+ for (int i = 0; i < iov_count; i++) {
+ total += iov[i].iov_len;
+ }
+ grpc_slice out = GRPC_SLICE_MALLOC(total);
+ size_t cur = 0;
+ for (int i = 0; i < iov_count; i++) {
+ for (int k = 0; k < iov[i].iov_len; k++) {
+ GRPC_SLICE_START_PTR(out)[cur++] = ((char*)iov[i].iov_base)[k];
+ }
+ }
+ return out;
+ }
+
+ int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped) {
+ WSABUF buf;
+ buf.len = GRPC_SLICE_LENGTH(write_buf_);
+ buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_);
+ DWORD flags = 0;
+ int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
+ bytes_sent_ptr, flags, overlapped, nullptr);
+ GRPC_CARES_TRACE_LOG(
+ "WSASend: name:%s. buf len:%d. bytes sent: %d. overlapped %p. return "
+ "val: %d",
+ GetName(), buf.len, *bytes_sent_ptr, overlapped, out);
+ return out;
+ }
+
+ ares_ssize_t TrySendWriteBufSyncNonBlocking() {
+ GPR_ASSERT(write_state_ == WRITE_IDLE);
+ ares_ssize_t total_sent;
+ DWORD bytes_sent = 0;
+ if (SendWriteBuf(&bytes_sent, nullptr) != 0) {
+ char* msg = gpr_format_message(WSAGetLastError());
+ GRPC_CARES_TRACE_LOG(
+ "TrySendWriteBufSyncNonBlocking: SendWriteBuf error:|%s|. fd:|%s|",
+ msg, GetName());
+ gpr_free(msg);
+ if (WSAGetLastError() == WSA_IO_PENDING) {
+ WSASetLastError(WSAEWOULDBLOCK);
+ write_state_ = WRITE_REQUESTED;
+ }
+ }
+ write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
+ GRPC_SLICE_LENGTH(write_buf_));
+ return bytes_sent;
+ }
+
+ ares_ssize_t SendV(const struct iovec* iov, int iov_count) {
+ GRPC_CARES_TRACE_LOG("SendV called on fd:|%s|. Current write state: %d",
+ GetName(), write_state_);
+ switch (write_state_) {
+ case WRITE_IDLE:
+ GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
+ grpc_slice_unref_internal(write_buf_);
+ write_buf_ = FlattenIovec(iov, iov_count);
+ return TrySendWriteBufSyncNonBlocking();
+ case WRITE_REQUESTED:
+ case WRITE_PENDING:
+ WSASetLastError(WSAEWOULDBLOCK);
+ return -1;
+ case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
+ grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
+ GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
+ GRPC_SLICE_LENGTH(write_buf_));
+ ares_ssize_t total_sent = 0;
+ for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
+ GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
+ GRPC_SLICE_START_PTR(write_buf_)[i]);
+ total_sent++;
+ }
+ grpc_slice_unref_internal(write_buf_);
+ write_buf_ =
+ grpc_slice_sub_no_ref(currently_attempted, total_sent,
+ GRPC_SLICE_LENGTH(currently_attempted));
+ write_state_ = WRITE_IDLE;
+ total_sent += TrySendWriteBufSyncNonBlocking();
+ return total_sent;
+ }
abort();
}
- bool IsFdStillReadableLocked() override { abort(); }
- void ShutdownLocked(grpc_error* error) override { abort(); }
- ares_socket_t GetWrappedAresSocketLocked() override { abort(); }
- const char* GetName() override { abort(); }
+
+ int Connect(const struct sockaddr* target, ares_socklen_t target_len) {
+ SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
+ GRPC_CARES_TRACE_LOG("Connect: fd:|%s|", GetName());
+ int out =
+ WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
+ if (out != 0) {
+ char* msg = gpr_format_message(WSAGetLastError());
+ GRPC_CARES_TRACE_LOG("Connect error code:|%d|, msg:|%s|. fd:|%s|",
+ WSAGetLastError(), msg, GetName());
+ gpr_free(msg);
+ // c-ares expects a posix-style connect API
+ out = -1;
+ }
+ return out;
+ }
+
+ static void OnIocpReadable(void* arg, grpc_error* error) {
+ GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
+ polled_fd->OnIocpReadableInner(error);
+ }
+
+ void OnIocpReadableInner(grpc_error* error) {
+ if (error == GRPC_ERROR_NONE) {
+ if (winsocket_->read_info.wsa_error != 0) {
+ /* WSAEMSGSIZE would be due to receiving more data
+ * than our read buffer's fixed capacity. Assume that
+ * the connection is TCP and read the leftovers
+ * in subsequent c-ares reads. */
+ if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
+ GRPC_ERROR_UNREF(error);
+ char* msg = gpr_format_message(winsocket_->read_info.wsa_error);
+ GRPC_CARES_TRACE_LOG(
+ "OnIocpReadableInner. winsocket error:|%s|. fd:|%s|", msg,
+ GetName());
+ error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ }
+ }
+ }
+ if (error == GRPC_ERROR_NONE) {
+ read_buf_ = grpc_slice_sub_no_ref(read_buf_, 0,
+ winsocket_->read_info.bytes_transfered);
+ } else {
+ grpc_slice_unref_internal(read_buf_);
+ read_buf_ = grpc_empty_slice();
+ }
+ GRPC_CARES_TRACE_LOG(
+ "OnIocpReadable finishing. read buf length now:|%d|. :fd:|%s|",
+ GRPC_SLICE_LENGTH(read_buf_), GetName());
+ ScheduleAndNullReadClosure(error);
+ }
+
+ static void OnIocpWriteable(void* arg, grpc_error* error) {
+ GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
+ polled_fd->OnIocpWriteableInner(error);
+ }
+
+ void OnIocpWriteableInner(grpc_error* error) {
+ GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
+ if (error == GRPC_ERROR_NONE) {
+ if (winsocket_->write_info.wsa_error != 0) {
+ char* msg = gpr_format_message(winsocket_->write_info.wsa_error);
+ GRPC_CARES_TRACE_LOG(
+ "OnIocpWriteableInner. winsocket error:|%s|. fd:|%s|", msg,
+ GetName());
+ GRPC_ERROR_UNREF(error);
+ error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ }
+ }
+ GPR_ASSERT(write_state_ == WRITE_PENDING);
+ if (error == GRPC_ERROR_NONE) {
+ write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
+ write_buf_ = grpc_slice_sub_no_ref(
+ write_buf_, 0, winsocket_->write_info.bytes_transfered);
+ } else {
+ grpc_slice_unref_internal(write_buf_);
+ write_buf_ = grpc_empty_slice();
+ }
+ ScheduleAndNullWriteClosure(error);
+ }
+
+ bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
+ void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
+
+ grpc_combiner* combiner_;
+ char recv_from_source_addr_[200];
+ ares_socklen_t recv_from_source_addr_len_;
+ grpc_slice read_buf_;
+ grpc_slice write_buf_;
+ grpc_closure* read_closure_ = nullptr;
+ grpc_closure* write_closure_ = nullptr;
+ grpc_closure outer_read_closure_;
+ grpc_closure outer_write_closure_;
+ grpc_winsocket* winsocket_;
+ WriteState write_state_;
+ char* name_ = nullptr;
+ bool gotten_into_driver_list_;
};
-GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
- grpc_pollset_set* driver_pollset_set) {
- return nullptr;
-}
+struct SockToPolledFdEntry {
+ SockToPolledFdEntry(SOCKET s, GrpcPolledFdWindows* fd)
+ : socket(s), polled_fd(fd) {}
+ SOCKET socket;
+ GrpcPolledFdWindows* polled_fd;
+ SockToPolledFdEntry* next = nullptr;
+};
+
+/* A SockToPolledFdMap can make ares_socket_t types (SOCKET's on windows)
+ * to GrpcPolledFdWindow's, and is used to find the appropriate
+ * GrpcPolledFdWindows to handle a virtual socket call when c-ares makes that
+ * socket call on the ares_socket_t type. Instances are owned by and one-to-one
+ * with a GrpcPolledFdWindows factory and event driver */
+class SockToPolledFdMap {
+ public:
+ SockToPolledFdMap(grpc_combiner* combiner) {
+ combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map");
+ }
+
+ ~SockToPolledFdMap() {
+ GPR_ASSERT(head_ == nullptr);
+ GRPC_COMBINER_UNREF(combiner_, "sock to polled fd map");
+ }
+
+ void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
+ SockToPolledFdEntry* new_node = New<SockToPolledFdEntry>(s, polled_fd);
+ new_node->next = head_;
+ head_ = new_node;
+ }
+
+ GrpcPolledFdWindows* LookupPolledFd(SOCKET s) {
+ for (SockToPolledFdEntry* node = head_; node != nullptr;
+ node = node->next) {
+ if (node->socket == s) {
+ GPR_ASSERT(node->polled_fd != nullptr);
+ return node->polled_fd;
+ }
+ }
+ abort();
+ }
+
+ void RemoveEntry(SOCKET s) {
+ GPR_ASSERT(head_ != nullptr);
+ SockToPolledFdEntry** prev = &head_;
+ for (SockToPolledFdEntry* node = head_; node != nullptr;
+ node = node->next) {
+ if (node->socket == s) {
+ *prev = node->next;
+ Delete(node);
+ return;
+ }
+ prev = &node->next;
+ }
+ abort();
+ }
+
+ /* These virtual socket functions are called from within the c-ares
+ * library. These methods generally dispatch those socket calls to the
+ * appropriate methods. The virtual "socket" and "close" methods are
+ * special and instead create/add and remove/destroy GrpcPolledFdWindows
+ * objects.
+ */
+ static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
+ SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+ SOCKET s = WSASocket(af, type, protocol, nullptr, 0, WSA_FLAG_OVERLAPPED);
+ if (s == INVALID_SOCKET) {
+ return s;
+ }
+ grpc_tcp_set_non_block(s);
+ GrpcPolledFdWindows* polled_fd =
+ New<GrpcPolledFdWindows>(s, map->combiner_);
+ map->AddNewSocket(s, polled_fd);
+ return s;
+ }
+
+ static int Connect(ares_socket_t as, const struct sockaddr* target,
+ ares_socklen_t target_len, void* user_data) {
+ SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+ GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
+ return polled_fd->Connect(target, target_len);
+ }
+
+ static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
+ int iovec_count, void* user_data) {
+ SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+ GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
+ return polled_fd->SendV(iov, iovec_count);
+ }
+
+ static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
+ int flags, struct sockaddr* from,
+ ares_socklen_t* from_len, void* user_data) {
+ SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+ GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
+ return polled_fd->RecvFrom(data, data_len, flags, from, from_len);
+ }
+
+ static int CloseSocket(SOCKET s, void* user_data) {
+ SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+ GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(s);
+ map->RemoveEntry(s);
+ // If a gRPC polled fd has not made it in to the driver's list yet, then
+ // the driver has not and will never see this socket.
+ if (!polled_fd->gotten_into_driver_list()) {
+ polled_fd->ShutdownLocked(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Shut down c-ares fd before without it ever having made it into the "
+ "driver's list"));
+ return 0;
+ }
+ return 0;
+ }
+
+ private:
+ SockToPolledFdEntry* head_ = nullptr;
+ grpc_combiner* combiner_;
+};
+
+const struct ares_socket_functions custom_ares_sock_funcs = {
+ &SockToPolledFdMap::Socket /* socket */,
+ &SockToPolledFdMap::CloseSocket /* close */,
+ &SockToPolledFdMap::Connect /* connect */,
+ &SockToPolledFdMap::RecvFrom /* recvfrom */,
+ &SockToPolledFdMap::SendV /* sendv */,
+};
+
+class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
+ public:
+ GrpcPolledFdFactoryWindows(grpc_combiner* combiner)
+ : sock_to_polled_fd_map_(combiner) {}
+
+ GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
+ grpc_pollset_set* driver_pollset_set,
+ grpc_combiner* combiner) override {
+ GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
+ // Set a flag so that the virtual socket "close" method knows it
+ // doesn't need to call ShutdownLocked, since now the driver will.
+ polled_fd->set_gotten_into_driver_list();
+ return polled_fd;
+ }
-void ConfigureAresChannelLocked(ares_channel* channel) { abort(); }
+ void ConfigureAresChannelLocked(ares_channel channel) override {
+ ares_set_socket_functions(channel, &custom_ares_sock_funcs,
+ &sock_to_polled_fd_map_);
+ }
+
+ private:
+ SockToPolledFdMap sock_to_polled_fd_map_;
+};
+
+UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
+ return UniquePtr<GrpcPolledFdFactory>(
+ New<GrpcPolledFdFactoryWindows>(combiner));
+}
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index b3d6437e9a..485998f5e4 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -49,6 +49,8 @@ static gpr_mu g_init_mu;
grpc_core::TraceFlag grpc_trace_cares_address_sorting(false,
"cares_address_sorting");
+grpc_core::TraceFlag grpc_trace_cares_resolver(false, "cares_resolver");
+
struct grpc_ares_request {
/** indicates the DNS server to use, if specified */
struct ares_addr_port_node dns_server_addr;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index 17eaa7ccf0..ca5779e1d7 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -28,6 +28,13 @@
extern grpc_core::TraceFlag grpc_trace_cares_address_sorting;
+extern grpc_core::TraceFlag grpc_trace_cares_resolver;
+
+#define GRPC_CARES_TRACE_LOG(format, ...) \
+ if (grpc_trace_cares_resolver.enabled()) { \
+ gpr_log(GPR_DEBUG, "(c-ares resolver) " format, __VA_ARGS__); \
+ }
+
typedef struct grpc_ares_request grpc_ares_request;
/* Asynchronously resolve \a name. Use \a default_port if a port isn't
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index fae4c33a17..282caf215c 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -58,6 +58,8 @@ class NativeDnsResolver : public Resolver {
void RequestReresolutionLocked() override;
+ void ResetBackoffLocked() override;
+
void ShutdownLocked() override;
private:
@@ -158,6 +160,13 @@ void NativeDnsResolver::RequestReresolutionLocked() {
}
}
+void NativeDnsResolver::ResetBackoffLocked() {
+ if (have_next_resolution_timer_) {
+ grpc_timer_cancel(&next_resolution_timer_);
+ }
+ backoff_.Reset();
+}
+
void NativeDnsResolver::ShutdownLocked() {
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 71ef8c518b..0e40f42e18 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -132,6 +132,8 @@ struct grpc_subchannel {
bool have_alarm;
/** have we started the backoff loop */
bool backoff_begun;
+ // reset_backoff() was called while alarm was pending
+ bool deferred_reset_backoff;
/** our alarm */
grpc_timer alarm;
@@ -402,6 +404,8 @@ static void continue_connect_locked(grpc_subchannel* c) {
c->next_attempt_deadline = c->backoff->NextAttemptTime();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
+ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "connecting");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
@@ -438,6 +442,9 @@ static void on_alarm(void* arg, grpc_error* error) {
if (c->disconnected) {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
&error, 1);
+ } else if (c->deferred_reset_backoff) {
+ c->deferred_reset_backoff = false;
+ error = GRPC_ERROR_NONE;
} else {
GRPC_ERROR_REF(error);
}
@@ -473,8 +480,6 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
if (!c->backoff_begun) {
c->backoff_begun = true;
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "connecting");
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
@@ -489,11 +494,6 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
- // During backoff, we prefer the connectivity state of CONNECTING instead of
- // TRANSIENT_FAILURE in order to prevent triggering re-resolution
- // continuously in pick_first.
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "backoff");
}
}
@@ -675,6 +675,19 @@ static void on_subchannel_connected(void* arg, grpc_error* error) {
grpc_channel_args_destroy(delete_channel_args);
}
+void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) {
+ gpr_mu_lock(&subchannel->mu);
+ if (subchannel->have_alarm) {
+ subchannel->deferred_reset_backoff = true;
+ grpc_timer_cancel(&subchannel->alarm);
+ } else {
+ subchannel->backoff_begun = false;
+ subchannel->backoff->Reset();
+ maybe_start_connecting_locked(subchannel);
+ }
+ gpr_mu_unlock(&subchannel->mu);
+}
+
/*
* grpc_subchannel_call implementation
*/
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 9e53f7d542..a135035d62 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -145,6 +145,13 @@ grpc_subchannel_get_connected_subchannel(grpc_subchannel* c);
const grpc_subchannel_key* grpc_subchannel_get_key(
const grpc_subchannel* subchannel);
+// Resets the connection backoff of the subchannel.
+// TODO(roth): Move connection backoff out of subchannels and up into LB
+// policy code (probably by adding a SubchannelGroup between
+// SubchannelList and SubchannelData), at which point this method can
+// go away.
+void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel);
+
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call* subchannel_call,
grpc_transport_stream_op_batch* op);