aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-08-24 15:34:52 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-08-24 15:34:52 -0700
commitddaef3e32581ef9c3078f402d2a08d7dcc781272 (patch)
tree2452f6117183bf07dcb4ebf9e67e39bf7359e042 /src
parentf1d50983ae61fe0be7a284b4cbd0beb287b0f6a8 (diff)
Remove non-POD global variables
Diffstat (limited to 'src')
-rw-r--r--src/cpp/client/channel_cc.cc94
1 files changed, 60 insertions, 34 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 865fecfe22..418b4fa2fd 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -41,12 +41,14 @@
#include <grpc/support/useful.h>
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
namespace grpc {
namespace {
int kConnectivityCheckIntervalMsec = 500;
void WatchStateChange(void* arg);
+void InitConnectivityWatcherOnce();
class TagSaver final : public CompletionQueueTag {
public:
@@ -71,10 +73,9 @@ class ChannelConnectivityWatcher {
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
bool disabled = false;
if (env != nullptr) {
- static const char* truthy[] = {"yes", "Yes", "YES", "true",
- "True", "TRUE", "1"};
+ static const char* truthy[] = {"yes", "true", "1"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
- if (0 == strcmp(env, truthy[i])) {
+ if (0 == gpr_stricmp(env, truthy[i])) {
disabled = true;
break;
}
@@ -82,54 +83,69 @@ class ChannelConnectivityWatcher {
}
gpr_free(env);
if (!disabled) {
+ gpr_ref_init(&ref_, 0);
gpr_thd_options options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&options);
+ gpr_thd_options_set_detached(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
}
}
- ~ChannelConnectivityWatcher() {
- cq_.Shutdown();
- if (thd_id_ != 0) {
- gpr_thd_join(thd_id_);
- }
- }
-
void WatchStateChangeImpl() {
bool ok = false;
void* tag = NULL;
CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
- while (status != CompletionQueue::SHUTDOWN) {
+ while (true) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
// Make sure we've seen 2 TIMEOUTs before going to sleep
if (status == CompletionQueue::TIMEOUT) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ if (status == CompletionQueue::TIMEOUT) {
+ gpr_sleep_until(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(kConnectivityCheckIntervalMsec,
+ GPR_TIMESPAN)));
+ continue;
+ }
}
- if (status == CompletionQueue::TIMEOUT) {
- gpr_sleep_until(
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(kConnectivityCheckIntervalMsec,
- GPR_TIMESPAN)));
- } else if (status == CompletionQueue::GOT_EVENT) {
- ChannelState* channel_state = static_cast<ChannelState*>(tag);
- channel_state->state = grpc_channel_check_connectivity_state(
- channel_state->channel, false);
- if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
- void* shutdown_tag = NULL;
- channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
- delete channel_state;
- } else {
- TagSaver* tag_saver = new TagSaver(channel_state);
- grpc_channel_watch_connectivity_state(
- channel_state->channel, channel_state->state,
- gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
+ ChannelState* channel_state = static_cast<ChannelState*>(tag);
+ channel_state->state =
+ grpc_channel_check_connectivity_state(channel_state->channel, false);
+ if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
+ void* shutdown_tag = NULL;
+ channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
+ delete channel_state;
+ if (gpr_unref(&ref_)) {
+ gpr_mu_lock(&g_watcher_mu_);
+ delete g_watcher_;
+ g_watcher_ = nullptr;
+ gpr_mu_unlock(&g_watcher_mu_);
+ break;
}
+ } else {
+ TagSaver* tag_saver = new TagSaver(channel_state);
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
}
}
}
- void StartWatching(grpc_channel* channel) {
+ static void StartWatching(grpc_channel* channel) {
+ gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce);
+ gpr_mu_lock(&g_watcher_mu_);
+ if (g_watcher_ == nullptr) {
+ g_watcher_ = new ChannelConnectivityWatcher();
+ }
+ g_watcher_->StartWatchingLocked(channel);
+ gpr_mu_unlock(&g_watcher_mu_);
+ }
+
+ static void InitOnce() { gpr_mu_init(&g_watcher_mu_); }
+
+ private:
+ void StartWatchingLocked(grpc_channel* channel) {
if (thd_id_ != 0) {
+ gpr_ref(&ref_);
ChannelState* channel_state = new ChannelState(channel);
// The first grpc_channel_watch_connectivity_state() is not used to
// monitor the channel state change, but to hold a reference of the
@@ -146,7 +162,6 @@ class ChannelConnectivityWatcher {
}
}
- private:
struct ChannelState {
explicit ChannelState(grpc_channel* channel)
: channel(channel), state(GRPC_CHANNEL_IDLE){};
@@ -156,15 +171,26 @@ class ChannelConnectivityWatcher {
};
gpr_thd_id thd_id_;
CompletionQueue cq_;
- CompletionQueue shutdown_cq_;
+ gpr_refcount ref_;
+
+ static gpr_once g_connectivity_watcher_once_;
+ static gpr_mu g_watcher_mu_;
+ static ChannelConnectivityWatcher* g_watcher_;
};
+gpr_once ChannelConnectivityWatcher::g_connectivity_watcher_once_ =
+ GPR_ONCE_INIT;
+gpr_mu ChannelConnectivityWatcher::g_watcher_mu_;
+ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
+
void WatchStateChange(void* arg) {
ChannelConnectivityWatcher* watcher =
static_cast<ChannelConnectivityWatcher*>(arg);
watcher->WatchStateChangeImpl();
}
+void InitConnectivityWatcherOnce() { ChannelConnectivityWatcher::InitOnce(); };
+
ChannelConnectivityWatcher channel_connectivity_watcher;
} // namespace
@@ -173,7 +199,7 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
if (grpc_channel_support_connectivity_watcher(channel)) {
- channel_connectivity_watcher.StartWatching(channel);
+ ChannelConnectivityWatcher::StartWatching(channel);
}
}