diff options
author | Mark D. Roth <roth@google.com> | 2017-09-08 08:01:29 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2017-09-08 08:01:29 -0700 |
commit | 0aee498525fb5344b5fabe4ac1b6df878a6454ee (patch) | |
tree | c5ffbe35a5b88f6f53172dd2276b09a834060c30 /src/cpp | |
parent | c7388e5d82124bc4c9c83532a710d8b3e1c2c640 (diff) | |
parent | b9bac8e0c60235dbab5774fe839dca09ab2f8ca2 (diff) |
Merge remote-tracking branch 'upstream/master' into plugin_credentials_api_fix
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 202 | ||||
-rw-r--r-- | src/cpp/common/channel_filter.cc | 4 | ||||
-rw-r--r-- | src/cpp/common/channel_filter.h | 11 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 9 | ||||
-rw-r--r-- | src/cpp/util/core_stats.cc | 90 | ||||
-rw-r--r-- | src/cpp/util/core_stats.h | 35 |
6 files changed, 318 insertions, 33 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c9..19a25c838f 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -18,7 +18,10 @@ #include <grpc++/channel.h> +#include <chrono> +#include <condition_variable> #include <memory> +#include <mutex> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> @@ -35,17 +38,197 @@ #include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" namespace grpc { +namespace { +int kConnectivityCheckIntervalMsec = 500; +void WatchStateChange(void* arg); + +class TagSaver final : public CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { + *tag = tag_; + delete this; + return true; + } + + private: + void* tag_; +}; + +// Constantly watches channel connectivity status to reconnect a transiently +// disconnected channel. This is a temporary work-around before we have retry +// support. +class ChannelConnectivityWatcher : private GrpcLibraryCodegen { + public: + static void StartWatching(grpc_channel* channel) { + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); + if (g_watcher_ == nullptr) { + g_watcher_ = new ChannelConnectivityWatcher(); + } + g_watcher_->StartWatchingLocked(channel); + } + } + + static void StopWatching() { + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); + if (g_watcher_->StopWatchingLocked()) { + delete g_watcher_; + g_watcher_ = nullptr; + } + } + } + + private: + ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) { + gpr_ref_init(&ref_, 0); + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + static bool IsDisabled() { + char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + bool disabled = gpr_is_true(env); + gpr_free(env); + return disabled; + } + + void WatchStateChangeImpl() { + bool ok = false; + void* tag = NULL; + CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; + while (true) { + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (shutdown_) { + // Drain cq_ if the watcher is shutting down + status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)); + } else { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + if (status == CompletionQueue::TIMEOUT) { + shutdown_cv_.wait_for(lock, std::chrono::milliseconds( + kConnectivityCheckIntervalMsec)); + continue; + } + } + } + } + ChannelState* channel_state = static_cast<ChannelState*>(tag); + channel_state->state = + grpc_channel_check_connectivity_state(channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + if (gpr_unref(&ref_)) { + break; + } + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + } + } + } + + void StartWatchingLocked(grpc_channel* channel) { + if (thd_id_ != 0) { + gpr_ref(&ref_); + ++channel_count_; + ChannelState* channel_state = new ChannelState(channel); + // The first grpc_channel_watch_connectivity_state() is not used to + // monitor the channel state change, but to hold a reference of the + // c channel. So that WatchStateChangeImpl() can observe state == + // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed. + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), + new TagSaver(nullptr)); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), + new TagSaver(channel_state)); + } + } + + bool StopWatchingLocked() { + if (--channel_count_ == 0) { + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + shutdown_ = true; + shutdown_cv_.notify_one(); + } + gpr_thd_join(thd_id_); + return true; + } + return false; + } + + friend void WatchStateChange(void* arg); + struct ChannelState { + explicit ChannelState(grpc_channel* channel) + : channel(channel), state(GRPC_CHANNEL_IDLE){}; + grpc_channel* channel; + grpc_connectivity_state state; + CompletionQueue shutdown_cq; + }; + gpr_thd_id thd_id_; + CompletionQueue cq_; + gpr_refcount ref_; + int channel_count_; + + std::mutex shutdown_mu_; + std::condition_variable shutdown_cv_; // protected by shutdown_mu_ + bool shutdown_; // protected by shutdown_mu_ + + static std::mutex g_watcher_mu_; + static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_ +}; + +std::mutex ChannelConnectivityWatcher::g_watcher_mu_; +ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; + +void WatchStateChange(void* arg) { + ChannelConnectivityWatcher* watcher = + static_cast<ChannelConnectivityWatcher*>(arg); + watcher->WatchStateChangeImpl(); +} +} // namespace + static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) : host_(host), c_channel_(channel) { g_gli_initializer.summon(); + if (grpc_channel_support_connectivity_watcher(channel)) { + ChannelConnectivityWatcher::StartWatching(channel); + } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + const bool stop_watching = + grpc_channel_support_connectivity_watcher(c_channel_); + grpc_channel_destroy(c_channel_); + if (stop_watching) { + ChannelConnectivityWatcher::StopWatching(); + } +} namespace { @@ -130,23 +313,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } -namespace { -class TagSaver final : public CompletionQueueTag { - public: - explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* status) override { - *tag = tag_; - delete this; - return true; - } - - private: - void* tag_; -}; - -} // namespace - void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, void* tag) { diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index 448d9fbcf2..ea44cff832 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -68,10 +68,6 @@ void CallData::SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, grpc_call_stack_ignore_set_pollset_or_pollset_set(exec_ctx, elem, pollent); } -char *CallData::GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return grpc_call_next_get_peer(exec_ctx, elem); -} - // internal code used by RegisterChannelFilter() namespace internal { diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index c3d187d7e1..c1aeb3f724 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -268,9 +268,6 @@ class CallData { virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_polling_entity *pollent); - - /// Gets the peer name. - virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); }; namespace internal { @@ -349,11 +346,6 @@ class ChannelFilter final { CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data); call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent); } - - static char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data); - return call_data->GetPeer(exec_ctx, elem); - } }; struct FilterRecord { @@ -396,8 +388,7 @@ void RegisterChannelFilter( FilterType::call_data_size, FilterType::InitCallElement, FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement, FilterType::channel_data_size, FilterType::InitChannelElement, - FilterType::DestroyChannelElement, FilterType::GetPeer, - FilterType::GetChannelInfo, name}}; + FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}}; internal::channel_filters->push_back(filter_record); } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 2483300cb1..6bd3ecda32 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -17,6 +17,7 @@ #include <grpc++/server.h> +#include <cstdlib> #include <sstream> #include <utility> @@ -38,6 +39,7 @@ #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/surface/call.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" @@ -607,7 +609,12 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { grpc_op cops[MAX_OPS]; ops->FillOps(call->call(), cops, &nops); auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); - GPR_ASSERT(GRPC_CALL_OK == result); + if (result != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); + grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR, + call->call(), cops, nops, ops); + abort(); + } } ServerInterface::BaseAsyncRequest::BaseAsyncRequest( diff --git a/src/cpp/util/core_stats.cc b/src/cpp/util/core_stats.cc new file mode 100644 index 0000000000..edf0b1bb67 --- /dev/null +++ b/src/cpp/util/core_stats.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/cpp/util/core_stats.h" + +#include <grpc/support/log.h> + +using grpc::core::Bucket; +using grpc::core::Histogram; +using grpc::core::Metric; +using grpc::core::Stats; + +namespace grpc { + +void CoreStatsToProto(const grpc_stats_data& core, Stats* proto) { + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + Metric* m = proto->add_metrics(); + m->set_name(grpc_stats_counter_name[i]); + m->set_count(core.counters[i]); + } + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + Metric* m = proto->add_metrics(); + m->set_name(grpc_stats_histogram_name[i]); + Histogram* h = m->mutable_histogram(); + for (int j = 0; j < grpc_stats_histo_buckets[i]; j++) { + Bucket* b = h->add_buckets(); + b->set_start(grpc_stats_histo_bucket_boundaries[i][j]); + b->set_count(core.histograms[grpc_stats_histo_start[i] + j]); + } + } +} + +void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core) { + memset(core, 0, sizeof(*core)); + for (const auto& m : proto.metrics()) { + switch (m.value_case()) { + case Metric::VALUE_NOT_SET: + break; + case Metric::kCount: + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + if (m.name() == grpc_stats_counter_name[i]) { + core->counters[i] = m.count(); + break; + } + } + break; + case Metric::kHistogram: + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + if (m.name() == grpc_stats_histogram_name[i]) { + const auto& h = m.histogram(); + bool valid = true; + if (grpc_stats_histo_buckets[i] != h.buckets_size()) valid = false; + for (int j = 0; valid && j < h.buckets_size(); j++) { + if (grpc_stats_histo_bucket_boundaries[i][j] != + h.buckets(j).start()) { + valid = false; + } + } + if (!valid) { + gpr_log(GPR_ERROR, + "Found histogram %s but shape is different from proto", + m.name().c_str()); + } + for (int j = 0; valid && j < h.buckets_size(); j++) { + core->histograms[grpc_stats_histo_start[i] + j] = + h.buckets(j).count(); + } + } + } + break; + } + } +} + +} // namespace grpc diff --git a/src/cpp/util/core_stats.h b/src/cpp/util/core_stats.h new file mode 100644 index 0000000000..00e38bf266 --- /dev/null +++ b/src/cpp/util/core_stats.h @@ -0,0 +1,35 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H +#define GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H + +#include "src/proto/grpc/core/stats.pb.h" + +extern "C" { +#include "src/core/lib/debug/stats.h" +} + +namespace grpc { + +void CoreStatsToProto(const grpc_stats_data& core, grpc::core::Stats* proto); +void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core); + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H |