aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-29 09:37:52 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-29 09:37:52 -0700
commit08a1cf8f4fd747ab331393916bd5d9cc7f4804c1 (patch)
tree51f5d3fcd64e0552c0d6c74ee50fd8e979e9a0d3 /src/core
parent4d74056b0806f68397111dc2b16f56386339b8e0 (diff)
Use connectivity state tracking code in chttp2 transport
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c2
-rw-r--r--src/core/client_config/lb_policies/pick_first.c2
-rw-r--r--src/core/client_config/subchannel.c2
-rw-r--r--src/core/transport/chttp2/internal.h6
-rw-r--r--src/core/transport/chttp2_transport.c55
-rw-r--r--src/core/transport/connectivity_state.c (renamed from src/core/channel/connectivity_state.c)45
-rw-r--r--src/core/transport/connectivity_state.h (renamed from src/core/channel/connectivity_state.h)23
7 files changed, 66 insertions, 69 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index b33ef7842f..e2f8debdfa 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -38,10 +38,10 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/connectivity_state.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
+#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 9d6c264e37..cdc7e75140 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -36,7 +36,7 @@
#include <string.h>
#include <grpc/support/alloc.h>
-#include "src/core/channel/connectivity_state.h"
+#include "src/core/transport/connectivity_state.h"
typedef struct pending_pick {
struct pending_pick *next;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index b4da9cda3f..2f5843b2a4 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -39,7 +39,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/connectivity_state.h"
+#include "src/core/transport/connectivity_state.h"
typedef struct {
/* all fields protected by subchannel->mu */
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 93235aef55..7e2e75f97d 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -34,7 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
-#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
@@ -47,6 +46,8 @@
#include "src/core/transport/chttp2/incoming_metadata.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
+#include "src/core/transport/connectivity_state.h"
+#include "src/core/transport/transport_impl.h"
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
@@ -335,8 +336,7 @@ struct grpc_chttp2_transport {
void *accept_stream_user_data;
/** connectivity tracking */
- grpc_iomgr_closure *on_connectivity_changed;
- grpc_connectivity_state *connectivity;
+ grpc_connectivity_state_tracker state_tracker;
} channel_callback;
};
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 11dd60bbb9..8f909dff37 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -37,18 +37,19 @@
#include <stdio.h>
#include <string.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/http2_errors.h"
+#include "src/core/transport/chttp2/internal.h"
#include "src/core/transport/chttp2/status_conversion.h"
#include "src/core/transport/chttp2/timeout_encoding.h"
-#include "src/core/transport/chttp2/internal.h"
#include "src/core/transport/transport_impl.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
/* #define REFCOUNTING_DEBUG */
@@ -81,7 +82,6 @@ static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
-static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
@@ -149,6 +149,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
+ grpc_connectivity_state_destroy(&t->channel_callback.state_tracker);
gpr_mu_unlock(&t->mu);
gpr_mu_destroy(&t->mu);
@@ -229,6 +230,8 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
+ grpc_connectivity_state_init(&t->channel_callback.state_tracker,
+ GRPC_CHANNEL_READY);
gpr_slice_buffer_init(&t->global.qbuf);
@@ -325,6 +328,8 @@ static void destroy_transport(grpc_transport *gt) {
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
+ grpc_connectivity_state_set(&t->channel_callback.state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
if (t->ep) {
grpc_endpoint_shutdown(t->ep);
}
@@ -445,8 +450,6 @@ static void unlock(grpc_chttp2_transport *t) {
REF_TRANSPORT(t, "writing");
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
}
- /* unlock_check_parser(t); */
- unlock_check_channel_callbacks(t);
run_closures = t->global.pending_closures;
t->global.pending_closures = NULL;
@@ -520,6 +523,9 @@ void grpc_chttp2_add_incoming_goaway(
gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
+ grpc_connectivity_state_set(
+ &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
}
static void maybe_start_some_streams(
@@ -544,9 +550,9 @@ static void maybe_start_some_streams(
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
- grpc_chttp2_add_incoming_goaway(
- transport_global, GRPC_CHTTP2_NO_ERROR,
- gpr_slice_from_copied_string("Exceeded sequence number limit"));
+ grpc_connectivity_state_set(&TRANSPORT_FROM_GLOBAL(transport_global)
+ ->channel_callback.state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE);
}
stream_global->outgoing_window =
@@ -669,10 +675,9 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
}
if (op->on_connectivity_state_change) {
- GPR_ASSERT(t->channel_callback.on_connectivity_changed == NULL);
- t->channel_callback.on_connectivity_changed =
- op->on_connectivity_state_change;
- t->channel_callback.connectivity = op->connectivity_state;
+ grpc_connectivity_state_notify_on_state_change(
+ &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
}
if (op->send_goaway) {
@@ -928,24 +933,6 @@ static void reading_action(void *pt, int iomgr_success_ignored) {
* CALLBACK LOOP
*/
-static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
- if (t->channel_callback.on_connectivity_changed != NULL) {
- grpc_connectivity_state current;
- if (t->closed || t->global.seen_goaway) {
- current = GRPC_CHANNEL_FATAL_FAILURE;
- } else {
- current = GRPC_CHANNEL_READY;
- }
- if (current != *t->channel_callback.connectivity) {
- *t->channel_callback.connectivity = current;
- grpc_chttp2_schedule_closure(
- &t->global, t->channel_callback.on_connectivity_changed, 1);
- t->channel_callback.on_connectivity_changed = NULL;
- t->channel_callback.connectivity = NULL;
- }
- }
-}
-
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
diff --git a/src/core/channel/connectivity_state.c b/src/core/transport/connectivity_state.c
index 0ee268ee59..5cbd67ef3c 100644
--- a/src/core/channel/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -31,52 +31,57 @@
*
*/
-#include "src/core/channel/connectivity_state.h"
+#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
-void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state) {
- tracker->current_state = init_state;
- tracker->watchers = NULL;
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state init_state) {
+ tracker->current_state = init_state;
+ tracker->watchers = NULL;
}
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
grpc_connectivity_state_watcher *w;
while ((w = tracker->watchers)) {
- tracker->watchers = w->next;
+ tracker->watchers = w->next;
- if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
- *w->current = GRPC_CHANNEL_FATAL_FAILURE;
- grpc_iomgr_add_callback(w->notify);
- } else {
- grpc_iomgr_add_delayed_callback(w->notify, 0);
- }
- gpr_free(w);
+ if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
+ *w->current = GRPC_CHANNEL_FATAL_FAILURE;
+ grpc_iomgr_add_callback(w->notify);
+ } else {
+ grpc_iomgr_add_delayed_callback(w->notify, 0);
+ }
+ gpr_free(w);
}
}
-grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker) {
- return tracker->current_state;
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker) {
+ return tracker->current_state;
}
-int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify) {
+int grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify) {
if (tracker->current_state != *current) {
*current = tracker->current_state;
grpc_iomgr_add_callback(notify);
} else {
- grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
- w->current = current;
- w->notify = notify;
+ grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
+ w->current = current;
+ w->notify = notify;
w->next = tracker->watchers;
tracker->watchers = w;
}
return tracker->current_state == GRPC_CHANNEL_IDLE;
}
-void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) {
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state) {
grpc_connectivity_state_watcher *new = NULL;
grpc_connectivity_state_watcher *w;
if (tracker->current_state == state) {
- return;
+ return;
}
tracker->current_state = state;
while ((w = tracker->watchers)) {
diff --git a/src/core/channel/connectivity_state.h b/src/core/transport/connectivity_state.h
index ebc1acc559..9a8c57525f 100644
--- a/src/core/channel/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -31,14 +31,14 @@
*
*/
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H
-#define GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H
+#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H
+#define GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H
#include <grpc/grpc.h>
#include "src/core/iomgr/iomgr.h"
typedef struct grpc_connectivity_state_watcher {
- /** we keep watchers in a linked list */
+ /** we keep watchers in a linked list */
struct grpc_connectivity_state_watcher *next;
/** closure to notify on change */
grpc_iomgr_closure *notify;
@@ -47,20 +47,25 @@ typedef struct grpc_connectivity_state_watcher {
} grpc_connectivity_state_watcher;
typedef struct {
- /** current connectivity state */
+ /** current connectivity state */
grpc_connectivity_state current_state;
/** all our watchers */
grpc_connectivity_state_watcher *watchers;
} grpc_connectivity_state_tracker;
-void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state);
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state init_state);
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
-void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state);
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state);
-grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker);
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker);
/** Return 1 if the channel should start connecting, 0 otherwise */
-int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify);
+int grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify);
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */