diff options
author | Yuchen Zeng <zyc@google.com> | 2017-08-29 00:51:49 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2017-09-01 23:05:15 -0700 |
commit | 5150cbd02d5a5d7cec64fa46225f2bb38611ba3b (patch) | |
tree | d1ef5eaaa2a37aa3ab5c23136b3ce06664bee181 /src/cpp | |
parent | eeea43fa242659edabb52d34c0f17f59426ac277 (diff) |
Fix timer shutdown process
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 113 |
1 files changed, 68 insertions, 45 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 0d2e834a7f..bad36a0b8c 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -18,7 +18,10 @@ #include <grpc++/channel.h> +#include <chrono> +#include <condition_variable> #include <memory> +#include <mutex> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> @@ -48,7 +51,6 @@ namespace grpc { namespace { int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); -void InitConnectivityWatcherOnce(); class TagSaver final : public CompletionQueueTag { public: @@ -67,46 +69,64 @@ class TagSaver final : public CompletionQueueTag { // Constantly watches channel connectivity status to reconnect a transiently // disconnected channel. This is a temporary work-around before we have retry // support. -class ChannelConnectivityWatcher { +class ChannelConnectivityWatcher : private GrpcLibraryCodegen { public: static void StartWatching(grpc_channel* channel) { - char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); - bool disabled = gpr_is_true(env); - gpr_free(env); - if (!disabled) { - gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce); - gpr_mu_lock(&g_watcher_mu_); + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); if (g_watcher_ == nullptr) { g_watcher_ = new ChannelConnectivityWatcher(); } g_watcher_->StartWatchingLocked(channel); - gpr_mu_unlock(&g_watcher_mu_); + } + } + + static void StopWatching() { + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); + if (g_watcher_->StopWatchingLocked()) { + delete g_watcher_; + g_watcher_ = nullptr; + } } } private: - ChannelConnectivityWatcher() { + ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) { gpr_ref_init(&ref_, 0); gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_detached(&options); + gpr_thd_options_set_joinable(&options); gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); } + static bool IsDisabled() { + char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + bool disabled = gpr_is_true(env); + gpr_free(env); + return disabled; + } + void WatchStateChangeImpl() { bool ok = false; void* tag = NULL; CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; while (true) { - status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); - // Make sure we've seen 2 TIMEOUTs before going to sleep - if (status == CompletionQueue::TIMEOUT) { - status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); - if (status == CompletionQueue::TIMEOUT) { - gpr_sleep_until( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kConnectivityCheckIntervalMsec, - GPR_TIMESPAN))); - continue; + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (shutdown_) { + // Drain cq_ if the watcher is shutting down + status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)); + } else { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + if (status == CompletionQueue::TIMEOUT) { + shutdown_cv_.wait_for(lock, std::chrono::milliseconds( + kConnectivityCheckIntervalMsec)); + continue; + } + } } } ChannelState* channel_state = static_cast<ChannelState*>(tag); @@ -116,7 +136,7 @@ class ChannelConnectivityWatcher { void* shutdown_tag = NULL; channel_state->shutdown_cq.Next(&shutdown_tag, &ok); delete channel_state; - if (Unref()) { + if (gpr_unref(&ref_)) { break; } } else { @@ -130,7 +150,8 @@ class ChannelConnectivityWatcher { void StartWatchingLocked(grpc_channel* channel) { if (thd_id_ != 0) { - Ref(); + gpr_ref(&ref_); + ++channel_count_; ChannelState* channel_state = new ChannelState(channel); // The first grpc_channel_watch_connectivity_state() is not used to // monitor the channel state change, but to hold a reference of the @@ -147,24 +168,20 @@ class ChannelConnectivityWatcher { } } - void Ref() { gpr_ref(&ref_); } - - bool Unref() { - if (gpr_unref(&ref_)) { - gpr_mu_lock(&g_watcher_mu_); - delete g_watcher_; - g_watcher_ = nullptr; - gpr_mu_unlock(&g_watcher_mu_); + bool StopWatchingLocked() { + if (--channel_count_ == 0) { + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + shutdown_ = true; + shutdown_cv_.notify_one(); + } + gpr_thd_join(thd_id_); return true; } return false; } - static void InitOnce() { gpr_mu_init(&g_watcher_mu_); } - friend void WatchStateChange(void* arg); - friend void InitConnectivityWatcherOnce(); - struct ChannelState { explicit ChannelState(grpc_channel* channel) : channel(channel), state(GRPC_CHANNEL_IDLE){}; @@ -175,16 +192,17 @@ class ChannelConnectivityWatcher { gpr_thd_id thd_id_; CompletionQueue cq_; gpr_refcount ref_; + int channel_count_; - static gpr_once g_connectivity_watcher_once_; - static gpr_mu g_watcher_mu_; - // protected under g_watcher_mu_ - static ChannelConnectivityWatcher* g_watcher_; + std::mutex shutdown_mu_; + std::condition_variable shutdown_cv_; // protected by shutdown_cv_ + bool shutdown_; // protected by shutdown_cv_ + + static std::mutex g_watcher_mu_; + static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_ }; -gpr_once ChannelConnectivityWatcher::g_connectivity_watcher_once_ = - GPR_ONCE_INIT; -gpr_mu ChannelConnectivityWatcher::g_watcher_mu_; +std::mutex ChannelConnectivityWatcher::g_watcher_mu_; ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; void WatchStateChange(void* arg) { @@ -192,8 +210,6 @@ void WatchStateChange(void* arg) { static_cast<ChannelConnectivityWatcher*>(arg); watcher->WatchStateChangeImpl(); } - -void InitConnectivityWatcherOnce() { ChannelConnectivityWatcher::InitOnce(); }; } // namespace static internal::GrpcLibraryInitializer g_gli_initializer; @@ -205,7 +221,14 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + if (grpc_channel_support_connectivity_watcher(c_channel_)) { + grpc_channel_destroy(c_channel_); + ChannelConnectivityWatcher::StopWatching(); + } else { + grpc_channel_destroy(c_channel_); + } +} namespace { |