aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/connectivity_state.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-17 15:27:13 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-17 15:27:13 -0700
commit5795da7c96642f3e19c8cfe105de7d1cc7cf6fbc (patch)
treeb27597449f26049270b754ba8f5b010d47e0f998 /src/core/transport/connectivity_state.c
parent2d2711f9e6fd3ac783f400fcd4e72798636f194e (diff)
Update connectivity state code to be completely synchronous
Diffstat (limited to 'src/core/transport/connectivity_state.c')
-rw-r--r--src/core/transport/connectivity_state.c73
1 files changed, 43 insertions, 30 deletions
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index cf23adfbb2..034f82474f 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -32,6 +32,9 @@
*/
#include "src/core/transport/connectivity_state.h"
+
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@@ -56,14 +59,12 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
}
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
- grpc_workqueue *workqueue,
grpc_connectivity_state init_state,
const char *name) {
tracker->current_state = init_state;
tracker->watchers = NULL;
- tracker->workqueue = workqueue;
- GRPC_WORKQUEUE_REF(workqueue, name);
tracker->name = gpr_strdup(name);
+ tracker->changed = 0;
}
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
@@ -78,10 +79,9 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
} else {
success = 0;
}
- grpc_workqueue_push(tracker->workqueue, w->notify, success);
+ w->notify->cb(w->notify->cb_arg, success);
gpr_free(w);
}
- GRPC_WORKQUEUE_UNREF(tracker->workqueue, tracker->name);
gpr_free(tracker->name);
}
@@ -90,9 +90,12 @@ grpc_connectivity_state grpc_connectivity_state_check(
return tracker->current_state;
}
-int grpc_connectivity_state_notify_on_state_change(
+grpc_connectivity_state_notify_on_state_change_result
+grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
grpc_iomgr_closure *notify) {
+ grpc_connectivity_state_notify_on_state_change_result result;
+ memset(&result, 0, sizeof(result));
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name,
grpc_connectivity_state_name(*current),
@@ -100,7 +103,7 @@ int grpc_connectivity_state_notify_on_state_change(
}
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_workqueue_push(tracker->workqueue, notify, 1);
+ result.state_already_changed = 1;
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -108,15 +111,13 @@ int grpc_connectivity_state_notify_on_state_change(
w->next = tracker->watchers;
tracker->watchers = w;
}
- return tracker->current_state == GRPC_CHANNEL_IDLE;
+ result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE;
+ return result;
}
-void grpc_connectivity_state_set_with_scheduler(
- grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
- void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg,
- const char *reason) {
- grpc_connectivity_state_watcher *new = NULL;
- grpc_connectivity_state_watcher *w;
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state,
+ const char *reason) {
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
grpc_connectivity_state_name(tracker->current_state),
@@ -127,28 +128,40 @@ void grpc_connectivity_state_set_with_scheduler(
}
GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE);
tracker->current_state = state;
- while ((w = tracker->watchers)) {
- tracker->watchers = w->next;
+ tracker->changed = 1;
+}
- if (state != *w->current) {
- *w->current = state;
- scheduler(arg, w->notify);
+void grpc_connectivity_state_begin_flush(
+ grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state_flusher *flusher) {
+ grpc_connectivity_state_watcher *w;
+ flusher->cbs = NULL;
+ if (!tracker->changed) return;
+ w = tracker->watchers;
+ tracker->watchers = NULL;
+ while (w != NULL) {
+ grpc_connectivity_state_watcher *next = w->next;
+ if (tracker->current_state != *w->current) {
+ *w->current = tracker->current_state;
+ w->notify->next = flusher->cbs;
+ flusher->cbs = w->notify;
gpr_free(w);
} else {
- w->next = new;
- new = w;
+ w->next = tracker->watchers;
+ tracker->watchers = w;
}
+ w = next;
}
- tracker->watchers = new;
+ tracker->changed = 0;
}
-static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) {
- grpc_workqueue_push(workqueue, closure, 1);
+void grpc_connectivity_state_end_flush(
+ grpc_connectivity_state_flusher *flusher) {
+ grpc_iomgr_closure *c = flusher->cbs;
+ while (c != NULL) {
+ grpc_iomgr_closure *next = c;
+ c->cb(c->cb_arg, 1);
+ c = next;
+ }
}
-void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
- grpc_connectivity_state state,
- const char *reason) {
- grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
- tracker->workqueue, reason);
-}