diff options
author | Vijay Pai <vpai@google.com> | 2018-02-08 10:26:25 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-02-08 10:26:25 -0800 |
commit | 25b61fd60e9fd24c3b23eac9396e22745f1cf51d (patch) | |
tree | 5b19761cfcd1e44fefffda9dad02670324f15ee1 /src/core/ext/filters/client_channel | |
parent | b6cf123717b8f6e405f62dd12cb6c43664e0680a (diff) | |
parent | 7bd5e18fea0201fed3edd74e3c3d7caf9040609c (diff) |
Merge branch 'master' into gpr_review_tls
Diffstat (limited to 'src/core/ext/filters/client_channel')
16 files changed, 285 insertions, 135 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a8a7a37be0..6b93644430 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1095,6 +1095,7 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) { chand, calld); } async_pick_done_locked(elem, GRPC_ERROR_REF(error)); + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). @@ -1134,6 +1135,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); calld->pick.on_complete = &calld->lb_pick_closure; + GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); const bool pick_done = grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick); if (pick_done) { @@ -1142,6 +1144,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", chand, calld); } + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } else { GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); grpc_call_combiner_set_notify_on_cancel( @@ -1333,12 +1336,12 @@ static void on_complete(void* arg, grpc_error* error) { static void cc_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } - GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); // If we've previously been cancelled, immediately fail any new batches. if (calld->error != GRPC_ERROR_NONE) { if (grpc_client_channel_trace.enabled()) { @@ -1347,7 +1350,7 @@ static void cc_start_transport_stream_op_batch( } grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(calld->error), calld->call_combiner); - goto done; + return; } if (batch->cancel_stream) { // Stash a copy of cancel_error in our call data, so that we can use @@ -1369,7 +1372,7 @@ static void cc_start_transport_stream_op_batch( waiting_for_pick_batches_add(calld, batch); waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error)); } - goto done; + return; } // Intercept on_complete for recv_trailing_metadata so that we can // check retry throttle status. @@ -1391,7 +1394,7 @@ static void cc_start_transport_stream_op_batch( calld, calld->subchannel_call); } grpc_subchannel_call_process_op(calld->subchannel_call, batch); - goto done; + return; } // We do not yet have a subchannel call. // Add the batch to the waiting-for-pick list. @@ -1417,8 +1420,6 @@ static void cc_start_transport_stream_op_batch( GRPC_CALL_COMBINER_STOP(calld->call_combiner, "batch does not include send_initial_metadata"); } -done: - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc index 7c5f79fb30..d42376413d 100644 --- a/src/core/ext/filters/client_channel/http_proxy.cc +++ b/src/core/ext/filters/client_channel/http_proxy.cc @@ -22,7 +22,6 @@ #include <string.h> #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -31,6 +30,7 @@ #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/slice/b64.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index 4596f90745..d6b759227e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -98,7 +98,7 @@ static void destroy_call_elem(grpc_call_element* elem, static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; - GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); + GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0); if (calld->client_stats != nullptr) { // Intercept send_initial_metadata. if (batch->send_initial_metadata) { @@ -120,7 +120,6 @@ static void start_transport_stream_op_batch( } // Chain to next filter. grpc_call_next_op(elem, batch); - GPR_TIMER_END("clr_start_transport_stream_op_batch", 0); } const grpc_channel_filter grpc_client_load_reporting_filter = { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 1709e5622e..5e24bdd4e7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -87,7 +87,6 @@ #include <grpc/byte_buffer_reader.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> @@ -106,6 +105,7 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 24c381a46d..ab6d3e6a03 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -328,18 +328,11 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING. * - * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests - * re-resolution). - * CHECK: subchannel_list->num_shutdown == - * subchannel_list->num_subchannels. - * - * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE. - * CHECK: subchannel_list->num_shutdown + - * subchannel_list->num_transient_failures == + * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + * TRANSIENT_FAILURE. + * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ - // TODO(juanlishen): For rule 4, we may want to re-resolve instead. grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy; GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); @@ -351,22 +344,12 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, /* 2) CONNECTING */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - } else if (subchannel_list->num_shutdown == + } else if (subchannel_list->num_transient_failures == subchannel_list->num_subchannels) { - /* 3) IDLE and re-resolve */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, - "rr_exhausted_subchannels+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, - GRPC_ERROR_NONE); - } else if (subchannel_list->num_shutdown + - subchannel_list->num_transient_failures == - subchannel_list->num_subchannels) { - /* 4) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "rr_transient_failure"); + /* 3) TRANSIENT_FAILURE */ + grpc_connectivity_state_set( + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "rr_exhausted_subchannels"); } GRPC_ERROR_UNREF(error); } @@ -387,6 +370,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { p->shutdown, sd->subchannel_list->shutting_down, grpc_error_string(error)); } + GPR_ASSERT(sd->subchannel != nullptr); // If the policy is shutting down, unref and return. if (p->shutdown) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); @@ -412,14 +396,19 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // Update state counters and new overall state. - update_state_counters_locked(sd); - update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // subchannel, if any. switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: { sd->connected_subchannel.reset(); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, + "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "Requesting re-resolution", + p, sd->subchannel); + } + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_NONE); break; } case GRPC_CHANNEL_READY: { @@ -442,8 +431,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "[RR %p] phasing out subchannel list %p (size %lu) in favor " "of %p (size %lu)", - (void*)p, (void*)p->subchannel_list, num_subchannels, - (void*)sd->subchannel_list, num_subchannels); + p, p->subchannel_list, num_subchannels, sd->subchannel_list, + num_subchannels); } if (p->subchannel_list != nullptr) { // dispose of the current subchannel_list @@ -455,7 +444,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ + * p->pending_picks. This preemptively replicates rr_pick()'s actions. + */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); grpc_lb_subchannel_data* selected = @@ -488,6 +478,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE:; // fallthrough } + // Update state counters and new overall state. + update_state_counters_locked(sd); + // Only update connectivity based on the selected subchannel list. + if (sd->subchannel_list == p->subchannel_list) { + update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); + } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); } @@ -562,6 +558,30 @@ static void rr_update_locked(grpc_lb_policy* policy, return; } if (p->started_picking) { + for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + const grpc_connectivity_state subchannel_state = + grpc_subchannel_check_connectivity( + subchannel_list->subchannels[i].subchannel, nullptr); + // Override the default setting of IDLE for connectivity notification + // purposes if the subchannel is already in transient failure. Otherwise + // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE + // discrepancy, attempt to re-resolve and end up here again. + // TODO(roth): As part of C++-ifying the subchannel_list API, design a + // better API for notifying the LB policy of subchannel states, which can + // be used both for the subchannel's initial state and for subsequent + // state changes. This will allow us to handle this more generally instead + // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any + // pending picks across all READY subchannels rather than sending them all + // to the first one). + if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + subchannel_list->subchannels[i].pending_connectivity_state_unsafe = + subchannel_list->subchannels[i].curr_connectivity_state = + subchannel_list->subchannels[i].prev_connectivity_state = + subchannel_state; + --subchannel_list->num_idle; + ++subchannel_list->num_transient_failures; + } + } if (p->latest_pending_subchannel_list != nullptr) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index fa2ffcc796..75f7ca2d12 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -54,13 +54,15 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, void grpc_lb_subchannel_data_start_connectivity_watch( grpc_lb_subchannel_data* sd) { if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): requesting connectivity change notification", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); + gpr_log( + GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): requesting connectivity change " + "notification (from %s)", + sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel, + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe)); } sd->connectivity_notification_pending = true; grpc_subchannel_notify_on_state_change( diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 3377605263..91537f3afe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -101,8 +101,6 @@ struct grpc_lb_subchannel_list { size_t num_ready; /** how many subchannels are in state TRANSIENT_FAILURE */ size_t num_transient_failures; - /** how many subchannels are in state SHUTDOWN */ - size_t num_shutdown; /** how many subchannels are in state IDLE */ size_t num_idle; diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc index c3309e36a3..4b6905eaa3 100644 --- a/src/core/ext/filters/client_channel/parse_address.cc +++ b/src/core/ext/filters/client_channel/parse_address.cc @@ -26,9 +26,10 @@ #endif #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> + +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #ifdef GRPC_HAVE_UNIX_SOCKET diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 6ba5f932f0..f2f25bc7c0 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -25,7 +25,6 @@ #include <unistd.h> #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/string_util.h> #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" @@ -35,6 +34,7 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" @@ -66,8 +66,8 @@ typedef struct { grpc_pollset_set* interested_parties; /** Closures used by the combiner */ - grpc_closure dns_ares_on_retry_timer_locked; - grpc_closure dns_ares_on_resolved_locked; + grpc_closure dns_ares_on_next_resolution_timer_closure; + grpc_closure dns_ares_on_resolved_closure; /** Combiner guarding the rest of the state */ grpc_combiner* combiner; @@ -85,12 +85,15 @@ typedef struct { grpc_channel_args** target_result; /** current (fully resolved) result */ grpc_channel_args* resolved_result; - /** retry timer */ - bool have_retry_timer; - grpc_timer retry_timer; + /** next resolution timer */ + bool have_next_resolution_timer; + grpc_timer next_resolution_timer; /** retry backoff state */ grpc_core::ManualConstructor<grpc_core::BackOff> backoff; - + /** min resolution period. Max one resolution will happen per period */ + grpc_millis min_time_between_resolutions; + /** when was the last resolution? -1 if no resolution has happened yet */ + grpc_millis last_resolution_timestamp; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; /** currently resolving service config */ @@ -100,6 +103,7 @@ typedef struct { static void dns_ares_destroy(grpc_resolver* r); static void dns_ares_start_resolving_locked(ares_dns_resolver* r); +static void dns_ares_maybe_start_resolving_locked(ares_dns_resolver* r); static void dns_ares_maybe_finish_next_locked(ares_dns_resolver* r); static void dns_ares_shutdown_locked(grpc_resolver* r); @@ -114,8 +118,8 @@ static const grpc_resolver_vtable dns_ares_resolver_vtable = { static void dns_ares_shutdown_locked(grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; - if (r->have_retry_timer) { - grpc_timer_cancel(&r->retry_timer); + if (r->have_next_resolution_timer) { + grpc_timer_cancel(&r->next_resolution_timer); } if (r->pending_request != nullptr) { grpc_cancel_ares_request(r->pending_request); @@ -131,20 +135,20 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) { static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { - r->backoff->Reset(); - dns_ares_start_resolving_locked(r); + dns_ares_maybe_start_resolving_locked(r); } } -static void dns_ares_on_retry_timer_locked(void* arg, grpc_error* error) { +static void dns_ares_on_next_resolution_timer_locked(void* arg, + grpc_error* error) { ares_dns_resolver* r = (ares_dns_resolver*)arg; - r->have_retry_timer = false; + r->have_next_resolution_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { dns_ares_start_resolving_locked(r); } } - GRPC_RESOLVER_UNREF(&r->base, "retry-timer"); + GRPC_RESOLVER_UNREF(&r->base, "next_resolution_timer"); } static bool value_in_json_array(grpc_json* array, const char* value) { @@ -261,6 +265,9 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { if (service_config != nullptr) grpc_service_config_destroy(service_config); gpr_free(service_config_string); grpc_lb_addresses_destroy(r->lb_addresses); + // Reset backoff state so that we start from the beginning when the + // next request gets triggered. + r->backoff->Reset(); } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); @@ -268,21 +275,22 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); - GPR_ASSERT(!r->have_retry_timer); - r->have_retry_timer = true; - GRPC_RESOLVER_REF(&r->base, "retry-timer"); + GPR_ASSERT(!r->have_next_resolution_timer); + r->have_next_resolution_timer = true; + GRPC_RESOLVER_REF(&r->base, "next_resolution_timer"); if (timeout > 0) { gpr_log(GPR_DEBUG, "retrying in %" PRIdPTR " milliseconds", timeout); } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_timer_init(&r->retry_timer, next_try, - &r->dns_ares_on_retry_timer_locked); + grpc_timer_init(&r->next_resolution_timer, next_try, + &r->dns_ares_on_next_resolution_timer_closure); } if (r->resolved_result != nullptr) { grpc_channel_args_destroy(r->resolved_result); } r->resolved_result = result; + r->last_resolution_timestamp = grpc_core::ExecCtx::Get()->Now(); r->resolved_version++; dns_ares_maybe_finish_next_locked(r); GRPC_RESOLVER_UNREF(&r->base, "dns-resolving"); @@ -297,8 +305,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - r->backoff->Reset(); - dns_ares_start_resolving_locked(r); + dns_ares_maybe_start_resolving_locked(r); } else { dns_ares_maybe_finish_next_locked(r); } @@ -312,7 +319,7 @@ static void dns_ares_start_resolving_locked(ares_dns_resolver* r) { r->service_config_json = nullptr; r->pending_request = grpc_dns_lookup_ares( r->dns_server, r->name_to_resolve, r->default_port, r->interested_parties, - &r->dns_ares_on_resolved_locked, &r->lb_addresses, + &r->dns_ares_on_resolved_closure, &r->lb_addresses, true /* check_grpclb */, r->request_service_config ? &r->service_config_json : nullptr); } @@ -330,6 +337,35 @@ static void dns_ares_maybe_finish_next_locked(ares_dns_resolver* r) { } } +static void dns_ares_maybe_start_resolving_locked(ares_dns_resolver* r) { + if (r->last_resolution_timestamp >= 0) { + const grpc_millis earliest_next_resolution = + r->last_resolution_timestamp + r->min_time_between_resolutions; + const grpc_millis ms_until_next_resolution = + earliest_next_resolution - grpc_core::ExecCtx::Get()->Now(); + if (ms_until_next_resolution > 0) { + const grpc_millis last_resolution_ago = + grpc_core::ExecCtx::Get()->Now() - r->last_resolution_timestamp; + gpr_log(GPR_DEBUG, + "In cooldown from last resolution (from %" PRIdPTR + " ms ago). Will resolve again in %" PRIdPTR " ms", + last_resolution_ago, ms_until_next_resolution); + if (!r->have_next_resolution_timer) { + r->have_next_resolution_timer = true; + GRPC_RESOLVER_REF(&r->base, "next_resolution_timer_cooldown"); + grpc_timer_init(&r->next_resolution_timer, ms_until_next_resolution, + &r->dns_ares_on_next_resolution_timer_closure); + } + // TODO(dgq): remove the following two lines once Pick First stops + // discarding subchannels after selecting. + ++r->resolved_version; + dns_ares_maybe_finish_next_locked(r); + return; + } + } + dns_ares_start_resolving_locked(r); +} + static void dns_ares_destroy(grpc_resolver* gr) { gpr_log(GPR_DEBUG, "dns_ares_destroy"); ares_dns_resolver* r = (ares_dns_resolver*)gr; @@ -374,12 +410,17 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args, .set_jitter(GRPC_DNS_RECONNECT_JITTER) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); r->backoff.Init(grpc_core::BackOff(backoff_options)); - GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, - dns_ares_on_retry_timer_locked, r, + GRPC_CLOSURE_INIT(&r->dns_ares_on_next_resolution_timer_closure, + dns_ares_on_next_resolution_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); - GRPC_CLOSURE_INIT(&r->dns_ares_on_resolved_locked, + GRPC_CLOSURE_INIT(&r->dns_ares_on_resolved_closure, dns_ares_on_resolved_locked, r, grpc_combiner_scheduler(r->base.combiner)); + const grpc_arg* period_arg = grpc_channel_args_find( + args->args, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS); + r->min_time_between_resolutions = + grpc_channel_arg_get_integer(period_arg, {1000, 0, INT_MAX}); + r->last_resolution_timestamp = -1; return &r->base; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 2b35bdb605..d76c8069d5 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -28,7 +28,6 @@ #include <ares.h> #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> @@ -36,6 +35,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" @@ -505,7 +505,7 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { } } GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); - grpc_lb_addresses_destroy(r->lb_addrs); + if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); gpr_free(r); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 62f03d52c0..478810d263 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -19,17 +19,19 @@ #include <grpc/support/port_platform.h> #include <inttypes.h> -#include <string.h> +#include <climits> +#include <cstring> #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/string_util.h> +#include <grpc/support/time.h> #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" @@ -65,13 +67,16 @@ typedef struct { grpc_channel_args** target_result; /** current (fully resolved) result */ grpc_channel_args* resolved_result; - /** retry timer */ - bool have_retry_timer; - grpc_timer retry_timer; - grpc_closure on_retry; + /** next resolution timer */ + bool have_next_resolution_timer; + grpc_timer next_resolution_timer; + grpc_closure next_resolution_closure; /** retry backoff state */ grpc_core::ManualConstructor<grpc_core::BackOff> backoff; - + /** min resolution period. Max one resolution will happen per period */ + grpc_millis min_time_between_resolutions; + /** when was the last resolution? -1 if no resolution has happened yet */ + grpc_millis last_resolution_timestamp; /** currently resolving addresses */ grpc_resolved_addresses* addresses; } dns_resolver; @@ -79,6 +84,7 @@ typedef struct { static void dns_destroy(grpc_resolver* r); static void dns_start_resolving_locked(dns_resolver* r); +static void maybe_start_resolving_locked(dns_resolver* r); static void dns_maybe_finish_next_locked(dns_resolver* r); static void dns_shutdown_locked(grpc_resolver* r); @@ -92,8 +98,8 @@ static const grpc_resolver_vtable dns_resolver_vtable = { static void dns_shutdown_locked(grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; - if (r->have_retry_timer) { - grpc_timer_cancel(&r->retry_timer); + if (r->have_next_resolution_timer) { + grpc_timer_cancel(&r->next_resolution_timer); } if (r->next_completion != nullptr) { *r->target_result = nullptr; @@ -106,8 +112,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) { static void dns_channel_saw_error_locked(grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { - r->backoff->Reset(); - dns_start_resolving_locked(r); + maybe_start_resolving_locked(r); } } @@ -119,24 +124,19 @@ static void dns_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - r->backoff->Reset(); - dns_start_resolving_locked(r); + maybe_start_resolving_locked(r); } else { dns_maybe_finish_next_locked(r); } } -static void dns_on_retry_timer_locked(void* arg, grpc_error* error) { +static void dns_on_next_resolution_timer_locked(void* arg, grpc_error* error) { dns_resolver* r = (dns_resolver*)arg; - - r->have_retry_timer = false; - if (error == GRPC_ERROR_NONE) { - if (!r->resolving) { - dns_start_resolving_locked(r); - } + r->have_next_resolution_timer = false; + if (error == GRPC_ERROR_NONE && !r->resolving) { + dns_start_resolving_locked(r); } - - GRPC_RESOLVER_UNREF(&r->base, "retry-timer"); + GRPC_RESOLVER_UNREF(&r->base, "next_resolution_timer"); } static void dns_on_resolved_locked(void* arg, grpc_error* error) { @@ -160,22 +160,24 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(addresses); + // Reset backoff state so that we start from the beginning when the + // next request gets triggered. + r->backoff->Reset(); } else { grpc_millis next_try = r->backoff->NextAttemptTime(); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); - GPR_ASSERT(!r->have_retry_timer); - r->have_retry_timer = true; - GRPC_RESOLVER_REF(&r->base, "retry-timer"); + GPR_ASSERT(!r->have_next_resolution_timer); + r->have_next_resolution_timer = true; + GRPC_RESOLVER_REF(&r->base, "next_resolution_timer"); if (timeout > 0) { gpr_log(GPR_DEBUG, "retrying in %" PRIdPTR " milliseconds", timeout); } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - GRPC_CLOSURE_INIT(&r->on_retry, dns_on_retry_timer_locked, r, - grpc_combiner_scheduler(r->base.combiner)); - grpc_timer_init(&r->retry_timer, next_try, &r->on_retry); + grpc_timer_init(&r->next_resolution_timer, next_try, + &r->next_resolution_closure); } if (r->resolved_result != nullptr) { grpc_channel_args_destroy(r->resolved_result); @@ -188,6 +190,35 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { GRPC_RESOLVER_UNREF(&r->base, "dns-resolving"); } +static void maybe_start_resolving_locked(dns_resolver* r) { + if (r->last_resolution_timestamp >= 0) { + const grpc_millis earliest_next_resolution = + r->last_resolution_timestamp + r->min_time_between_resolutions; + const grpc_millis ms_until_next_resolution = + earliest_next_resolution - grpc_core::ExecCtx::Get()->Now(); + if (ms_until_next_resolution > 0) { + const grpc_millis last_resolution_ago = + grpc_core::ExecCtx::Get()->Now() - r->last_resolution_timestamp; + gpr_log(GPR_DEBUG, + "In cooldown from last resolution (from %" PRIdPTR + " ms ago). Will resolve again in %" PRIdPTR " ms", + last_resolution_ago, ms_until_next_resolution); + if (!r->have_next_resolution_timer) { + r->have_next_resolution_timer = true; + GRPC_RESOLVER_REF(&r->base, "next_resolution_timer_cooldown"); + grpc_timer_init(&r->next_resolution_timer, ms_until_next_resolution, + &r->next_resolution_closure); + } + // TODO(dgq): remove the following two lines once Pick First stops + // discarding subchannels after selecting. + ++r->resolved_version; + dns_maybe_finish_next_locked(r); + return; + } + } + dns_start_resolving_locked(r); +} + static void dns_start_resolving_locked(dns_resolver* r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); @@ -198,6 +229,7 @@ static void dns_start_resolving_locked(dns_resolver* r) { GRPC_CLOSURE_CREATE(dns_on_resolved_locked, r, grpc_combiner_scheduler(r->base.combiner)), &r->addresses); + r->last_resolution_timestamp = grpc_core::ExecCtx::Get()->Now(); } static void dns_maybe_finish_next_locked(dns_resolver* r) { @@ -250,6 +282,14 @@ static grpc_resolver* dns_create(grpc_resolver_args* args, .set_jitter(GRPC_DNS_RECONNECT_JITTER) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); r->backoff.Init(grpc_core::BackOff(backoff_options)); + const grpc_arg* period_arg = grpc_channel_args_find( + args->args, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS); + r->min_time_between_resolutions = + grpc_channel_arg_get_integer(period_arg, {1000, 0, INT_MAX}); + r->last_resolution_timestamp = -1; + GRPC_CLOSURE_INIT(&r->next_resolution_closure, + dns_on_next_resolution_timer_locked, r, + grpc_combiner_scheduler(r->base.combiner)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index eaa5e6ac49..f457917775 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -24,7 +24,6 @@ #include <string.h> #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> @@ -32,6 +31,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/combiner.h" @@ -47,10 +47,10 @@ // typedef struct { - // base class -- must be first + // Base class -- must be first grpc_resolver base; - // passed-in parameters + // Passed-in parameters grpc_channel_args* channel_args; // If not NULL, the next set of resolution results to be returned to @@ -61,9 +61,16 @@ typedef struct { // fake_resolver_channel_saw_error_locked(). grpc_channel_args* results_upon_error; - // pending next completion, or NULL + // TODO(juanlishen): This can go away once pick_first is changed to not throw + // away its subchannels, since that will eliminate its dependence on + // channel_saw_error_locked() causing an immediate resolver return. + // A copy of the most-recently used resolution results. + grpc_channel_args* last_used_results; + + // Pending next completion, or NULL grpc_closure* next_completion; - // target result address for next completion + + // Target result address for next completion grpc_channel_args** target_result; } fake_resolver; @@ -71,6 +78,7 @@ static void fake_resolver_destroy(grpc_resolver* gr) { fake_resolver* r = (fake_resolver*)gr; grpc_channel_args_destroy(r->next_results); grpc_channel_args_destroy(r->results_upon_error); + grpc_channel_args_destroy(r->last_used_results); grpc_channel_args_destroy(r->channel_args); gpr_free(r); } @@ -98,9 +106,15 @@ static void fake_resolver_maybe_finish_next_locked(fake_resolver* r) { static void fake_resolver_channel_saw_error_locked(grpc_resolver* resolver) { fake_resolver* r = (fake_resolver*)resolver; - if (r->next_results == nullptr && r->results_upon_error != nullptr) { - // Pretend we re-resolved. + // A resolution must have been returned before an error is seen. + GPR_ASSERT(r->last_used_results != nullptr); + grpc_channel_args_destroy(r->next_results); + if (r->results_upon_error != nullptr) { r->next_results = grpc_channel_args_copy(r->results_upon_error); + } else { + // If results_upon_error is unavailable, re-resolve with the most-recently + // used results to avoid a no-op re-resolution. + r->next_results = grpc_channel_args_copy(r->last_used_results); } fake_resolver_maybe_finish_next_locked(r); } @@ -149,35 +163,56 @@ void grpc_fake_resolver_response_generator_unref( typedef struct set_response_closure_arg { grpc_closure set_response_closure; grpc_fake_resolver_response_generator* generator; - grpc_channel_args* next_response; + grpc_channel_args* response; + bool upon_error; } set_response_closure_arg; -static void set_response_closure_fn(void* arg, grpc_error* error) { +static void set_response_closure_locked(void* arg, grpc_error* error) { set_response_closure_arg* closure_arg = (set_response_closure_arg*)arg; grpc_fake_resolver_response_generator* generator = closure_arg->generator; fake_resolver* r = generator->resolver; - if (r->next_results != nullptr) { + if (!closure_arg->upon_error) { grpc_channel_args_destroy(r->next_results); - } - r->next_results = closure_arg->next_response; - if (r->results_upon_error != nullptr) { + r->next_results = closure_arg->response; + grpc_channel_args_destroy(r->last_used_results); + r->last_used_results = grpc_channel_args_copy(closure_arg->response); + fake_resolver_maybe_finish_next_locked(r); + } else { grpc_channel_args_destroy(r->results_upon_error); + r->results_upon_error = closure_arg->response; } - r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response); gpr_free(closure_arg); - fake_resolver_maybe_finish_next_locked(r); } void grpc_fake_resolver_response_generator_set_response( grpc_fake_resolver_response_generator* generator, - grpc_channel_args* next_response) { + grpc_channel_args* response) { + GPR_ASSERT(generator->resolver != nullptr); + GPR_ASSERT(response != nullptr); + set_response_closure_arg* closure_arg = + (set_response_closure_arg*)gpr_zalloc(sizeof(*closure_arg)); + closure_arg->generator = generator; + closure_arg->response = grpc_channel_args_copy(response); + closure_arg->upon_error = false; + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, + set_response_closure_locked, closure_arg, + grpc_combiner_scheduler( + generator->resolver->base.combiner)), + GRPC_ERROR_NONE); +} + +void grpc_fake_resolver_response_generator_set_response_upon_error( + grpc_fake_resolver_response_generator* generator, + grpc_channel_args* response) { GPR_ASSERT(generator->resolver != nullptr); set_response_closure_arg* closure_arg = (set_response_closure_arg*)gpr_zalloc(sizeof(*closure_arg)); closure_arg->generator = generator; - closure_arg->next_response = grpc_channel_args_copy(next_response); + closure_arg->response = + response != nullptr ? grpc_channel_args_copy(response) : nullptr; + closure_arg->upon_error = true; GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, - set_response_closure_fn, closure_arg, + set_response_closure_locked, closure_arg, grpc_combiner_scheduler( generator->resolver->base.combiner)), GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index a8977e5980..94f9a8e6ca 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -36,11 +36,20 @@ typedef struct grpc_fake_resolver_response_generator grpc_fake_resolver_response_generator* grpc_fake_resolver_response_generator_create(); -// Instruct the fake resolver associated with the \a response_generator instance -// to trigger a new resolution for \a uri and \a args. +// Set next response of the fake resolver associated with the \a +// response_generator instance and trigger a new resolution. void grpc_fake_resolver_response_generator_set_response( grpc_fake_resolver_response_generator* generator, - grpc_channel_args* next_response); + grpc_channel_args* response); + +// Set results_upon_error of the fake resolver associated with the \a +// response_generator instance. When fake_resolver_channel_saw_error_locked() is +// called, results_upon_error will be returned as long as it's non-NULL, +// otherwise the last value set by +// grpc_fake_resolver_response_generator_set_response() will be returned. +void grpc_fake_resolver_response_generator_set_response_upon_error( + grpc_fake_resolver_response_generator* generator, + grpc_channel_args* response); // Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance. grpc_arg grpc_fake_resolver_response_generator_arg( diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 99ad78e23c..784935eb20 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -22,7 +22,6 @@ #include <string.h> #include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> @@ -30,6 +29,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index cad8578511..6bf710c948 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -653,14 +653,13 @@ static void on_subchannel_connected(void* arg, grpc_error* error) { */ static void subchannel_call_destroy(void* call, grpc_error* error) { + GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0); grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); - GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); connection->Unref(DEBUG_LOCATION, "subchannel_call"); - GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, @@ -682,12 +681,11 @@ void grpc_subchannel_call_unref( void grpc_subchannel_call_process_op(grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { - GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0); + GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0); grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); top_elem->filter->start_transport_stream_op_batch(top_elem, batch); - GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> @@ -740,8 +738,9 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { } namespace grpc_core { + ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) - : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), + : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), channel_stack_(channel_stack) {} ConnectedSubchannel::~ConnectedSubchannel() { @@ -776,7 +775,9 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, args.arena, sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - Ref(DEBUG_LOCATION, "subchannel_call"); + RefCountedPtr<ConnectedSubchannel> connection = + Ref(DEBUG_LOCATION, "subchannel_call"); + connection.release(); // Ref is passed to the grpc_subchannel_call object. (*call)->connection = this; const grpc_call_element_args call_args = { callstk, /* call_stack */ @@ -798,4 +799,5 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); return GRPC_ERROR_NONE; } + } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index b7593ec911..d2b45ae9c8 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -68,7 +68,8 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #endif namespace grpc_core { -class ConnectedSubchannel : public grpc_core::RefCountedWithTracing { + +class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { public: struct CallArgs { grpc_polling_entity* pollent; @@ -93,6 +94,7 @@ class ConnectedSubchannel : public grpc_core::RefCountedWithTracing { private: grpc_channel_stack* channel_stack_; }; + } // namespace grpc_core grpc_subchannel* grpc_subchannel_ref( |