aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/channel_connectivity.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/channel_connectivity.c')
-rw-r--r--src/core/surface/channel_connectivity.c138
1 files changed, 95 insertions, 43 deletions
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 88a7c16598..df2774b527 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -37,7 +37,9 @@
#include <grpc/support/log.h>
#include "src/core/channel/client_channel.h"
-#include "src/core/iomgr/alarm.h"
+#include "src/core/channel/client_uchannel.h"
+#include "src/core/iomgr/timer.h"
+#include "src/core/surface/api_trace.h"
#include "src/core/surface/completion_queue.h"
grpc_connectivity_state grpc_channel_check_connectivity_state(
@@ -45,15 +47,29 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
/* forward through to the underlying client channel */
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
- if (client_channel_elem->filter != &grpc_client_channel_filter) {
- gpr_log(GPR_ERROR,
- "grpc_channel_check_connectivity_state called on something that is "
- "not a client channel, but '%s'",
- client_channel_elem->filter->name);
- return GRPC_CHANNEL_FATAL_FAILURE;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_connectivity_state state;
+ GRPC_API_TRACE(
+ "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
+ (channel, try_to_connect));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ state = grpc_client_channel_check_connectivity_state(
+ &exec_ctx, client_channel_elem, try_to_connect);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return state;
+ }
+ if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ state = grpc_client_uchannel_check_connectivity_state(
+ &exec_ctx, client_channel_elem, try_to_connect);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return state;
}
- return grpc_client_channel_check_connectivity_state(client_channel_elem,
- try_to_connect);
+ gpr_log(GPR_ERROR,
+ "grpc_channel_check_connectivity_state called on something that is "
+ "not a (u)client channel, but '%s'",
+ client_channel_elem->filter->name);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return GRPC_CHANNEL_FATAL_FAILURE;
}
typedef enum {
@@ -67,8 +83,9 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
- grpc_iomgr_closure on_complete;
- grpc_alarm alarm;
+ int removed;
+ grpc_closure on_complete;
+ grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
@@ -76,26 +93,31 @@ typedef struct {
void *tag;
} state_watcher;
-static void delete_state_watcher(state_watcher *w) {
+static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_del_interested_party(client_channel_elem,
- grpc_cq_pollset(w->cq));
- GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_channel_connectivity");
+ } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_uchannel_connectivity");
+ } else {
+ abort();
+ }
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
-static void finished_completion(void *pw, grpc_cq_completion *ignored) {
+static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_cq_completion *ignored) {
int delete = 0;
state_watcher *w = pw;
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
case CALLED_BACK:
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- break;
+ GPR_UNREACHABLE_CODE(return );
case CALLING_BACK:
w->phase = CALLED_BACK;
break;
@@ -106,34 +128,48 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) {
gpr_mu_unlock(&w->mu);
if (delete) {
- delete_state_watcher(w);
+ delete_state_watcher(exec_ctx, w);
}
}
-static void partly_done(state_watcher *w, int due_to_completion) {
+static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
+ int due_to_completion) {
int delete = 0;
+ grpc_channel_element *client_channel_elem = NULL;
+ gpr_mu_lock(&w->mu);
+ if (w->removed == 0) {
+ w->removed = 1;
+ client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ } else {
+ grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ }
+ }
+ gpr_mu_unlock(&w->mu);
if (due_to_completion) {
gpr_mu_lock(&w->mu);
w->success = 1;
gpr_mu_unlock(&w->mu);
- grpc_alarm_cancel(&w->alarm);
+ grpc_timer_cancel(exec_ctx, &w->alarm);
}
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
w->phase = CALLING_BACK;
- grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
- &w->completion_storage);
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion,
+ w, &w->completion_storage);
break;
case CALLING_BACK:
w->phase = CALLING_BACK_AND_FINISHED;
break;
case CALLING_BACK_AND_FINISHED:
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- break;
+ GPR_UNREACHABLE_CODE(return );
case CALLED_BACK:
delete = 1;
break;
@@ -141,47 +177,63 @@ static void partly_done(state_watcher *w, int due_to_completion) {
gpr_mu_unlock(&w->mu);
if (delete) {
- delete_state_watcher(w);
+ delete_state_watcher(exec_ctx, w);
}
}
-static void watch_complete(void *pw, int success) { partly_done(pw, 1); }
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+ partly_done(exec_ctx, pw, 1);
+}
-static void timeout_complete(void *pw, int success) { partly_done(pw, 0); }
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+ partly_done(exec_ctx, pw, 0);
+}
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
state_watcher *w = gpr_malloc(sizeof(*w));
+ GRPC_API_TRACE(
+ "grpc_channel_watch_connectivity_state("
+ "channel=%p, last_observed_state=%d, "
+ "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
+ "cq=%p, tag=%p)",
+ 7, (channel, (int)last_observed_state, (long)deadline.tv_sec,
+ deadline.tv_nsec, (int)deadline.clock_type, cq, tag));
+
grpc_cq_begin_op(cq);
gpr_mu_init(&w->mu);
- grpc_iomgr_closure_init(&w->on_complete, watch_complete, w);
+ grpc_closure_init(&w->on_complete, watch_complete, w);
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
+ w->removed = 0;
w->cq = cq;
w->tag = tag;
w->channel = channel;
- grpc_alarm_init(&w->alarm,
+ grpc_timer_init(&exec_ctx, &w->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
- if (client_channel_elem->filter != &grpc_client_channel_filter) {
- gpr_log(GPR_ERROR,
- "grpc_channel_watch_connectivity_state called on something that is "
- "not a client channel, but '%s'",
- client_channel_elem->filter->name);
- grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
- } else {
- GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
- grpc_client_channel_add_interested_party(client_channel_elem,
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
+ grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem,
grpc_cq_pollset(cq));
- grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
- &w->on_complete);
+ grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
+ &w->state, &w->on_complete);
+ } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity");
+ grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem,
+ grpc_cq_pollset(cq));
+ grpc_client_uchannel_watch_connectivity_state(
+ &exec_ctx, client_channel_elem, &w->state, &w->on_complete);
}
+
+ grpc_exec_ctx_finish(&exec_ctx);
}