aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-02-08 10:26:25 -0800
committerGravatar Vijay Pai <vpai@google.com>2018-02-08 10:26:25 -0800
commit25b61fd60e9fd24c3b23eac9396e22745f1cf51d (patch)
tree5b19761cfcd1e44fefffda9dad02670324f15ee1 /src/core/ext/filters/client_channel
parentb6cf123717b8f6e405f62dd12cb6c43664e0680a (diff)
parent7bd5e18fea0201fed3edd74e3c3d7caf9040609c (diff)
Merge branch 'master' into gpr_review_tls
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc13
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc82
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc16
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h2
-rw-r--r--src/core/ext/filters/client_channel/parse_address.cc3
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc91
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc4
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc96
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc71
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h15
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc14
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h4
16 files changed, 285 insertions, 135 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index a8a7a37be0..6b93644430 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -1095,6 +1095,7 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) {
chand, calld);
}
async_pick_done_locked(elem, GRPC_ERROR_REF(error));
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
@@ -1134,6 +1135,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
grpc_combiner_scheduler(chand->combiner));
calld->pick.on_complete = &calld->lb_pick_closure;
+ GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
const bool pick_done =
grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick);
if (pick_done) {
@@ -1142,6 +1144,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
}
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
} else {
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
grpc_call_combiner_set_notify_on_cancel(
@@ -1333,12 +1336,12 @@ static void on_complete(void* arg, grpc_error* error) {
static void cc_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
if (chand->deadline_checking_enabled) {
grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
}
- GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
// If we've previously been cancelled, immediately fail any new batches.
if (calld->error != GRPC_ERROR_NONE) {
if (grpc_client_channel_trace.enabled()) {
@@ -1347,7 +1350,7 @@ static void cc_start_transport_stream_op_batch(
}
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(calld->error), calld->call_combiner);
- goto done;
+ return;
}
if (batch->cancel_stream) {
// Stash a copy of cancel_error in our call data, so that we can use
@@ -1369,7 +1372,7 @@ static void cc_start_transport_stream_op_batch(
waiting_for_pick_batches_add(calld, batch);
waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error));
}
- goto done;
+ return;
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
@@ -1391,7 +1394,7 @@ static void cc_start_transport_stream_op_batch(
calld, calld->subchannel_call);
}
grpc_subchannel_call_process_op(calld->subchannel_call, batch);
- goto done;
+ return;
}
// We do not yet have a subchannel call.
// Add the batch to the waiting-for-pick list.
@@ -1417,8 +1420,6 @@ static void cc_start_transport_stream_op_batch(
GRPC_CALL_COMBINER_STOP(calld->call_combiner,
"batch does not include send_initial_metadata");
}
-done:
- GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc
index 7c5f79fb30..d42376413d 100644
--- a/src/core/ext/filters/client_channel/http_proxy.cc
+++ b/src/core/ext/filters/client_channel/http_proxy.cc
@@ -22,7 +22,6 @@
#include <string.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@@ -31,6 +30,7 @@
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/slice/b64.h"
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
index 4596f90745..d6b759227e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
@@ -98,7 +98,7 @@ static void destroy_call_elem(grpc_call_element* elem,
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = (call_data*)elem->call_data;
- GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
+ GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0);
if (calld->client_stats != nullptr) {
// Intercept send_initial_metadata.
if (batch->send_initial_metadata) {
@@ -120,7 +120,6 @@ static void start_transport_stream_op_batch(
}
// Chain to next filter.
grpc_call_next_op(elem, batch);
- GPR_TIMER_END("clr_start_transport_stream_op_batch", 0);
}
const grpc_channel_filter grpc_client_load_reporting_filter = {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 1709e5622e..5e24bdd4e7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -87,7 +87,6 @@
#include <grpc/byte_buffer_reader.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
@@ -106,6 +105,7 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 24c381a46d..ab6d3e6a03 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -328,18 +328,11 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
- * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests
- * re-resolution).
- * CHECK: subchannel_list->num_shutdown ==
- * subchannel_list->num_subchannels.
- *
- * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is
- * TRANSIENT_FAILURE.
- * CHECK: subchannel_list->num_shutdown +
- * subchannel_list->num_transient_failures ==
+ * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
+ * TRANSIENT_FAILURE.
+ * CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
- // TODO(juanlishen): For rule 4, we may want to re-resolve instead.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
@@ -351,22 +344,12 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
/* 2) CONNECTING */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
- } else if (subchannel_list->num_shutdown ==
+ } else if (subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
- /* 3) IDLE and re-resolve */
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE,
- "rr_exhausted_subchannels+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
- GRPC_ERROR_NONE);
- } else if (subchannel_list->num_shutdown +
- subchannel_list->num_transient_failures ==
- subchannel_list->num_subchannels) {
- /* 4) TRANSIENT_FAILURE */
- grpc_connectivity_state_set(&p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "rr_transient_failure");
+ /* 3) TRANSIENT_FAILURE */
+ grpc_connectivity_state_set(
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "rr_exhausted_subchannels");
}
GRPC_ERROR_UNREF(error);
}
@@ -387,6 +370,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
p->shutdown, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
+ GPR_ASSERT(sd->subchannel != nullptr);
// If the policy is shutting down, unref and return.
if (p->shutdown) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
@@ -412,14 +396,19 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- // Update state counters and new overall state.
- update_state_counters_locked(sd);
- update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
sd->connected_subchannel.reset();
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
+ "Requesting re-resolution",
+ p, sd->subchannel);
+ }
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
+ GRPC_ERROR_NONE);
break;
}
case GRPC_CHANNEL_READY: {
@@ -442,8 +431,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %lu) in favor "
"of %p (size %lu)",
- (void*)p, (void*)p->subchannel_list, num_subchannels,
- (void*)sd->subchannel_list, num_subchannels);
+ p, p->subchannel_list, num_subchannels, sd->subchannel_list,
+ num_subchannels);
}
if (p->subchannel_list != nullptr) {
// dispose of the current subchannel_list
@@ -455,7 +444,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
+ * p->pending_picks. This preemptively replicates rr_pick()'s actions.
+ */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
grpc_lb_subchannel_data* selected =
@@ -488,6 +478,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Update state counters and new overall state.
+ update_state_counters_locked(sd);
+ // Only update connectivity based on the selected subchannel list.
+ if (sd->subchannel_list == p->subchannel_list) {
+ update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
+ }
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
@@ -562,6 +558,30 @@ static void rr_update_locked(grpc_lb_policy* policy,
return;
}
if (p->started_picking) {
+ for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
+ const grpc_connectivity_state subchannel_state =
+ grpc_subchannel_check_connectivity(
+ subchannel_list->subchannels[i].subchannel, nullptr);
+ // Override the default setting of IDLE for connectivity notification
+ // purposes if the subchannel is already in transient failure. Otherwise
+ // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
+ // discrepancy, attempt to re-resolve and end up here again.
+ // TODO(roth): As part of C++-ifying the subchannel_list API, design a
+ // better API for notifying the LB policy of subchannel states, which can
+ // be used both for the subchannel's initial state and for subsequent
+ // state changes. This will allow us to handle this more generally instead
+ // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any
+ // pending picks across all READY subchannels rather than sending them all
+ // to the first one).
+ if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ subchannel_list->subchannels[i].pending_connectivity_state_unsafe =
+ subchannel_list->subchannels[i].curr_connectivity_state =
+ subchannel_list->subchannels[i].prev_connectivity_state =
+ subchannel_state;
+ --subchannel_list->num_idle;
+ ++subchannel_list->num_transient_failures;
+ }
+ }
if (p->latest_pending_subchannel_list != nullptr) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index fa2ffcc796..75f7ca2d12 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -54,13 +54,15 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
void grpc_lb_subchannel_data_start_connectivity_watch(
grpc_lb_subchannel_data* sd) {
if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): requesting connectivity change notification",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- (size_t)(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel);
+ gpr_log(
+ GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): requesting connectivity change "
+ "notification (from %s)",
+ sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
+ sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
+ sd->subchannel_list->num_subchannels, sd->subchannel,
+ grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe));
}
sd->connectivity_notification_pending = true;
grpc_subchannel_notify_on_state_change(
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 3377605263..91537f3afe 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -101,8 +101,6 @@ struct grpc_lb_subchannel_list {
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
- /** how many subchannels are in state SHUTDOWN */
- size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index c3309e36a3..4b6905eaa3 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -26,9 +26,10 @@
#endif
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#ifdef GRPC_HAVE_UNIX_SOCKET
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 6ba5f932f0..f2f25bc7c0 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -25,7 +25,6 @@
#include <unistd.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
@@ -35,6 +34,7 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -66,8 +66,8 @@ typedef struct {
grpc_pollset_set* interested_parties;
/** Closures used by the combiner */
- grpc_closure dns_ares_on_retry_timer_locked;
- grpc_closure dns_ares_on_resolved_locked;
+ grpc_closure dns_ares_on_next_resolution_timer_closure;
+ grpc_closure dns_ares_on_resolved_closure;
/** Combiner guarding the rest of the state */
grpc_combiner* combiner;
@@ -85,12 +85,15 @@ typedef struct {
grpc_channel_args** target_result;
/** current (fully resolved) result */
grpc_channel_args* resolved_result;
- /** retry timer */
- bool have_retry_timer;
- grpc_timer retry_timer;
+ /** next resolution timer */
+ bool have_next_resolution_timer;
+ grpc_timer next_resolution_timer;
/** retry backoff state */
grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
-
+ /** min resolution period. Max one resolution will happen per period */
+ grpc_millis min_time_between_resolutions;
+ /** when was the last resolution? -1 if no resolution has happened yet */
+ grpc_millis last_resolution_timestamp;
/** currently resolving addresses */
grpc_lb_addresses* lb_addresses;
/** currently resolving service config */
@@ -100,6 +103,7 @@ typedef struct {
static void dns_ares_destroy(grpc_resolver* r);
static void dns_ares_start_resolving_locked(ares_dns_resolver* r);
+static void dns_ares_maybe_start_resolving_locked(ares_dns_resolver* r);
static void dns_ares_maybe_finish_next_locked(ares_dns_resolver* r);
static void dns_ares_shutdown_locked(grpc_resolver* r);
@@ -114,8 +118,8 @@ static const grpc_resolver_vtable dns_ares_resolver_vtable = {
static void dns_ares_shutdown_locked(grpc_resolver* resolver) {
ares_dns_resolver* r = (ares_dns_resolver*)resolver;
- if (r->have_retry_timer) {
- grpc_timer_cancel(&r->retry_timer);
+ if (r->have_next_resolution_timer) {
+ grpc_timer_cancel(&r->next_resolution_timer);
}
if (r->pending_request != nullptr) {
grpc_cancel_ares_request(r->pending_request);
@@ -131,20 +135,20 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) {
static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) {
ares_dns_resolver* r = (ares_dns_resolver*)resolver;
if (!r->resolving) {
- r->backoff->Reset();
- dns_ares_start_resolving_locked(r);
+ dns_ares_maybe_start_resolving_locked(r);
}
}
-static void dns_ares_on_retry_timer_locked(void* arg, grpc_error* error) {
+static void dns_ares_on_next_resolution_timer_locked(void* arg,
+ grpc_error* error) {
ares_dns_resolver* r = (ares_dns_resolver*)arg;
- r->have_retry_timer = false;
+ r->have_next_resolution_timer = false;
if (error == GRPC_ERROR_NONE) {
if (!r->resolving) {
dns_ares_start_resolving_locked(r);
}
}
- GRPC_RESOLVER_UNREF(&r->base, "retry-timer");
+ GRPC_RESOLVER_UNREF(&r->base, "next_resolution_timer");
}
static bool value_in_json_array(grpc_json* array, const char* value) {
@@ -261,6 +265,9 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) {
if (service_config != nullptr) grpc_service_config_destroy(service_config);
gpr_free(service_config_string);
grpc_lb_addresses_destroy(r->lb_addresses);
+ // Reset backoff state so that we start from the beginning when the
+ // next request gets triggered.
+ r->backoff->Reset();
} else {
const char* msg = grpc_error_string(error);
gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
@@ -268,21 +275,22 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) {
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
- GPR_ASSERT(!r->have_retry_timer);
- r->have_retry_timer = true;
- GRPC_RESOLVER_REF(&r->base, "retry-timer");
+ GPR_ASSERT(!r->have_next_resolution_timer);
+ r->have_next_resolution_timer = true;
+ GRPC_RESOLVER_REF(&r->base, "next_resolution_timer");
if (timeout > 0) {
gpr_log(GPR_DEBUG, "retrying in %" PRIdPTR " milliseconds", timeout);
} else {
gpr_log(GPR_DEBUG, "retrying immediately");
}
- grpc_timer_init(&r->retry_timer, next_try,
- &r->dns_ares_on_retry_timer_locked);
+ grpc_timer_init(&r->next_resolution_timer, next_try,
+ &r->dns_ares_on_next_resolution_timer_closure);
}
if (r->resolved_result != nullptr) {
grpc_channel_args_destroy(r->resolved_result);
}
r->resolved_result = result;
+ r->last_resolution_timestamp = grpc_core::ExecCtx::Get()->Now();
r->resolved_version++;
dns_ares_maybe_finish_next_locked(r);
GRPC_RESOLVER_UNREF(&r->base, "dns-resolving");
@@ -297,8 +305,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- r->backoff->Reset();
- dns_ares_start_resolving_locked(r);
+ dns_ares_maybe_start_resolving_locked(r);
} else {
dns_ares_maybe_finish_next_locked(r);
}
@@ -312,7 +319,7 @@ static void dns_ares_start_resolving_locked(ares_dns_resolver* r) {
r->service_config_json = nullptr;
r->pending_request = grpc_dns_lookup_ares(
r->dns_server, r->name_to_resolve, r->default_port, r->interested_parties,
- &r->dns_ares_on_resolved_locked, &r->lb_addresses,
+ &r->dns_ares_on_resolved_closure, &r->lb_addresses,
true /* check_grpclb */,
r->request_service_config ? &r->service_config_json : nullptr);
}
@@ -330,6 +337,35 @@ static void dns_ares_maybe_finish_next_locked(ares_dns_resolver* r) {
}
}
+static void dns_ares_maybe_start_resolving_locked(ares_dns_resolver* r) {
+ if (r->last_resolution_timestamp >= 0) {
+ const grpc_millis earliest_next_resolution =
+ r->last_resolution_timestamp + r->min_time_between_resolutions;
+ const grpc_millis ms_until_next_resolution =
+ earliest_next_resolution - grpc_core::ExecCtx::Get()->Now();
+ if (ms_until_next_resolution > 0) {
+ const grpc_millis last_resolution_ago =
+ grpc_core::ExecCtx::Get()->Now() - r->last_resolution_timestamp;
+ gpr_log(GPR_DEBUG,
+ "In cooldown from last resolution (from %" PRIdPTR
+ " ms ago). Will resolve again in %" PRIdPTR " ms",
+ last_resolution_ago, ms_until_next_resolution);
+ if (!r->have_next_resolution_timer) {
+ r->have_next_resolution_timer = true;
+ GRPC_RESOLVER_REF(&r->base, "next_resolution_timer_cooldown");
+ grpc_timer_init(&r->next_resolution_timer, ms_until_next_resolution,
+ &r->dns_ares_on_next_resolution_timer_closure);
+ }
+ // TODO(dgq): remove the following two lines once Pick First stops
+ // discarding subchannels after selecting.
+ ++r->resolved_version;
+ dns_ares_maybe_finish_next_locked(r);
+ return;
+ }
+ }
+ dns_ares_start_resolving_locked(r);
+}
+
static void dns_ares_destroy(grpc_resolver* gr) {
gpr_log(GPR_DEBUG, "dns_ares_destroy");
ares_dns_resolver* r = (ares_dns_resolver*)gr;
@@ -374,12 +410,17 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args,
.set_jitter(GRPC_DNS_RECONNECT_JITTER)
.set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
r->backoff.Init(grpc_core::BackOff(backoff_options));
- GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked,
- dns_ares_on_retry_timer_locked, r,
+ GRPC_CLOSURE_INIT(&r->dns_ares_on_next_resolution_timer_closure,
+ dns_ares_on_next_resolution_timer_locked, r,
grpc_combiner_scheduler(r->base.combiner));
- GRPC_CLOSURE_INIT(&r->dns_ares_on_resolved_locked,
+ GRPC_CLOSURE_INIT(&r->dns_ares_on_resolved_closure,
dns_ares_on_resolved_locked, r,
grpc_combiner_scheduler(r->base.combiner));
+ const grpc_arg* period_arg = grpc_channel_args_find(
+ args->args, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS);
+ r->min_time_between_resolutions =
+ grpc_channel_arg_get_integer(period_arg, {1000, 0, INT_MAX});
+ r->last_resolution_timestamp = -1;
return &r->base;
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 2b35bdb605..d76c8069d5 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -28,7 +28,6 @@
#include <ares.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
@@ -36,6 +35,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
@@ -505,7 +505,7 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
}
}
GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error));
- grpc_lb_addresses_destroy(r->lb_addrs);
+ if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs);
gpr_free(r);
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 62f03d52c0..478810d263 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -19,17 +19,19 @@
#include <grpc/support/port_platform.h>
#include <inttypes.h>
-#include <string.h>
+#include <climits>
+#include <cstring>
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -65,13 +67,16 @@ typedef struct {
grpc_channel_args** target_result;
/** current (fully resolved) result */
grpc_channel_args* resolved_result;
- /** retry timer */
- bool have_retry_timer;
- grpc_timer retry_timer;
- grpc_closure on_retry;
+ /** next resolution timer */
+ bool have_next_resolution_timer;
+ grpc_timer next_resolution_timer;
+ grpc_closure next_resolution_closure;
/** retry backoff state */
grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
-
+ /** min resolution period. Max one resolution will happen per period */
+ grpc_millis min_time_between_resolutions;
+ /** when was the last resolution? -1 if no resolution has happened yet */
+ grpc_millis last_resolution_timestamp;
/** currently resolving addresses */
grpc_resolved_addresses* addresses;
} dns_resolver;
@@ -79,6 +84,7 @@ typedef struct {
static void dns_destroy(grpc_resolver* r);
static void dns_start_resolving_locked(dns_resolver* r);
+static void maybe_start_resolving_locked(dns_resolver* r);
static void dns_maybe_finish_next_locked(dns_resolver* r);
static void dns_shutdown_locked(grpc_resolver* r);
@@ -92,8 +98,8 @@ static const grpc_resolver_vtable dns_resolver_vtable = {
static void dns_shutdown_locked(grpc_resolver* resolver) {
dns_resolver* r = (dns_resolver*)resolver;
- if (r->have_retry_timer) {
- grpc_timer_cancel(&r->retry_timer);
+ if (r->have_next_resolution_timer) {
+ grpc_timer_cancel(&r->next_resolution_timer);
}
if (r->next_completion != nullptr) {
*r->target_result = nullptr;
@@ -106,8 +112,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) {
static void dns_channel_saw_error_locked(grpc_resolver* resolver) {
dns_resolver* r = (dns_resolver*)resolver;
if (!r->resolving) {
- r->backoff->Reset();
- dns_start_resolving_locked(r);
+ maybe_start_resolving_locked(r);
}
}
@@ -119,24 +124,19 @@ static void dns_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- r->backoff->Reset();
- dns_start_resolving_locked(r);
+ maybe_start_resolving_locked(r);
} else {
dns_maybe_finish_next_locked(r);
}
}
-static void dns_on_retry_timer_locked(void* arg, grpc_error* error) {
+static void dns_on_next_resolution_timer_locked(void* arg, grpc_error* error) {
dns_resolver* r = (dns_resolver*)arg;
-
- r->have_retry_timer = false;
- if (error == GRPC_ERROR_NONE) {
- if (!r->resolving) {
- dns_start_resolving_locked(r);
- }
+ r->have_next_resolution_timer = false;
+ if (error == GRPC_ERROR_NONE && !r->resolving) {
+ dns_start_resolving_locked(r);
}
-
- GRPC_RESOLVER_UNREF(&r->base, "retry-timer");
+ GRPC_RESOLVER_UNREF(&r->base, "next_resolution_timer");
}
static void dns_on_resolved_locked(void* arg, grpc_error* error) {
@@ -160,22 +160,24 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) {
result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1);
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(addresses);
+ // Reset backoff state so that we start from the beginning when the
+ // next request gets triggered.
+ r->backoff->Reset();
} else {
grpc_millis next_try = r->backoff->NextAttemptTime();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
- GPR_ASSERT(!r->have_retry_timer);
- r->have_retry_timer = true;
- GRPC_RESOLVER_REF(&r->base, "retry-timer");
+ GPR_ASSERT(!r->have_next_resolution_timer);
+ r->have_next_resolution_timer = true;
+ GRPC_RESOLVER_REF(&r->base, "next_resolution_timer");
if (timeout > 0) {
gpr_log(GPR_DEBUG, "retrying in %" PRIdPTR " milliseconds", timeout);
} else {
gpr_log(GPR_DEBUG, "retrying immediately");
}
- GRPC_CLOSURE_INIT(&r->on_retry, dns_on_retry_timer_locked, r,
- grpc_combiner_scheduler(r->base.combiner));
- grpc_timer_init(&r->retry_timer, next_try, &r->on_retry);
+ grpc_timer_init(&r->next_resolution_timer, next_try,
+ &r->next_resolution_closure);
}
if (r->resolved_result != nullptr) {
grpc_channel_args_destroy(r->resolved_result);
@@ -188,6 +190,35 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) {
GRPC_RESOLVER_UNREF(&r->base, "dns-resolving");
}
+static void maybe_start_resolving_locked(dns_resolver* r) {
+ if (r->last_resolution_timestamp >= 0) {
+ const grpc_millis earliest_next_resolution =
+ r->last_resolution_timestamp + r->min_time_between_resolutions;
+ const grpc_millis ms_until_next_resolution =
+ earliest_next_resolution - grpc_core::ExecCtx::Get()->Now();
+ if (ms_until_next_resolution > 0) {
+ const grpc_millis last_resolution_ago =
+ grpc_core::ExecCtx::Get()->Now() - r->last_resolution_timestamp;
+ gpr_log(GPR_DEBUG,
+ "In cooldown from last resolution (from %" PRIdPTR
+ " ms ago). Will resolve again in %" PRIdPTR " ms",
+ last_resolution_ago, ms_until_next_resolution);
+ if (!r->have_next_resolution_timer) {
+ r->have_next_resolution_timer = true;
+ GRPC_RESOLVER_REF(&r->base, "next_resolution_timer_cooldown");
+ grpc_timer_init(&r->next_resolution_timer, ms_until_next_resolution,
+ &r->next_resolution_closure);
+ }
+ // TODO(dgq): remove the following two lines once Pick First stops
+ // discarding subchannels after selecting.
+ ++r->resolved_version;
+ dns_maybe_finish_next_locked(r);
+ return;
+ }
+ }
+ dns_start_resolving_locked(r);
+}
+
static void dns_start_resolving_locked(dns_resolver* r) {
GRPC_RESOLVER_REF(&r->base, "dns-resolving");
GPR_ASSERT(!r->resolving);
@@ -198,6 +229,7 @@ static void dns_start_resolving_locked(dns_resolver* r) {
GRPC_CLOSURE_CREATE(dns_on_resolved_locked, r,
grpc_combiner_scheduler(r->base.combiner)),
&r->addresses);
+ r->last_resolution_timestamp = grpc_core::ExecCtx::Get()->Now();
}
static void dns_maybe_finish_next_locked(dns_resolver* r) {
@@ -250,6 +282,14 @@ static grpc_resolver* dns_create(grpc_resolver_args* args,
.set_jitter(GRPC_DNS_RECONNECT_JITTER)
.set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
r->backoff.Init(grpc_core::BackOff(backoff_options));
+ const grpc_arg* period_arg = grpc_channel_args_find(
+ args->args, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS);
+ r->min_time_between_resolutions =
+ grpc_channel_arg_get_integer(period_arg, {1000, 0, INT_MAX});
+ r->last_resolution_timestamp = -1;
+ GRPC_CLOSURE_INIT(&r->next_resolution_closure,
+ dns_on_next_resolution_timer_locked, r,
+ grpc_combiner_scheduler(r->base.combiner));
return &r->base;
}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index eaa5e6ac49..f457917775 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -24,7 +24,6 @@
#include <string.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
@@ -32,6 +31,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -47,10 +47,10 @@
//
typedef struct {
- // base class -- must be first
+ // Base class -- must be first
grpc_resolver base;
- // passed-in parameters
+ // Passed-in parameters
grpc_channel_args* channel_args;
// If not NULL, the next set of resolution results to be returned to
@@ -61,9 +61,16 @@ typedef struct {
// fake_resolver_channel_saw_error_locked().
grpc_channel_args* results_upon_error;
- // pending next completion, or NULL
+ // TODO(juanlishen): This can go away once pick_first is changed to not throw
+ // away its subchannels, since that will eliminate its dependence on
+ // channel_saw_error_locked() causing an immediate resolver return.
+ // A copy of the most-recently used resolution results.
+ grpc_channel_args* last_used_results;
+
+ // Pending next completion, or NULL
grpc_closure* next_completion;
- // target result address for next completion
+
+ // Target result address for next completion
grpc_channel_args** target_result;
} fake_resolver;
@@ -71,6 +78,7 @@ static void fake_resolver_destroy(grpc_resolver* gr) {
fake_resolver* r = (fake_resolver*)gr;
grpc_channel_args_destroy(r->next_results);
grpc_channel_args_destroy(r->results_upon_error);
+ grpc_channel_args_destroy(r->last_used_results);
grpc_channel_args_destroy(r->channel_args);
gpr_free(r);
}
@@ -98,9 +106,15 @@ static void fake_resolver_maybe_finish_next_locked(fake_resolver* r) {
static void fake_resolver_channel_saw_error_locked(grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver;
- if (r->next_results == nullptr && r->results_upon_error != nullptr) {
- // Pretend we re-resolved.
+ // A resolution must have been returned before an error is seen.
+ GPR_ASSERT(r->last_used_results != nullptr);
+ grpc_channel_args_destroy(r->next_results);
+ if (r->results_upon_error != nullptr) {
r->next_results = grpc_channel_args_copy(r->results_upon_error);
+ } else {
+ // If results_upon_error is unavailable, re-resolve with the most-recently
+ // used results to avoid a no-op re-resolution.
+ r->next_results = grpc_channel_args_copy(r->last_used_results);
}
fake_resolver_maybe_finish_next_locked(r);
}
@@ -149,35 +163,56 @@ void grpc_fake_resolver_response_generator_unref(
typedef struct set_response_closure_arg {
grpc_closure set_response_closure;
grpc_fake_resolver_response_generator* generator;
- grpc_channel_args* next_response;
+ grpc_channel_args* response;
+ bool upon_error;
} set_response_closure_arg;
-static void set_response_closure_fn(void* arg, grpc_error* error) {
+static void set_response_closure_locked(void* arg, grpc_error* error) {
set_response_closure_arg* closure_arg = (set_response_closure_arg*)arg;
grpc_fake_resolver_response_generator* generator = closure_arg->generator;
fake_resolver* r = generator->resolver;
- if (r->next_results != nullptr) {
+ if (!closure_arg->upon_error) {
grpc_channel_args_destroy(r->next_results);
- }
- r->next_results = closure_arg->next_response;
- if (r->results_upon_error != nullptr) {
+ r->next_results = closure_arg->response;
+ grpc_channel_args_destroy(r->last_used_results);
+ r->last_used_results = grpc_channel_args_copy(closure_arg->response);
+ fake_resolver_maybe_finish_next_locked(r);
+ } else {
grpc_channel_args_destroy(r->results_upon_error);
+ r->results_upon_error = closure_arg->response;
}
- r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response);
gpr_free(closure_arg);
- fake_resolver_maybe_finish_next_locked(r);
}
void grpc_fake_resolver_response_generator_set_response(
grpc_fake_resolver_response_generator* generator,
- grpc_channel_args* next_response) {
+ grpc_channel_args* response) {
+ GPR_ASSERT(generator->resolver != nullptr);
+ GPR_ASSERT(response != nullptr);
+ set_response_closure_arg* closure_arg =
+ (set_response_closure_arg*)gpr_zalloc(sizeof(*closure_arg));
+ closure_arg->generator = generator;
+ closure_arg->response = grpc_channel_args_copy(response);
+ closure_arg->upon_error = false;
+ GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
+ set_response_closure_locked, closure_arg,
+ grpc_combiner_scheduler(
+ generator->resolver->base.combiner)),
+ GRPC_ERROR_NONE);
+}
+
+void grpc_fake_resolver_response_generator_set_response_upon_error(
+ grpc_fake_resolver_response_generator* generator,
+ grpc_channel_args* response) {
GPR_ASSERT(generator->resolver != nullptr);
set_response_closure_arg* closure_arg =
(set_response_closure_arg*)gpr_zalloc(sizeof(*closure_arg));
closure_arg->generator = generator;
- closure_arg->next_response = grpc_channel_args_copy(next_response);
+ closure_arg->response =
+ response != nullptr ? grpc_channel_args_copy(response) : nullptr;
+ closure_arg->upon_error = true;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
- set_response_closure_fn, closure_arg,
+ set_response_closure_locked, closure_arg,
grpc_combiner_scheduler(
generator->resolver->base.combiner)),
GRPC_ERROR_NONE);
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
index a8977e5980..94f9a8e6ca 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -36,11 +36,20 @@ typedef struct grpc_fake_resolver_response_generator
grpc_fake_resolver_response_generator*
grpc_fake_resolver_response_generator_create();
-// Instruct the fake resolver associated with the \a response_generator instance
-// to trigger a new resolution for \a uri and \a args.
+// Set next response of the fake resolver associated with the \a
+// response_generator instance and trigger a new resolution.
void grpc_fake_resolver_response_generator_set_response(
grpc_fake_resolver_response_generator* generator,
- grpc_channel_args* next_response);
+ grpc_channel_args* response);
+
+// Set results_upon_error of the fake resolver associated with the \a
+// response_generator instance. When fake_resolver_channel_saw_error_locked() is
+// called, results_upon_error will be returned as long as it's non-NULL,
+// otherwise the last value set by
+// grpc_fake_resolver_response_generator_set_response() will be returned.
+void grpc_fake_resolver_response_generator_set_response_upon_error(
+ grpc_fake_resolver_response_generator* generator,
+ grpc_channel_args* response);
// Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance.
grpc_arg grpc_fake_resolver_response_generator_arg(
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
index 99ad78e23c..784935eb20 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
@@ -22,7 +22,6 @@
#include <string.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
@@ -30,6 +29,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index cad8578511..6bf710c948 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -653,14 +653,13 @@ static void on_subchannel_connected(void* arg, grpc_error* error) {
*/
static void subchannel_call_destroy(void* call, grpc_error* error) {
+ GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0);
grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != nullptr);
- GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_core::ConnectedSubchannel* connection = c->connection;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
connection->Unref(DEBUG_LOCATION, "subchannel_call");
- GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call,
@@ -682,12 +681,11 @@ void grpc_subchannel_call_unref(
void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
grpc_transport_stream_op_batch* batch) {
- GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
+ GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0);
grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
- GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
@@ -740,8 +738,9 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
}
namespace grpc_core {
+
ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
- : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount),
+ : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
channel_stack_(channel_stack) {}
ConnectedSubchannel::~ConnectedSubchannel() {
@@ -776,7 +775,9 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
args.arena,
sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size);
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
- Ref(DEBUG_LOCATION, "subchannel_call");
+ RefCountedPtr<ConnectedSubchannel> connection =
+ Ref(DEBUG_LOCATION, "subchannel_call");
+ connection.release(); // Ref is passed to the grpc_subchannel_call object.
(*call)->connection = this;
const grpc_call_element_args call_args = {
callstk, /* call_stack */
@@ -798,4 +799,5 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
return GRPC_ERROR_NONE;
}
+
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index b7593ec911..d2b45ae9c8 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -68,7 +68,8 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
#endif
namespace grpc_core {
-class ConnectedSubchannel : public grpc_core::RefCountedWithTracing {
+
+class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
public:
struct CallArgs {
grpc_polling_entity* pollent;
@@ -93,6 +94,7 @@ class ConnectedSubchannel : public grpc_core::RefCountedWithTracing {
private:
grpc_channel_stack* channel_stack_;
};
+
} // namespace grpc_core
grpc_subchannel* grpc_subchannel_ref(