aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/client/channel_cc.cc
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-08-04 10:42:03 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-08-22 18:56:25 -0700
commitbe9b81424075fbd9bc77956c3eb882929b7bb5a8 (patch)
treebaf85fe9201d457650992623666443303aec424f /src/cpp/client/channel_cc.cc
parentff0996d02b20404f97483d08cd4e7003534fc990 (diff)
Add ChannelConnectivityWatcher
Diffstat (limited to 'src/cpp/client/channel_cc.cc')
-rw-r--r--src/cpp/client/channel_cc.cc64
1 files changed, 62 insertions, 2 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index f2d9bb07c9..27bd75f3cd 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -35,17 +35,77 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
#include "src/core/lib/profiling/timers.h"
namespace grpc {
+namespace {
+void WatchStateChange(void* arg);
+} // namespace
+
+// Constantly watches channel connectivity status to reconnect a transiently
+// disconnected channel. This is a temporary work-around before we have retry
+// support.
+class ChannelConnectivityWatcher {
+ public:
+ ChannelConnectivityWatcher(Channel* channel)
+ : channel_(channel), thd_id_(0), being_destroyed_(0) {}
+
+ void WatchStateChangeImpl() {
+ grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
+ while (state != GRPC_CHANNEL_SHUTDOWN) {
+ if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) {
+ break;
+ }
+ channel_->WaitForStateChange(
+ state, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)));
+ state = channel_->GetState(false);
+ }
+ }
+ void StartWatching() {
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
+ }
+
+ void Destroy() {
+ if (thd_id_ != 0) {
+ gpr_atm_no_barrier_store(&being_destroyed_, 1);
+ gpr_thd_join(thd_id_);
+ }
+ }
+
+ private:
+ Channel* channel_;
+ gpr_thd_id thd_id_;
+ gpr_atm being_destroyed_;
+};
+
+namespace {
+void WatchStateChange(void* arg) {
+ ChannelConnectivityWatcher* watcher =
+ static_cast<ChannelConnectivityWatcher*>(arg);
+ watcher->WatchStateChangeImpl();
+}
+} // namespace
+
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
- : host_(host), c_channel_(channel) {
+ : connectivity_watcher_(new ChannelConnectivityWatcher(this)),
+ host_(host),
+ c_channel_(channel) {
g_gli_initializer.summon();
+ if (host != "inproc") {
+ connectivity_watcher_->StartWatching();
+ }
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ connectivity_watcher_->Destroy();
+ grpc_channel_destroy(c_channel_);
+}
namespace {