diff options
author | 2015-06-29 09:37:52 -0700 | |
---|---|---|
committer | 2015-06-29 09:37:52 -0700 | |
commit | 08a1cf8f4fd747ab331393916bd5d9cc7f4804c1 (patch) | |
tree | 51f5d3fcd64e0552c0d6c74ee50fd8e979e9a0d3 /src/core/transport | |
parent | 4d74056b0806f68397111dc2b16f56386339b8e0 (diff) |
Use connectivity state tracking code in chttp2 transport
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 55 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.c | 100 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.h | 71 |
4 files changed, 195 insertions, 37 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 93235aef55..7e2e75f97d 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -34,7 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H #define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H -#include "src/core/transport/transport_impl.h" #include "src/core/iomgr/endpoint.h" #include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" @@ -47,6 +46,8 @@ #include "src/core/transport/chttp2/incoming_metadata.h" #include "src/core/transport/chttp2/stream_encoder.h" #include "src/core/transport/chttp2/stream_map.h" +#include "src/core/transport/connectivity_state.h" +#include "src/core/transport/transport_impl.h" typedef struct grpc_chttp2_transport grpc_chttp2_transport; typedef struct grpc_chttp2_stream grpc_chttp2_stream; @@ -335,8 +336,7 @@ struct grpc_chttp2_transport { void *accept_stream_user_data; /** connectivity tracking */ - grpc_iomgr_closure *on_connectivity_changed; - grpc_connectivity_state *connectivity; + grpc_connectivity_state_tracker state_tracker; } channel_callback; }; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 11dd60bbb9..8f909dff37 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -37,18 +37,19 @@ #include <stdio.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/http2_errors.h" +#include "src/core/transport/chttp2/internal.h" #include "src/core/transport/chttp2/status_conversion.h" #include "src/core/transport/chttp2/timeout_encoding.h" -#include "src/core/transport/chttp2/internal.h" #include "src/core/transport/transport_impl.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> /* #define REFCOUNTING_DEBUG */ @@ -81,7 +82,6 @@ static const grpc_transport_vtable vtable; static void lock(grpc_chttp2_transport *t); static void unlock(grpc_chttp2_transport *t); -static void unlock_check_channel_callbacks(grpc_chttp2_transport *t); static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ @@ -149,6 +149,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); grpc_chttp2_stream_map_destroy(&t->new_stream_map); + grpc_connectivity_state_destroy(&t->channel_callback.state_tracker); gpr_mu_unlock(&t->mu); gpr_mu_destroy(&t->mu); @@ -229,6 +230,8 @@ static void init_transport(grpc_chttp2_transport *t, t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; + grpc_connectivity_state_init(&t->channel_callback.state_tracker, + GRPC_CHANNEL_READY); gpr_slice_buffer_init(&t->global.qbuf); @@ -325,6 +328,8 @@ static void destroy_transport(grpc_transport *gt) { static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; + grpc_connectivity_state_set(&t->channel_callback.state_tracker, + GRPC_CHANNEL_FATAL_FAILURE); if (t->ep) { grpc_endpoint_shutdown(t->ep); } @@ -445,8 +450,6 @@ static void unlock(grpc_chttp2_transport *t) { REF_TRANSPORT(t, "writing"); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); } - /* unlock_check_parser(t); */ - unlock_check_channel_callbacks(t); run_closures = t->global.pending_closures; t->global.pending_closures = NULL; @@ -520,6 +523,9 @@ void grpc_chttp2_add_incoming_goaway( gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; + grpc_connectivity_state_set( + &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, + GRPC_CHANNEL_FATAL_FAILURE); } static void maybe_start_some_streams( @@ -544,9 +550,9 @@ static void maybe_start_some_streams( transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { - grpc_chttp2_add_incoming_goaway( - transport_global, GRPC_CHTTP2_NO_ERROR, - gpr_slice_from_copied_string("Exceeded sequence number limit")); + grpc_connectivity_state_set(&TRANSPORT_FROM_GLOBAL(transport_global) + ->channel_callback.state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE); } stream_global->outgoing_window = @@ -669,10 +675,9 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->on_connectivity_state_change) { - GPR_ASSERT(t->channel_callback.on_connectivity_changed == NULL); - t->channel_callback.on_connectivity_changed = - op->on_connectivity_state_change; - t->channel_callback.connectivity = op->connectivity_state; + grpc_connectivity_state_notify_on_state_change( + &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); } if (op->send_goaway) { @@ -928,24 +933,6 @@ static void reading_action(void *pt, int iomgr_success_ignored) { * CALLBACK LOOP */ -static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { - if (t->channel_callback.on_connectivity_changed != NULL) { - grpc_connectivity_state current; - if (t->closed || t->global.seen_goaway) { - current = GRPC_CHANNEL_FATAL_FAILURE; - } else { - current = GRPC_CHANNEL_READY; - } - if (current != *t->channel_callback.connectivity) { - *t->channel_callback.connectivity = current; - grpc_chttp2_schedule_closure( - &t->global, t->channel_callback.on_connectivity_changed, 1); - t->channel_callback.on_connectivity_changed = NULL; - t->channel_callback.connectivity = NULL; - } - } -} - void grpc_chttp2_schedule_closure( grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success) { diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c new file mode 100644 index 0000000000..5cbd67ef3c --- /dev/null +++ b/src/core/transport/connectivity_state.c @@ -0,0 +1,100 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/transport/connectivity_state.h" +#include <grpc/support/alloc.h> + +void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state init_state) { + tracker->current_state = init_state; + tracker->watchers = NULL; +} + +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { + grpc_connectivity_state_watcher *w; + while ((w = tracker->watchers)) { + tracker->watchers = w->next; + + if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) { + *w->current = GRPC_CHANNEL_FATAL_FAILURE; + grpc_iomgr_add_callback(w->notify); + } else { + grpc_iomgr_add_delayed_callback(w->notify, 0); + } + gpr_free(w); + } +} + +grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker *tracker) { + return tracker->current_state; +} + +int grpc_connectivity_state_notify_on_state_change( + grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, + grpc_iomgr_closure *notify) { + if (tracker->current_state != *current) { + *current = tracker->current_state; + grpc_iomgr_add_callback(notify); + } else { + grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = current; + w->notify = notify; + w->next = tracker->watchers; + tracker->watchers = w; + } + return tracker->current_state == GRPC_CHANNEL_IDLE; +} + +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state state) { + grpc_connectivity_state_watcher *new = NULL; + grpc_connectivity_state_watcher *w; + if (tracker->current_state == state) { + return; + } + tracker->current_state = state; + while ((w = tracker->watchers)) { + tracker->watchers = w->next; + + if (state != *w->current) { + *w->current = state; + grpc_iomgr_add_callback(w->notify); + gpr_free(w); + } else { + w->next = new; + new = w; + } + } + tracker->watchers = new; +} diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h new file mode 100644 index 0000000000..9a8c57525f --- /dev/null +++ b/src/core/transport/connectivity_state.h @@ -0,0 +1,71 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H +#define GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H + +#include <grpc/grpc.h> +#include "src/core/iomgr/iomgr.h" + +typedef struct grpc_connectivity_state_watcher { + /** we keep watchers in a linked list */ + struct grpc_connectivity_state_watcher *next; + /** closure to notify on change */ + grpc_iomgr_closure *notify; + /** the current state as believed by the watcher */ + grpc_connectivity_state *current; +} grpc_connectivity_state_watcher; + +typedef struct { + /** current connectivity state */ + grpc_connectivity_state current_state; + /** all our watchers */ + grpc_connectivity_state_watcher *watchers; +} grpc_connectivity_state_tracker; + +void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state init_state); +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); + +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state state); + +grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker *tracker); + +/** Return 1 if the channel should start connecting, 0 otherwise */ +int grpc_connectivity_state_notify_on_state_change( + grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, + grpc_iomgr_closure *notify); + +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ |