diff options
-rw-r--r-- | BUILD | 3 | ||||
-rw-r--r-- | CMakeLists.txt | 6 | ||||
-rw-r--r-- | Makefile | 6 | ||||
-rw-r--r-- | binding.gyp | 1 | ||||
-rw-r--r-- | config.m4 | 1 | ||||
-rw-r--r-- | config.w32 | 1 | ||||
-rw-r--r-- | gRPC-Core.podspec | 5 | ||||
-rw-r--r-- | grpc.gemspec | 3 | ||||
-rw-r--r-- | grpc.gyp | 4 | ||||
-rw-r--r-- | package.xml | 3 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/channel_connectivity.cc | 165 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/channel_connectivity_internal.cc | 195 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/channel_connectivity_internal.h | 33 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 3 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/connectivity_watcher.c | 179 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/connectivity_watcher.h | 30 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 1 | ||||
-rw-r--r-- | tools/doxygen/Doxyfile.core.internal | 3 | ||||
-rw-r--r-- | tools/run_tests/generated/sources_and_headers.json | 5 |
19 files changed, 491 insertions, 156 deletions
@@ -876,6 +876,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/client_channel.cc", "src/core/ext/filters/client_channel/client_channel_factory.cc", "src/core/ext/filters/client_channel/client_channel_plugin.cc", + "src/core/ext/filters/client_channel/connectivity_watcher.cc", "src/core/ext/filters/client_channel/connector.cc", "src/core/ext/filters/client_channel/http_connect_handshaker.cc", "src/core/ext/filters/client_channel/http_proxy.cc", @@ -894,8 +895,10 @@ grpc_cc_library( "src/core/ext/filters/client_channel/uri_parser.cc", ], hdrs = [ + "src/core/ext/filters/client_channel/channel_connectivity_internal.h", "src/core/ext/filters/client_channel/client_channel.h", "src/core/ext/filters/client_channel/client_channel_factory.h", + "src/core/ext/filters/client_channel/connectivity_watcher.h", "src/core/ext/filters/client_channel/connector.h", "src/core/ext/filters/client_channel/http_connect_handshaker.h", "src/core/ext/filters/client_channel/http_proxy.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 3ac6c9b63d..8122d4b025 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1157,6 +1157,7 @@ add_library(grpc src/core/ext/filters/client_channel/client_channel.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/connectivity_watcher.c src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc @@ -1481,6 +1482,7 @@ add_library(grpc_cronet src/core/ext/filters/client_channel/client_channel.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/connectivity_watcher.c src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc @@ -1771,6 +1773,7 @@ add_library(grpc_test_util src/core/ext/filters/client_channel/client_channel.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/connectivity_watcher.c src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc @@ -2036,6 +2039,7 @@ add_library(grpc_test_util_unsecure src/core/ext/filters/client_channel/client_channel.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/connectivity_watcher.c src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc @@ -2320,6 +2324,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/client_channel.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/connectivity_watcher.c src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc @@ -3055,6 +3060,7 @@ add_library(grpc++_cronet src/core/ext/filters/client_channel/client_channel.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/connectivity_watcher.c src/core/ext/filters/client_channel/connector.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc @@ -3157,6 +3157,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/connectivity_watcher.c \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ @@ -3480,6 +3481,7 @@ LIBGRPC_CRONET_SRC = \ src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/connectivity_watcher.c \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ @@ -3768,6 +3770,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/connectivity_watcher.c \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ @@ -4023,6 +4026,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/connectivity_watcher.c \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ @@ -4284,6 +4288,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/connectivity_watcher.c \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ @@ -4997,6 +5002,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/connectivity_watcher.c \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ diff --git a/binding.gyp b/binding.gyp index e004b0fa32..707afb476a 100644 --- a/binding.gyp +++ b/binding.gyp @@ -857,6 +857,7 @@ 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/connectivity_watcher.c', 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', @@ -282,6 +282,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/connectivity_watcher.c \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ diff --git a/config.w32 b/config.w32 index 67b5e2f554..75a83e2414 100644 --- a/config.w32 +++ b/config.w32 @@ -259,6 +259,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\client_channel.cc " + "src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " + "src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " + + "src\\core\\ext\\filters\\client_channel\\connectivity_watcher.c " + "src\\core\\ext\\filters\\client_channel\\connector.cc " + "src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " + "src\\core\\ext\\filters\\client_channel\\http_proxy.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 0e3b50c4aa..9b4f4ec675 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -299,8 +299,10 @@ Pod::Spec.new do |s| 'src/core/tsi/transport_security_adapter.h', 'src/core/tsi/transport_security_interface.h', 'src/core/ext/transport/chttp2/server/chttp2_server.h', + 'src/core/ext/filters/client_channel/channel_connectivity_internal.h', 'src/core/ext/filters/client_channel/client_channel.h', 'src/core/ext/filters/client_channel/client_channel_factory.h', + 'src/core/ext/filters/client_channel/connectivity_watcher.h', 'src/core/ext/filters/client_channel/connector.h', 'src/core/ext/filters/client_channel/http_connect_handshaker.h', 'src/core/ext/filters/client_channel/http_proxy.h', @@ -672,6 +674,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/connectivity_watcher.c', 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', @@ -802,8 +805,10 @@ Pod::Spec.new do |s| 'src/core/tsi/transport_security_adapter.h', 'src/core/tsi/transport_security_interface.h', 'src/core/ext/transport/chttp2/server/chttp2_server.h', + 'src/core/ext/filters/client_channel/channel_connectivity_internal.h', 'src/core/ext/filters/client_channel/client_channel.h', 'src/core/ext/filters/client_channel/client_channel_factory.h', + 'src/core/ext/filters/client_channel/connectivity_watcher.h', 'src/core/ext/filters/client_channel/connector.h', 'src/core/ext/filters/client_channel/http_connect_handshaker.h', 'src/core/ext/filters/client_channel/http_proxy.h', diff --git a/grpc.gemspec b/grpc.gemspec index 4567058344..9f5f45bb1b 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -230,8 +230,10 @@ Gem::Specification.new do |s| s.files += %w( src/core/tsi/transport_security_adapter.h ) s.files += %w( src/core/tsi/transport_security_interface.h ) s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.h ) + s.files += %w( src/core/ext/filters/client_channel/channel_connectivity_internal.h ) s.files += %w( src/core/ext/filters/client_channel/client_channel.h ) s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h ) + s.files += %w( src/core/ext/filters/client_channel/connectivity_watcher.h ) s.files += %w( src/core/ext/filters/client_channel/connector.h ) s.files += %w( src/core/ext/filters/client_channel/http_connect_handshaker.h ) s.files += %w( src/core/ext/filters/client_channel/http_proxy.h ) @@ -607,6 +609,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/client_channel.cc ) s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc ) s.files += %w( src/core/ext/filters/client_channel/client_channel_plugin.cc ) + s.files += %w( src/core/ext/filters/client_channel/connectivity_watcher.c ) s.files += %w( src/core/ext/filters/client_channel/connector.cc ) s.files += %w( src/core/ext/filters/client_channel/http_connect_handshaker.cc ) s.files += %w( src/core/ext/filters/client_channel/http_proxy.cc ) @@ -423,6 +423,7 @@ 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/connectivity_watcher.c', 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', @@ -665,6 +666,7 @@ 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/connectivity_watcher.c', 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', @@ -872,6 +874,7 @@ 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/connectivity_watcher.c', 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', @@ -1097,6 +1100,7 @@ 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/connectivity_watcher.c', 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', diff --git a/package.xml b/package.xml index d08b803355..65cb53ad1c 100644 --- a/package.xml +++ b/package.xml @@ -242,8 +242,10 @@ <file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.h" role="src" /> <file baseinstalldir="/" name="src/core/tsi/transport_security_interface.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/channel_connectivity_internal.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/connectivity_watcher.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_connect_handshaker.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_proxy.h" role="src" /> @@ -619,6 +621,7 @@ <file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_plugin.cc" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/connectivity_watcher.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_connect_handshaker.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_proxy.cc" role="src" /> diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index 31a8fc39ce..33e9deaaf6 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -23,8 +23,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h" #include "src/core/ext/filters/client_channel/client_channel.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/completion_queue.h" @@ -52,125 +52,6 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( return GRPC_CHANNEL_SHUTDOWN; } -typedef enum { - WAITING, - READY_TO_CALL_BACK, - CALLING_BACK_AND_FINISHED, -} callback_phase; - -typedef struct { - gpr_mu mu; - callback_phase phase; - grpc_closure on_complete; - grpc_closure on_timeout; - grpc_closure watcher_timer_init; - grpc_timer alarm; - grpc_connectivity_state state; - grpc_completion_queue *cq; - grpc_cq_completion completion_storage; - grpc_channel *channel; - grpc_error *error; - void *tag; -} state_watcher; - -static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { - grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(w->channel)); - if (client_channel_elem->filter == &grpc_client_channel_filter) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, - "watch_channel_connectivity"); - } else { - abort(); - } - gpr_mu_destroy(&w->mu); - gpr_free(w); -} - -static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, - grpc_cq_completion *ignored) { - bool should_delete = false; - state_watcher *w = (state_watcher *)pw; - gpr_mu_lock(&w->mu); - switch (w->phase) { - case WAITING: - case READY_TO_CALL_BACK: - GPR_UNREACHABLE_CODE(return ); - case CALLING_BACK_AND_FINISHED: - should_delete = true; - break; - } - gpr_mu_unlock(&w->mu); - - if (should_delete) { - delete_state_watcher(exec_ctx, w); - } -} - -static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, - bool due_to_completion, grpc_error *error) { - if (due_to_completion) { - grpc_timer_cancel(exec_ctx, &w->alarm); - } else { - grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_watch_connectivity_state( - exec_ctx, client_channel_elem, - grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL, - &w->on_complete, NULL); - } - - gpr_mu_lock(&w->mu); - - if (due_to_completion) { - if (GRPC_TRACER_ON(grpc_trace_operation_failures)) { - GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error)); - } - GRPC_ERROR_UNREF(error); - error = GRPC_ERROR_NONE; - } else { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Timed out waiting for connection state change"); - } else if (error == GRPC_ERROR_CANCELLED) { - error = GRPC_ERROR_NONE; - } - } - switch (w->phase) { - case WAITING: - GRPC_ERROR_REF(error); - w->error = error; - w->phase = READY_TO_CALL_BACK; - break; - case READY_TO_CALL_BACK: - if (error != GRPC_ERROR_NONE) { - GPR_ASSERT(!due_to_completion); - GRPC_ERROR_UNREF(w->error); - GRPC_ERROR_REF(error); - w->error = error; - } - w->phase = CALLING_BACK_AND_FINISHED; - grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w, - &w->completion_storage); - break; - case CALLING_BACK_AND_FINISHED: - GPR_UNREACHABLE_CODE(return ); - break; - } - gpr_mu_unlock(&w->mu); - - GRPC_ERROR_UNREF(error); -} - -static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, - grpc_error *error) { - partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error)); -} - -static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, - grpc_error *error) { - partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error)); -} - int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); @@ -202,10 +83,10 @@ int grpc_channel_support_connectivity_watcher(grpc_channel *channel) { void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { + grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_channel_element *client_channel_elem = - grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + grpc_channel_stack_last_element(channel_stack); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - state_watcher *w = (state_watcher *)gpr_malloc(sizeof(*w)); GRPC_API_TRACE( "grpc_channel_watch_connectivity_state(" @@ -213,39 +94,11 @@ void grpc_channel_watch_connectivity_state( "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "cq=%p, tag=%p)", - 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec, - (int)deadline.clock_type, cq, tag)); - - GPR_ASSERT(grpc_cq_begin_op(cq, tag)); - - gpr_mu_init(&w->mu); - GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w, - grpc_schedule_on_exec_ctx); - w->phase = WAITING; - w->state = last_observed_state; - w->cq = cq; - w->tag = tag; - w->channel = channel; - w->error = NULL; - - watcher_timer_init_arg *wa = - (watcher_timer_init_arg *)gpr_malloc(sizeof(watcher_timer_init_arg)); - wa->w = w; - wa->deadline = deadline; - GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa, - grpc_schedule_on_exec_ctx); - - if (client_channel_elem->filter == &grpc_client_channel_filter) { - GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); - grpc_client_channel_watch_connectivity_state( - &exec_ctx, client_channel_elem, - grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state, - &w->on_complete, &w->watcher_timer_init); - } else { - abort(); - } - + 7, + (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec, + (int)deadline.clock_type, cq, tag)); + grpc_channel_watch_connectivity_state_internal( + &exec_ctx, client_channel_elem, channel_stack, last_observed_state, + deadline, cq, tag); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/ext/filters/client_channel/channel_connectivity_internal.cc b/src/core/ext/filters/client_channel/channel_connectivity_internal.cc new file mode 100644 index 0000000000..06bcfe2ffb --- /dev/null +++ b/src/core/ext/filters/client_channel/channel_connectivity_internal.cc @@ -0,0 +1,195 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/completion_queue.h" + +typedef enum { + WAITING, + READY_TO_CALL_BACK, + CALLING_BACK_AND_FINISHED, +} callback_phase; + +typedef struct { + gpr_mu mu; + callback_phase phase; + grpc_closure on_complete; + grpc_closure on_timeout; + grpc_closure watcher_timer_init; + grpc_timer alarm; + grpc_connectivity_state state; + grpc_completion_queue *cq; + grpc_cq_completion completion_storage; + grpc_channel_element *client_channel_elem; + grpc_channel_stack *channel_stack; + grpc_error *error; + void *tag; +} state_watcher; + +static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->channel_stack, + "watch_channel_connectivity"); + gpr_mu_destroy(&w->mu); + gpr_free(w); +} + +static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, + grpc_cq_completion *ignored) { + bool should_delete = false; + state_watcher *w = (state_watcher *)pw; + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + case READY_TO_CALL_BACK: + GPR_UNREACHABLE_CODE(return ); + case CALLING_BACK_AND_FINISHED: + should_delete = true; + break; + } + gpr_mu_unlock(&w->mu); + + if (should_delete) { + delete_state_watcher(exec_ctx, w); + } +} + +static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, + bool due_to_completion, grpc_error *error) { + if (due_to_completion) { + grpc_timer_cancel(exec_ctx, &w->alarm); + } else { + grpc_channel_element *client_channel_elem = w->client_channel_elem; + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL, + &w->on_complete, NULL); + } + + gpr_mu_lock(&w->mu); + + if (due_to_completion) { + if (GRPC_TRACER_ON(grpc_trace_operation_failures)) { + GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); + error = GRPC_ERROR_NONE; + } else { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Timed out waiting for connection state change"); + } else if (error == GRPC_ERROR_CANCELLED) { + error = GRPC_ERROR_NONE; + } + } + switch (w->phase) { + case WAITING: + GRPC_ERROR_REF(error); + w->error = error; + w->phase = READY_TO_CALL_BACK; + break; + case READY_TO_CALL_BACK: + if (error != GRPC_ERROR_NONE) { + GPR_ASSERT(!due_to_completion); + GRPC_ERROR_UNREF(w->error); + GRPC_ERROR_REF(error); + w->error = error; + } + w->phase = CALLING_BACK_AND_FINISHED; + grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w, + &w->completion_storage); + break; + case CALLING_BACK_AND_FINISHED: + GPR_UNREACHABLE_CODE(return ); + break; + } + gpr_mu_unlock(&w->mu); + + GRPC_ERROR_UNREF(error); +} + +static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, + grpc_error *error) { + partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error)); +} + +static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, + grpc_error *error) { + partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error)); +} + +typedef struct watcher_timer_init_arg { + state_watcher *w; + gpr_timespec deadline; +} watcher_timer_init_arg; + +static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg; + + grpc_timer_init(exec_ctx, &wa->w->alarm, + gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC), + &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_free(wa); +} + +void grpc_channel_watch_connectivity_state_internal( + grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem, + grpc_channel_stack *channel_stack, + grpc_connectivity_state last_observed_state, gpr_timespec deadline, + grpc_completion_queue *cq, void *tag) { + state_watcher *w = (state_watcher *)gpr_malloc(sizeof(*w)); + + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); + + gpr_mu_init(&w->mu); + GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w, + grpc_schedule_on_exec_ctx); + w->phase = WAITING; + w->state = last_observed_state; + w->cq = cq; + w->tag = tag; + w->client_channel_elem = client_channel_elem; + w->channel_stack = channel_stack; + w->error = NULL; + + watcher_timer_init_arg *wa = + (watcher_timer_init_arg *)gpr_malloc(sizeof(watcher_timer_init_arg)); + wa->w = w; + wa->deadline = deadline; + GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa, + grpc_schedule_on_exec_ctx); + + if (client_channel_elem->filter == &grpc_client_channel_filter) { + GRPC_CHANNEL_STACK_REF(channel_stack, "watch_channel_connectivity"); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state, + &w->on_complete, &w->watcher_timer_init); + } else { + abort(); + } +} diff --git a/src/core/ext/filters/client_channel/channel_connectivity_internal.h b/src/core/ext/filters/client_channel/channel_connectivity_internal.h new file mode 100644 index 0000000000..d260a20c07 --- /dev/null +++ b/src/core/ext/filters/client_channel/channel_connectivity_internal.h @@ -0,0 +1,33 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H + +#include <grpc/grpc.h> +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +void grpc_channel_watch_connectivity_state_internal( + grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem, + grpc_channel_stack *channel_stack, + grpc_connectivity_state last_observed_state, gpr_timespec deadline, + grpc_completion_queue *cq, void *tag); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H \ + */ diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ea5e076c3b..be8ea81a02 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -31,6 +31,7 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/filters/client_channel/connectivity_watcher.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" @@ -753,6 +754,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, } chand->deadline_checking_enabled = grpc_deadline_checking_enabled(args->channel_args); + grpc_client_channel_start_watching_connectivity(exec_ctx, elem, + chand->owning_stack); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/filters/client_channel/connectivity_watcher.c b/src/core/ext/filters/client_channel/connectivity_watcher.c new file mode 100644 index 0000000000..da45929f26 --- /dev/null +++ b/src/core/ext/filters/client_channel/connectivity_watcher.c @@ -0,0 +1,179 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/ext/filters/client_channel/connectivity_watcher.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h" +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" + +#define DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS 500 + +typedef struct connectivity_watcher { + grpc_timer watcher_timer; + grpc_closure check_connectivity_closure; + grpc_completion_queue* cq; + gpr_refcount refs; + size_t channel_count; + bool shutting_down; +} connectivity_watcher; + +typedef struct channel_state { + grpc_channel_element* client_channel_elem; + grpc_channel_stack* channel_stack; + grpc_connectivity_state state; +} channel_state; + +static gpr_once g_once = GPR_ONCE_INIT; +static gpr_mu g_watcher_mu; +static connectivity_watcher* g_watcher = NULL; + +static void init_g_watcher_mu() { gpr_mu_init(&g_watcher_mu); } + +static void start_watching_locked(grpc_exec_ctx* exec_ctx, + grpc_channel_element* client_channel_elem, + grpc_channel_stack* channel_stack) { + gpr_ref(&g_watcher->refs); + ++g_watcher->channel_count; + channel_state* s = (channel_state*)gpr_zalloc(sizeof(channel_state)); + s->client_channel_elem = client_channel_elem; + s->channel_stack = channel_stack; + s->state = GRPC_CHANNEL_IDLE; + grpc_channel_watch_connectivity_state_internal( + exec_ctx, client_channel_elem, channel_stack, s->state, + gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, (void*)s); +} + +static bool is_disabled() { + char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + bool disabled = gpr_is_true(env); + gpr_free(env); + return disabled; +} + +static bool connectivity_watcher_unref(grpc_exec_ctx* exec_ctx) { + if (gpr_unref(&g_watcher->refs)) { + gpr_mu_lock(&g_watcher_mu); + grpc_completion_queue_destroy(g_watcher->cq); + gpr_free(g_watcher); + g_watcher = NULL; + gpr_mu_unlock(&g_watcher_mu); + return true; + } + return false; +} + +static void check_connectivity_state(grpc_exec_ctx* exec_ctx, void* ignored, + grpc_error* error) { + grpc_event ev; + while (true) { + gpr_mu_lock(&g_watcher_mu); + if (g_watcher->shutting_down) { + // Drain cq if the watcher is shutting down + ev = grpc_completion_queue_next( + g_watcher->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); + } else { + ev = grpc_completion_queue_next(g_watcher->cq, + gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (ev.type == GRPC_QUEUE_TIMEOUT) { + ev = grpc_completion_queue_next( + g_watcher->cq, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); + if (ev.type == GRPC_QUEUE_TIMEOUT) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_timer_init( + exec_ctx, &g_watcher->watcher_timer, + gpr_time_add(now, gpr_time_from_millis( + DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS, + GPR_TIMESPAN)), + &g_watcher->check_connectivity_closure, now); + gpr_mu_unlock(&g_watcher_mu); + break; + } + } + } + gpr_mu_unlock(&g_watcher_mu); + if (ev.type != GRPC_OP_COMPLETE) { + break; + } + channel_state* s = (channel_state*)(ev.tag); + s->state = grpc_client_channel_check_connectivity_state( + exec_ctx, s->client_channel_elem, false /* try_to_connect */); + if (s->state == GRPC_CHANNEL_SHUTDOWN) { + GRPC_CHANNEL_STACK_UNREF(exec_ctx, s->channel_stack, + "connectivity_watcher_stop_watching"); + gpr_free(s); + if (connectivity_watcher_unref(exec_ctx)) { + break; + } + } else { + grpc_channel_watch_connectivity_state_internal( + exec_ctx, s->client_channel_elem, s->channel_stack, s->state, + gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, s); + } + } +} + +void grpc_client_channel_start_watching_connectivity( + grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem, + grpc_channel_stack* channel_stack) { + if (is_disabled()) return; + GRPC_CHANNEL_STACK_REF(channel_stack, "connectivity_watcher_start_watching"); + gpr_once_init(&g_once, init_g_watcher_mu); + gpr_mu_lock(&g_watcher_mu); + if (g_watcher == NULL) { + g_watcher = (connectivity_watcher*)gpr_zalloc(sizeof(connectivity_watcher)); + g_watcher->cq = grpc_completion_queue_create_internal( + GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING); + gpr_ref_init(&g_watcher->refs, 0); + GRPC_CLOSURE_INIT(&g_watcher->check_connectivity_closure, + check_connectivity_state, NULL, + grpc_schedule_on_exec_ctx); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_timer_init( + exec_ctx, &g_watcher->watcher_timer, + gpr_time_add( + now, gpr_time_from_millis(DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS, + GPR_TIMESPAN)), + &g_watcher->check_connectivity_closure, now); + } + start_watching_locked(exec_ctx, client_channel_elem, channel_stack); + gpr_mu_init(&g_watcher_mu); +} + +void grpc_client_channel_stop_watching_connectivity( + grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem, + grpc_channel_stack* channel_stack) { + if (is_disabled()) return; + gpr_once_init(&g_once, init_g_watcher_mu); + gpr_mu_lock(&g_watcher_mu); + if (--g_watcher->channel_count == 0) { + g_watcher->shutting_down = true; + grpc_timer_cancel(exec_ctx, &g_watcher->watcher_timer); + connectivity_watcher_unref(exec_ctx); + } + gpr_mu_unlock(&g_watcher_mu); +} diff --git a/src/core/ext/filters/client_channel/connectivity_watcher.h b/src/core/ext/filters/client_channel/connectivity_watcher.h new file mode 100644 index 0000000000..89586dc736 --- /dev/null +++ b/src/core/ext/filters/client_channel/connectivity_watcher.h @@ -0,0 +1,30 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H + +#include <grpc/grpc.h> +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +void grpc_client_channel_start_watching_connectivity( + grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem, + grpc_channel_stack* channel_stack); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 140f4ceee1..467b13f22b 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -258,6 +258,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/connectivity_watcher.c', 'src/core/ext/filters/client_channel/connector.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index e0536423fa..bed3845db2 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -908,11 +908,14 @@ src/core/ext/census/tracing.cc \ src/core/ext/census/tracing.h \ src/core/ext/filters/client_channel/README.md \ src/core/ext/filters/client_channel/channel_connectivity.cc \ +src/core/ext/filters/client_channel/channel_connectivity_internal.h \ src/core/ext/filters/client_channel/client_channel.cc \ src/core/ext/filters/client_channel/client_channel.h \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_factory.h \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ +src/core/ext/filters/client_channel/connectivity_watcher.c \ +src/core/ext/filters/client_channel/connectivity_watcher.h \ src/core/ext/filters/client_channel/connector.cc \ src/core/ext/filters/client_channel/connector.h \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 08ab2a5d93..af7b7ee9a0 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -8467,8 +8467,10 @@ "grpc_deadline_filter" ], "headers": [ + "src/core/ext/filters/client_channel/channel_connectivity_internal.h", "src/core/ext/filters/client_channel/client_channel.h", "src/core/ext/filters/client_channel/client_channel_factory.h", + "src/core/ext/filters/client_channel/connectivity_watcher.h", "src/core/ext/filters/client_channel/connector.h", "src/core/ext/filters/client_channel/http_connect_handshaker.h", "src/core/ext/filters/client_channel/http_proxy.h", @@ -8491,11 +8493,14 @@ "name": "grpc_client_channel", "src": [ "src/core/ext/filters/client_channel/channel_connectivity.cc", + "src/core/ext/filters/client_channel/channel_connectivity_internal.h", "src/core/ext/filters/client_channel/client_channel.cc", "src/core/ext/filters/client_channel/client_channel.h", "src/core/ext/filters/client_channel/client_channel_factory.cc", "src/core/ext/filters/client_channel/client_channel_factory.h", "src/core/ext/filters/client_channel/client_channel_plugin.cc", + "src/core/ext/filters/client_channel/connectivity_watcher.c", + "src/core/ext/filters/client_channel/connectivity_watcher.h", "src/core/ext/filters/client_channel/connector.cc", "src/core/ext/filters/client_channel/connector.h", "src/core/ext/filters/client_channel/http_connect_handshaker.cc", |