diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-17 15:27:13 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-17 15:27:13 -0700 |
commit | 5795da7c96642f3e19c8cfe105de7d1cc7cf6fbc (patch) | |
tree | b27597449f26049270b754ba8f5b010d47e0f998 /src/core/transport/connectivity_state.c | |
parent | 2d2711f9e6fd3ac783f400fcd4e72798636f194e (diff) |
Update connectivity state code to be completely synchronous
Diffstat (limited to 'src/core/transport/connectivity_state.c')
-rw-r--r-- | src/core/transport/connectivity_state.c | 73 |
1 files changed, 43 insertions, 30 deletions
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index cf23adfbb2..034f82474f 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -32,6 +32,9 @@ */ #include "src/core/transport/connectivity_state.h" + +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -56,14 +59,12 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) { } void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, - grpc_workqueue *workqueue, grpc_connectivity_state init_state, const char *name) { tracker->current_state = init_state; tracker->watchers = NULL; - tracker->workqueue = workqueue; - GRPC_WORKQUEUE_REF(workqueue, name); tracker->name = gpr_strdup(name); + tracker->changed = 0; } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { @@ -78,10 +79,9 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { } else { success = 0; } - grpc_workqueue_push(tracker->workqueue, w->notify, success); + w->notify->cb(w->notify->cb_arg, success); gpr_free(w); } - GRPC_WORKQUEUE_UNREF(tracker->workqueue, tracker->name); gpr_free(tracker->name); } @@ -90,9 +90,12 @@ grpc_connectivity_state grpc_connectivity_state_check( return tracker->current_state; } -int grpc_connectivity_state_notify_on_state_change( +grpc_connectivity_state_notify_on_state_change_result +grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { + grpc_connectivity_state_notify_on_state_change_result result; + memset(&result, 0, sizeof(result)); if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name, grpc_connectivity_state_name(*current), @@ -100,7 +103,7 @@ int grpc_connectivity_state_notify_on_state_change( } if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_workqueue_push(tracker->workqueue, notify, 1); + result.state_already_changed = 1; } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -108,15 +111,13 @@ int grpc_connectivity_state_notify_on_state_change( w->next = tracker->watchers; tracker->watchers = w; } - return tracker->current_state == GRPC_CHANNEL_IDLE; + result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE; + return result; } -void grpc_connectivity_state_set_with_scheduler( - grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg, - const char *reason) { - grpc_connectivity_state_watcher *new = NULL; - grpc_connectivity_state_watcher *w; +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state state, + const char *reason) { if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, grpc_connectivity_state_name(tracker->current_state), @@ -127,28 +128,40 @@ void grpc_connectivity_state_set_with_scheduler( } GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE); tracker->current_state = state; - while ((w = tracker->watchers)) { - tracker->watchers = w->next; + tracker->changed = 1; +} - if (state != *w->current) { - *w->current = state; - scheduler(arg, w->notify); +void grpc_connectivity_state_begin_flush( + grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state_flusher *flusher) { + grpc_connectivity_state_watcher *w; + flusher->cbs = NULL; + if (!tracker->changed) return; + w = tracker->watchers; + tracker->watchers = NULL; + while (w != NULL) { + grpc_connectivity_state_watcher *next = w->next; + if (tracker->current_state != *w->current) { + *w->current = tracker->current_state; + w->notify->next = flusher->cbs; + flusher->cbs = w->notify; gpr_free(w); } else { - w->next = new; - new = w; + w->next = tracker->watchers; + tracker->watchers = w; } + w = next; } - tracker->watchers = new; + tracker->changed = 0; } -static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) { - grpc_workqueue_push(workqueue, closure, 1); +void grpc_connectivity_state_end_flush( + grpc_connectivity_state_flusher *flusher) { + grpc_iomgr_closure *c = flusher->cbs; + while (c != NULL) { + grpc_iomgr_closure *next = c; + c->cb(c->cb_arg, 1); + c = next; + } } -void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, - grpc_connectivity_state state, - const char *reason) { - grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, - tracker->workqueue, reason); -} |