aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-02-17 07:03:54 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-02-17 07:03:54 -0800
commit65d57b8c3d83c3245dbe864c46b1056a1c99ddcc (patch)
tree3f3029cfe6c1b1097f4b24b04af26a1cf537c87e /src/core/lib/transport
parent6f4178878ccc45f365ce72eef6247315e048cf2a (diff)
parent46ca4f7d4eca6cb69a6d80ce35f58f2325cf945d (diff)
Merge github.com:grpc/grpc into zalloc
Diffstat (limited to 'src/core/lib/transport')
-rw-r--r--src/core/lib/transport/connectivity_state.c45
-rw-r--r--src/core/lib/transport/connectivity_state.h20
-rw-r--r--src/core/lib/transport/transport.h4
3 files changed, 50 insertions, 19 deletions
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 8fc5bf3e9a..afe1f6164d 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -62,7 +62,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state init_state,
const char *name) {
- tracker->current_state = init_state;
+ gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
tracker->current_error = GRPC_ERROR_NONE;
tracker->watchers = NULL;
tracker->name = gpr_strdup(name);
@@ -89,15 +89,30 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
}
grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker) {
+ grpc_connectivity_state cur =
+ (grpc_connectivity_state)gpr_atm_no_barrier_load(
+ &tracker->current_state_atm);
+ if (grpc_connectivity_state_trace) {
+ gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
+ grpc_connectivity_state_name(cur));
+ }
+ return cur;
+}
+
+grpc_connectivity_state grpc_connectivity_state_get(
grpc_connectivity_state_tracker *tracker, grpc_error **error) {
+ grpc_connectivity_state cur =
+ (grpc_connectivity_state)gpr_atm_no_barrier_load(
+ &tracker->current_state_atm);
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
- grpc_connectivity_state_name(tracker->current_state));
+ grpc_connectivity_state_name(cur));
}
if (error != NULL) {
*error = GRPC_ERROR_REF(tracker->current_error);
}
- return tracker->current_state;
+ return cur;
}
bool grpc_connectivity_state_has_watchers(
@@ -108,6 +123,9 @@ bool grpc_connectivity_state_has_watchers(
bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify) {
+ grpc_connectivity_state cur =
+ (grpc_connectivity_state)gpr_atm_no_barrier_load(
+ &tracker->current_state_atm);
if (grpc_connectivity_state_trace) {
if (current == NULL) {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
@@ -115,7 +133,7 @@ bool grpc_connectivity_state_notify_on_state_change(
} else {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
tracker->name, grpc_connectivity_state_name(*current),
- grpc_connectivity_state_name(tracker->current_state), notify);
+ grpc_connectivity_state_name(cur), notify);
}
}
if (current == NULL) {
@@ -138,8 +156,8 @@ bool grpc_connectivity_state_notify_on_state_change(
}
return false;
} else {
- if (tracker->current_state != *current) {
- *current = tracker->current_state;
+ if (cur != *current) {
+ *current = cur;
grpc_closure_sched(exec_ctx, notify,
GRPC_ERROR_REF(tracker->current_error));
} else {
@@ -149,7 +167,7 @@ bool grpc_connectivity_state_notify_on_state_change(
w->next = tracker->watchers;
tracker->watchers = w;
}
- return tracker->current_state == GRPC_CHANNEL_IDLE;
+ return cur == GRPC_CHANNEL_IDLE;
}
}
@@ -157,11 +175,14 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
grpc_error *error, const char *reason) {
+ grpc_connectivity_state cur =
+ (grpc_connectivity_state)gpr_atm_no_barrier_load(
+ &tracker->current_state_atm);
grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker,
- tracker->name, grpc_connectivity_state_name(tracker->current_state),
+ tracker->name, grpc_connectivity_state_name(cur),
grpc_connectivity_state_name(state), reason, error, error_string);
}
switch (state) {
@@ -178,13 +199,13 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
}
GRPC_ERROR_UNREF(tracker->current_error);
tracker->current_error = error;
- if (tracker->current_state == state) {
+ if (cur == state) {
return;
}
- GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_SHUTDOWN);
- tracker->current_state = state;
+ GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN);
+ gpr_atm_no_barrier_store(&tracker->current_state_atm, state);
while ((w = tracker->watchers) != NULL) {
- *w->current = tracker->current_state;
+ *w->current = state;
tracker->watchers = w->next;
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name,
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 769c675b79..c9604c34dd 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -47,8 +47,8 @@ typedef struct grpc_connectivity_state_watcher {
} grpc_connectivity_state_watcher;
typedef struct {
- /** current connectivity state */
- grpc_connectivity_state current_state;
+ /** current grpc_connectivity_state */
+ gpr_atm current_state_atm;
/** error associated with state */
grpc_error *current_error;
/** all our watchers */
@@ -59,6 +59,7 @@ typedef struct {
extern int grpc_connectivity_state_trace;
+/** enum --> string conversion */
const char *grpc_connectivity_state_name(grpc_connectivity_state state);
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
@@ -68,22 +69,31 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker);
/** Set connectivity state; not thread safe; access must be serialized with an
- * external lock */
+ * external lock */
void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
grpc_error *associated_error,
const char *reason);
+/** Return true if this connectivity state has watchers.
+ Access must be serialized with an external lock. */
bool grpc_connectivity_state_has_watchers(
grpc_connectivity_state_tracker *tracker);
+/** Return the last seen connectivity state. No need to synchronize access. */
grpc_connectivity_state grpc_connectivity_state_check(
- grpc_connectivity_state_tracker *tracker, grpc_error **current_error);
+ grpc_connectivity_state_tracker *tracker);
+
+/** Return the last seen connectivity state, and the associated error.
+ Access must be serialized with an external lock. */
+grpc_connectivity_state grpc_connectivity_state_get(
+ grpc_connectivity_state_tracker *tracker, grpc_error **error);
/** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
- case) */
+ case).
+ Access must be serialized with an external lock. */
bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify);
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index f2dbb891b2..bb23c0225a 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -167,9 +167,9 @@ typedef struct grpc_transport_stream_op {
/***************************************************************************
* remaining fields are initialized and used at the discretion of the
- * transport implementation */
+ * current handler of the op */
- grpc_transport_private_op_data transport_private;
+ grpc_transport_private_op_data handler_private;
} grpc_transport_stream_op;
/** Transport op: a set of operations to perform on a transport as a whole */