diff options
author | Yuchen Zeng <zyc@google.com> | 2017-08-24 15:34:52 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2017-08-24 15:34:52 -0700 |
commit | ddaef3e32581ef9c3078f402d2a08d7dcc781272 (patch) | |
tree | 2452f6117183bf07dcb4ebf9e67e39bf7359e042 /src/cpp | |
parent | f1d50983ae61fe0be7a284b4cbd0beb287b0f6a8 (diff) |
Remove non-POD global variables
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 94 |
1 files changed, 60 insertions, 34 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 865fecfe22..418b4fa2fd 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -41,12 +41,14 @@ #include <grpc/support/useful.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" namespace grpc { namespace { int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); +void InitConnectivityWatcherOnce(); class TagSaver final : public CompletionQueueTag { public: @@ -71,10 +73,9 @@ class ChannelConnectivityWatcher { char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); bool disabled = false; if (env != nullptr) { - static const char* truthy[] = {"yes", "Yes", "YES", "true", - "True", "TRUE", "1"}; + static const char* truthy[] = {"yes", "true", "1"}; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { - if (0 == strcmp(env, truthy[i])) { + if (0 == gpr_stricmp(env, truthy[i])) { disabled = true; break; } @@ -82,54 +83,69 @@ class ChannelConnectivityWatcher { } gpr_free(env); if (!disabled) { + gpr_ref_init(&ref_, 0); gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); + gpr_thd_options_set_detached(&options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); } } - ~ChannelConnectivityWatcher() { - cq_.Shutdown(); - if (thd_id_ != 0) { - gpr_thd_join(thd_id_); - } - } - void WatchStateChangeImpl() { bool ok = false; void* tag = NULL; CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; - while (status != CompletionQueue::SHUTDOWN) { + while (true) { status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); // Make sure we've seen 2 TIMEOUTs before going to sleep if (status == CompletionQueue::TIMEOUT) { status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + if (status == CompletionQueue::TIMEOUT) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(kConnectivityCheckIntervalMsec, + GPR_TIMESPAN))); + continue; + } } - if (status == CompletionQueue::TIMEOUT) { - gpr_sleep_until( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kConnectivityCheckIntervalMsec, - GPR_TIMESPAN))); - } else if (status == CompletionQueue::GOT_EVENT) { - ChannelState* channel_state = static_cast<ChannelState*>(tag); - channel_state->state = grpc_channel_check_connectivity_state( - channel_state->channel, false); - if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { - void* shutdown_tag = NULL; - channel_state->shutdown_cq.Next(&shutdown_tag, &ok); - delete channel_state; - } else { - TagSaver* tag_saver = new TagSaver(channel_state); - grpc_channel_watch_connectivity_state( - channel_state->channel, channel_state->state, - gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + ChannelState* channel_state = static_cast<ChannelState*>(tag); + channel_state->state = + grpc_channel_check_connectivity_state(channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + if (gpr_unref(&ref_)) { + gpr_mu_lock(&g_watcher_mu_); + delete g_watcher_; + g_watcher_ = nullptr; + gpr_mu_unlock(&g_watcher_mu_); + break; } + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); } } } - void StartWatching(grpc_channel* channel) { + static void StartWatching(grpc_channel* channel) { + gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce); + gpr_mu_lock(&g_watcher_mu_); + if (g_watcher_ == nullptr) { + g_watcher_ = new ChannelConnectivityWatcher(); + } + g_watcher_->StartWatchingLocked(channel); + gpr_mu_unlock(&g_watcher_mu_); + } + + static void InitOnce() { gpr_mu_init(&g_watcher_mu_); } + + private: + void StartWatchingLocked(grpc_channel* channel) { if (thd_id_ != 0) { + gpr_ref(&ref_); ChannelState* channel_state = new ChannelState(channel); // The first grpc_channel_watch_connectivity_state() is not used to // monitor the channel state change, but to hold a reference of the @@ -146,7 +162,6 @@ class ChannelConnectivityWatcher { } } - private: struct ChannelState { explicit ChannelState(grpc_channel* channel) : channel(channel), state(GRPC_CHANNEL_IDLE){}; @@ -156,15 +171,26 @@ class ChannelConnectivityWatcher { }; gpr_thd_id thd_id_; CompletionQueue cq_; - CompletionQueue shutdown_cq_; + gpr_refcount ref_; + + static gpr_once g_connectivity_watcher_once_; + static gpr_mu g_watcher_mu_; + static ChannelConnectivityWatcher* g_watcher_; }; +gpr_once ChannelConnectivityWatcher::g_connectivity_watcher_once_ = + GPR_ONCE_INIT; +gpr_mu ChannelConnectivityWatcher::g_watcher_mu_; +ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; + void WatchStateChange(void* arg) { ChannelConnectivityWatcher* watcher = static_cast<ChannelConnectivityWatcher*>(arg); watcher->WatchStateChangeImpl(); } +void InitConnectivityWatcherOnce() { ChannelConnectivityWatcher::InitOnce(); }; + ChannelConnectivityWatcher channel_connectivity_watcher; } // namespace @@ -173,7 +199,7 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) : host_(host), c_channel_(channel) { g_gli_initializer.summon(); if (grpc_channel_support_connectivity_watcher(channel)) { - channel_connectivity_watcher.StartWatching(channel); + ChannelConnectivityWatcher::StartWatching(channel); } } |