diff options
author | 2017-08-24 03:07:01 -0700 | |
---|---|---|
committer | 2017-08-24 03:13:42 -0700 | |
commit | b4481a9a137b26d78228926922062f835d9a1606 (patch) | |
tree | 3c77583ba9eb29cab26a4b2f2afbffa1ad8b547d /src/cpp/client/channel_cc.cc | |
parent | bfb4e06e3c209907e9e4e15f915b4a3cd318f42c (diff) |
Share one monitoring thread between channels
Diffstat (limited to 'src/cpp/client/channel_cc.cc')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 130 |
1 files changed, 76 insertions, 54 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index ffe88df732..f06d25b0d9 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -43,8 +43,23 @@ namespace grpc { namespace { -int kConnectivityCheckIntervalMsec = 100; +int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); + +class TagSaver final : public CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { + *tag = tag_; + delete this; + return true; + } + + private: + void* tag_; +}; + } // namespace // Constantly watches channel connectivity status to reconnect a transiently @@ -52,55 +67,80 @@ void WatchStateChange(void* arg); // support. class ChannelConnectivityWatcher { public: - explicit ChannelConnectivityWatcher(Channel* channel) - : channel_(channel), thd_id_(0) {} + ChannelConnectivityWatcher() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + ~ChannelConnectivityWatcher() { + cq_.Shutdown(); + if (thd_id_ != 0) { + gpr_thd_join(thd_id_); + } + } void WatchStateChangeImpl() { - grpc_connectivity_state state = GRPC_CHANNEL_IDLE; bool ok = false; void* tag = NULL; - while (state != GRPC_CHANNEL_SHUTDOWN) { - channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME), - &cq_, NULL); - while (cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)) == - CompletionQueue::TIMEOUT) { + CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; + while (status != CompletionQueue::SHUTDOWN) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + } + if (status == CompletionQueue::TIMEOUT) { gpr_sleep_until( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(kConnectivityCheckIntervalMsec, + gpr_time_from_millis(kConnectivityCheckIntervalMsec, GPR_TIMESPAN))); + } else if (status == CompletionQueue::GOT_EVENT) { + ChannelState* channel_state = static_cast<ChannelState*>(tag); + channel_state->state = grpc_channel_check_connectivity_state( + channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + } } - state = channel_->GetState(false); } } - void StartWatching() { + void StartWatching(grpc_channel* channel) { const char* disabled_str = std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); if (disabled_str == nullptr || strcmp(disabled_str, "1")) { - // This NotifyOnstateChange() is not used to monitor the channel state - // change, but to hold a reference of the c channel. So that - // WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN - // without holding any lock on the channel object. - channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, - gpr_inf_future(GPR_CLOCK_REALTIME), - &shutdown_cq_, NULL); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + ChannelState* channel_state = new ChannelState(channel); + // The first grpc_channel_watch_connectivity_state() is not used to + // monitor the channel state change, but to hold a reference of the + // c channel. So that WatchStateChangeImpl() can observe state == + // GRPC_CHANNEL_SHUTDOWN without holding any lock on the channel object. + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), + new TagSaver(nullptr)); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), + new TagSaver(channel_state)); } } - void Destroy() { - if (thd_id_ != 0) { - gpr_thd_join(thd_id_); - } - bool ok = false; - void* tag = NULL; - shutdown_cq_.Next(&tag, &ok); - } - private: - Channel* channel_; + struct ChannelState { + explicit ChannelState(grpc_channel* channel) + : channel(channel), state(GRPC_CHANNEL_IDLE){}; + grpc_channel* channel; + grpc_connectivity_state state; + CompletionQueue shutdown_cq; + }; gpr_thd_id thd_id_; CompletionQueue cq_; CompletionQueue shutdown_cq_; @@ -112,22 +152,21 @@ void WatchStateChange(void* arg) { static_cast<ChannelConnectivityWatcher*>(arg); watcher->WatchStateChangeImpl(); } + +ChannelConnectivityWatcher channel_connectivity_watcher; } // namespace static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) - : connectivity_watcher_(new ChannelConnectivityWatcher(this)), - host_(host), - c_channel_(channel) { + : host_(host), c_channel_(channel) { g_gli_initializer.summon(); if (grpc_channel_support_connectivity_watcher(channel)) { - connectivity_watcher_->StartWatching(); + channel_connectivity_watcher.StartWatching(channel); } } Channel::~Channel() { grpc_channel_destroy(c_channel_); - connectivity_watcher_->Destroy(); } namespace { @@ -213,23 +252,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } -namespace { -class TagSaver final : public CompletionQueueTag { - public: - explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* status) override { - *tag = tag_; - delete this; - return true; - } - - private: - void* tag_; -}; - -} // namespace - void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, void* tag) { |