aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-03-30 13:28:56 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-03-30 13:28:56 -0700
commit7c1b5db3bb000a7c69d9d8151c66fecbacce64c3 (patch)
tree29f3f4b62097d3c0b7235e4b1ee89eb6b4023bc1 /src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
parentd536dba4d3234ba28066a31f7a092ce9daa89ec4 (diff)
Convert subchannel_list code to C++.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h468
1 files changed, 370 insertions, 98 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 6889d596ac..d717ba0e37 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -21,116 +21,388 @@
#include <grpc/support/port_platform.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
-// TODO(roth): This code is intended to be shared between pick_first and
-// round_robin. However, the interface needs more work to provide clean
-// encapsulation. For example, the structs here have some fields that are
-// only used in one of the two (e.g., the state counters in
-// grpc_lb_subchannel_list and the prev_connectivity_state field in
-// grpc_lb_subchannel_data are only used in round_robin, and the
-// checking_subchannel field in grpc_lb_subchannel_list is only used by
-// pick_first). Also, there is probably some code duplication between the
-// connectivity state notification callback code in both pick_first and
-// round_robin that could be refactored and moved here. In a future PR,
-// need to clean this up.
-
-typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list;
-
-typedef struct {
- /** backpointer to owning subchannel list */
- grpc_lb_subchannel_list* subchannel_list;
- /** subchannel itself */
- grpc_subchannel* subchannel;
- grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
- /** Is a connectivity notification pending? */
- bool connectivity_notification_pending;
- /** notification that connectivity has changed on subchannel */
- grpc_closure connectivity_changed_closure;
- /** previous and current connectivity states. Updated by \a
- * \a connectivity_changed_closure based on
- * \a pending_connectivity_state_unsafe. */
- grpc_connectivity_state prev_connectivity_state;
- grpc_connectivity_state curr_connectivity_state;
- /** connectivity state to be updated by
- * grpc_subchannel_notify_on_state_change(), not guarded by
- * the combiner. To be copied to \a curr_connectivity_state by
- * \a connectivity_changed_closure. */
- grpc_connectivity_state pending_connectivity_state_unsafe;
- /** the subchannel's target user data */
- void* user_data;
- /** vtable to operate over \a user_data */
- const grpc_lb_user_data_vtable* user_data_vtable;
-} grpc_lb_subchannel_data;
-
-/// Unrefs the subchannel contained in sd.
-void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
- const char* reason);
-
-/// Starts watching the connectivity state of the subchannel.
-/// The connectivity_changed_cb callback must invoke either
-/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
-/// grpc_lb_subchannel_data_start_connectivity_watch().
-void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-/// Stops watching the connectivity state of the subchannel.
-void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-struct grpc_lb_subchannel_list {
- /** backpointer to owning policy */
- grpc_core::LoadBalancingPolicy* policy;
-
- grpc_core::TraceFlag* tracer;
-
- /** all our subchannels */
- size_t num_subchannels;
- grpc_lb_subchannel_data* subchannels;
-
- /** Index into subchannels of the one we're currently checking.
- * Used when connecting to subchannels serially instead of in parallel. */
- // TODO(roth): When we have time, we can probably make this go away
- // and compute the index dynamically by subtracting
- // subchannel_list->subchannels from the subchannel_data pointer.
- size_t checking_subchannel;
-
- /** how many subchannels are in state READY */
- size_t num_ready;
- /** how many subchannels are in state TRANSIENT_FAILURE */
- size_t num_transient_failures;
- /** how many subchannels are in state IDLE */
- size_t num_idle;
-
- /** There will be one ref for each entry in subchannels for which there is a
- * pending connectivity state watcher callback. */
- gpr_refcount refcount;
-
- /** Is this list shutting down? This may be true due to the shutdown of the
- * policy itself or because a newer update has arrived while this one hadn't
- * finished processing. */
- bool shutting_down;
+// FIXME: add comments
+
+namespace grpc_core {
+
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelData {
+ public:
+ // Returns the index into subchannel_list_ of this object.
+ size_t Index() const {
+ return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
+ subchannel_list_->subchannel(0));
+ }
+
+ SubchannelListType* subchannel_list() const { return subchannel_list_; }
+
+ grpc_subchannel* subchannel() const { return subchannel_; }
+
+ ConnectedSubchannel* connected_subchannel() const {
+ return connected_subchannel_.get();
+ }
+
+// FIXME: maybe do this automatically in OnConnectivityChangedLocked()
+// when the new state is TRANSIENT_FAILURE?
+ void clear_connected_subchannel() { connected_subchannel_.reset(); }
+
+ void GetConnectedSubchannelFromSubchannelLocked() {
+ connected_subchannel_ =
+ grpc_subchannel_get_connected_subchannel(subchannel_);
+ }
+
+ void SetConnectedSubchannelFromLocked(SubchannelData* other) {
+ connected_subchannel_ = other->connected_subchannel_; // Adds ref.
+ }
+
+ bool connectivity_notification_pending() const {
+ return connectivity_notification_pending_;
+ }
+ grpc_connectivity_state connectivity_state() const {
+ return curr_connectivity_state_;
+ }
+
+ virtual grpc_connectivity_state CheckConnectivityStateLocked() {
+ pending_connectivity_state_unsafe_ =
+ grpc_subchannel_check_connectivity(subchannel(), nullptr);
+ curr_connectivity_state_ = pending_connectivity_state_unsafe_;
+ return curr_connectivity_state_;
+ }
+
+ // Unrefs the subchannel.
+ virtual void UnrefSubchannelLocked(const char* reason);
+
+ /// Starts watching the connectivity state of the subchannel.
+ /// The connectivity_changed_cb callback must invoke either
+ /// StopConnectivityWatch() or again call StartConnectivityWatch().
+ void StartConnectivityWatchLocked();
+
+ /// Stops watching the connectivity state of the subchannel.
+ void StopConnectivityWatchLocked();
+
+ /// Cancels watching the connectivity state of the subchannel.
+ void CancelConnectivityWatchLocked(const char* reason);
+
+ void ShutdownLocked(const char* reason);
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelData(
+ SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner);
+
+ virtual ~SubchannelData();
+
+// FIXME: define API
+ virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT;
+
+ private:
+ static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+
+ // Backpointer to owning subchannel list. Not owned.
+ SubchannelListType* subchannel_list_;
+
+ // The subchannel and connected subchannel.
+ grpc_subchannel* subchannel_;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+
+ // Notification that connectivity has changed on subchannel.
+ grpc_closure connectivity_changed_closure_;
+ // Is a connectivity notification pending?
+ bool connectivity_notification_pending_;
+ // Connectivity state to be updated by
+ // grpc_subchannel_notify_on_state_change(), not guarded by
+ // the combiner. Will be copied to \a curr_connectivity_state_ by
+ // \a connectivity_changed_closure_.
+ grpc_connectivity_state pending_connectivity_state_unsafe_;
+ // Current connectivity state.
+ grpc_connectivity_state curr_connectivity_state_;
};
-grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelList
+ : public RefCountedWithTracing<SubchannelListType> {
+ public:
+ typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
+
+ size_t num_subchannels() const { return subchannels_.size(); }
+ SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
+
+ // Marks the subchannel_list as discarded. Unsubscribes all its subchannels.
+ void ShutdownLocked(const char* reason);
+
+ bool shutting_down() const { return shutting_down_; }
+
+ LoadBalancingPolicy* policy() const { return policy_; }
+ TraceFlag* tracer() const { return tracer_; }
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args);
+
+ virtual ~SubchannelList();
+
+ private:
+ // So New() can call our private ctor.
+ template <typename T, typename... Args>
+ friend T* New(Args&&... args);
+
+ // Backpointer to owning policy.
+ LoadBalancingPolicy* policy_;
+
+ TraceFlag* tracer_;
+
+ // The list of subchannels.
+ SubchannelVector subchannels_;
+
+ // Is this list shutting down? This may be true due to the shutdown of the
+ // policy itself or because a newer update has arrived while this one hadn't
+ // finished processing.
+ bool shutting_down_ = false;
+};
+
+//
+// implementation -- no user-servicable parts below
+//
+
+//
+// SubchannelData
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
+ SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : subchannel_list_(subchannel_list),
+ subchannel_(subchannel),
+ // We assume that the current state is IDLE. If not, we'll get a
+ // callback telling us that.
+ pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE),
+ curr_connectivity_state_(GRPC_CHANNEL_IDLE) {
+ GRPC_CLOSURE_INIT(
+ &connectivity_changed_closure_,
+ (&SubchannelData<SubchannelListType,
+ SubchannelDataType>::OnConnectivityChangedLocked),
+ this, grpc_combiner_scheduler(combiner));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
+ UnrefSubchannelLocked("subchannel_data_destroy");
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::UnrefSubchannelLocked(
+ const char* reason) {
+ if (subchannel_ != nullptr) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): unreffing subchannel",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_);
+ }
+ GRPC_SUBCHANNEL_UNREF(subchannel_, reason);
+ subchannel_ = nullptr;
+ connected_subchannel_.reset();
+ }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StartConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): requesting connectivity change "
+ "notification (from %s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_,
+ grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
+ }
+ connectivity_notification_pending_ = true;
+ grpc_subchannel_notify_on_state_change(
+ subchannel_, subchannel_list_->policy()->interested_parties(),
+ &pending_connectivity_state_unsafe_,
+ &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StopConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): stopping connectivity watch",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_);
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ connectivity_notification_pending_ = false;
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::CancelConnectivityWatchLocked(
+ const char* reason) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): canceling connectivity watch (%s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_, reason);
+ }
+ grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
+ &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::OnConnectivityChangedLocked(
+ void* arg, grpc_error* error) {
+ SubchannelData* sd = static_cast<SubchannelData*>(arg);
+ // Now that we're inside the combiner, copy the pending connectivity
+ // state (which was set by the connectivity state watcher) to
+ // curr_connectivity_state_, which is what we use inside of the combiner.
+ sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
+ sd->ProcessConnectivityChangeLocked(error);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::ShutdownLocked(const char* reason) {
+ // If there's a pending notification for this subchannel, cancel it;
+ // the callback is responsible for unreffing the subchannel.
+ // Otherwise, unref the subchannel directly.
+ if (connectivity_notification_pending_) {
+ CancelConnectivityWatchLocked(reason);
+ } else if (subchannel_ != nullptr) {
+ UnrefSubchannelLocked(reason);
+ }
+}
+
+//
+// SubchannelList
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
+ LoadBalancingPolicy* policy, TraceFlag* tracer,
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
- const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb);
+ const grpc_channel_args& args)
+ : RefCountedWithTracing<SubchannelListType>(tracer),
+ policy_(policy),
+ tracer_(tracer) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
+ tracer_->name(), policy, this, addresses->num_addresses);
+ }
+ subchannels_.reserve(addresses->num_addresses);
+ // We need to remove the LB addresses in order to be able to compare the
+ // subchannel keys of subchannels from a different batch of addresses.
+ static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ // Create a subchannel for each address.
+ grpc_subchannel_args sc_args;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+ &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
+ gpr_free(addr_arg.value.string);
+ sc_args.args = new_args;
+ grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
+ client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(new_args);
+ if (subchannel == nullptr) {
+ // Subchannel could not be created.
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_DEBUG,
+ "[%s %p] could not create subchannel for address uri %s, "
+ "ignoring",
+ tracer_->name(), policy_, address_uri);
+ gpr_free(address_uri);
+ }
+ continue;
+ }
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR
+ ": Created subchannel %p for address uri %s",
+ tracer_->name(), policy_, this, subchannels_.size(), subchannel,
+ address_uri);
+ gpr_free(address_uri);
+ }
+ subchannels_.emplace_back(static_cast<SubchannelListType*>(this),
+ addresses->user_data_vtable,
+ addresses->addresses[i], subchannel, combiner);
+ }
+}
-void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
+ tracer_->name(), policy_, this);
+ }
+}
-void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelList<SubchannelListType,
+ SubchannelDataType>::ShutdownLocked(const char* reason) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
+ tracer_->name(), policy_, this, reason);
+ }
+ GPR_ASSERT(!shutting_down_);
+ shutting_down_ = true;
+ for (size_t i = 0; i < subchannels_.size(); i++) {
+ SubchannelDataType* sd = &subchannels_[i];
+ sd->ShutdownLocked(reason);
+ }
+}
-/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
-/// connectivity state notification callback will ultimately unref it.
-void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */