diff options
author | Mark D. Roth <roth@google.com> | 2018-03-30 13:28:56 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2018-03-30 13:28:56 -0700 |
commit | 7c1b5db3bb000a7c69d9d8151c66fecbacce64c3 (patch) | |
tree | 29f3f4b62097d3c0b7235e4b1ee89eb6b4023bc1 /src/core/ext/filters/client_channel/lb_policy/subchannel_list.h | |
parent | d536dba4d3234ba28066a31f7a092ce9daa89ec4 (diff) |
Convert subchannel_list code to C++.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/subchannel_list.h | 468 |
1 files changed, 370 insertions, 98 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 6889d596ac..d717ba0e37 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -21,116 +21,388 @@ #include <grpc/support/port_platform.h> +#include <string.h> + +#include <grpc/support/alloc.h> + #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -// TODO(roth): This code is intended to be shared between pick_first and -// round_robin. However, the interface needs more work to provide clean -// encapsulation. For example, the structs here have some fields that are -// only used in one of the two (e.g., the state counters in -// grpc_lb_subchannel_list and the prev_connectivity_state field in -// grpc_lb_subchannel_data are only used in round_robin, and the -// checking_subchannel field in grpc_lb_subchannel_list is only used by -// pick_first). Also, there is probably some code duplication between the -// connectivity state notification callback code in both pick_first and -// round_robin that could be refactored and moved here. In a future PR, -// need to clean this up. - -typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list; - -typedef struct { - /** backpointer to owning subchannel list */ - grpc_lb_subchannel_list* subchannel_list; - /** subchannel itself */ - grpc_subchannel* subchannel; - grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; - /** Is a connectivity notification pending? */ - bool connectivity_notification_pending; - /** notification that connectivity has changed on subchannel */ - grpc_closure connectivity_changed_closure; - /** previous and current connectivity states. Updated by \a - * \a connectivity_changed_closure based on - * \a pending_connectivity_state_unsafe. */ - grpc_connectivity_state prev_connectivity_state; - grpc_connectivity_state curr_connectivity_state; - /** connectivity state to be updated by - * grpc_subchannel_notify_on_state_change(), not guarded by - * the combiner. To be copied to \a curr_connectivity_state by - * \a connectivity_changed_closure. */ - grpc_connectivity_state pending_connectivity_state_unsafe; - /** the subchannel's target user data */ - void* user_data; - /** vtable to operate over \a user_data */ - const grpc_lb_user_data_vtable* user_data_vtable; -} grpc_lb_subchannel_data; - -/// Unrefs the subchannel contained in sd. -void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, - const char* reason); - -/// Starts watching the connectivity state of the subchannel. -/// The connectivity_changed_cb callback must invoke either -/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call -/// grpc_lb_subchannel_data_start_connectivity_watch(). -void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_lb_subchannel_data* sd); - -/// Stops watching the connectivity state of the subchannel. -void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_lb_subchannel_data* sd); - -struct grpc_lb_subchannel_list { - /** backpointer to owning policy */ - grpc_core::LoadBalancingPolicy* policy; - - grpc_core::TraceFlag* tracer; - - /** all our subchannels */ - size_t num_subchannels; - grpc_lb_subchannel_data* subchannels; - - /** Index into subchannels of the one we're currently checking. - * Used when connecting to subchannels serially instead of in parallel. */ - // TODO(roth): When we have time, we can probably make this go away - // and compute the index dynamically by subtracting - // subchannel_list->subchannels from the subchannel_data pointer. - size_t checking_subchannel; - - /** how many subchannels are in state READY */ - size_t num_ready; - /** how many subchannels are in state TRANSIENT_FAILURE */ - size_t num_transient_failures; - /** how many subchannels are in state IDLE */ - size_t num_idle; - - /** There will be one ref for each entry in subchannels for which there is a - * pending connectivity state watcher callback. */ - gpr_refcount refcount; - - /** Is this list shutting down? This may be true due to the shutdown of the - * policy itself or because a newer update has arrived while this one hadn't - * finished processing. */ - bool shutting_down; +// FIXME: add comments + +namespace grpc_core { + +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelData { + public: + // Returns the index into subchannel_list_ of this object. + size_t Index() const { + return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) - + subchannel_list_->subchannel(0)); + } + + SubchannelListType* subchannel_list() const { return subchannel_list_; } + + grpc_subchannel* subchannel() const { return subchannel_; } + + ConnectedSubchannel* connected_subchannel() const { + return connected_subchannel_.get(); + } + +// FIXME: maybe do this automatically in OnConnectivityChangedLocked() +// when the new state is TRANSIENT_FAILURE? + void clear_connected_subchannel() { connected_subchannel_.reset(); } + + void GetConnectedSubchannelFromSubchannelLocked() { + connected_subchannel_ = + grpc_subchannel_get_connected_subchannel(subchannel_); + } + + void SetConnectedSubchannelFromLocked(SubchannelData* other) { + connected_subchannel_ = other->connected_subchannel_; // Adds ref. + } + + bool connectivity_notification_pending() const { + return connectivity_notification_pending_; + } + grpc_connectivity_state connectivity_state() const { + return curr_connectivity_state_; + } + + virtual grpc_connectivity_state CheckConnectivityStateLocked() { + pending_connectivity_state_unsafe_ = + grpc_subchannel_check_connectivity(subchannel(), nullptr); + curr_connectivity_state_ = pending_connectivity_state_unsafe_; + return curr_connectivity_state_; + } + + // Unrefs the subchannel. + virtual void UnrefSubchannelLocked(const char* reason); + + /// Starts watching the connectivity state of the subchannel. + /// The connectivity_changed_cb callback must invoke either + /// StopConnectivityWatch() or again call StartConnectivityWatch(). + void StartConnectivityWatchLocked(); + + /// Stops watching the connectivity state of the subchannel. + void StopConnectivityWatchLocked(); + + /// Cancels watching the connectivity state of the subchannel. + void CancelConnectivityWatchLocked(const char* reason); + + void ShutdownLocked(const char* reason); + + GRPC_ABSTRACT_BASE_CLASS + + protected: + SubchannelData( + SubchannelListType* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner); + + virtual ~SubchannelData(); + +// FIXME: define API + virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT; + + private: + static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + + // Backpointer to owning subchannel list. Not owned. + SubchannelListType* subchannel_list_; + + // The subchannel and connected subchannel. + grpc_subchannel* subchannel_; + RefCountedPtr<ConnectedSubchannel> connected_subchannel_; + + // Notification that connectivity has changed on subchannel. + grpc_closure connectivity_changed_closure_; + // Is a connectivity notification pending? + bool connectivity_notification_pending_; + // Connectivity state to be updated by + // grpc_subchannel_notify_on_state_change(), not guarded by + // the combiner. Will be copied to \a curr_connectivity_state_ by + // \a connectivity_changed_closure_. + grpc_connectivity_state pending_connectivity_state_unsafe_; + // Current connectivity state. + grpc_connectivity_state curr_connectivity_state_; }; -grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelList + : public RefCountedWithTracing<SubchannelListType> { + public: + typedef InlinedVector<SubchannelDataType, 10> SubchannelVector; + + size_t num_subchannels() const { return subchannels_.size(); } + SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; } + + // Marks the subchannel_list as discarded. Unsubscribes all its subchannels. + void ShutdownLocked(const char* reason); + + bool shutting_down() const { return shutting_down_; } + + LoadBalancingPolicy* policy() const { return policy_; } + TraceFlag* tracer() const { return tracer_; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args); + + virtual ~SubchannelList(); + + private: + // So New() can call our private ctor. + template <typename T, typename... Args> + friend T* New(Args&&... args); + + // Backpointer to owning policy. + LoadBalancingPolicy* policy_; + + TraceFlag* tracer_; + + // The list of subchannels. + SubchannelVector subchannels_; + + // Is this list shutting down? This may be true due to the shutdown of the + // policy itself or because a newer update has arrived while this one hadn't + // finished processing. + bool shutting_down_ = false; +}; + +// +// implementation -- no user-servicable parts below +// + +// +// SubchannelData +// + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( + SubchannelListType* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner) + : subchannel_list_(subchannel_list), + subchannel_(subchannel), + // We assume that the current state is IDLE. If not, we'll get a + // callback telling us that. + pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE), + curr_connectivity_state_(GRPC_CHANNEL_IDLE) { + GRPC_CLOSURE_INIT( + &connectivity_changed_closure_, + (&SubchannelData<SubchannelListType, + SubchannelDataType>::OnConnectivityChangedLocked), + this, grpc_combiner_scheduler(combiner)); +} + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() { + UnrefSubchannelLocked("subchannel_data_destroy"); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::UnrefSubchannelLocked( + const char* reason) { + if (subchannel_ != nullptr) { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): unreffing subchannel", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_); + } + GRPC_SUBCHANNEL_UNREF(subchannel_, reason); + subchannel_ = nullptr; + connected_subchannel_.reset(); + } +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::StartConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log( + GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): requesting connectivity change " + "notification (from %s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_, + grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); + } + connectivity_notification_pending_ = true; + grpc_subchannel_notify_on_state_change( + subchannel_, subchannel_list_->policy()->interested_parties(), + &pending_connectivity_state_unsafe_, + &connectivity_changed_closure_); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::StopConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): stopping connectivity watch", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_); + } + GPR_ASSERT(connectivity_notification_pending_); + connectivity_notification_pending_ = false; +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::CancelConnectivityWatchLocked( + const char* reason) { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): canceling connectivity watch (%s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_, reason); + } + grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr, + &connectivity_changed_closure_); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::OnConnectivityChangedLocked( + void* arg, grpc_error* error) { + SubchannelData* sd = static_cast<SubchannelData*>(arg); + // Now that we're inside the combiner, copy the pending connectivity + // state (which was set by the connectivity state watcher) to + // curr_connectivity_state_, which is what we use inside of the combiner. + sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_; + sd->ProcessConnectivityChangeLocked(error); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::ShutdownLocked(const char* reason) { + // If there's a pending notification for this subchannel, cancel it; + // the callback is responsible for unreffing the subchannel. + // Otherwise, unref the subchannel directly. + if (connectivity_notification_pending_) { + CancelConnectivityWatchLocked(reason); + } else if (subchannel_ != nullptr) { + UnrefSubchannelLocked(reason); + } +} + +// +// SubchannelList +// + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( + LoadBalancingPolicy* policy, TraceFlag* tracer, const grpc_lb_addresses* addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, - const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb); + const grpc_channel_args& args) + : RefCountedWithTracing<SubchannelListType>(tracer), + policy_(policy), + tracer_(tracer) { + if (tracer_->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", + tracer_->name(), policy, this, addresses->num_addresses); + } + subchannels_.reserve(addresses->num_addresses); + // We need to remove the LB addresses in order to be able to compare the + // subchannel keys of subchannels from a different batch of addresses. + static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + // Create a subchannel for each address. + grpc_subchannel_args sc_args; + for (size_t i = 0; i < addresses->num_addresses; i++) { + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); + gpr_free(addr_arg.value.string); + sc_args.args = new_args; + grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( + client_channel_factory, &sc_args); + grpc_channel_args_destroy(new_args); + if (subchannel == nullptr) { + // Subchannel could not be created. + if (tracer_->enabled()) { + char* address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, + "[%s %p] could not create subchannel for address uri %s, " + "ignoring", + tracer_->name(), policy_, address_uri); + gpr_free(address_uri); + } + continue; + } + if (tracer_->enabled()) { + char* address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR + ": Created subchannel %p for address uri %s", + tracer_->name(), policy_, this, subchannels_.size(), subchannel, + address_uri); + gpr_free(address_uri); + } + subchannels_.emplace_back(static_cast<SubchannelListType*>(this), + addresses->user_data_vtable, + addresses->addresses[i], subchannel, combiner); + } +} -void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, - const char* reason); +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() { + if (tracer_->enabled()) { + gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", + tracer_->name(), policy_, this); + } +} -void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, - const char* reason); +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelList<SubchannelListType, + SubchannelDataType>::ShutdownLocked(const char* reason) { + if (tracer_->enabled()) { + gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", + tracer_->name(), policy_, this, reason); + } + GPR_ASSERT(!shutting_down_); + shutting_down_ = true; + for (size_t i = 0; i < subchannels_.size(); i++) { + SubchannelDataType* sd = &subchannels_[i]; + sd->ShutdownLocked(reason); + } +} -/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The -/// connectivity state notification callback will ultimately unref it. -void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_lb_subchannel_list* subchannel_list, const char* reason); +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ |