aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-29 09:37:52 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-29 09:37:52 -0700
commit08a1cf8f4fd747ab331393916bd5d9cc7f4804c1 (patch)
tree51f5d3fcd64e0552c0d6c74ee50fd8e979e9a0d3 /src/core/transport
parent4d74056b0806f68397111dc2b16f56386339b8e0 (diff)
Use connectivity state tracking code in chttp2 transport
Diffstat (limited to 'src/core/transport')
-rw-r--r--src/core/transport/chttp2/internal.h6
-rw-r--r--src/core/transport/chttp2_transport.c55
-rw-r--r--src/core/transport/connectivity_state.c100
-rw-r--r--src/core/transport/connectivity_state.h71
4 files changed, 195 insertions, 37 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 93235aef55..7e2e75f97d 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -34,7 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
-#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
@@ -47,6 +46,8 @@
#include "src/core/transport/chttp2/incoming_metadata.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
+#include "src/core/transport/connectivity_state.h"
+#include "src/core/transport/transport_impl.h"
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
@@ -335,8 +336,7 @@ struct grpc_chttp2_transport {
void *accept_stream_user_data;
/** connectivity tracking */
- grpc_iomgr_closure *on_connectivity_changed;
- grpc_connectivity_state *connectivity;
+ grpc_connectivity_state_tracker state_tracker;
} channel_callback;
};
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 11dd60bbb9..8f909dff37 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -37,18 +37,19 @@
#include <stdio.h>
#include <string.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/http2_errors.h"
+#include "src/core/transport/chttp2/internal.h"
#include "src/core/transport/chttp2/status_conversion.h"
#include "src/core/transport/chttp2/timeout_encoding.h"
-#include "src/core/transport/chttp2/internal.h"
#include "src/core/transport/transport_impl.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
/* #define REFCOUNTING_DEBUG */
@@ -81,7 +82,6 @@ static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
-static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
@@ -149,6 +149,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
+ grpc_connectivity_state_destroy(&t->channel_callback.state_tracker);
gpr_mu_unlock(&t->mu);
gpr_mu_destroy(&t->mu);
@@ -229,6 +230,8 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
+ grpc_connectivity_state_init(&t->channel_callback.state_tracker,
+ GRPC_CHANNEL_READY);
gpr_slice_buffer_init(&t->global.qbuf);
@@ -325,6 +328,8 @@ static void destroy_transport(grpc_transport *gt) {
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
+ grpc_connectivity_state_set(&t->channel_callback.state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
if (t->ep) {
grpc_endpoint_shutdown(t->ep);
}
@@ -445,8 +450,6 @@ static void unlock(grpc_chttp2_transport *t) {
REF_TRANSPORT(t, "writing");
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
}
- /* unlock_check_parser(t); */
- unlock_check_channel_callbacks(t);
run_closures = t->global.pending_closures;
t->global.pending_closures = NULL;
@@ -520,6 +523,9 @@ void grpc_chttp2_add_incoming_goaway(
gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
+ grpc_connectivity_state_set(
+ &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
}
static void maybe_start_some_streams(
@@ -544,9 +550,9 @@ static void maybe_start_some_streams(
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
- grpc_chttp2_add_incoming_goaway(
- transport_global, GRPC_CHTTP2_NO_ERROR,
- gpr_slice_from_copied_string("Exceeded sequence number limit"));
+ grpc_connectivity_state_set(&TRANSPORT_FROM_GLOBAL(transport_global)
+ ->channel_callback.state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE);
}
stream_global->outgoing_window =
@@ -669,10 +675,9 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
}
if (op->on_connectivity_state_change) {
- GPR_ASSERT(t->channel_callback.on_connectivity_changed == NULL);
- t->channel_callback.on_connectivity_changed =
- op->on_connectivity_state_change;
- t->channel_callback.connectivity = op->connectivity_state;
+ grpc_connectivity_state_notify_on_state_change(
+ &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
}
if (op->send_goaway) {
@@ -928,24 +933,6 @@ static void reading_action(void *pt, int iomgr_success_ignored) {
* CALLBACK LOOP
*/
-static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
- if (t->channel_callback.on_connectivity_changed != NULL) {
- grpc_connectivity_state current;
- if (t->closed || t->global.seen_goaway) {
- current = GRPC_CHANNEL_FATAL_FAILURE;
- } else {
- current = GRPC_CHANNEL_READY;
- }
- if (current != *t->channel_callback.connectivity) {
- *t->channel_callback.connectivity = current;
- grpc_chttp2_schedule_closure(
- &t->global, t->channel_callback.on_connectivity_changed, 1);
- t->channel_callback.on_connectivity_changed = NULL;
- t->channel_callback.connectivity = NULL;
- }
- }
-}
-
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
new file mode 100644
index 0000000000..5cbd67ef3c
--- /dev/null
+++ b/src/core/transport/connectivity_state.c
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/transport/connectivity_state.h"
+#include <grpc/support/alloc.h>
+
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state init_state) {
+ tracker->current_state = init_state;
+ tracker->watchers = NULL;
+}
+
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
+ grpc_connectivity_state_watcher *w;
+ while ((w = tracker->watchers)) {
+ tracker->watchers = w->next;
+
+ if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
+ *w->current = GRPC_CHANNEL_FATAL_FAILURE;
+ grpc_iomgr_add_callback(w->notify);
+ } else {
+ grpc_iomgr_add_delayed_callback(w->notify, 0);
+ }
+ gpr_free(w);
+ }
+}
+
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker) {
+ return tracker->current_state;
+}
+
+int grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify) {
+ if (tracker->current_state != *current) {
+ *current = tracker->current_state;
+ grpc_iomgr_add_callback(notify);
+ } else {
+ grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
+ w->current = current;
+ w->notify = notify;
+ w->next = tracker->watchers;
+ tracker->watchers = w;
+ }
+ return tracker->current_state == GRPC_CHANNEL_IDLE;
+}
+
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state) {
+ grpc_connectivity_state_watcher *new = NULL;
+ grpc_connectivity_state_watcher *w;
+ if (tracker->current_state == state) {
+ return;
+ }
+ tracker->current_state = state;
+ while ((w = tracker->watchers)) {
+ tracker->watchers = w->next;
+
+ if (state != *w->current) {
+ *w->current = state;
+ grpc_iomgr_add_callback(w->notify);
+ gpr_free(w);
+ } else {
+ w->next = new;
+ new = w;
+ }
+ }
+ tracker->watchers = new;
+}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
new file mode 100644
index 0000000000..9a8c57525f
--- /dev/null
+++ b/src/core/transport/connectivity_state.h
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H
+#define GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H
+
+#include <grpc/grpc.h>
+#include "src/core/iomgr/iomgr.h"
+
+typedef struct grpc_connectivity_state_watcher {
+ /** we keep watchers in a linked list */
+ struct grpc_connectivity_state_watcher *next;
+ /** closure to notify on change */
+ grpc_iomgr_closure *notify;
+ /** the current state as believed by the watcher */
+ grpc_connectivity_state *current;
+} grpc_connectivity_state_watcher;
+
+typedef struct {
+ /** current connectivity state */
+ grpc_connectivity_state current_state;
+ /** all our watchers */
+ grpc_connectivity_state_watcher *watchers;
+} grpc_connectivity_state_tracker;
+
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state init_state);
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
+
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state);
+
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker);
+
+/** Return 1 if the channel should start connecting, 0 otherwise */
+int grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify);
+
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */