aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/client/channel_cc.cc
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-08-24 03:07:01 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-08-24 03:13:42 -0700
commitb4481a9a137b26d78228926922062f835d9a1606 (patch)
tree3c77583ba9eb29cab26a4b2f2afbffa1ad8b547d /src/cpp/client/channel_cc.cc
parentbfb4e06e3c209907e9e4e15f915b4a3cd318f42c (diff)
Share one monitoring thread between channels
Diffstat (limited to 'src/cpp/client/channel_cc.cc')
-rw-r--r--src/cpp/client/channel_cc.cc130
1 files changed, 76 insertions, 54 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index ffe88df732..f06d25b0d9 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -43,8 +43,23 @@
namespace grpc {
namespace {
-int kConnectivityCheckIntervalMsec = 100;
+int kConnectivityCheckIntervalMsec = 500;
void WatchStateChange(void* arg);
+
+class TagSaver final : public CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() override {}
+ bool FinalizeResult(void** tag, bool* status) override {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+
+ private:
+ void* tag_;
+};
+
} // namespace
// Constantly watches channel connectivity status to reconnect a transiently
@@ -52,55 +67,80 @@ void WatchStateChange(void* arg);
// support.
class ChannelConnectivityWatcher {
public:
- explicit ChannelConnectivityWatcher(Channel* channel)
- : channel_(channel), thd_id_(0) {}
+ ChannelConnectivityWatcher() {
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
+ }
+
+ ~ChannelConnectivityWatcher() {
+ cq_.Shutdown();
+ if (thd_id_ != 0) {
+ gpr_thd_join(thd_id_);
+ }
+ }
void WatchStateChangeImpl() {
- grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
bool ok = false;
void* tag = NULL;
- while (state != GRPC_CHANNEL_SHUTDOWN) {
- channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME),
- &cq_, NULL);
- while (cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)) ==
- CompletionQueue::TIMEOUT) {
+ CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
+ while (status != CompletionQueue::SHUTDOWN) {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ // Make sure we've seen 2 TIMEOUTs before going to sleep
+ if (status == CompletionQueue::TIMEOUT) {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ }
+ if (status == CompletionQueue::TIMEOUT) {
gpr_sleep_until(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(kConnectivityCheckIntervalMsec,
+ gpr_time_from_millis(kConnectivityCheckIntervalMsec,
GPR_TIMESPAN)));
+ } else if (status == CompletionQueue::GOT_EVENT) {
+ ChannelState* channel_state = static_cast<ChannelState*>(tag);
+ channel_state->state = grpc_channel_check_connectivity_state(
+ channel_state->channel, false);
+ if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
+ void* shutdown_tag = NULL;
+ channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
+ delete channel_state;
+ } else {
+ TagSaver* tag_saver = new TagSaver(channel_state);
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
+ }
}
- state = channel_->GetState(false);
}
}
- void StartWatching() {
+ void StartWatching(grpc_channel* channel) {
const char* disabled_str =
std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
if (disabled_str == nullptr || strcmp(disabled_str, "1")) {
- // This NotifyOnstateChange() is not used to monitor the channel state
- // change, but to hold a reference of the c channel. So that
- // WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN
- // without holding any lock on the channel object.
- channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE,
- gpr_inf_future(GPR_CLOCK_REALTIME),
- &shutdown_cq_, NULL);
- gpr_thd_options options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&options);
- gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
+ ChannelState* channel_state = new ChannelState(channel);
+ // The first grpc_channel_watch_connectivity_state() is not used to
+ // monitor the channel state change, but to hold a reference of the
+ // c channel. So that WatchStateChangeImpl() can observe state ==
+ // GRPC_CHANNEL_SHUTDOWN without holding any lock on the channel object.
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
+ new TagSaver(nullptr));
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
+ new TagSaver(channel_state));
}
}
- void Destroy() {
- if (thd_id_ != 0) {
- gpr_thd_join(thd_id_);
- }
- bool ok = false;
- void* tag = NULL;
- shutdown_cq_.Next(&tag, &ok);
- }
-
private:
- Channel* channel_;
+ struct ChannelState {
+ explicit ChannelState(grpc_channel* channel)
+ : channel(channel), state(GRPC_CHANNEL_IDLE){};
+ grpc_channel* channel;
+ grpc_connectivity_state state;
+ CompletionQueue shutdown_cq;
+ };
gpr_thd_id thd_id_;
CompletionQueue cq_;
CompletionQueue shutdown_cq_;
@@ -112,22 +152,21 @@ void WatchStateChange(void* arg) {
static_cast<ChannelConnectivityWatcher*>(arg);
watcher->WatchStateChangeImpl();
}
+
+ChannelConnectivityWatcher channel_connectivity_watcher;
} // namespace
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
- : connectivity_watcher_(new ChannelConnectivityWatcher(this)),
- host_(host),
- c_channel_(channel) {
+ : host_(host), c_channel_(channel) {
g_gli_initializer.summon();
if (grpc_channel_support_connectivity_watcher(channel)) {
- connectivity_watcher_->StartWatching();
+ channel_connectivity_watcher.StartWatching(channel);
}
}
Channel::~Channel() {
grpc_channel_destroy(c_channel_);
- connectivity_watcher_->Destroy();
}
namespace {
@@ -213,23 +252,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
-namespace {
-class TagSaver final : public CompletionQueueTag {
- public:
- explicit TagSaver(void* tag) : tag_(tag) {}
- ~TagSaver() override {}
- bool FinalizeResult(void** tag, bool* status) override {
- *tag = tag_;
- delete this;
- return true;
- }
-
- private:
- void* tag_;
-};
-
-} // namespace
-
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {