diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-07 13:39:45 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-07 13:39:45 -0700 |
commit | 99bdb388a7e66d3fc6d59b2116c5c9e1ecf4d112 (patch) | |
tree | 313beb8fbf6b639960524184ee5afeff455d81b3 /src | |
parent | 06a1baed61fb9b8fe905667a6f343c672396486e (diff) | |
parent | a8fc020e0f9491effaf2f6cc8770245bd2d6a796 (diff) |
Merge branch 'write_completion' into write_completion_stats
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/filters/client_channel/channel_connectivity.c | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 8 | ||||
-rw-r--r-- | src/core/lib/support/string.c | 13 | ||||
-rw-r--r-- | src/core/lib/support/string.h | 3 | ||||
-rw-r--r-- | src/cpp/client/channel_cc.cc | 202 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 |
7 files changed, 212 insertions, 25 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index b83c95275f..0a9e90d12e 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -191,6 +191,12 @@ static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(wa); } +int grpc_channel_support_connectivity_watcher(grpc_channel *channel) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + return client_channel_elem->filter != &grpc_client_channel_filter ? 0 : 1; +} + void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 3d19953eeb..1feea6d628 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -164,13 +164,7 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { bool grpc_iomgr_abort_on_leaks(void) { char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS"); - if (env == NULL) return false; - static const char *truthy[] = {"yes", "Yes", "YES", "true", - "True", "TRUE", "1"}; - bool should_we = false; - for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { - if (0 == strcmp(env, truthy[i])) should_we = true; - } + bool should_we = gpr_is_true(env); gpr_free(env); return should_we; } diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c index b65009754a..ec93303024 100644 --- a/src/core/lib/support/string.c +++ b/src/core/lib/support/string.c @@ -298,3 +298,16 @@ void *gpr_memrchr(const void *s, int c, size_t n) { } return NULL; } + +bool gpr_is_true(const char *s) { + if (s == NULL) { + return false; + } + static const char *truthy[] = {"yes", "true", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == gpr_stricmp(s, truthy[i])) { + return true; + } + } + return false; +} diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h index e11df8439d..5a56fa3a0a 100644 --- a/src/core/lib/support/string.h +++ b/src/core/lib/support/string.h @@ -19,6 +19,7 @@ #ifndef GRPC_CORE_LIB_SUPPORT_STRING_H #define GRPC_CORE_LIB_SUPPORT_STRING_H +#include <stdbool.h> #include <stddef.h> #include <grpc/support/port_platform.h> @@ -106,6 +107,8 @@ int gpr_stricmp(const char *a, const char *b); void *gpr_memrchr(const void *s, int c, size_t n); +/** Return true if lower(s) equals "true", "yes" or "1", otherwise false. */ +bool gpr_is_true(const char *s); #ifdef __cplusplus } #endif diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c9..19a25c838f 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -18,7 +18,10 @@ #include <grpc++/channel.h> +#include <chrono> +#include <condition_variable> #include <memory> +#include <mutex> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> @@ -35,17 +38,197 @@ #include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" namespace grpc { +namespace { +int kConnectivityCheckIntervalMsec = 500; +void WatchStateChange(void* arg); + +class TagSaver final : public CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { + *tag = tag_; + delete this; + return true; + } + + private: + void* tag_; +}; + +// Constantly watches channel connectivity status to reconnect a transiently +// disconnected channel. This is a temporary work-around before we have retry +// support. +class ChannelConnectivityWatcher : private GrpcLibraryCodegen { + public: + static void StartWatching(grpc_channel* channel) { + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); + if (g_watcher_ == nullptr) { + g_watcher_ = new ChannelConnectivityWatcher(); + } + g_watcher_->StartWatchingLocked(channel); + } + } + + static void StopWatching() { + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); + if (g_watcher_->StopWatchingLocked()) { + delete g_watcher_; + g_watcher_ = nullptr; + } + } + } + + private: + ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) { + gpr_ref_init(&ref_, 0); + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + static bool IsDisabled() { + char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + bool disabled = gpr_is_true(env); + gpr_free(env); + return disabled; + } + + void WatchStateChangeImpl() { + bool ok = false; + void* tag = NULL; + CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; + while (true) { + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (shutdown_) { + // Drain cq_ if the watcher is shutting down + status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)); + } else { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + if (status == CompletionQueue::TIMEOUT) { + shutdown_cv_.wait_for(lock, std::chrono::milliseconds( + kConnectivityCheckIntervalMsec)); + continue; + } + } + } + } + ChannelState* channel_state = static_cast<ChannelState*>(tag); + channel_state->state = + grpc_channel_check_connectivity_state(channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + if (gpr_unref(&ref_)) { + break; + } + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + } + } + } + + void StartWatchingLocked(grpc_channel* channel) { + if (thd_id_ != 0) { + gpr_ref(&ref_); + ++channel_count_; + ChannelState* channel_state = new ChannelState(channel); + // The first grpc_channel_watch_connectivity_state() is not used to + // monitor the channel state change, but to hold a reference of the + // c channel. So that WatchStateChangeImpl() can observe state == + // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed. + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), + new TagSaver(nullptr)); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), + new TagSaver(channel_state)); + } + } + + bool StopWatchingLocked() { + if (--channel_count_ == 0) { + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + shutdown_ = true; + shutdown_cv_.notify_one(); + } + gpr_thd_join(thd_id_); + return true; + } + return false; + } + + friend void WatchStateChange(void* arg); + struct ChannelState { + explicit ChannelState(grpc_channel* channel) + : channel(channel), state(GRPC_CHANNEL_IDLE){}; + grpc_channel* channel; + grpc_connectivity_state state; + CompletionQueue shutdown_cq; + }; + gpr_thd_id thd_id_; + CompletionQueue cq_; + gpr_refcount ref_; + int channel_count_; + + std::mutex shutdown_mu_; + std::condition_variable shutdown_cv_; // protected by shutdown_mu_ + bool shutdown_; // protected by shutdown_mu_ + + static std::mutex g_watcher_mu_; + static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_ +}; + +std::mutex ChannelConnectivityWatcher::g_watcher_mu_; +ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; + +void WatchStateChange(void* arg) { + ChannelConnectivityWatcher* watcher = + static_cast<ChannelConnectivityWatcher*>(arg); + watcher->WatchStateChangeImpl(); +} +} // namespace + static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) : host_(host), c_channel_(channel) { g_gli_initializer.summon(); + if (grpc_channel_support_connectivity_watcher(channel)) { + ChannelConnectivityWatcher::StartWatching(channel); + } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + const bool stop_watching = + grpc_channel_support_connectivity_watcher(c_channel_); + grpc_channel_destroy(c_channel_); + if (stop_watching) { + ChannelConnectivityWatcher::StopWatching(); + } +} namespace { @@ -130,23 +313,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } -namespace { -class TagSaver final : public CompletionQueueTag { - public: - explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* status) override { - *tag = tag_; - delete this; - return true; - } - - private: - void* tag_; -}; - -} // namespace - void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, void* tag) { diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 9671d794c5..0402ce34fb 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -93,6 +93,7 @@ grpc_alarm_destroy_type grpc_alarm_destroy_import; grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import; grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; +grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import; grpc_channel_create_call_type grpc_channel_create_call_import; grpc_channel_ping_type grpc_channel_ping_import; grpc_channel_register_call_type grpc_channel_register_call_import; @@ -399,6 +400,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state"); grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers"); grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state"); + grpc_channel_support_connectivity_watcher_import = (grpc_channel_support_connectivity_watcher_type) GetProcAddress(library, "grpc_channel_support_connectivity_watcher"); grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call"); grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping"); grpc_channel_register_call_import = (grpc_channel_register_call_type) GetProcAddress(library, "grpc_channel_register_call"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index b64199be8e..e3704e592b 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -260,6 +260,9 @@ extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_ext typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag); extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; #define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import +typedef int(*grpc_channel_support_connectivity_watcher_type)(grpc_channel *channel); +extern grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import; +#define grpc_channel_support_connectivity_watcher grpc_channel_support_connectivity_watcher_import typedef grpc_call *(*grpc_channel_create_call_type)(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, grpc_slice method, const grpc_slice *host, gpr_timespec deadline, void *reserved); extern grpc_channel_create_call_type grpc_channel_create_call_import; #define grpc_channel_create_call grpc_channel_create_call_import |