aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/backup_poller.cc143
-rw-r--r--src/core/ext/filters/client_channel/backup_poller.h (renamed from src/core/ext/filters/client_channel/connectivity_watcher.h)14
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.cc160
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity_internal.cc195
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity_internal.h33
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc6
-rw-r--r--src/core/ext/filters/client_channel/connectivity_watcher.cc179
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py3
8 files changed, 309 insertions, 424 deletions
diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc
new file mode 100644
index 0000000000..36b55ebf9a
--- /dev/null
+++ b/src/core/ext/filters/client_channel/backup_poller.cc
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/ext/filters/client_channel/backup_poller.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
+
+#define DEFAULT_POLLING_INTERVAL_MS 500
+
+typedef struct backup_poller {
+ grpc_timer polling_timer;
+ grpc_closure run_poller_closure;
+ grpc_closure shutdown_closure;
+ gpr_mu* pollset_mu;
+ grpc_pollset* pollset;
+ gpr_refcount refs;
+ gpr_refcount shutdown_refs;
+} backup_poller;
+
+static gpr_once g_once = GPR_ONCE_INIT;
+static gpr_mu g_poller_mu;
+static backup_poller* g_poller = NULL;
+
+static void init_g_poller_mu() { gpr_mu_init(&g_poller_mu); }
+
+static bool is_disabled() {
+ char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_backup_poller");
+ bool disabled = gpr_is_true(env);
+ gpr_free(env);
+ return disabled;
+}
+
+static bool backup_poller_shutdown_unref(grpc_exec_ctx* exec_ctx,
+ backup_poller* p) {
+ if (gpr_unref(&p->shutdown_refs)) {
+ grpc_pollset_destroy(exec_ctx, p->pollset);
+ gpr_free(p->pollset);
+ gpr_free(p);
+ }
+ return true;
+}
+
+static void done_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ backup_poller_shutdown_unref(exec_ctx, (backup_poller*)arg);
+}
+
+static void g_poller_unref(grpc_exec_ctx* exec_ctx) {
+ if (gpr_unref(&g_poller->refs)) {
+ gpr_mu_lock(&g_poller_mu);
+ backup_poller* p = g_poller;
+ g_poller = NULL;
+ gpr_mu_unlock(&g_poller_mu);
+
+ grpc_timer_cancel(exec_ctx, &p->polling_timer);
+ gpr_mu_lock(p->pollset_mu);
+ grpc_pollset_shutdown(exec_ctx, p->pollset,
+ GRPC_CLOSURE_INIT(&p->shutdown_closure, done_poller,
+ p, grpc_schedule_on_exec_ctx));
+ gpr_mu_unlock(p->pollset_mu);
+ }
+}
+
+static void run_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ backup_poller* p = (backup_poller*)arg;
+ if (error != GRPC_ERROR_NONE) {
+ if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("check_connectivity_state", error);
+ }
+ backup_poller_shutdown_unref(exec_ctx, p);
+ return;
+ }
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_mu_lock(p->pollset_mu);
+ grpc_error* err = grpc_pollset_work(exec_ctx, p->pollset, NULL, now,
+ gpr_inf_past(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(p->pollset_mu);
+ GRPC_LOG_IF_ERROR("Run client channel backup poller", err);
+ grpc_timer_init(
+ exec_ctx, &p->polling_timer,
+ gpr_time_add(
+ now, gpr_time_from_millis(DEFAULT_POLLING_INTERVAL_MS, GPR_TIMESPAN)),
+ &p->run_poller_closure, now);
+}
+
+void grpc_client_channel_start_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
+ if (is_disabled()) return;
+ gpr_once_init(&g_once, init_g_poller_mu);
+ gpr_mu_lock(&g_poller_mu);
+ if (g_poller == NULL) {
+ g_poller = (backup_poller*)gpr_zalloc(sizeof(backup_poller));
+ g_poller->pollset = (grpc_pollset*)gpr_malloc(grpc_pollset_size());
+ grpc_pollset_init(g_poller->pollset, &g_poller->pollset_mu);
+ gpr_ref_init(&g_poller->refs, 0);
+ // one for timer cancellation, one for pollset shutdown
+ gpr_ref_init(&g_poller->shutdown_refs, 2);
+ GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller,
+ grpc_schedule_on_exec_ctx);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_timer_init(
+ exec_ctx, &g_poller->polling_timer,
+ gpr_time_add(now, gpr_time_from_millis(DEFAULT_POLLING_INTERVAL_MS,
+ GPR_TIMESPAN)),
+ &g_poller->run_poller_closure, now);
+ }
+ gpr_ref(&g_poller->refs);
+ gpr_mu_unlock(&g_poller_mu);
+
+ grpc_pollset_set_add_pollset(exec_ctx, interested_parties, g_poller->pollset);
+}
+
+void grpc_client_channel_stop_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
+ if (is_disabled()) return;
+ grpc_pollset_set_del_pollset(exec_ctx, interested_parties, g_poller->pollset);
+ g_poller_unref(exec_ctx);
+}
diff --git a/src/core/ext/filters/client_channel/connectivity_watcher.h b/src/core/ext/filters/client_channel/backup_poller.h
index e12d6c284a..3044f75711 100644
--- a/src/core/ext/filters/client_channel/connectivity_watcher.h
+++ b/src/core/ext/filters/client_channel/backup_poller.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H
-#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
@@ -25,8 +25,10 @@
/* Constantly watches client channel connectivity status to reconnect a
* transiently disconnected channel */
-void grpc_client_channel_start_watching_connectivity(
- grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem,
- grpc_channel_stack* channel_stack);
+void grpc_client_channel_start_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H */
+void grpc_client_channel_stop_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H */
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc
index 4c36b0f97a..31a8fc39ce 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.cc
+++ b/src/core/ext/filters/client_channel/channel_connectivity.cc
@@ -23,8 +23,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h"
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -52,6 +52,125 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
return GRPC_CHANNEL_SHUTDOWN;
}
+typedef enum {
+ WAITING,
+ READY_TO_CALL_BACK,
+ CALLING_BACK_AND_FINISHED,
+} callback_phase;
+
+typedef struct {
+ gpr_mu mu;
+ callback_phase phase;
+ grpc_closure on_complete;
+ grpc_closure on_timeout;
+ grpc_closure watcher_timer_init;
+ grpc_timer alarm;
+ grpc_connectivity_state state;
+ grpc_completion_queue *cq;
+ grpc_cq_completion completion_storage;
+ grpc_channel *channel;
+ grpc_error *error;
+ void *tag;
+} state_watcher;
+
+static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_channel_connectivity");
+ } else {
+ abort();
+ }
+ gpr_mu_destroy(&w->mu);
+ gpr_free(w);
+}
+
+static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_cq_completion *ignored) {
+ bool should_delete = false;
+ state_watcher *w = (state_watcher *)pw;
+ gpr_mu_lock(&w->mu);
+ switch (w->phase) {
+ case WAITING:
+ case READY_TO_CALL_BACK:
+ GPR_UNREACHABLE_CODE(return );
+ case CALLING_BACK_AND_FINISHED:
+ should_delete = true;
+ break;
+ }
+ gpr_mu_unlock(&w->mu);
+
+ if (should_delete) {
+ delete_state_watcher(exec_ctx, w);
+ }
+}
+
+static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
+ bool due_to_completion, grpc_error *error) {
+ if (due_to_completion) {
+ grpc_timer_cancel(exec_ctx, &w->alarm);
+ } else {
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL,
+ &w->on_complete, NULL);
+ }
+
+ gpr_mu_lock(&w->mu);
+
+ if (due_to_completion) {
+ if (GRPC_TRACER_ON(grpc_trace_operation_failures)) {
+ GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
+ }
+ GRPC_ERROR_UNREF(error);
+ error = GRPC_ERROR_NONE;
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Timed out waiting for connection state change");
+ } else if (error == GRPC_ERROR_CANCELLED) {
+ error = GRPC_ERROR_NONE;
+ }
+ }
+ switch (w->phase) {
+ case WAITING:
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ w->phase = READY_TO_CALL_BACK;
+ break;
+ case READY_TO_CALL_BACK:
+ if (error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(!due_to_completion);
+ GRPC_ERROR_UNREF(w->error);
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ }
+ w->phase = CALLING_BACK_AND_FINISHED;
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w,
+ &w->completion_storage);
+ break;
+ case CALLING_BACK_AND_FINISHED:
+ GPR_UNREACHABLE_CODE(return );
+ break;
+ }
+ gpr_mu_unlock(&w->mu);
+
+ GRPC_ERROR_UNREF(error);
+}
+
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error));
+}
+
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error));
+}
+
int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
@@ -83,10 +202,10 @@ int grpc_channel_support_connectivity_watcher(grpc_channel *channel) {
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
- grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_channel_element *client_channel_elem =
- grpc_channel_stack_last_element(channel_stack);
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ state_watcher *w = (state_watcher *)gpr_malloc(sizeof(*w));
GRPC_API_TRACE(
"grpc_channel_watch_connectivity_state("
@@ -96,8 +215,37 @@ void grpc_channel_watch_connectivity_state(
"cq=%p, tag=%p)",
7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));
- grpc_channel_watch_connectivity_state_internal(
- &exec_ctx, client_channel_elem, channel_stack, last_observed_state,
- deadline, cq, tag);
+
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
+
+ gpr_mu_init(&w->mu);
+ GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w,
+ grpc_schedule_on_exec_ctx);
+ w->phase = WAITING;
+ w->state = last_observed_state;
+ w->cq = cq;
+ w->tag = tag;
+ w->channel = channel;
+ w->error = NULL;
+
+ watcher_timer_init_arg *wa =
+ (watcher_timer_init_arg *)gpr_malloc(sizeof(watcher_timer_init_arg));
+ wa->w = w;
+ wa->deadline = deadline;
+ GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa,
+ grpc_schedule_on_exec_ctx);
+
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
+ grpc_client_channel_watch_connectivity_state(
+ &exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
+ &w->on_complete, &w->watcher_timer_init);
+ } else {
+ abort();
+ }
+
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/ext/filters/client_channel/channel_connectivity_internal.cc b/src/core/ext/filters/client_channel/channel_connectivity_internal.cc
deleted file mode 100644
index 06bcfe2ffb..0000000000
--- a/src/core/ext/filters/client_channel/channel_connectivity_internal.cc
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h"
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/ext/filters/client_channel/client_channel.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/surface/api_trace.h"
-#include "src/core/lib/surface/completion_queue.h"
-
-typedef enum {
- WAITING,
- READY_TO_CALL_BACK,
- CALLING_BACK_AND_FINISHED,
-} callback_phase;
-
-typedef struct {
- gpr_mu mu;
- callback_phase phase;
- grpc_closure on_complete;
- grpc_closure on_timeout;
- grpc_closure watcher_timer_init;
- grpc_timer alarm;
- grpc_connectivity_state state;
- grpc_completion_queue *cq;
- grpc_cq_completion completion_storage;
- grpc_channel_element *client_channel_elem;
- grpc_channel_stack *channel_stack;
- grpc_error *error;
- void *tag;
-} state_watcher;
-
-static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->channel_stack,
- "watch_channel_connectivity");
- gpr_mu_destroy(&w->mu);
- gpr_free(w);
-}
-
-static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
- grpc_cq_completion *ignored) {
- bool should_delete = false;
- state_watcher *w = (state_watcher *)pw;
- gpr_mu_lock(&w->mu);
- switch (w->phase) {
- case WAITING:
- case READY_TO_CALL_BACK:
- GPR_UNREACHABLE_CODE(return );
- case CALLING_BACK_AND_FINISHED:
- should_delete = true;
- break;
- }
- gpr_mu_unlock(&w->mu);
-
- if (should_delete) {
- delete_state_watcher(exec_ctx, w);
- }
-}
-
-static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
- bool due_to_completion, grpc_error *error) {
- if (due_to_completion) {
- grpc_timer_cancel(exec_ctx, &w->alarm);
- } else {
- grpc_channel_element *client_channel_elem = w->client_channel_elem;
- grpc_client_channel_watch_connectivity_state(
- exec_ctx, client_channel_elem,
- grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL,
- &w->on_complete, NULL);
- }
-
- gpr_mu_lock(&w->mu);
-
- if (due_to_completion) {
- if (GRPC_TRACER_ON(grpc_trace_operation_failures)) {
- GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
- }
- GRPC_ERROR_UNREF(error);
- error = GRPC_ERROR_NONE;
- } else {
- if (error == GRPC_ERROR_NONE) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Timed out waiting for connection state change");
- } else if (error == GRPC_ERROR_CANCELLED) {
- error = GRPC_ERROR_NONE;
- }
- }
- switch (w->phase) {
- case WAITING:
- GRPC_ERROR_REF(error);
- w->error = error;
- w->phase = READY_TO_CALL_BACK;
- break;
- case READY_TO_CALL_BACK:
- if (error != GRPC_ERROR_NONE) {
- GPR_ASSERT(!due_to_completion);
- GRPC_ERROR_UNREF(w->error);
- GRPC_ERROR_REF(error);
- w->error = error;
- }
- w->phase = CALLING_BACK_AND_FINISHED;
- grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w,
- &w->completion_storage);
- break;
- case CALLING_BACK_AND_FINISHED:
- GPR_UNREACHABLE_CODE(return );
- break;
- }
- gpr_mu_unlock(&w->mu);
-
- GRPC_ERROR_UNREF(error);
-}
-
-static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
- grpc_error *error) {
- partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error));
-}
-
-static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
- grpc_error *error) {
- partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error));
-}
-
-typedef struct watcher_timer_init_arg {
- state_watcher *w;
- gpr_timespec deadline;
-} watcher_timer_init_arg;
-
-static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error_ignored) {
- watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg;
-
- grpc_timer_init(exec_ctx, &wa->w->alarm,
- gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC),
- &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
- gpr_free(wa);
-}
-
-void grpc_channel_watch_connectivity_state_internal(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem,
- grpc_channel_stack *channel_stack,
- grpc_connectivity_state last_observed_state, gpr_timespec deadline,
- grpc_completion_queue *cq, void *tag) {
- state_watcher *w = (state_watcher *)gpr_malloc(sizeof(*w));
-
- GPR_ASSERT(grpc_cq_begin_op(cq, tag));
-
- gpr_mu_init(&w->mu);
- GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w,
- grpc_schedule_on_exec_ctx);
- w->phase = WAITING;
- w->state = last_observed_state;
- w->cq = cq;
- w->tag = tag;
- w->client_channel_elem = client_channel_elem;
- w->channel_stack = channel_stack;
- w->error = NULL;
-
- watcher_timer_init_arg *wa =
- (watcher_timer_init_arg *)gpr_malloc(sizeof(watcher_timer_init_arg));
- wa->w = w;
- wa->deadline = deadline;
- GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa,
- grpc_schedule_on_exec_ctx);
-
- if (client_channel_elem->filter == &grpc_client_channel_filter) {
- GRPC_CHANNEL_STACK_REF(channel_stack, "watch_channel_connectivity");
- grpc_client_channel_watch_connectivity_state(
- exec_ctx, client_channel_elem,
- grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
- &w->on_complete, &w->watcher_timer_init);
- } else {
- abort();
- }
-}
diff --git a/src/core/ext/filters/client_channel/channel_connectivity_internal.h b/src/core/ext/filters/client_channel/channel_connectivity_internal.h
deleted file mode 100644
index d260a20c07..0000000000
--- a/src/core/ext/filters/client_channel/channel_connectivity_internal.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H
-#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H
-
-#include <grpc/grpc.h>
-#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-
-void grpc_channel_watch_connectivity_state_internal(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem,
- grpc_channel_stack *channel_stack,
- grpc_connectivity_state last_observed_state, gpr_timespec deadline,
- grpc_completion_queue *cq, void *tag);
-
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H \
- */
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index be8ea81a02..8223f25a33 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -31,7 +31,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
-#include "src/core/ext/filters/client_channel/connectivity_watcher.h"
+#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
@@ -754,8 +754,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
}
chand->deadline_checking_enabled =
grpc_deadline_checking_enabled(args->channel_args);
- grpc_client_channel_start_watching_connectivity(exec_ctx, elem,
- chand->owning_stack);
+ grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties);
return GRPC_ERROR_NONE;
}
@@ -793,6 +792,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
+ grpc_client_channel_stop_backup_polling(exec_ctx, chand->interested_parties);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
diff --git a/src/core/ext/filters/client_channel/connectivity_watcher.cc b/src/core/ext/filters/client_channel/connectivity_watcher.cc
deleted file mode 100644
index da45929f26..0000000000
--- a/src/core/ext/filters/client_channel/connectivity_watcher.cc
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/ext/filters/client_channel/connectivity_watcher.h"
-
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/sync.h>
-#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h"
-#include "src/core/ext/filters/client_channel/client_channel.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/support/env.h"
-#include "src/core/lib/support/string.h"
-#include "src/core/lib/surface/channel.h"
-#include "src/core/lib/surface/completion_queue.h"
-
-#define DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS 500
-
-typedef struct connectivity_watcher {
- grpc_timer watcher_timer;
- grpc_closure check_connectivity_closure;
- grpc_completion_queue* cq;
- gpr_refcount refs;
- size_t channel_count;
- bool shutting_down;
-} connectivity_watcher;
-
-typedef struct channel_state {
- grpc_channel_element* client_channel_elem;
- grpc_channel_stack* channel_stack;
- grpc_connectivity_state state;
-} channel_state;
-
-static gpr_once g_once = GPR_ONCE_INIT;
-static gpr_mu g_watcher_mu;
-static connectivity_watcher* g_watcher = NULL;
-
-static void init_g_watcher_mu() { gpr_mu_init(&g_watcher_mu); }
-
-static void start_watching_locked(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* client_channel_elem,
- grpc_channel_stack* channel_stack) {
- gpr_ref(&g_watcher->refs);
- ++g_watcher->channel_count;
- channel_state* s = (channel_state*)gpr_zalloc(sizeof(channel_state));
- s->client_channel_elem = client_channel_elem;
- s->channel_stack = channel_stack;
- s->state = GRPC_CHANNEL_IDLE;
- grpc_channel_watch_connectivity_state_internal(
- exec_ctx, client_channel_elem, channel_stack, s->state,
- gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, (void*)s);
-}
-
-static bool is_disabled() {
- char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
- bool disabled = gpr_is_true(env);
- gpr_free(env);
- return disabled;
-}
-
-static bool connectivity_watcher_unref(grpc_exec_ctx* exec_ctx) {
- if (gpr_unref(&g_watcher->refs)) {
- gpr_mu_lock(&g_watcher_mu);
- grpc_completion_queue_destroy(g_watcher->cq);
- gpr_free(g_watcher);
- g_watcher = NULL;
- gpr_mu_unlock(&g_watcher_mu);
- return true;
- }
- return false;
-}
-
-static void check_connectivity_state(grpc_exec_ctx* exec_ctx, void* ignored,
- grpc_error* error) {
- grpc_event ev;
- while (true) {
- gpr_mu_lock(&g_watcher_mu);
- if (g_watcher->shutting_down) {
- // Drain cq if the watcher is shutting down
- ev = grpc_completion_queue_next(
- g_watcher->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
- } else {
- ev = grpc_completion_queue_next(g_watcher->cq,
- gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
- // Make sure we've seen 2 TIMEOUTs before going to sleep
- if (ev.type == GRPC_QUEUE_TIMEOUT) {
- ev = grpc_completion_queue_next(
- g_watcher->cq, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
- if (ev.type == GRPC_QUEUE_TIMEOUT) {
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- grpc_timer_init(
- exec_ctx, &g_watcher->watcher_timer,
- gpr_time_add(now, gpr_time_from_millis(
- DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS,
- GPR_TIMESPAN)),
- &g_watcher->check_connectivity_closure, now);
- gpr_mu_unlock(&g_watcher_mu);
- break;
- }
- }
- }
- gpr_mu_unlock(&g_watcher_mu);
- if (ev.type != GRPC_OP_COMPLETE) {
- break;
- }
- channel_state* s = (channel_state*)(ev.tag);
- s->state = grpc_client_channel_check_connectivity_state(
- exec_ctx, s->client_channel_elem, false /* try_to_connect */);
- if (s->state == GRPC_CHANNEL_SHUTDOWN) {
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, s->channel_stack,
- "connectivity_watcher_stop_watching");
- gpr_free(s);
- if (connectivity_watcher_unref(exec_ctx)) {
- break;
- }
- } else {
- grpc_channel_watch_connectivity_state_internal(
- exec_ctx, s->client_channel_elem, s->channel_stack, s->state,
- gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, s);
- }
- }
-}
-
-void grpc_client_channel_start_watching_connectivity(
- grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem,
- grpc_channel_stack* channel_stack) {
- if (is_disabled()) return;
- GRPC_CHANNEL_STACK_REF(channel_stack, "connectivity_watcher_start_watching");
- gpr_once_init(&g_once, init_g_watcher_mu);
- gpr_mu_lock(&g_watcher_mu);
- if (g_watcher == NULL) {
- g_watcher = (connectivity_watcher*)gpr_zalloc(sizeof(connectivity_watcher));
- g_watcher->cq = grpc_completion_queue_create_internal(
- GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING);
- gpr_ref_init(&g_watcher->refs, 0);
- GRPC_CLOSURE_INIT(&g_watcher->check_connectivity_closure,
- check_connectivity_state, NULL,
- grpc_schedule_on_exec_ctx);
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- grpc_timer_init(
- exec_ctx, &g_watcher->watcher_timer,
- gpr_time_add(
- now, gpr_time_from_millis(DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS,
- GPR_TIMESPAN)),
- &g_watcher->check_connectivity_closure, now);
- }
- start_watching_locked(exec_ctx, client_channel_elem, channel_stack);
- gpr_mu_init(&g_watcher_mu);
-}
-
-void grpc_client_channel_stop_watching_connectivity(
- grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem,
- grpc_channel_stack* channel_stack) {
- if (is_disabled()) return;
- gpr_once_init(&g_once, init_g_watcher_mu);
- gpr_mu_lock(&g_watcher_mu);
- if (--g_watcher->channel_count == 0) {
- g_watcher->shutting_down = true;
- grpc_timer_cancel(exec_ctx, &g_watcher->watcher_timer);
- connectivity_watcher_unref(exec_ctx);
- }
- gpr_mu_unlock(&g_watcher_mu);
-}
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 44a4f0401a..cf2e244e1e 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -254,12 +254,11 @@ CORE_SOURCE_FILES = [
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
+ 'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
- 'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
- 'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',