diff options
author | 2017-08-04 10:42:03 -0700 | |
---|---|---|
committer | 2017-08-22 18:56:25 -0700 | |
commit | be9b81424075fbd9bc77956c3eb882929b7bb5a8 (patch) | |
tree | baf85fe9201d457650992623666443303aec424f /src/cpp/client/channel_cc.cc | |
parent | ff0996d02b20404f97483d08cd4e7003534fc990 (diff) |
Add ChannelConnectivityWatcher
Diffstat (limited to 'src/cpp/client/channel_cc.cc')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 64 |
1 files changed, 62 insertions, 2 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c9..27bd75f3cd 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -35,17 +35,77 @@ #include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/thd.h> #include "src/core/lib/profiling/timers.h" namespace grpc { +namespace { +void WatchStateChange(void* arg); +} // namespace + +// Constantly watches channel connectivity status to reconnect a transiently +// disconnected channel. This is a temporary work-around before we have retry +// support. +class ChannelConnectivityWatcher { + public: + ChannelConnectivityWatcher(Channel* channel) + : channel_(channel), thd_id_(0), being_destroyed_(0) {} + + void WatchStateChangeImpl() { + grpc_connectivity_state state = GRPC_CHANNEL_IDLE; + while (state != GRPC_CHANNEL_SHUTDOWN) { + if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) { + break; + } + channel_->WaitForStateChange( + state, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(1, GPR_TIMESPAN))); + state = channel_->GetState(false); + } + } + void StartWatching() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + void Destroy() { + if (thd_id_ != 0) { + gpr_atm_no_barrier_store(&being_destroyed_, 1); + gpr_thd_join(thd_id_); + } + } + + private: + Channel* channel_; + gpr_thd_id thd_id_; + gpr_atm being_destroyed_; +}; + +namespace { +void WatchStateChange(void* arg) { + ChannelConnectivityWatcher* watcher = + static_cast<ChannelConnectivityWatcher*>(arg); + watcher->WatchStateChangeImpl(); +} +} // namespace + static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) - : host_(host), c_channel_(channel) { + : connectivity_watcher_(new ChannelConnectivityWatcher(this)), + host_(host), + c_channel_(channel) { g_gli_initializer.summon(); + if (host != "inproc") { + connectivity_watcher_->StartWatching(); + } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + connectivity_watcher_->Destroy(); + grpc_channel_destroy(c_channel_); +} namespace { |