diff options
author | 2018-01-19 12:20:15 -0800 | |
---|---|---|
committer | 2018-01-19 12:20:15 -0800 | |
commit | fb061c329ac5086b62619b118191574f3fd27a79 (patch) | |
tree | 6a855770d7218425a2cd53bc378800c30d6c8525 /src/core/ext/filters | |
parent | 032e9b32dc5978a042bdda5c3031ae6cbd928973 (diff) | |
parent | 9813858908da2e7deaa58d91f4030e6881e364e3 (diff) |
Merge remote-tracking branch 'upstream/master' into fix-stream-compression-config-interface
Diffstat (limited to 'src/core/ext/filters')
34 files changed, 903 insertions, 1120 deletions
diff --git a/src/core/ext/filters/client_channel/OWNERS b/src/core/ext/filters/client_channel/OWNERS index 773bc73179..8f5e92808e 100644 --- a/src/core/ext/filters/client_channel/OWNERS +++ b/src/core/ext/filters/client_channel/OWNERS @@ -1,4 +1,4 @@ set noparent @markdroth @dgquintas -@ctiller +@a11r diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc index bfc549e709..906a72b662 100644 --- a/src/core/ext/filters/client_channel/backup_poller.cc +++ b/src/core/ext/filters/client_channel/backup_poller.cc @@ -23,17 +23,18 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/support/env.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" #define DEFAULT_POLL_INTERVAL_MS 5000 -typedef struct backup_poller { +namespace { +struct backup_poller { grpc_timer polling_timer; grpc_closure run_poller_closure; grpc_closure shutdown_closure; @@ -42,7 +43,8 @@ typedef struct backup_poller { bool shutting_down; // guarded by pollset_mu gpr_refcount refs; gpr_refcount shutdown_refs; -} backup_poller; +}; +} // namespace static gpr_once g_once = GPR_ONCE_INIT; static gpr_mu g_poller_mu; diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index 20693ba419..a827aa30ec 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -58,7 +58,8 @@ typedef enum { CALLING_BACK_AND_FINISHED, } callback_phase; -typedef struct { +namespace { +struct state_watcher { gpr_mu mu; callback_phase phase; grpc_closure on_complete; @@ -71,7 +72,8 @@ typedef struct { grpc_channel* channel; grpc_error* error; void* tag; -} state_watcher; +}; +} // namespace static void delete_state_watcher(state_watcher* w) { grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index e99022a91b..a8a7a37be0 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -41,12 +41,12 @@ #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -553,6 +553,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); + grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } chand->lb_policy = new_lb_policy; @@ -658,6 +659,7 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); + grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); chand->lb_policy = nullptr; } @@ -792,6 +794,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); + grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } gpr_free(chand->info_lb_policy_name); @@ -852,12 +855,10 @@ typedef struct client_channel_call_data { grpc_subchannel_call* subchannel_call; grpc_error* error; - grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending. + grpc_lb_policy_pick_state pick; grpc_closure lb_pick_closure; grpc_closure lb_pick_cancel_closure; - grpc_connected_subchannel* connected_subchannel; - grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity* pollent; grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; @@ -866,8 +867,6 @@ typedef struct client_channel_call_data { grpc_transport_stream_op_batch* initial_metadata_batch; - grpc_linked_mdelem lb_token_mdelem; - grpc_closure on_complete; grpc_closure* original_on_complete; } call_data; @@ -1004,17 +1003,17 @@ static void create_subchannel_call_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - const grpc_connected_subchannel_call_args call_args = { - calld->pollent, // pollent - calld->path, // path - calld->call_start_time, // start_time - calld->deadline, // deadline - calld->arena, // arena - calld->subchannel_call_context, // context - calld->call_combiner // call_combiner + const grpc_core::ConnectedSubchannel::CallArgs call_args = { + calld->pollent, // pollent + calld->path, // path + calld->call_start_time, // start_time + calld->deadline, // deadline + calld->arena, // arena + calld->pick.subchannel_call_context, // context + calld->call_combiner // call_combiner }; - grpc_error* new_error = grpc_connected_subchannel_create_call( - calld->connected_subchannel, &call_args, &calld->subchannel_call); + grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -1032,7 +1031,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - if (calld->connected_subchannel == nullptr) { + if (calld->pick.connected_subchannel == nullptr) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); calld->error = error == GRPC_ERROR_NONE @@ -1071,13 +1070,16 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - if (calld->lb_policy != nullptr) { + // Note: chand->lb_policy may have changed since we started our pick, + // in which case we will be cancelling the pick on a policy other than + // the one we started it on. However, this will just be a no-op. + if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", - chand, calld, calld->lb_policy); + chand, calld, chand->lb_policy); } - grpc_lb_policy_cancel_pick_locked( - calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error)); + grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick, + GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); } @@ -1092,9 +1094,6 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", chand, calld); } - GPR_ASSERT(calld->lb_policy != nullptr); - GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); - calld->lb_policy = nullptr; async_pick_done_locked(elem, GRPC_ERROR_REF(error)); } @@ -1128,26 +1127,21 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; } } - const grpc_lb_policy_pick_args inputs = { + calld->pick.initial_metadata = calld->initial_metadata_batch->payload->send_initial_metadata - .send_initial_metadata, - initial_metadata_flags, &calld->lb_token_mdelem}; - // Keep a ref to the LB policy in calld while the pick is pending. - GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); - calld->lb_policy = chand->lb_policy; + .send_initial_metadata; + calld->pick.initial_metadata_flags = initial_metadata_flags; GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); - const bool pick_done = grpc_lb_policy_pick_locked( - chand->lb_policy, &inputs, &calld->connected_subchannel, - calld->subchannel_call_context, nullptr, &calld->lb_pick_closure); + calld->pick.on_complete = &calld->lb_pick_closure; + const bool pick_done = + grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", chand, calld); } - GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); - calld->lb_policy = nullptr; } else { GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); grpc_call_combiner_set_notify_on_cancel( @@ -1289,7 +1283,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - GPR_ASSERT(calld->connected_subchannel == nullptr); + GPR_ASSERT(calld->pick.connected_subchannel == nullptr); if (chand->lb_policy != nullptr) { // We already have an LB policy, so ask it for a pick. if (pick_callback_start_locked(elem)) { @@ -1467,15 +1461,14 @@ static void cc_destroy_call_elem(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_destroy_call"); } - GPR_ASSERT(calld->lb_policy == nullptr); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); - if (calld->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked"); + if (calld->pick.connected_subchannel != nullptr) { + calld->pick.connected_subchannel.reset(); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { - if (calld->subchannel_call_context[i].value != nullptr) { - calld->subchannel_call_context[i].destroy( - calld->subchannel_call_context[i].value); + if (calld->pick.subchannel_call_context[i].value != nullptr) { + calld->pick.subchannel_call_context[i].destroy( + calld->pick.subchannel_call_context[i].value); } } GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 556a3bc6a1..6bfd038887 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -30,11 +30,11 @@ #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker_registry.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/env.h" -#include "src/core/lib/support/string.h" typedef struct http_connect_handshaker { // Base class. Must be first. diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc index 2eafeee702..037c65822a 100644 --- a/src/core/ext/filters/client_channel/http_proxy.cc +++ b/src/core/ext/filters/client_channel/http_proxy.cc @@ -30,9 +30,9 @@ #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/slice/b64.h" -#include "src/core/lib/support/env.h" -#include "src/core/lib/support/string.h" /** * Parses the 'http_proxy' env var and returns the proxy hostname to resolve or diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 7a5a8dec34..cc4fe7ec62 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -19,8 +19,6 @@ #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/lib/iomgr/combiner.h" -#define WEAK_REF_BITS 16 - grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount( false, "lb_policy_refcount"); @@ -28,91 +26,60 @@ void grpc_lb_policy_init(grpc_lb_policy* policy, const grpc_lb_policy_vtable* vtable, grpc_combiner* combiner) { policy->vtable = vtable; - gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); + gpr_ref_init(&policy->refs, 1); policy->interested_parties = grpc_pollset_set_create(); policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy"); } #ifndef NDEBUG -#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason -#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char* purpose -#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason -#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose +void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line, + const char* reason) { + if (grpc_trace_lb_policy_refcount.enabled()) { + gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "LB_POLICY:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy, + old_refs, old_refs + 1, reason); + } #else -#define REF_FUNC_EXTRA_ARGS -#define REF_MUTATE_EXTRA_ARGS -#define REF_FUNC_PASS_ARGS(new_reason) -#define REF_MUTATE_PASS_ARGS(x) +void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) { #endif + gpr_ref(&lb_policy->refs); +} -static gpr_atm ref_mutate(grpc_lb_policy* c, gpr_atm delta, - int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) - : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifndef NDEBUG +void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line, + const char* reason) { if (grpc_trace_lb_policy_refcount.enabled()) { + gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "LB_POLICY: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, - purpose, old_val, old_val + delta, reason); + "LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy, + old_refs, old_refs - 1, reason); } +#else +void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) { #endif - return old_val; -} - -void grpc_lb_policy_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { - ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); -} - -static void shutdown_locked(void* arg, grpc_error* error) { - grpc_lb_policy* policy = (grpc_lb_policy*)arg; - policy->vtable->shutdown_locked(policy); - GRPC_LB_POLICY_WEAK_UNREF(policy, "strong-unref"); -} - -void grpc_lb_policy_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = - ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS), - 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); - gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); - gpr_atm check = 1 << WEAK_REF_BITS; - if ((old_val & mask) == check) { - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(shutdown_locked, policy, - grpc_combiner_scheduler(policy->combiner)), - GRPC_ERROR_NONE); - } else { - grpc_lb_policy_weak_unref(policy REF_FUNC_PASS_ARGS("strong-unref")); + if (gpr_unref(&lb_policy->refs)) { + grpc_pollset_set_destroy(lb_policy->interested_parties); + grpc_combiner* combiner = lb_policy->combiner; + lb_policy->vtable->destroy(lb_policy); + GRPC_COMBINER_UNREF(combiner, "lb_policy"); } } -void grpc_lb_policy_weak_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { - ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); -} - -void grpc_lb_policy_weak_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = - ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); - if (old_val == 1) { - grpc_pollset_set_destroy(policy->interested_parties); - grpc_combiner* combiner = policy->combiner; - policy->vtable->destroy(policy); - GRPC_COMBINER_UNREF(combiner, "lb_policy"); - } +void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, + grpc_lb_policy* new_policy) { + policy->vtable->shutdown_locked(policy, new_policy); } int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, - void** user_data, grpc_closure* on_complete) { - return policy->vtable->pick_locked(policy, pick_args, target, context, - user_data, on_complete); + grpc_lb_policy_pick_state* pick) { + return policy->vtable->pick_locked(policy, pick); } void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error) { - policy->vtable->cancel_pick_locked(policy, target, error); + policy->vtable->cancel_pick_locked(policy, pick, error); } void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 3572c97ed1..e19726efb3 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -20,6 +20,7 @@ #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/gpr++/ref_counted_ptr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" @@ -33,7 +34,7 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount; struct grpc_lb_policy { const grpc_lb_policy_vtable* vtable; - gpr_atm ref_pair; + gpr_refcount refs; /* owned pointer to interested parties in load balancing decisions */ grpc_pollset_set* interested_parties; /* combiner under which lb_policy actions take place */ @@ -42,32 +43,42 @@ struct grpc_lb_policy { grpc_closure* request_reresolution; }; -/** Extra arguments for an LB pick */ -typedef struct grpc_lb_policy_pick_args { - /** Initial metadata associated with the picking call. */ +/// State used for an LB pick. +typedef struct grpc_lb_policy_pick_state { + /// Initial metadata associated with the picking call. grpc_metadata_batch* initial_metadata; - /** Bitmask used for selective cancelling. See \a - * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in - * grpc_types.h */ + /// Bitmask used for selective cancelling. See \a + /// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in + /// grpc_types.h. uint32_t initial_metadata_flags; - /** Storage for LB token in \a initial_metadata, or NULL if not used */ - grpc_linked_mdelem* lb_token_mdelem_storage; -} grpc_lb_policy_pick_args; + /// Storage for LB token in \a initial_metadata, or NULL if not used. + grpc_linked_mdelem lb_token_mdelem_storage; + /// Closure to run when pick is complete, if not completed synchronously. + grpc_closure* on_complete; + /// Will be set to the selected subchannel, or nullptr on failure or when + /// the LB policy decides to drop the call. + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; + /// Will be populated with context to pass to the subchannel call, if needed. + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; + /// Upon success, \a *user_data will be set to whatever opaque information + /// may need to be propagated from the LB policy, or NULL if not needed. + void** user_data; + /// Next pointer. For internal use by LB policy. + struct grpc_lb_policy_pick_state* next; +} grpc_lb_policy_pick_state; struct grpc_lb_policy_vtable { void (*destroy)(grpc_lb_policy* policy); - void (*shutdown_locked)(grpc_lb_policy* policy); + + /// \see grpc_lb_policy_shutdown_locked(). + void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy); /** \see grpc_lb_policy_pick */ - int (*pick_locked)(grpc_lb_policy* policy, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, void** user_data, - grpc_closure* on_complete); + int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick); /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick_locked)(grpc_lb_policy* policy, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error); /** \see grpc_lb_policy_cancel_picks */ @@ -103,37 +114,19 @@ struct grpc_lb_policy_vtable { }; #ifndef NDEBUG - -/* Strong references: the policy will shutdown when they reach zero */ #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(p, r) \ grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) - -/* Weak references: they don't prevent the shutdown of the LB policy. When no - * strong references are left but there are still weak ones, shutdown is called. - * Once the weak reference also reaches zero, the LB policy is destroyed. */ -#define GRPC_LB_POLICY_WEAK_REF(p, r) \ - grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_LB_POLICY_WEAK_UNREF(p, r) \ - grpc_lb_policy_weak_unref((p), __FILE__, __LINE__, (r)) void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line, const char* reason); void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line, const char* reason); -void grpc_lb_policy_weak_ref(grpc_lb_policy* policy, const char* file, int line, - const char* reason); -void grpc_lb_policy_weak_unref(grpc_lb_policy* policy, const char* file, - int line, const char* reason); -#else +#else // !NDEBUG #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) -#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p)) -#define GRPC_LB_POLICY_WEAK_UNREF(p, r) grpc_lb_policy_weak_unref((p)) void grpc_lb_policy_ref(grpc_lb_policy* policy); void grpc_lb_policy_unref(grpc_lb_policy* policy); -void grpc_lb_policy_weak_ref(grpc_lb_policy* policy); -void grpc_lb_policy_weak_unref(grpc_lb_policy* policy); #endif /** called by concrete implementations to initialize the base struct */ @@ -141,40 +134,37 @@ void grpc_lb_policy_init(grpc_lb_policy* policy, const grpc_lb_policy_vtable* vtable, grpc_combiner* combiner); -/** Finds an appropriate subchannel for a call, based on \a pick_args. - - \a target will be set to the selected subchannel, or NULL on failure - or when the LB policy decides to drop the call. +/// Shuts down \a policy. +/// If \a new_policy is non-null, any pending picks will be restarted +/// on that policy; otherwise, they will be failed. +void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, + grpc_lb_policy* new_policy); - Upon success, \a user_data will be set to whatever opaque information - may need to be propagated from the LB policy, or NULL if not needed. - \a context will be populated with context to pass to the subchannel - call, if needed. +/** Finds an appropriate subchannel for a call, based on data in \a pick. + \a pick must remain alive until the pick is complete. If the pick succeeds and a result is known immediately, a non-zero - value will be returned. Otherwise, \a on_complete will be invoked + value will be returned. Otherwise, \a pick->on_complete will be invoked once the pick is complete with its error argument set to indicate success or failure. Any IO should be done under the \a interested_parties \a grpc_pollset_set in the \a grpc_lb_policy struct. */ int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, - void** user_data, grpc_closure* on_complete); + grpc_lb_policy_pick_state* pick); -/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) +/** Perform a connected subchannel ping (see \a + grpc_core::ConnectedSubchannel::Ping) against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, grpc_closure* on_initiate, grpc_closure* on_ack); -/** Cancel picks for \a target. +/** Cancel picks for \a pick. The \a on_complete callback of the pending picks will be invoked with \a *target set to NULL. */ void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error); /** Cancel all pending picks for which their \a initial_metadata_flags (as given diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index 3eedb08ecc..1708d81e61 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -32,7 +32,8 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, static void destroy_channel_elem(grpc_channel_element* elem) {} -typedef struct { +namespace { +struct call_data { // Stats object to update. grpc_grpclb_client_stats* client_stats; // State for intercepting send_initial_metadata. @@ -43,7 +44,8 @@ typedef struct { grpc_closure recv_initial_metadata_ready; grpc_closure* original_recv_initial_metadata_ready; bool recv_initial_metadata_succeeded; -} call_data; +}; +} // namespace static void on_complete_for_send(void* arg, grpc_error* error) { call_data* calld = (call_data*)arg; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 3c64213fb9..6c29cd8218 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -54,7 +54,7 @@ * operations in progress over the old RR instance. This is done by * decreasing the reference count on the old policy. The moment no more * references are held on the old RR policy, it'll be destroyed and \a - * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN + * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * state. At this point we can transition to a new RR instance safely, which * is done once again via \a rr_handover_locked(). * @@ -106,6 +106,8 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -113,13 +115,11 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" -#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -128,185 +128,48 @@ grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb"); -/* add lb_token of selected subchannel (address) to the call's initial - * metadata */ -static grpc_error* initial_metadata_add_lb_token( - grpc_metadata_batch* initial_metadata, - grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - -static void destroy_client_stats(void* arg) { - grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg); -} - -typedef struct wrapped_rr_closure_arg { - /* the closure instance using this struct as argument */ - grpc_closure wrapper_closure; - - /* the original closure. Usually a on_complete/notify cb for pick() and ping() - * calls against the internal RR instance, respectively. */ - grpc_closure* wrapped_closure; - - /* the pick's initial metadata, kept in order to append the LB token for the - * pick */ - grpc_metadata_batch* initial_metadata; - - /* the picked target, used to determine which LB token to add to the pick's - * initial metadata */ - grpc_connected_subchannel** target; - - /* the context to be populated for the subchannel call */ - grpc_call_context_element* context; - - /* Stats for client-side load reporting. Note that this holds a - * reference, which must be either passed on via context or unreffed. */ +struct glb_lb_policy; + +namespace { + +/// Linked list of pending pick requests. It stores all information needed to +/// eventually call (Round Robin's) pick() on them. They mainly stay pending +/// waiting for the RR policy to be created. +/// +/// Note that when a pick is sent to the RR policy, we inject our own +/// on_complete callback, so that we can intercept the result before +/// invoking the original on_complete callback. This allows us to set the +/// LB token metadata and add client_stats to the call context. +/// See \a pending_pick_complete() for details. +struct pending_pick { + // Our on_complete closure and the original one. + grpc_closure on_complete; + grpc_closure* original_on_complete; + // The original pick. + grpc_lb_policy_pick_state* pick; + // Stats for client-side load reporting. Note that this holds a + // reference, which must be either passed on via context or unreffed. grpc_grpclb_client_stats* client_stats; - - /* the LB token associated with the pick */ + // The LB token associated with the pick. This is set via user_data in + // the pick. grpc_mdelem lb_token; - - /* storage for the lb token initial metadata mdelem */ - grpc_linked_mdelem* lb_token_mdelem_storage; - - /* The RR instance related to the closure */ - grpc_lb_policy* rr_policy; - - /* The grpclb instance that created the wrapping. This instance is not owned, - * reference counts are untouched. It's used only for logging purposes. */ - grpc_lb_policy* glb_policy; - - /* heap memory to be freed upon closure execution. */ - void* free_when_done; -} wrapped_rr_closure_arg; - -/* The \a on_complete closure passed as part of the pick requires keeping a - * reference to its associated round robin instance. We wrap this closure in - * order to unref the round robin instance upon its invocation */ -static void wrapped_rr_closure(void* arg, grpc_error* error) { - wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg; - - GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); - - if (wc_arg->rr_policy != nullptr) { - /* if *target is nullptr, no pick has been made by the RR policy (eg, all - * addresses failed to connect). There won't be any user_data/token - * available */ - if (*wc_arg->target != nullptr) { - if (!GRPC_MDISNULL(wc_arg->lb_token)) { - initial_metadata_add_lb_token(wc_arg->initial_metadata, - wc_arg->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token)); - } else { - gpr_log( - GPR_ERROR, - "[grpclb %p] No LB token for connected subchannel pick %p (from RR " - "instance %p).", - wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy); - abort(); - } - // Pass on client stats via context. Passes ownership of the reference. - GPR_ASSERT(wc_arg->client_stats != nullptr); - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; - } else { - grpc_grpclb_client_stats_unref(wc_arg->client_stats); - } - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy, - wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure"); - } - GPR_ASSERT(wc_arg->free_when_done != nullptr); - gpr_free(wc_arg->free_when_done); -} - -/* Linked list of pending pick requests. It stores all information needed to - * eventually call (Round Robin's) pick() on them. They mainly stay pending - * waiting for the RR policy to be created/updated. - * - * One particularity is the wrapping of the user-provided \a on_complete closure - * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in - * order to correctly unref the RR policy instance upon completion of the pick. - * See \a wrapped_rr_closure for details. */ -typedef struct pending_pick { + // The grpclb instance that created the wrapping. This instance is not owned, + // reference counts are untouched. It's used only for logging purposes. + glb_lb_policy* glb_policy; + // Next pending pick. struct pending_pick* next; +}; - /* original pick()'s arguments */ - grpc_lb_policy_pick_args pick_args; - - /* output argument where to store the pick()ed connected subchannel, or - * nullptr upon error. */ - grpc_connected_subchannel** target; - - /* args for wrapped_on_complete */ - wrapped_rr_closure_arg wrapped_on_complete_arg; -} pending_pick; - -static void add_pending_pick(pending_pick** root, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, - grpc_closure* on_complete) { - pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); - pp->next = *root; - pp->pick_args = *pick_args; - pp->target = target; - pp->wrapped_on_complete_arg.wrapped_closure = on_complete; - pp->wrapped_on_complete_arg.target = target; - pp->wrapped_on_complete_arg.context = context; - pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; - pp->wrapped_on_complete_arg.lb_token_mdelem_storage = - pick_args->lb_token_mdelem_storage; - pp->wrapped_on_complete_arg.free_when_done = pp; - GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure, - wrapped_rr_closure, &pp->wrapped_on_complete_arg, - grpc_schedule_on_exec_ctx); - *root = pp; -} - -/* Same as the \a pending_pick struct but for ping operations */ -typedef struct pending_ping { +/// A linked list of pending pings waiting for the RR policy to be created. +struct pending_ping { + grpc_closure* on_initiate; + grpc_closure* on_ack; struct pending_ping* next; +}; - /* args for sending the ping */ - wrapped_rr_closure_arg* on_initiate; - wrapped_rr_closure_arg* on_ack; -} pending_ping; - -static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate, - grpc_closure* on_ack) { - pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); - if (on_initiate != nullptr) { - pping->on_initiate = - (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate)); - pping->on_initiate->wrapped_closure = on_initiate; - pping->on_initiate->free_when_done = pping->on_initiate; - GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure, - &pping->on_initiate, grpc_schedule_on_exec_ctx); - } - if (on_ack != nullptr) { - pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack)); - pping->on_ack->wrapped_closure = on_ack; - pping->on_ack->free_when_done = pping->on_ack; - GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure, - &pping->on_ack, grpc_schedule_on_exec_ctx); - } - pping->next = *root; - *root = pping; -} +} // namespace -/* - * glb_lb_policy - */ -typedef struct rr_connectivity_data rr_connectivity_data; - -typedef struct glb_lb_policy { +struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -331,6 +194,9 @@ typedef struct glb_lb_policy { /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy* rr_policy; + grpc_closure on_rr_connectivity_changed; + grpc_connectivity_state rr_connectivity_state; + bool started_picking; /** our connectivity state tracker */ @@ -365,11 +231,11 @@ typedef struct glb_lb_policy { /** are we already watching the LB channel's connectivity? */ bool watching_lb_channel; - /** is \a lb_call_retry_timer active? */ - bool retry_timer_active; + /** is the callback associated with \a lb_call_retry_timer pending? */ + bool retry_timer_callback_pending; - /** is \a lb_fallback_timer active? */ - bool fallback_timer_active; + /** is the callback associated with \a lb_fallback_timer pending? */ + bool fallback_timer_callback_pending; /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; @@ -377,6 +243,9 @@ typedef struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ + /* Finished sending initial request. */ + grpc_closure lb_on_sent_initial_request; + /* Status from the LB server has been received. This signals the end of the LB * call. */ grpc_closure lb_on_server_status_received; @@ -408,7 +277,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_backoff lb_call_backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -416,6 +285,7 @@ typedef struct glb_lb_policy { /** LB fallback timer */ grpc_timer lb_fallback_timer; + bool initial_request_sent; bool seen_initial_response; /* Stats for client-side load reporting. Should be unreffed and @@ -424,22 +294,94 @@ typedef struct glb_lb_policy { /* Interval and timer for next client load report. */ grpc_millis client_stats_report_interval; grpc_timer client_load_report_timer; - bool client_load_report_timer_pending; + bool client_load_report_timer_callback_pending; bool last_client_load_report_counters_were_zero; /* Closure used for either the load report timer or the callback for * completion of sending the load report. */ grpc_closure client_load_report_closure; /* Client load report message payload. */ grpc_byte_buffer* client_load_report_payload; -} glb_lb_policy; - -/* Keeps track and reacts to changes in connectivity of the RR instance */ -struct rr_connectivity_data { - grpc_closure on_change; - grpc_connectivity_state state; - glb_lb_policy* glb_policy; }; +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static grpc_error* initial_metadata_add_lb_token( + grpc_metadata_batch* initial_metadata, + grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != nullptr); + GPR_ASSERT(!GRPC_MDISNULL(lb_token)); + return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + +static void destroy_client_stats(void* arg) { + grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg); +} + +static void pending_pick_set_metadata_and_context(pending_pick* pp) { + /* if connected_subchannel is nullptr, no pick has been made by the RR + * policy (e.g., all addresses failed to connect). There won't be any + * user_data/token available */ + if (pp->pick->connected_subchannel != nullptr) { + if (!GRPC_MDISNULL(pp->lb_token)) { + initial_metadata_add_lb_token(pp->pick->initial_metadata, + &pp->pick->lb_token_mdelem_storage, + GRPC_MDELEM_REF(pp->lb_token)); + } else { + gpr_log(GPR_ERROR, + "[grpclb %p] No LB token for connected subchannel pick %p", + pp->glb_policy, pp->pick); + abort(); + } + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(pp->client_stats != nullptr); + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = + pp->client_stats; + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = + destroy_client_stats; + } else { + if (pp->client_stats != nullptr) { + grpc_grpclb_client_stats_unref(pp->client_stats); + } + } +} + +/* The \a on_complete closure passed as part of the pick requires keeping a + * reference to its associated round robin instance. We wrap this closure in + * order to unref the round robin instance upon its invocation */ +static void pending_pick_complete(void* arg, grpc_error* error) { + pending_pick* pp = (pending_pick*)arg; + pending_pick_set_metadata_and_context(pp); + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); + gpr_free(pp); +} + +static pending_pick* pending_pick_create(glb_lb_policy* glb_policy, + grpc_lb_policy_pick_state* pick) { + pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); + pp->pick = pick; + pp->glb_policy = glb_policy; + GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp, + grpc_schedule_on_exec_ctx); + pp->original_on_complete = pick->on_complete; + pp->pick->on_complete = &pp->on_complete; + return pp; +} + +static void pending_pick_add(pending_pick** root, pending_pick* new_pp) { + new_pp->next = *root; + *root = new_pp; +} + +static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate, + grpc_closure* on_ack) { + pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); + pping->on_initiate = on_initiate; + pping->on_ack = on_ack; + pping->next = *root; + *root = pping; +} + static bool is_server_valid(const grpc_grpclb_server* server, size_t idx, bool log) { if (server->drop) return false; @@ -551,7 +493,6 @@ static grpc_lb_addresses* process_serverlist_locked( gpr_free(uri); user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, false /* is_balancer */, nullptr /* balancer_name */, user_data); @@ -592,7 +533,6 @@ static void update_lb_connectivity_status_locked( grpc_error* rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); - /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. * @@ -622,7 +562,6 @@ static void update_lb_connectivity_status_locked( * * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (rr_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: @@ -633,7 +572,6 @@ static void update_lb_connectivity_status_locked( case GRPC_CHANNEL_READY: GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); } - if (grpc_lb_glb_trace.enabled()) { gpr_log( GPR_INFO, @@ -651,10 +589,8 @@ static void update_lb_connectivity_status_locked( * cleanups this callback would otherwise be responsible for. * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ -static bool pick_from_internal_rr_locked( - glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args, - bool force_async, grpc_connected_subchannel** target, - wrapped_rr_closure_arg* wc_arg) { +static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, + bool force_async, pending_pick* pp) { // Check for drops if we are not using fallback backend addresses. if (glb_policy->serverlist != nullptr) { // Look at the index into the serverlist to see if we should drop this call. @@ -664,57 +600,36 @@ static bool pick_from_internal_rr_locked( glb_policy->serverlist_index = 0; // Wrap-around. } if (server->drop) { - // Not using the RR policy, so unref it. - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy, - wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); // Update client load reporting stats to indicate the number of // dropped calls. Note that we have to do this here instead of in // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - GPR_ASSERT(wc_arg->client_stats != nullptr); + GPR_ASSERT(glb_policy->client_stats != nullptr); grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, wc_arg->client_stats); - grpc_grpclb_client_stats_unref(wc_arg->client_stats); + server->load_balance_token, glb_policy->client_stats); if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); - gpr_free(wc_arg->free_when_done); + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + gpr_free(pp); return false; } - gpr_free(wc_arg->free_when_done); + gpr_free(pp); return true; } } + // Set client_stats and user_data. + pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); + GPR_ASSERT(pp->pick->user_data == nullptr); + pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. - const bool pick_done = grpc_lb_policy_pick_locked( - wc_arg->rr_policy, pick_args, target, wc_arg->context, - (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure); + bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick); if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy, - wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); - /* add the load reporting initial metadata */ - initial_metadata_add_lb_token(pick_args->initial_metadata, - pick_args->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token)); - // Pass on client stats via context. Passes ownership of the reference. - GPR_ASSERT(wc_arg->client_stats != nullptr); - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + pending_pick_set_metadata_and_context(pp); if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); - gpr_free(wc_arg->free_when_done); - return false; + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + pick_done = false; } - gpr_free(wc_arg->free_when_done); + gpr_free(pp); } /* else, the pending pick will be registered and taken care of by the * pending pick list inside the RR policy (glb_policy->rr_policy). @@ -756,7 +671,7 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) { gpr_free(args); } -static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error); +static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error); static void create_rr_locked(glb_lb_policy* glb_policy, grpc_lb_policy_args* args) { GPR_ASSERT(glb_policy->rr_policy == nullptr); @@ -778,72 +693,46 @@ static void create_rr_locked(glb_lb_policy* glb_policy, glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; - const grpc_connectivity_state rr_state = - grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, - &rr_state_error); + glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked( + glb_policy->rr_policy, &rr_state_error); /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error); + update_lb_connectivity_status_locked( + glb_policy, glb_policy->rr_connectivity_state, rr_state_error); /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); - - /* Allocate the data for the tracking of the new RR policy's connectivity. - * It'll be deallocated in glb_rr_connectivity_changed() */ - rr_connectivity_data* rr_connectivity = - (rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data)); - GRPC_CLOSURE_INIT(&rr_connectivity->on_change, - glb_rr_connectivity_changed_locked, rr_connectivity, + GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed, + on_rr_connectivity_changed_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = rr_state; - /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); - grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); + GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb"); + grpc_lb_policy_notify_on_state_change_locked( + glb_policy->rr_policy, &glb_policy->rr_connectivity_state, + &glb_policy->on_rr_connectivity_changed); grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); - - /* Update picks and pings in wait */ + // Send pending picks to RR policy. pending_pick* pp; while ((pp = glb_policy->pending_picks)) { glb_policy->pending_picks = pp->next; - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); - pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; - pp->wrapped_on_complete_arg.client_stats = - grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending pick about to (async) PICK from RR %p", glb_policy, glb_policy->rr_policy); } - pick_from_internal_rr_locked(glb_policy, &pp->pick_args, - true /* force_async */, pp->target, - &pp->wrapped_on_complete_arg); + pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp); } - + // Send pending pings to RR policy. pending_ping* pping; while ((pping = glb_policy->pending_pings)) { glb_policy->pending_pings = pping->next; - grpc_closure* on_initiate = nullptr; - grpc_closure* on_ack = nullptr; - if (pping->on_initiate != nullptr) { - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); - pping->on_initiate->rr_policy = glb_policy->rr_policy; - on_initiate = &pping->on_initiate->wrapper_closure; - } - if (pping->on_ack != nullptr) { - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); - pping->on_ack->rr_policy = glb_policy->rr_policy; - on_ack = &pping->on_ack->wrapper_closure; - } if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate, + pping->on_ack); gpr_free(pping); } } @@ -869,31 +758,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) { lb_policy_args_destroy(args); } -static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) { - rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg; - glb_lb_policy* glb_policy = rr_connectivity->glb_policy; +static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) { + glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (glb_policy->shutting_down) { - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); - gpr_free(rr_connectivity); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); return; } - if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) { + if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* An RR policy that has transitioned into the SHUTDOWN connectivity state * should not be considered for picks or updates: the SHUTDOWN state is a * sink, policies can't transition back from it. .*/ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); glb_policy->rr_policy = nullptr; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); - gpr_free(rr_connectivity); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); return; } /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ - update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state, - GRPC_ERROR_REF(error)); - /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); + update_lb_connectivity_status_locked( + glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error)); + /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */ + grpc_lb_policy_notify_on_state_change_locked( + glb_policy->rr_policy, &glb_policy->rr_connectivity_state, + &glb_policy->on_rr_connectivity_changed); } static void destroy_balancer_name(void* balancer_name) { @@ -1001,38 +887,27 @@ static void glb_destroy(grpc_lb_policy* pol) { gpr_free(glb_policy); } -static void glb_shutdown_locked(grpc_lb_policy* pol) { +static void glb_shutdown_locked(grpc_lb_policy* pol, + grpc_lb_policy* new_policy) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; - - /* We need a copy of the lb_call pointer because we can't cancell the call - * while holding glb_policy->mu: lb_on_server_status_received, invoked due to - * the cancel, needs to acquire that same lock */ - grpc_call* lb_call = glb_policy->lb_call; - /* glb_policy->lb_call and this local lb_call must be consistent at this point * because glb_policy->lb_call is only assigned in lb_call_init_locked as part * of query_for_backends_locked, which can only be invoked while * glb_policy->shutting_down is false. */ - if (lb_call != nullptr) { - grpc_call_cancel(lb_call, nullptr); + if (glb_policy->lb_call != nullptr) { + grpc_call_cancel(glb_policy->lb_call, nullptr); /* lb_on_server_status_received will pick up the cancel and clean up */ } - if (glb_policy->retry_timer_active) { + if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); - glb_policy->retry_timer_active = false; } - if (glb_policy->fallback_timer_active) { + if (glb_policy->fallback_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_fallback_timer); - glb_policy->fallback_timer_active = false; } - - pending_pick* pp = glb_policy->pending_picks; - glb_policy->pending_picks = nullptr; - pending_ping* pping = glb_policy->pending_pings; - glb_policy->pending_pings = nullptr; if (glb_policy->rr_policy != nullptr) { + grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr); GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); } else { grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); @@ -1047,28 +922,35 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { } grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "glb_shutdown"); - + // Clear pending picks. + pending_pick* pp = glb_policy->pending_picks; + glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_REF(error)); - gpr_free(pp); + if (new_policy != nullptr) { + // Hand pick over to new policy. + if (pp->client_stats != nullptr) { + grpc_grpclb_client_stats_unref(pp->client_stats); + } + pp->pick->on_complete = pp->original_on_complete; + if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) { + // Synchronous return; schedule callback. + GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE); + } + gpr_free(pp); + } else { + pp->pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); + } pp = next; } - + // Clear pending pings. + pending_ping* pping = glb_policy->pending_pings; + glb_policy->pending_pings = nullptr; while (pping != nullptr) { pending_ping* next = pping->next; - if (pping->on_initiate != nullptr) { - GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure, - GRPC_ERROR_REF(error)); - gpr_free(pping->on_initiate); - } - if (pping->on_ack != nullptr) { - GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure, - GRPC_ERROR_REF(error)); - gpr_free(pping->on_ack); - } + GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error)); gpr_free(pping); pping = next; } @@ -1086,16 +968,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // we invoke the completion closure and set *target to nullptr right here. static void glb_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; pending_pick* pp = glb_policy->pending_picks; glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - if (pp->target == target) { - *target = nullptr; - GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, + if (pp->pick == pick) { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1105,7 +987,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, pp = next; } if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target, + grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); @@ -1130,9 +1012,9 @@ static void glb_cancel_picks_locked(grpc_lb_policy* pol, glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == + if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1154,20 +1036,21 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy); static void start_picking_locked(glb_lb_policy* glb_policy) { /* start a timer to fall back */ if (glb_policy->lb_fallback_timeout_ms > 0 && - glb_policy->serverlist == nullptr && !glb_policy->fallback_timer_active) { + glb_policy->serverlist == nullptr && + !glb_policy->fallback_timer_callback_pending) { grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->fallback_timer_active = true; + glb_policy->fallback_timer_callback_pending = true; grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, &glb_policy->lb_on_fallback); } glb_policy->started_picking = true; - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(glb_policy); } @@ -1179,19 +1062,9 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) { } static int glb_pick_locked(grpc_lb_policy* pol, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, void** user_data, - grpc_closure* on_complete) { - if (pick_args->lb_token_mdelem_storage == nullptr) { - *target = nullptr; - GRPC_CLOSURE_SCHED(on_complete, - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "No mdelem storage for the LB token. Load reporting " - "won't work without it. Failing")); - return 0; - } + grpc_lb_policy_pick_state* pick) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; + pending_pick* pp = pending_pick_create(glb_policy, pick); bool pick_done = false; if (glb_policy->rr_policy != nullptr) { const grpc_connectivity_state rr_connectivity_state = @@ -1199,7 +1072,7 @@ static int glb_pick_locked(grpc_lb_policy* pol, nullptr); // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the // callback registered to capture this event - // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We + // (on_rr_connectivity_changed_locked) may not have been invoked yet. We // need to make sure we aren't trying to pick from a RR policy instance // that's in shutdown. if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { @@ -1209,32 +1082,16 @@ static int glb_pick_locked(grpc_lb_policy* pol, glb_policy, glb_policy->rr_policy, grpc_connectivity_state_name(rr_connectivity_state)); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, - on_complete); + pending_pick_add(&glb_policy->pending_picks, pp); pick_done = false; } else { // RR not in shutdown if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, glb_policy->rr_policy); } - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - wrapped_rr_closure_arg* wc_arg = - (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg)); - GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, - grpc_schedule_on_exec_ctx); - wc_arg->rr_policy = glb_policy->rr_policy; - wc_arg->target = target; - wc_arg->context = context; GPR_ASSERT(glb_policy->client_stats != nullptr); - wc_arg->client_stats = - grpc_grpclb_client_stats_ref(glb_policy->client_stats); - wc_arg->wrapped_closure = on_complete; - wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; - wc_arg->initial_metadata = pick_args->initial_metadata; - wc_arg->free_when_done = wc_arg; - wc_arg->glb_policy = pol; - pick_done = pick_from_internal_rr_locked( - glb_policy, pick_args, false /* force_async */, target, wc_arg); + pick_done = + pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp); } } else { // glb_policy->rr_policy == NULL if (grpc_lb_glb_trace.enabled()) { @@ -1242,8 +1099,7 @@ static int glb_pick_locked(grpc_lb_policy* pol, "[grpclb %p] No RR policy. Adding to grpclb's pending picks", glb_policy); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, - on_complete); + pending_pick_add(&glb_policy->pending_picks, pp); if (!glb_policy->started_picking) { start_picking_locked(glb_policy); } @@ -1265,7 +1121,7 @@ static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (glb_policy->rr_policy) { grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); } else { - add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); + pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack); if (!glb_policy->started_picking) { start_picking_locked(glb_policy); } @@ -1282,7 +1138,7 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol, static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - glb_policy->retry_timer_active = false; + glb_policy->retry_timer_callback_pending = false; if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr && error == GRPC_ERROR_NONE) { if (grpc_lb_glb_trace.enabled()) { @@ -1290,43 +1146,42 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { } query_for_backends_locked(glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer"); } static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { if (glb_policy->started_picking && glb_policy->updating_lb_call) { - if (glb_policy->retry_timer_active) { + if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); } if (!glb_policy->shutting_down) start_picking_locked(glb_policy); glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state) - .next_attempt_start_time; + grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime(); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); if (timeout > 0) { gpr_log(GPR_DEBUG, - "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", + "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.", glb_policy, timeout); } else { - gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", + gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.", glb_policy); } } - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->retry_timer_active = true; + glb_policy->retry_timer_callback_pending = true; grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry); } - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "lb_on_server_status_received_locked"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "lb_on_server_status_received_locked"); } static void send_client_load_report_locked(void* arg, grpc_error* error); @@ -1348,8 +1203,8 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) { grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); glb_policy->client_load_report_payload = nullptr; if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); + glb_policy->client_load_report_timer_callback_pending = false; + GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { maybe_restart_lb_call(glb_policy); } @@ -1358,6 +1213,22 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) { schedule_next_client_load_report(glb_policy); } +static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = glb_policy->client_load_report_payload; + GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, + client_load_report_done_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); + if (call_error != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } +} + static bool load_report_counters_are_zero(grpc_grpclb_request* request) { grpc_grpclb_dropped_call_counts* drop_entries = (grpc_grpclb_dropped_call_counts*) @@ -1373,8 +1244,8 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) { static void send_client_load_report_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); + glb_policy->client_load_report_timer_callback_pending = false; + GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { maybe_restart_lb_call(glb_policy); } @@ -1401,22 +1272,15 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) { grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); - // Send load report message. - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = glb_policy->client_load_report_payload; - GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, - client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_call_error call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); - if (call_error != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); - GPR_ASSERT(GRPC_CALL_OK == call_error); + // If we've already sent the initial request, then we can go ahead and send + // the load report. Otherwise, we need to wait until the initial request has + // been sent to send this (see lb_on_sent_initial_request_locked() below). + if (glb_policy->initial_request_sent) { + do_send_client_load_report_locked(glb_policy); } } +static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error); static void lb_on_server_status_received_locked(void* arg, grpc_error* error); static void lb_on_response_received_locked(void* arg, grpc_error* error); static void lb_call_init_locked(glb_lb_policy* glb_policy) { @@ -1456,6 +1320,9 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -1463,13 +1330,16 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_backoff_init(&glb_policy->lb_call_backoff_state, - GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_GRPCLB_RECONNECT_JITTER, - GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + + glb_policy->lb_call_backoff.Init(backoff_options); + glb_policy->initial_request_sent = false; glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; } @@ -1485,7 +1355,7 @@ static void lb_call_destroy_locked(glb_lb_policy* glb_policy) { grpc_byte_buffer_destroy(glb_policy->lb_request_payload); grpc_slice_unref_internal(glb_policy->lb_call_status_details); - if (glb_policy->client_load_report_timer_pending) { + if (glb_policy->client_load_report_timer_callback_pending) { grpc_timer_cancel(&glb_policy->client_load_report_timer); } } @@ -1528,8 +1398,11 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops, - (size_t)(op - ops), nullptr); + /* take a ref to be released in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked"); + call_error = grpc_call_start_batch_and_execute( + glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_sent_initial_request); GPR_ASSERT(GRPC_CALL_OK == call_error); op = ops; @@ -1542,10 +1415,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref - * count goes to zero) to be unref'd in lb_on_server_status_received_locked */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, - "lb_on_server_status_received_locked"); + /* take a ref to be released in lb_on_server_status_received_locked() */ + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); @@ -1557,22 +1428,32 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take another weak ref to be unref'd/reused in - * lb_on_response_received_locked */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); + /* take a ref to be unref'd/reused in lb_on_response_received_locked() */ + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } +static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) { + glb_lb_policy* glb_policy = (glb_lb_policy*)arg; + glb_policy->initial_request_sent = true; + // If we attempted to send a client load report before the initial request was + // sent, send the load report now. + if (glb_policy->client_load_report_payload != nullptr) { + do_send_client_load_report_locked(glb_policy); + } + GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked"); +} + static void lb_on_response_received_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op* op = ops; if (glb_policy->lb_response_payload != nullptr) { - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; @@ -1595,11 +1476,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { "client load reporting interval = %" PRIdPTR " milliseconds", glb_policy, glb_policy->client_stats_report_interval); } - /* take a weak ref (won't prevent calling of \a glb_shutdown() if the - * strong ref count goes to zero) to be unref'd in - * send_client_load_report_locked() */ - glb_policy->client_load_report_timer_pending = true; - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); + /* take a ref to be unref'd in send_client_load_report_locked() */ + glb_policy->client_load_report_timer_callback_pending = true; + GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report"); schedule_next_client_load_report(glb_policy); } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, @@ -1647,9 +1526,8 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { /* or dispose of the fallback */ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = nullptr; - if (glb_policy->fallback_timer_active) { + if (glb_policy->fallback_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_fallback_timer); - glb_policy->fallback_timer_active = false; } } /* and update the copy in the glb_lb_policy instance. This @@ -1682,27 +1560,27 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { op->flags = 0; op->reserved = nullptr; op++; - /* reuse the "lb_on_response_received_locked" weak ref taken in + /* reuse the "lb_on_response_received_locked" ref taken in * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } else { - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "lb_on_response_received_locked_shutdown"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "lb_on_response_received_locked_shutdown"); } } else { /* empty payload: call cancelled. */ - /* dispose of the "lb_on_response_received_locked" weak ref taken in + /* dispose of the "lb_on_response_received_locked" ref taken in * query_for_backends_locked() and reused in every reception loop */ - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "lb_on_response_received_locked_empty_payload"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "lb_on_response_received_locked_empty_payload"); } } static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - glb_policy->fallback_timer_active = false; + glb_policy->fallback_timer_callback_pending = false; /* If we receive a serverlist after the timer fires but before this callback * actually runs, don't fall back. */ if (glb_policy->serverlist == nullptr) { @@ -1716,7 +1594,7 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { rr_handover_locked(glb_policy); } } - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { @@ -1737,7 +1615,7 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { // If the load report timer is still pending, we wait for it to be // called before restarting the call. Otherwise, we restart the call // here. - if (!glb_policy->client_load_report_timer_pending) { + if (!glb_policy->client_load_report_timer_callback_pending) { maybe_restart_lb_call(glb_policy); } } @@ -1800,7 +1678,7 @@ static void glb_update_locked(grpc_lb_policy* policy, grpc_channel_get_channel_stack(glb_policy->lb_channel)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); glb_policy->watching_lb_channel = true; - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); + GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity"); grpc_client_channel_watch_connectivity_state( client_channel_elem, grpc_polling_entity_create_from_pollset_set( @@ -1847,9 +1725,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, // lb_on_server_status_received() will pick up the cancel and reinit // lb_call. } else if (glb_policy->started_picking) { - if (glb_policy->retry_timer_active) { + if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); - glb_policy->retry_timer_active = false; } start_picking_locked(glb_policy); } @@ -1857,8 +1734,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, case GRPC_CHANNEL_SHUTDOWN: done: glb_policy->watching_lb_channel = false; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "watch_lb_channel_connectivity_cb_shutdown"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "watch_lb_channel_connectivity_cb_shutdown"); break; } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc index a8ecea4212..1e7f34bdc7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc @@ -22,8 +22,8 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/support/string.h" grpc_channel* grpc_lb_policy_grpclb_create_lb_channel( const char* lb_service_target_addresses, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc index 76bcddf945..15233d371c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -22,11 +22,11 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/transport/lb_targets_info.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/string.h" grpc_channel* grpc_lb_policy_grpclb_create_lb_channel( const char* lb_service_target_addresses, diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 0861261359..725b78d478 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -31,13 +31,6 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); -typedef struct pending_pick { - struct pending_pick* next; - uint32_t initial_metadata_flags; - grpc_connected_subchannel** target; - grpc_closure* on_complete; -} pending_pick; - typedef struct { /** base policy: must be first */ grpc_lb_policy base; @@ -52,7 +45,7 @@ typedef struct { /** are we shut down? */ bool shutdown; /** list of picks that are waiting on connectivity */ - pending_pick* pending_picks; + grpc_lb_policy_pick_state* pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; @@ -70,19 +63,27 @@ static void pf_destroy(grpc_lb_policy* pol) { } } -static void pf_shutdown_locked(grpc_lb_policy* pol) { +static void pf_shutdown_locked(grpc_lb_policy* pol, + grpc_lb_policy* new_policy) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } p->shutdown = true; - pending_pick* pp; - while ((pp = p->pending_picks) != nullptr) { - p->pending_picks = pp->next; - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); - gpr_free(pp); + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks) != nullptr) { + p->pending_picks = pick->next; + if (new_policy != nullptr) { + // Hand off to new LB policy. + if (grpc_lb_policy_pick_locked(new_policy, pick)) { + // Synchronous return, schedule closure. + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } + } else { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); + } } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); @@ -102,19 +103,18 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) { } static void pf_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pp = p->pending_picks; p->pending_picks = nullptr; while (pp != nullptr) { - pending_pick* next = pp->next; - if (pp->target == target) { - *target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, + grpc_lb_policy_pick_state* next = pp->next; + if (pp == pick) { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); - gpr_free(pp); } else { pp->next = p->pending_picks; p->pending_picks = pp; @@ -129,21 +129,20 @@ static void pf_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_eq, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pick = p->pending_picks; p->pending_picks = nullptr; - while (pp != nullptr) { - pending_pick* next = pp->next; - if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + while (pick != nullptr) { + grpc_lb_policy_pick_state* next = pick->next; + if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(pp->on_complete, + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); - gpr_free(pp); } else { - pp->next = p->pending_picks; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; } - pp = next; + pick = next; } GRPC_ERROR_UNREF(error); } @@ -173,27 +172,19 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) { } static int pf_pick_locked(grpc_lb_policy* pol, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, void** user_data, - grpc_closure* on_complete) { + grpc_lb_policy_pick_state* pick) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; // If we have a selected subchannel already, return synchronously. if (p->selected != nullptr) { - *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, - "picked"); + pick->connected_subchannel = p->selected->connected_subchannel; return 1; } // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { start_picking_locked(p); } - pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->on_complete = on_complete; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; return 0; } @@ -225,8 +216,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { - grpc_connected_subchannel_ping(p->selected->connected_subchannel, - on_initiate, on_ack); + p->selected->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -305,8 +295,7 @@ static void pf_update_locked(grpc_lb_policy* policy, subchannel_list->num_subchannels); } if (p->selected->connected_subchannel != nullptr) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "pf_update_includes_selected"); + sd->connected_subchannel = p->selected->connected_subchannel; } p->selected = sd; if (p->subchannel_list != nullptr) { @@ -418,8 +407,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || - sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -427,20 +416,19 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { p->started_picking = false; grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + // in transient failure. Rely on re-resolution to recover. + p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel( + sd, "pf_selected_shutdown"); // Unrefs connected subchannel } else { grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); - } - if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); - } else { - p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown"); } } return; @@ -458,6 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. + sd->connected_subchannel = + grpc_subchannel_get_connected_subchannel(sd->subchannel); if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, @@ -468,9 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, @@ -479,18 +466,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Drop all other subchannels, since we are now connected. destroy_unselected_subchannels_locked(p); // Update any calls that were waiting for a pick. - pending_pick* pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks)) { + p->pending_picks = pick->next; + pick->connected_subchannel = p->selected->connected_subchannel; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", (void*)p->selected); } - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); @@ -529,39 +514,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } - case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); - // Advance to next subchannel and check its state. - grpc_lb_subchannel_data* original_sd = sd; - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr && sd != original_sd); - if (sd == original_sd) { - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_exhausted_subchannels"); - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, - "exhausted_subchannels+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, - GRPC_ERROR_NONE); - } - } else { - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } - } + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(break); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index b0c84017df..e217a0b0c0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -34,6 +34,7 @@ #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gpr++/ref_counted_ptr.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -41,29 +42,6 @@ grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); -/** List of entities waiting for a pick. - * - * Once a pick is available, \a target is updated and \a on_complete called. */ -typedef struct pending_pick { - struct pending_pick* next; - - /* output argument where to store the pick()ed user_data. It'll be NULL if no - * such data is present or there's an error (the definite test for errors is - * \a target being NULL). */ - void** user_data; - - /* bitmask passed to pick() and used for selective cancelling. See - * grpc_lb_policy_cancel_picks() */ - uint32_t initial_metadata_flags; - - /* output argument where to store the pick()ed connected subchannel, or NULL - * upon error. */ - grpc_connected_subchannel** target; - - /* to be invoked once the pick() has completed (regardless of success) */ - grpc_closure* on_complete; -} pending_pick; - typedef struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -75,7 +53,7 @@ typedef struct round_robin_lb_policy { /** are we shutting down? */ bool shutdown; /** List of picks that are waiting on connectivity */ - pending_pick* pending_picks; + grpc_lb_policy_pick_state* pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; @@ -150,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, (void*)p, (unsigned long)last_ready_index, (void*)p->subchannel_list->subchannels[last_ready_index].subchannel, (void*)p->subchannel_list->subchannels[last_ready_index] - .connected_subchannel); + .connected_subchannel.get()); } } @@ -167,19 +145,27 @@ static void rr_destroy(grpc_lb_policy* pol) { gpr_free(p); } -static void rr_shutdown_locked(grpc_lb_policy* pol) { +static void rr_shutdown_locked(grpc_lb_policy* pol, + grpc_lb_policy* new_policy) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); } p->shutdown = true; - pending_pick* pp; - while ((pp = p->pending_picks) != nullptr) { - p->pending_picks = pp->next; - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); - gpr_free(pp); + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks) != nullptr) { + p->pending_picks = pick->next; + if (new_policy != nullptr) { + // Hand off to new LB policy. + if (grpc_lb_policy_pick_locked(new_policy, pick)) { + // Synchronous return; schedule callback. + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } + } else { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); + } } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); @@ -199,19 +185,18 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) { } static void rr_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pp = p->pending_picks; p->pending_picks = nullptr; while (pp != nullptr) { - pending_pick* next = pp->next; - if (pp->target == target) { - *target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, + grpc_lb_policy_pick_state* next = pp->next; + if (pp == pick) { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); - gpr_free(pp); } else { pp->next = p->pending_picks; p->pending_picks = pp; @@ -226,22 +211,21 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_eq, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - pending_pick* pp = p->pending_picks; + grpc_lb_policy_pick_state* pick = p->pending_picks; p->pending_picks = nullptr; - while (pp != nullptr) { - pending_pick* next = pp->next; - if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + while (pick != nullptr) { + grpc_lb_policy_pick_state* next = pick->next; + if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(pp->on_complete, + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); - gpr_free(pp); } else { - pp->next = p->pending_picks; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; } - pp = next; + pick = next; } GRPC_ERROR_UNREF(error); } @@ -266,13 +250,10 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) { } static int rr_pick_locked(grpc_lb_policy* pol, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, void** user_data, - grpc_closure* on_complete) { + grpc_lb_policy_pick_state* pick) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol, + gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol, p->shutdown); } GPR_ASSERT(!p->shutdown); @@ -282,18 +263,17 @@ static int rr_pick_locked(grpc_lb_policy* pol, /* readily available, report right away */ grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[next_ready_index]; - *target = - GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); - if (user_data != nullptr) { - *user_data = sd->user_data; + pick->connected_subchannel = sd->connected_subchannel; + if (pick->user_data != nullptr) { + *pick->user_data = sd->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " - "index %lu)", - (void*)p, (void*)sd->subchannel, (void*)*target, - (void*)sd->subchannel_list, (unsigned long)next_ready_index); + "index %" PRIuPTR ")", + p, sd->subchannel, pick->connected_subchannel.get(), + sd->subchannel_list, next_ready_index); } /* only advance the last picked pointer if the selection was used */ update_last_ready_subchannel_index_locked(p, next_ready_index); @@ -304,27 +284,21 @@ static int rr_pick_locked(grpc_lb_policy* pol, if (!p->started_picking) { start_picking_locked(p); } - pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->on_complete = on_complete; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->user_data = user_data; - p->pending_picks = pp; + pick->next = p->pending_picks; + p->pending_picks = pick; return 0; } static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; + GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(subchannel_list->num_transient_failures > 0); --subchannel_list->num_transient_failures; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - GPR_ASSERT(subchannel_list->num_shutdown > 0); - --subchannel_list->num_shutdown; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; @@ -334,8 +308,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++subchannel_list->num_transient_failures; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - ++subchannel_list->num_shutdown; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { ++subchannel_list->num_idle; } @@ -435,6 +407,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // either the current or latest pending subchannel lists. GPR_ASSERT(sd->subchannel_list == p->subchannel_list || sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -442,18 +415,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // Update state counters and new overall state. update_state_counters_locked(sd); update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); - // If the sd's new state is SHUTDOWN, unref the subchannel. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "rr_connectivity_shutdown"); - } else { // sd not in SHUTDOWN - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* + // subchannel, if any. + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + sd->connected_subchannel.reset(); + break; + } + case GRPC_CHANNEL_READY: { if (sd->connected_subchannel == nullptr) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); + sd->connected_subchannel = + grpc_subchannel_get_connected_subchannel(sd->subchannel); } if (sd->subchannel_list != p->subchannel_list) { // promote sd->subchannel_list to p->subchannel_list. @@ -493,13 +465,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // picks, update the last picked pointer update_last_ready_subchannel_index_locked(p, next_ready_index); } - pending_pick* pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - selected->connected_subchannel, "rr_picked"); - if (pp->user_data != nullptr) { - *pp->user_data = selected->user_data; + grpc_lb_policy_pick_state* pick; + while ((pick = p->pending_picks)) { + p->pending_picks = pick->next; + pick->connected_subchannel = selected->connected_subchannel; + if (pick->user_data != nullptr) { + *pick->user_data = selected->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, @@ -508,13 +479,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { (void*)p, (void*)selected->subchannel, (void*)p->subchannel_list, (unsigned long)next_ready_index); } - GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } + break; } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(return ); + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE:; // fallthrough } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -538,10 +513,9 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (next_ready_index < p->subchannel_list->num_subchannels) { grpc_lb_subchannel_data* selected = &p->subchannel_list->subchannels[next_ready_index]; - grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( - selected->connected_subchannel, "rr_ping"); - grpc_connected_subchannel_ping(target, on_initiate, on_ack); - GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target = + selected->connected_subchannel; + target->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index a3b4c8e524..fa2ffcc796 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -42,10 +42,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, } GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); sd->subchannel = nullptr; - if (sd->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason); - sd->connected_subchannel = nullptr; - } + sd->connected_subchannel.reset(); if (sd->user_data != nullptr) { GPR_ASSERT(sd->user_data_vtable != nullptr); sd->user_data_vtable->destroy(sd->user_data); @@ -213,13 +210,13 @@ void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, void grpc_lb_subchannel_list_ref_for_connectivity_watch( grpc_lb_subchannel_list* subchannel_list, const char* reason) { - GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason); + GRPC_LB_POLICY_REF(subchannel_list->policy, reason); grpc_lb_subchannel_list_ref(subchannel_list, reason); } void grpc_lb_subchannel_list_unref_for_connectivity_watch( grpc_lb_subchannel_list* subchannel_list, const char* reason) { - GRPC_LB_POLICY_WEAK_UNREF(subchannel_list->policy, reason); + GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason); grpc_lb_subchannel_list_unref(subchannel_list, reason); } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0f8cea9347..f4e345def6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -22,6 +22,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gpr++/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" // TODO(roth): This code is intended to be shared between pick_first and @@ -43,7 +44,7 @@ typedef struct { grpc_lb_subchannel_list* subchannel_list; /** subchannel itself */ grpc_subchannel* subchannel; - grpc_connected_subchannel* connected_subchannel; + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; /** Is a connectivity notification pending? */ bool connectivity_notification_pending; /** notification that connectivity has changed on subchannel */ diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc index edd0330c6a..8414504e8f 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.cc +++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc @@ -20,7 +20,7 @@ #include <string.h> -#include "src/core/lib/support/string.h" +#include "src/core/lib/gpr/string.h" #define MAX_POLICIES 10 diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc index 39b1237c77..c3309e36a3 100644 --- a/src/core/ext/filters/client_channel/parse_address.cc +++ b/src/core/ext/filters/client_channel/parse_address.cc @@ -29,7 +29,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/support/string.h" +#include "src/core/lib/gpr/string.h" #ifdef GRPC_HAVE_UNIX_SOCKET diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 4ec4477c82..1efdc26d56 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -34,16 +34,16 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" -#include "src/core/lib/support/env.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -89,7 +89,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -131,7 +131,7 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) { static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(r); } } @@ -264,8 +264,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->NextAttemptTime(); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -298,7 +297,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(r); } else { dns_ares_maybe_finish_next_locked(r); @@ -368,11 +367,13 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args, if (args->pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff.Init(grpc_core::BackOff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 40e264504c..2eb2a9b59d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -30,10 +30,10 @@ #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/support/string.h" typedef struct fd_node { /** the owner of this fd node */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 3a870b2d06..2b35bdb605 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -36,12 +36,12 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/support/string.h" static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 77698e97aa..66a03c5a85 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -29,13 +29,13 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/support/env.h" -#include "src/core/lib/support/string.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -70,7 +70,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -106,7 +106,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) { static void dns_channel_saw_error_locked(grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(r); } } @@ -119,7 +119,7 @@ static void dns_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(r); } else { dns_maybe_finish_next_locked(r); @@ -161,8 +161,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(addresses); } else { - grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->NextAttemptTime(); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -244,11 +243,13 @@ static grpc_resolver* dns_create(grpc_resolver_args* args, if (args->pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff.Init(grpc_core::BackOff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index fe3ad1403c..eaa5e6ac49 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -32,13 +32,13 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 7d1e283fa3..99ad78e23c 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -30,12 +30,12 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" typedef struct { /** base class: must be first */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 84a5ace31d..bb43651d0c 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -20,7 +20,9 @@ #include <inttypes.h> #include <limits.h> -#include <string.h> + +#include <algorithm> +#include <cstring> #include <grpc/support/alloc.h> #include <grpc/support/avl.h> @@ -35,6 +37,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" +#include "src/core/lib/gpr++/debug_location.h" +#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -48,19 +52,17 @@ #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 -#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20 +#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \ - &(subchannel)->connected_subchannel))) - -typedef struct { +namespace { +struct state_watcher { grpc_closure closure; grpc_subchannel* subchannel; grpc_connectivity_state connectivity_state; -} state_watcher; +}; +} // namespace typedef struct external_state_watcher { grpc_subchannel* subchannel; @@ -93,7 +95,7 @@ struct grpc_subchannel { grpc_connect_out_args connecting_result; /** callback for connection finishing */ - grpc_closure connected; + grpc_closure on_connected; /** callback for our alarm */ grpc_closure on_alarm; @@ -102,12 +104,13 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; - /** active connection, or null; of type grpc_connected_subchannel */ - gpr_atm connected_subchannel; - /** mutex protecting remaining elements */ gpr_mu mu; + /** active connection, or null; of type grpc_core::ConnectedSubchannel + */ + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; + /** have we seen a disconnection? */ bool disconnected; /** are we connecting */ @@ -118,8 +121,9 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_backoff backoff_state; - grpc_backoff_result backoff_result; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; + grpc_millis next_attempt_deadline; + grpc_millis min_connect_timeout_ms; /** do we have an active alarm? */ bool have_alarm; @@ -130,16 +134,15 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - grpc_connected_subchannel* connection; + grpc_core::ConnectedSubchannel* connection; grpc_closure* schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call*)(callstack)) - 1) -static void subchannel_connected(void* subchannel, grpc_error* error); +static void on_subchannel_connected(void* subchannel, grpc_error* error); #ifndef NDEBUG #define REF_REASON reason @@ -157,20 +160,9 @@ static void subchannel_connected(void* subchannel, grpc_error* error); */ static void connection_destroy(void* arg, grpc_error* error) { - grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg; - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); - gpr_free(c); -} - -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); - return c; -} - -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + grpc_channel_stack* stk = (grpc_channel_stack*)arg; + grpc_channel_stack_destroy(stk); + gpr_free(stk); } /* @@ -237,18 +229,13 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref( } static void disconnect(grpc_subchannel* c) { - grpc_connected_subchannel* con; grpc_subchannel_index_unregister(c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection"); - gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); - } + c->connected_subchannel.reset(); gpr_mu_unlock(&c->mu); } @@ -274,6 +261,54 @@ void grpc_subchannel_weak_unref( } } +static void parse_args_for_backoff_values( + const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, + grpc_millis* min_connect_timeout_ms) { + grpc_millis initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + *min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; + grpc_millis max_backoff_ms = + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; + if (args != nullptr) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, + "grpc.testing.fixed_reconnect_backoff_ms")) { + fixed_reconnect_backoff = true; + initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms = + grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + *min_connect_timeout_ms = grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX}); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); + } + } + } + backoff_options->set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_max_backoff(max_backoff_ms); +} + grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_subchannel_args* args) { grpc_subchannel_key* key = grpc_subchannel_key_create(args); @@ -320,47 +355,14 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, if (new_args != nullptr) grpc_channel_args_destroy(new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c, + GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - int initial_backoff_ms = - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; - int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - bool fixed_reconnect_backoff = false; - if (c->args) { - for (size_t i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff_ms")) { - fixed_reconnect_backoff = true; - initial_backoff_ms = min_backoff_ms = max_backoff_ms = - grpc_channel_arg_get_integer(&c->args->args[i], - {initial_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - min_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {min_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - max_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {max_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - initial_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {initial_backoff_ms, 100, INT_MAX}); - } - } - } - grpc_backoff_init( - &c->backoff_state, initial_backoff_ms, - fixed_reconnect_backoff ? 1.0 - : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - min_backoff_ms, max_backoff_ms); + grpc_core::BackOff::Options backoff_options; + parse_args_for_backoff_values(args->args, &backoff_options, + &c->min_connect_timeout_ms); + c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(key, c); @@ -368,15 +370,16 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, static void continue_connect_locked(grpc_subchannel* c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; - args.deadline = c->backoff_result.current_deadline; + const grpc_millis min_deadline = + c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + c->next_attempt_deadline = c->backoff->NextAttemptTime(); + args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected); + &c->on_connected); } grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, @@ -416,7 +419,6 @@ static void on_alarm(void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = grpc_backoff_step(&c->backoff_state); continue_connect_locked(c); gpr_mu_unlock(&c->mu); } else { @@ -437,7 +439,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { return; } - if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) { + if (c->connected_subchannel != nullptr) { /* Already connected: don't restart */ return; } @@ -452,22 +454,20 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = grpc_backoff_begin(&c->backoff_state); continue_connect_locked(c); } else { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = - c->backoff_result.next_attempt_start_time - - grpc_core::ExecCtx::Get()->Now(); + c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { - gpr_log(GPR_INFO, "Retry immediately"); + gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); } else { - gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); + gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c, + time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); - grpc_timer_init(&c->alarm, c->backoff_result.next_attempt_start_time, - &c->on_alarm); + grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); } } @@ -508,75 +508,56 @@ void grpc_subchannel_notify_on_state_change( } } -void grpc_connected_subchannel_process_transport_op( - grpc_connected_subchannel* con, grpc_transport_op* op) { - grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op); -} - -static void subchannel_on_child_state_changed(void* p, grpc_error* error) { - state_watcher* sw = (state_watcher*)p; - grpc_subchannel* c = sw->subchannel; +static void on_connected_subchannel_connectivity_changed(void* p, + grpc_error* error) { + state_watcher* connected_subchannel_watcher = (state_watcher*)p; + grpc_subchannel* c = connected_subchannel_watcher->subchannel; gpr_mu* mu = &c->mu; gpr_mu_lock(mu); - /* if we failed just leave this closure */ - if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* any errors on a subchannel ==> we're done, create a new one */ - sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - } - grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr, - &sw->connectivity_state, &sw->closure); - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - sw = nullptr; + switch (connected_subchannel_watcher->connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: { + if (!c->disconnected && c->connected_subchannel != nullptr) { + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(GPR_INFO, + "Connected subchannel %p of subchannel %p has gone into %s. " + "Attempting to reconnect.", + c->connected_subchannel.get(), c, + grpc_connectivity_state_name( + connected_subchannel_watcher->connectivity_state)); + } + c->connected_subchannel.reset(); + grpc_connectivity_state_set(&c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "reflect_child"); + c->backoff_begun = false; + c->backoff->Reset(); + maybe_start_connecting_locked(c); + } else { + connected_subchannel_watcher->connectivity_state = + GRPC_CHANNEL_SHUTDOWN; + } + break; + } + default: { + grpc_connectivity_state_set( + &c->state_tracker, connected_subchannel_watcher->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); + c->connected_subchannel->NotifyOnStateChange( + nullptr, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); + connected_subchannel_watcher = nullptr; + } } - gpr_mu_unlock(mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); - gpr_free(sw); -} - -static void connected_subchannel_state_op(grpc_connected_subchannel* con, - grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, - grpc_closure* closure) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); -} - -void grpc_connected_subchannel_notify_on_state_change( - grpc_connected_subchannel* con, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* closure) { - connected_subchannel_state_op(con, interested_parties, state, closure); -} - -void grpc_connected_subchannel_ping(grpc_connected_subchannel* con, - grpc_closure* on_initiate, - grpc_closure* on_ack) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); + gpr_free(connected_subchannel_watcher); } static bool publish_transport_locked(grpc_subchannel* c) { - grpc_connected_subchannel* con; - grpc_channel_stack* stk; - state_watcher* sw_subchannel; - /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( @@ -588,8 +569,9 @@ static bool publish_transport_locked(grpc_subchannel* c) { grpc_channel_stack_builder_destroy(builder); return false; } + grpc_channel_stack* stk; grpc_error* error = grpc_channel_stack_builder_finish( - builder, 0, 1, connection_destroy, nullptr, (void**)&con); + builder, 0, 1, connection_destroy, nullptr, (void**)&stk); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", @@ -597,38 +579,37 @@ static bool publish_transport_locked(grpc_subchannel* c) { GRPC_ERROR_UNREF(error); return false; } - stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel)); - sw_subchannel->subchannel = c; - sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed, - sw_subchannel, grpc_schedule_on_exec_ctx); + state_watcher* connected_subchannel_watcher = + (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher)); + connected_subchannel_watcher->subchannel = c; + connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY; + GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure, + on_connected_subchannel_connectivity_changed, + connected_subchannel_watcher, grpc_schedule_on_exec_ctx); if (c->disconnected) { - gpr_free(sw_subchannel); + gpr_free(connected_subchannel_watcher); grpc_channel_stack_destroy(stk); - gpr_free(con); + gpr_free(stk); return false; } /* publish */ - /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. - I'd have expected the rel_cas below to be enough, but - seemingly it's not. - Re-evaluate if we really need this. */ - gpr_atm_full_barrier(); - GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); + c->connected_subchannel.reset( + grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", + c->connected_subchannel.get(), c); /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - grpc_connected_subchannel_notify_on_state_change( - con, c->pollset_set, &sw_subchannel->connectivity_state, - &sw_subchannel->closure); + c->connected_subchannel->NotifyOnStateChange( + c->pollset_set, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); /* signal completion */ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, @@ -636,11 +617,11 @@ static bool publish_transport_locked(grpc_subchannel* c) { return true; } -static void subchannel_connected(void* arg, grpc_error* error) { +static void on_subchannel_connected(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; - GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); + GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); gpr_mu_lock(&c->mu); c->connecting = false; if (c->connecting_result.transport != nullptr && @@ -675,10 +656,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_connected_subchannel* connection = c->connection; + grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); - GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); + connection->Unref(DEBUG_LOCATION, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -709,9 +690,12 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call, GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( - grpc_subchannel* c) { - return GET_CONNECTED_SUBCHANNEL(c, acq); +grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { + gpr_mu_lock(&c->mu); + auto copy = c->connected_subchannel; + gpr_mu_unlock(&c->mu); + return copy; } const grpc_subchannel_key* grpc_subchannel_get_key( @@ -719,36 +703,6 @@ const grpc_subchannel_key* grpc_subchannel_get_key( return subchannel->key; } -grpc_error* grpc_connected_subchannel_create_call( - grpc_connected_subchannel* con, - const grpc_connected_subchannel_call_args* args, - grpc_subchannel_call** call) { - grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = (grpc_subchannel_call*)gpr_arena_alloc( - args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args->context, /* context */ - args->path, /* path */ - args->start_time, /* start_time */ - args->deadline, /* deadline */ - args->arena, /* arena */ - args->call_combiner /* call_combiner */ - }; - grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy, - *call, &call_args); - if (error != GRPC_ERROR_NONE) { - const char* error_string = grpc_error_string(error); - gpr_log(GPR_ERROR, "error: %s", error_string); - return error; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); - return GRPC_ERROR_NONE; -} - grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); @@ -784,3 +738,64 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } + +namespace grpc_core { +ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) + : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), + channel_stack_(channel_stack) {} + +ConnectedSubchannel::~ConnectedSubchannel() { + GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); +} + +void ConnectedSubchannel::NotifyOnStateChange( + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +void ConnectedSubchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, + grpc_subchannel_call** call) { + *call = (grpc_subchannel_call*)gpr_arena_alloc( + args.arena, + sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); + Ref(DEBUG_LOCATION, "subchannel_call"); + (*call)->connection = this; + const grpc_call_element_args call_args = { + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner /* call_combiner */ + }; + grpc_error* error = grpc_call_stack_init( + channel_stack_, 1, subchannel_call_destroy, *call, &call_args); + if (error != GRPC_ERROR_NONE) { + const char* error_string = grpc_error_string(error); + gpr_log(GPR_ERROR, "error: %s", error_string); + return error; + } + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); + return GRPC_ERROR_NONE; +} +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 9d34fff07a..f2a5c1e273 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -21,8 +21,10 @@ #include "src/core/ext/filters/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/gpr++/ref_counted.h" +#include "src/core/lib/gpr++/ref_counted_ptr.h" +#include "src/core/lib/gpr/arena.h" #include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -32,7 +34,6 @@ /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; -typedef struct grpc_connected_subchannel grpc_connected_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_key grpc_subchannel_key; @@ -48,10 +49,6 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) -#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ - grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) \ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ @@ -65,14 +62,39 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif +namespace grpc_core { +class ConnectedSubchannel : public grpc_core::RefCountedWithTracing { + public: + struct CallArgs { + grpc_polling_entity* pollent; + grpc_slice path; + gpr_timespec start_time; + grpc_millis deadline; + gpr_arena* arena; + grpc_call_context_element* context; + grpc_call_combiner* call_combiner; + }; + + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + ~ConnectedSubchannel(); + + grpc_channel_stack* channel_stack() { return channel_stack_; } + void NotifyOnStateChange(grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, + grpc_closure* closure); + void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); + grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + + private: + grpc_channel_stack* channel_stack_; +}; +} // namespace grpc_core + grpc_subchannel* grpc_subchannel_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel* grpc_subchannel_ref_from_weak_ref( @@ -83,35 +105,11 @@ grpc_subchannel* grpc_subchannel_weak_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a subchannel call */ -typedef struct { - grpc_polling_entity* pollent; - grpc_slice path; - gpr_timespec start_time; - grpc_millis deadline; - gpr_arena* arena; - grpc_call_context_element* context; - grpc_call_combiner* call_combiner; -} grpc_connected_subchannel_call_args; - -grpc_error* grpc_connected_subchannel_create_call( - grpc_connected_subchannel* connected_subchannel, - const grpc_connected_subchannel_call_args* args, - grpc_subchannel_call** subchannel_call); - -/** process a transport level op */ -void grpc_connected_subchannel_process_transport_op( - grpc_connected_subchannel* subchannel, grpc_transport_op* op); - /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel* channel, grpc_error** error); @@ -121,17 +119,12 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( void grpc_subchannel_notify_on_state_change( grpc_subchannel* channel, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* notify); -void grpc_connected_subchannel_notify_on_state_change( - grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify); -void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel, - grpc_closure* on_initiate, - grpc_closure* on_ack); - -/** retrieve the grpc_connected_subchannel - or NULL if called before - the subchannel becomes connected */ -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( - grpc_subchannel* subchannel); + +/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected + * (which may happen before it initially connects or during transient failures) + * */ +grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); /** return the subchannel index key for \a subchannel */ const grpc_subchannel_key* grpc_subchannel_get_key( diff --git a/src/core/ext/filters/client_channel/uri_parser.cc b/src/core/ext/filters/client_channel/uri_parser.cc index 3428f4b54c..c5f2d6822c 100644 --- a/src/core/ext/filters/client_channel/uri_parser.cc +++ b/src/core/ext/filters/client_channel/uri_parser.cc @@ -26,10 +26,10 @@ #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/lib/gpr/string.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" /** a size_t default value... maps to all 1's */ #define NOT_SET (~(size_t)0) diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index a1fb10f5b8..5584d50018 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -20,12 +20,12 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <string.h> +#include "src/core/lib/gpr/string.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" @@ -35,7 +35,8 @@ /* default maximum size of payload eligable for GET request */ static const size_t kMaxPayloadSizeForGet = 2048; -typedef struct call_data { +namespace { +struct call_data { grpc_call_combiner* call_combiner; // State for handling send_initial_metadata ops. grpc_linked_mdelem method; @@ -60,13 +61,14 @@ typedef struct call_data { grpc_closure on_send_message_next_done; grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; -} call_data; +}; -typedef struct channel_data { +struct channel_data { grpc_mdelem static_scheme; grpc_mdelem user_agent; size_t max_payload_size_for_get; -} channel_data; +}; +} // namespace static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index c17830fa7b..0218ec6e40 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -29,23 +29,24 @@ #include "src/core/lib/compression/algorithm_metadata.h" #include "src/core/lib/compression/compression_internal.h" #include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -typedef enum { +namespace { +enum initial_metadata_state { // Initial metadata not yet seen. INITIAL_METADATA_UNSEEN = 0, // Initial metadata seen; compression algorithm set. HAS_COMPRESSION_ALGORITHM, // Initial metadata seen; no compression algorithm set. NO_COMPRESSION_ALGORITHM, -} initial_metadata_state; +}; -typedef struct call_data { +struct call_data { grpc_call_combiner* call_combiner; grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; @@ -63,9 +64,9 @@ typedef struct call_data { grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; grpc_closure on_send_message_next_done; -} call_data; +}; -typedef struct channel_data { +struct channel_data { /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; /** Bitset of enabled compression algorithms */ @@ -74,7 +75,8 @@ typedef struct channel_data { uint32_t supported_message_compression_algorithms; /** Supported stream compression algorithms */ uint32_t supported_stream_compression_algorithms; -} channel_data; +}; +} // namespace static bool skip_compression(grpc_call_element* elem, uint32_t flags, bool has_compression_algorithm) { diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index b872dc98f5..508a3bf9fc 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -31,7 +31,8 @@ #define EXPECTED_CONTENT_TYPE "application/grpc" #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 -typedef struct call_data { +namespace { +struct call_data { grpc_call_combiner* call_combiner; grpc_linked_mdelem status; @@ -60,11 +61,12 @@ typedef struct call_data { grpc_closure hs_on_recv; grpc_closure hs_on_complete; grpc_closure hs_recv_message_ready; -} call_data; +}; -typedef struct channel_data { +struct channel_data { uint8_t unused; -} channel_data; +}; +} // namespace static grpc_error* server_filter_outgoing_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index f50a928fcd..a414229768 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -31,7 +31,8 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" -typedef struct call_data { +namespace { +struct call_data { intptr_t id; /**< an id unique to the call */ bool have_trailing_md_string; grpc_slice trailing_md_string; @@ -48,11 +49,12 @@ typedef struct call_data { /* to get notified of the availability of the incoming initial metadata. */ grpc_closure on_initial_md_ready; grpc_metadata_batch* recv_initial_metadata; -} call_data; +}; -typedef struct channel_data { +struct channel_data { intptr_t id; /**< an id unique to the channel */ -} channel_data; +}; +} // namespace static void on_initial_md_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = (grpc_call_element*)user_data; diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 0499c6ecfc..7b86e4cd6c 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -37,7 +37,8 @@ #define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \ { DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX } -typedef struct channel_data { +namespace { +struct channel_data { /* We take a reference to the channel stack for the timer callback */ grpc_channel_stack* channel_stack; /* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer @@ -84,7 +85,8 @@ typedef struct channel_data { grpc_connectivity_state connectivity_state; /* Number of active calls */ gpr_atm call_count; -} channel_data; +}; +} // namespace /* Increase the nubmer of active calls. Before the increasement, if there are no calls, the max_idle_timer should be cancelled. */ diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index f8487f9a9e..8d76c4a837 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -26,7 +26,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack_builder.h" -#include "src/core/lib/support/string.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/service_config.h" @@ -86,7 +86,8 @@ static void* refcounted_message_size_limits_create_from_json( return value; } -typedef struct call_data { +namespace { +struct call_data { grpc_call_combiner* call_combiner; message_size_limits limits; // Receive closures are chained: we inject this closure as the @@ -97,13 +98,14 @@ typedef struct call_data { grpc_byte_stream** recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; -} call_data; +}; -typedef struct channel_data { +struct channel_data { message_size_limits limits; // Maps path names to refcounted_message_size_limits structs. grpc_slice_hash_table* method_limit_table; -} channel_data; +}; +} // namespace // Callback invoked when we receive a message. Here we check the max // receive message size. diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc index 555a9134a2..88bb8c71cc 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc @@ -25,7 +25,8 @@ #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/metadata.h" -typedef struct call_data { +namespace { +struct call_data { // Receive closures are chained: we inject this closure as the // recv_initial_metadata_ready up-call on transport_stream_op, and remember to // call our next_recv_initial_metadata_ready member after handling it. @@ -37,7 +38,8 @@ typedef struct call_data { // Marks whether the workaround is active bool workaround_active; -} call_data; +}; +} // namespace // Find the user agent metadata element in the batch static bool get_user_agent_mdelem(const grpc_metadata_batch* batch, |