aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-08-24 15:34:52 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-08-24 15:34:52 -0700
commitddaef3e32581ef9c3078f402d2a08d7dcc781272 (patch)
tree2452f6117183bf07dcb4ebf9e67e39bf7359e042
parentf1d50983ae61fe0be7a284b4cbd0beb287b0f6a8 (diff)
Remove non-POD global variables
-rw-r--r--doc/environment_variables.md4
-rw-r--r--src/cpp/client/channel_cc.cc94
2 files changed, 63 insertions, 35 deletions
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index bb351cbdcc..0123f3f25d 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -118,4 +118,6 @@ some configuration as environment variables that can be set.
The channel connectivity watcher uses one extra thread to check the channel
state every 500 ms on the client side. It can help reconnect disconnected
client channels (mostly due to idleness), so that the next RPC on this channel
- won't fail. Set to 1 to turn off this watcher and save a thread.
+ won't fail. Set to 1 to turn off this watcher and save a thread. Please note
+ this is a temporary work-around, it will be removed in the future once we have
+ support for automatically reestablishing failed connections.
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 865fecfe22..418b4fa2fd 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -41,12 +41,14 @@
#include <grpc/support/useful.h>
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
namespace grpc {
namespace {
int kConnectivityCheckIntervalMsec = 500;
void WatchStateChange(void* arg);
+void InitConnectivityWatcherOnce();
class TagSaver final : public CompletionQueueTag {
public:
@@ -71,10 +73,9 @@ class ChannelConnectivityWatcher {
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
bool disabled = false;
if (env != nullptr) {
- static const char* truthy[] = {"yes", "Yes", "YES", "true",
- "True", "TRUE", "1"};
+ static const char* truthy[] = {"yes", "true", "1"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
- if (0 == strcmp(env, truthy[i])) {
+ if (0 == gpr_stricmp(env, truthy[i])) {
disabled = true;
break;
}
@@ -82,54 +83,69 @@ class ChannelConnectivityWatcher {
}
gpr_free(env);
if (!disabled) {
+ gpr_ref_init(&ref_, 0);
gpr_thd_options options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&options);
+ gpr_thd_options_set_detached(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
}
}
- ~ChannelConnectivityWatcher() {
- cq_.Shutdown();
- if (thd_id_ != 0) {
- gpr_thd_join(thd_id_);
- }
- }
-
void WatchStateChangeImpl() {
bool ok = false;
void* tag = NULL;
CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
- while (status != CompletionQueue::SHUTDOWN) {
+ while (true) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
// Make sure we've seen 2 TIMEOUTs before going to sleep
if (status == CompletionQueue::TIMEOUT) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ if (status == CompletionQueue::TIMEOUT) {
+ gpr_sleep_until(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(kConnectivityCheckIntervalMsec,
+ GPR_TIMESPAN)));
+ continue;
+ }
}
- if (status == CompletionQueue::TIMEOUT) {
- gpr_sleep_until(
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(kConnectivityCheckIntervalMsec,
- GPR_TIMESPAN)));
- } else if (status == CompletionQueue::GOT_EVENT) {
- ChannelState* channel_state = static_cast<ChannelState*>(tag);
- channel_state->state = grpc_channel_check_connectivity_state(
- channel_state->channel, false);
- if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
- void* shutdown_tag = NULL;
- channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
- delete channel_state;
- } else {
- TagSaver* tag_saver = new TagSaver(channel_state);
- grpc_channel_watch_connectivity_state(
- channel_state->channel, channel_state->state,
- gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
+ ChannelState* channel_state = static_cast<ChannelState*>(tag);
+ channel_state->state =
+ grpc_channel_check_connectivity_state(channel_state->channel, false);
+ if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
+ void* shutdown_tag = NULL;
+ channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
+ delete channel_state;
+ if (gpr_unref(&ref_)) {
+ gpr_mu_lock(&g_watcher_mu_);
+ delete g_watcher_;
+ g_watcher_ = nullptr;
+ gpr_mu_unlock(&g_watcher_mu_);
+ break;
}
+ } else {
+ TagSaver* tag_saver = new TagSaver(channel_state);
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
}
}
}
- void StartWatching(grpc_channel* channel) {
+ static void StartWatching(grpc_channel* channel) {
+ gpr_once_init(&g_connectivity_watcher_once_, InitConnectivityWatcherOnce);
+ gpr_mu_lock(&g_watcher_mu_);
+ if (g_watcher_ == nullptr) {
+ g_watcher_ = new ChannelConnectivityWatcher();
+ }
+ g_watcher_->StartWatchingLocked(channel);
+ gpr_mu_unlock(&g_watcher_mu_);
+ }
+
+ static void InitOnce() { gpr_mu_init(&g_watcher_mu_); }
+
+ private:
+ void StartWatchingLocked(grpc_channel* channel) {
if (thd_id_ != 0) {
+ gpr_ref(&ref_);
ChannelState* channel_state = new ChannelState(channel);
// The first grpc_channel_watch_connectivity_state() is not used to
// monitor the channel state change, but to hold a reference of the
@@ -146,7 +162,6 @@ class ChannelConnectivityWatcher {
}
}
- private:
struct ChannelState {
explicit ChannelState(grpc_channel* channel)
: channel(channel), state(GRPC_CHANNEL_IDLE){};
@@ -156,15 +171,26 @@ class ChannelConnectivityWatcher {
};
gpr_thd_id thd_id_;
CompletionQueue cq_;
- CompletionQueue shutdown_cq_;
+ gpr_refcount ref_;
+
+ static gpr_once g_connectivity_watcher_once_;
+ static gpr_mu g_watcher_mu_;
+ static ChannelConnectivityWatcher* g_watcher_;
};
+gpr_once ChannelConnectivityWatcher::g_connectivity_watcher_once_ =
+ GPR_ONCE_INIT;
+gpr_mu ChannelConnectivityWatcher::g_watcher_mu_;
+ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
+
void WatchStateChange(void* arg) {
ChannelConnectivityWatcher* watcher =
static_cast<ChannelConnectivityWatcher*>(arg);
watcher->WatchStateChangeImpl();
}
+void InitConnectivityWatcherOnce() { ChannelConnectivityWatcher::InitOnce(); };
+
ChannelConnectivityWatcher channel_connectivity_watcher;
} // namespace
@@ -173,7 +199,7 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
if (grpc_channel_support_connectivity_watcher(channel)) {
- channel_connectivity_watcher.StartWatching(channel);
+ ChannelConnectivityWatcher::StartWatching(channel);
}
}