aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel_cc.cc202
-rw-r--r--src/cpp/common/channel_filter.cc4
-rw-r--r--src/cpp/common/channel_filter.h11
-rw-r--r--src/cpp/util/core_stats.cc90
-rw-r--r--src/cpp/util/core_stats.h35
5 files changed, 310 insertions, 32 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index f2d9bb07c9..19a25c838f 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -18,7 +18,10 @@
#include <grpc++/channel.h>
+#include <chrono>
+#include <condition_variable>
#include <memory>
+#include <mutex>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -35,17 +38,197 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
namespace grpc {
+namespace {
+int kConnectivityCheckIntervalMsec = 500;
+void WatchStateChange(void* arg);
+
+class TagSaver final : public CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() override {}
+ bool FinalizeResult(void** tag, bool* status) override {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+
+ private:
+ void* tag_;
+};
+
+// Constantly watches channel connectivity status to reconnect a transiently
+// disconnected channel. This is a temporary work-around before we have retry
+// support.
+class ChannelConnectivityWatcher : private GrpcLibraryCodegen {
+ public:
+ static void StartWatching(grpc_channel* channel) {
+ if (!IsDisabled()) {
+ std::unique_lock<std::mutex> lock(g_watcher_mu_);
+ if (g_watcher_ == nullptr) {
+ g_watcher_ = new ChannelConnectivityWatcher();
+ }
+ g_watcher_->StartWatchingLocked(channel);
+ }
+ }
+
+ static void StopWatching() {
+ if (!IsDisabled()) {
+ std::unique_lock<std::mutex> lock(g_watcher_mu_);
+ if (g_watcher_->StopWatchingLocked()) {
+ delete g_watcher_;
+ g_watcher_ = nullptr;
+ }
+ }
+ }
+
+ private:
+ ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) {
+ gpr_ref_init(&ref_, 0);
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
+ }
+
+ static bool IsDisabled() {
+ char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
+ bool disabled = gpr_is_true(env);
+ gpr_free(env);
+ return disabled;
+ }
+
+ void WatchStateChangeImpl() {
+ bool ok = false;
+ void* tag = NULL;
+ CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(shutdown_mu_);
+ if (shutdown_) {
+ // Drain cq_ if the watcher is shutting down
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME));
+ } else {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ // Make sure we've seen 2 TIMEOUTs before going to sleep
+ if (status == CompletionQueue::TIMEOUT) {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ if (status == CompletionQueue::TIMEOUT) {
+ shutdown_cv_.wait_for(lock, std::chrono::milliseconds(
+ kConnectivityCheckIntervalMsec));
+ continue;
+ }
+ }
+ }
+ }
+ ChannelState* channel_state = static_cast<ChannelState*>(tag);
+ channel_state->state =
+ grpc_channel_check_connectivity_state(channel_state->channel, false);
+ if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
+ void* shutdown_tag = NULL;
+ channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
+ delete channel_state;
+ if (gpr_unref(&ref_)) {
+ break;
+ }
+ } else {
+ TagSaver* tag_saver = new TagSaver(channel_state);
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
+ }
+ }
+ }
+
+ void StartWatchingLocked(grpc_channel* channel) {
+ if (thd_id_ != 0) {
+ gpr_ref(&ref_);
+ ++channel_count_;
+ ChannelState* channel_state = new ChannelState(channel);
+ // The first grpc_channel_watch_connectivity_state() is not used to
+ // monitor the channel state change, but to hold a reference of the
+ // c channel. So that WatchStateChangeImpl() can observe state ==
+ // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed.
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
+ new TagSaver(nullptr));
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
+ new TagSaver(channel_state));
+ }
+ }
+
+ bool StopWatchingLocked() {
+ if (--channel_count_ == 0) {
+ {
+ std::unique_lock<std::mutex> lock(shutdown_mu_);
+ shutdown_ = true;
+ shutdown_cv_.notify_one();
+ }
+ gpr_thd_join(thd_id_);
+ return true;
+ }
+ return false;
+ }
+
+ friend void WatchStateChange(void* arg);
+ struct ChannelState {
+ explicit ChannelState(grpc_channel* channel)
+ : channel(channel), state(GRPC_CHANNEL_IDLE){};
+ grpc_channel* channel;
+ grpc_connectivity_state state;
+ CompletionQueue shutdown_cq;
+ };
+ gpr_thd_id thd_id_;
+ CompletionQueue cq_;
+ gpr_refcount ref_;
+ int channel_count_;
+
+ std::mutex shutdown_mu_;
+ std::condition_variable shutdown_cv_; // protected by shutdown_mu_
+ bool shutdown_; // protected by shutdown_mu_
+
+ static std::mutex g_watcher_mu_;
+ static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_
+};
+
+std::mutex ChannelConnectivityWatcher::g_watcher_mu_;
+ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
+
+void WatchStateChange(void* arg) {
+ ChannelConnectivityWatcher* watcher =
+ static_cast<ChannelConnectivityWatcher*>(arg);
+ watcher->WatchStateChangeImpl();
+}
+} // namespace
+
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
+ if (grpc_channel_support_connectivity_watcher(channel)) {
+ ChannelConnectivityWatcher::StartWatching(channel);
+ }
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ const bool stop_watching =
+ grpc_channel_support_connectivity_watcher(c_channel_);
+ grpc_channel_destroy(c_channel_);
+ if (stop_watching) {
+ ChannelConnectivityWatcher::StopWatching();
+ }
+}
namespace {
@@ -130,23 +313,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
-namespace {
-class TagSaver final : public CompletionQueueTag {
- public:
- explicit TagSaver(void* tag) : tag_(tag) {}
- ~TagSaver() override {}
- bool FinalizeResult(void** tag, bool* status) override {
- *tag = tag_;
- delete this;
- return true;
- }
-
- private:
- void* tag_;
-};
-
-} // namespace
-
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc
index 448d9fbcf2..ea44cff832 100644
--- a/src/cpp/common/channel_filter.cc
+++ b/src/cpp/common/channel_filter.cc
@@ -68,10 +68,6 @@ void CallData::SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx,
grpc_call_stack_ignore_set_pollset_or_pollset_set(exec_ctx, elem, pollent);
}
-char *CallData::GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return grpc_call_next_get_peer(exec_ctx, elem);
-}
-
// internal code used by RegisterChannelFilter()
namespace internal {
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index c3d187d7e1..c1aeb3f724 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -268,9 +268,6 @@ class CallData {
virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
-
- /// Gets the peer name.
- virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
};
namespace internal {
@@ -349,11 +346,6 @@ class ChannelFilter final {
CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data);
call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent);
}
-
- static char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data);
- return call_data->GetPeer(exec_ctx, elem);
- }
};
struct FilterRecord {
@@ -396,8 +388,7 @@ void RegisterChannelFilter(
FilterType::call_data_size, FilterType::InitCallElement,
FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement,
FilterType::channel_data_size, FilterType::InitChannelElement,
- FilterType::DestroyChannelElement, FilterType::GetPeer,
- FilterType::GetChannelInfo, name}};
+ FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}};
internal::channel_filters->push_back(filter_record);
}
diff --git a/src/cpp/util/core_stats.cc b/src/cpp/util/core_stats.cc
new file mode 100644
index 0000000000..edf0b1bb67
--- /dev/null
+++ b/src/cpp/util/core_stats.cc
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/cpp/util/core_stats.h"
+
+#include <grpc/support/log.h>
+
+using grpc::core::Bucket;
+using grpc::core::Histogram;
+using grpc::core::Metric;
+using grpc::core::Stats;
+
+namespace grpc {
+
+void CoreStatsToProto(const grpc_stats_data& core, Stats* proto) {
+ for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) {
+ Metric* m = proto->add_metrics();
+ m->set_name(grpc_stats_counter_name[i]);
+ m->set_count(core.counters[i]);
+ }
+ for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) {
+ Metric* m = proto->add_metrics();
+ m->set_name(grpc_stats_histogram_name[i]);
+ Histogram* h = m->mutable_histogram();
+ for (int j = 0; j < grpc_stats_histo_buckets[i]; j++) {
+ Bucket* b = h->add_buckets();
+ b->set_start(grpc_stats_histo_bucket_boundaries[i][j]);
+ b->set_count(core.histograms[grpc_stats_histo_start[i] + j]);
+ }
+ }
+}
+
+void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core) {
+ memset(core, 0, sizeof(*core));
+ for (const auto& m : proto.metrics()) {
+ switch (m.value_case()) {
+ case Metric::VALUE_NOT_SET:
+ break;
+ case Metric::kCount:
+ for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) {
+ if (m.name() == grpc_stats_counter_name[i]) {
+ core->counters[i] = m.count();
+ break;
+ }
+ }
+ break;
+ case Metric::kHistogram:
+ for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) {
+ if (m.name() == grpc_stats_histogram_name[i]) {
+ const auto& h = m.histogram();
+ bool valid = true;
+ if (grpc_stats_histo_buckets[i] != h.buckets_size()) valid = false;
+ for (int j = 0; valid && j < h.buckets_size(); j++) {
+ if (grpc_stats_histo_bucket_boundaries[i][j] !=
+ h.buckets(j).start()) {
+ valid = false;
+ }
+ }
+ if (!valid) {
+ gpr_log(GPR_ERROR,
+ "Found histogram %s but shape is different from proto",
+ m.name().c_str());
+ }
+ for (int j = 0; valid && j < h.buckets_size(); j++) {
+ core->histograms[grpc_stats_histo_start[i] + j] =
+ h.buckets(j).count();
+ }
+ }
+ }
+ break;
+ }
+ }
+}
+
+} // namespace grpc
diff --git a/src/cpp/util/core_stats.h b/src/cpp/util/core_stats.h
new file mode 100644
index 0000000000..00e38bf266
--- /dev/null
+++ b/src/cpp/util/core_stats.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H
+#define GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H
+
+#include "src/proto/grpc/core/stats.pb.h"
+
+extern "C" {
+#include "src/core/lib/debug/stats.h"
+}
+
+namespace grpc {
+
+void CoreStatsToProto(const grpc_stats_data& core, grpc::core::Stats* proto);
+void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core);
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H