diff options
Diffstat (limited to 'src/core/surface/channel_connectivity.c')
-rw-r--r-- | src/core/surface/channel_connectivity.c | 138 |
1 files changed, 95 insertions, 43 deletions
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 88a7c16598..df2774b527 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -37,7 +37,9 @@ #include <grpc/support/log.h> #include "src/core/channel/client_channel.h" -#include "src/core/iomgr/alarm.h" +#include "src/core/channel/client_uchannel.h" +#include "src/core/iomgr/timer.h" +#include "src/core/surface/api_trace.h" #include "src/core/surface/completion_queue.h" grpc_connectivity_state grpc_channel_check_connectivity_state( @@ -45,15 +47,29 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( /* forward through to the underlying client channel */ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - if (client_channel_elem->filter != &grpc_client_channel_filter) { - gpr_log(GPR_ERROR, - "grpc_channel_check_connectivity_state called on something that is " - "not a client channel, but '%s'", - client_channel_elem->filter->name); - return GRPC_CHANNEL_FATAL_FAILURE; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_connectivity_state state; + GRPC_API_TRACE( + "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2, + (channel, try_to_connect)); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + state = grpc_client_channel_check_connectivity_state( + &exec_ctx, client_channel_elem, try_to_connect); + grpc_exec_ctx_finish(&exec_ctx); + return state; + } + if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + state = grpc_client_uchannel_check_connectivity_state( + &exec_ctx, client_channel_elem, try_to_connect); + grpc_exec_ctx_finish(&exec_ctx); + return state; } - return grpc_client_channel_check_connectivity_state(client_channel_elem, - try_to_connect); + gpr_log(GPR_ERROR, + "grpc_channel_check_connectivity_state called on something that is " + "not a (u)client channel, but '%s'", + client_channel_elem->filter->name); + grpc_exec_ctx_finish(&exec_ctx); + return GRPC_CHANNEL_FATAL_FAILURE; } typedef enum { @@ -67,8 +83,9 @@ typedef struct { gpr_mu mu; callback_phase phase; int success; - grpc_iomgr_closure on_complete; - grpc_alarm alarm; + int removed; + grpc_closure on_complete; + grpc_timer alarm; grpc_connectivity_state state; grpc_completion_queue *cq; grpc_cq_completion completion_storage; @@ -76,26 +93,31 @@ typedef struct { void *tag; } state_watcher; -static void delete_state_watcher(state_watcher *w) { +static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_del_interested_party(client_channel_elem, - grpc_cq_pollset(w->cq)); - GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, + "watch_channel_connectivity"); + } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, + "watch_uchannel_connectivity"); + } else { + abort(); + } gpr_mu_destroy(&w->mu); gpr_free(w); } -static void finished_completion(void *pw, grpc_cq_completion *ignored) { +static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, + grpc_cq_completion *ignored) { int delete = 0; state_watcher *w = pw; gpr_mu_lock(&w->mu); switch (w->phase) { case WAITING: case CALLED_BACK: - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - break; + GPR_UNREACHABLE_CODE(return ); case CALLING_BACK: w->phase = CALLED_BACK; break; @@ -106,34 +128,48 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) { gpr_mu_unlock(&w->mu); if (delete) { - delete_state_watcher(w); + delete_state_watcher(exec_ctx, w); } } -static void partly_done(state_watcher *w, int due_to_completion) { +static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, + int due_to_completion) { int delete = 0; + grpc_channel_element *client_channel_elem = NULL; + gpr_mu_lock(&w->mu); + if (w->removed == 0) { + w->removed = 1; + client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq)); + } else { + grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq)); + } + } + gpr_mu_unlock(&w->mu); if (due_to_completion) { gpr_mu_lock(&w->mu); w->success = 1; gpr_mu_unlock(&w->mu); - grpc_alarm_cancel(&w->alarm); + grpc_timer_cancel(exec_ctx, &w->alarm); } gpr_mu_lock(&w->mu); switch (w->phase) { case WAITING: w->phase = CALLING_BACK; - grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w, - &w->completion_storage); + grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion, + w, &w->completion_storage); break; case CALLING_BACK: w->phase = CALLING_BACK_AND_FINISHED; break; case CALLING_BACK_AND_FINISHED: - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - break; + GPR_UNREACHABLE_CODE(return ); case CALLED_BACK: delete = 1; break; @@ -141,47 +177,63 @@ static void partly_done(state_watcher *w, int due_to_completion) { gpr_mu_unlock(&w->mu); if (delete) { - delete_state_watcher(w); + delete_state_watcher(exec_ctx, w); } } -static void watch_complete(void *pw, int success) { partly_done(pw, 1); } +static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) { + partly_done(exec_ctx, pw, 1); +} -static void timeout_complete(void *pw, int success) { partly_done(pw, 0); } +static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) { + partly_done(exec_ctx, pw, 0); +} void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; state_watcher *w = gpr_malloc(sizeof(*w)); + GRPC_API_TRACE( + "grpc_channel_watch_connectivity_state(" + "channel=%p, last_observed_state=%d, " + "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, " + "cq=%p, tag=%p)", + 7, (channel, (int)last_observed_state, (long)deadline.tv_sec, + deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); + grpc_cq_begin_op(cq); gpr_mu_init(&w->mu); - grpc_iomgr_closure_init(&w->on_complete, watch_complete, w); + grpc_closure_init(&w->on_complete, watch_complete, w); w->phase = WAITING; w->state = last_observed_state; w->success = 0; + w->removed = 0; w->cq = cq; w->tag = tag; w->channel = channel; - grpc_alarm_init(&w->alarm, + grpc_timer_init(&exec_ctx, &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); - if (client_channel_elem->filter != &grpc_client_channel_filter) { - gpr_log(GPR_ERROR, - "grpc_channel_watch_connectivity_state called on something that is " - "not a client channel, but '%s'", - client_channel_elem->filter->name); - grpc_iomgr_add_delayed_callback(&w->on_complete, 1); - } else { - GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); - grpc_client_channel_add_interested_party(client_channel_elem, + if (client_channel_elem->filter == &grpc_client_channel_filter) { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); + grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem, grpc_cq_pollset(cq)); - grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, - &w->on_complete); + grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, + &w->state, &w->on_complete); + } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity"); + grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem, + grpc_cq_pollset(cq)); + grpc_client_uchannel_watch_connectivity_state( + &exec_ctx, client_channel_elem, &w->state, &w->on_complete); } + + grpc_exec_ctx_finish(&exec_ctx); } |