diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-29 09:37:52 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-29 09:37:52 -0700 |
commit | 08a1cf8f4fd747ab331393916bd5d9cc7f4804c1 (patch) | |
tree | 51f5d3fcd64e0552c0d6c74ee50fd8e979e9a0d3 /src/core | |
parent | 4d74056b0806f68397111dc2b16f56386339b8e0 (diff) |
Use connectivity state tracking code in chttp2 transport
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 2 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 2 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 55 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.c (renamed from src/core/channel/connectivity_state.c) | 45 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.h (renamed from src/core/channel/connectivity_state.h) | 23 |
7 files changed, 66 insertions, 69 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index b33ef7842f..e2f8debdfa 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -38,10 +38,10 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" -#include "src/core/channel/connectivity_state.h" #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/pollset_set.h" #include "src/core/support/string.h" +#include "src/core/transport/connectivity_state.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 9d6c264e37..cdc7e75140 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -36,7 +36,7 @@ #include <string.h> #include <grpc/support/alloc.h> -#include "src/core/channel/connectivity_state.h" +#include "src/core/transport/connectivity_state.h" typedef struct pending_pick { struct pending_pick *next; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index b4da9cda3f..2f5843b2a4 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -39,7 +39,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" -#include "src/core/channel/connectivity_state.h" +#include "src/core/transport/connectivity_state.h" typedef struct { /* all fields protected by subchannel->mu */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 93235aef55..7e2e75f97d 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -34,7 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H #define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H -#include "src/core/transport/transport_impl.h" #include "src/core/iomgr/endpoint.h" #include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" @@ -47,6 +46,8 @@ #include "src/core/transport/chttp2/incoming_metadata.h" #include "src/core/transport/chttp2/stream_encoder.h" #include "src/core/transport/chttp2/stream_map.h" +#include "src/core/transport/connectivity_state.h" +#include "src/core/transport/transport_impl.h" typedef struct grpc_chttp2_transport grpc_chttp2_transport; typedef struct grpc_chttp2_stream grpc_chttp2_stream; @@ -335,8 +336,7 @@ struct grpc_chttp2_transport { void *accept_stream_user_data; /** connectivity tracking */ - grpc_iomgr_closure *on_connectivity_changed; - grpc_connectivity_state *connectivity; + grpc_connectivity_state_tracker state_tracker; } channel_callback; }; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 11dd60bbb9..8f909dff37 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -37,18 +37,19 @@ #include <stdio.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/http2_errors.h" +#include "src/core/transport/chttp2/internal.h" #include "src/core/transport/chttp2/status_conversion.h" #include "src/core/transport/chttp2/timeout_encoding.h" -#include "src/core/transport/chttp2/internal.h" #include "src/core/transport/transport_impl.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> /* #define REFCOUNTING_DEBUG */ @@ -81,7 +82,6 @@ static const grpc_transport_vtable vtable; static void lock(grpc_chttp2_transport *t); static void unlock(grpc_chttp2_transport *t); -static void unlock_check_channel_callbacks(grpc_chttp2_transport *t); static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ @@ -149,6 +149,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); grpc_chttp2_stream_map_destroy(&t->new_stream_map); + grpc_connectivity_state_destroy(&t->channel_callback.state_tracker); gpr_mu_unlock(&t->mu); gpr_mu_destroy(&t->mu); @@ -229,6 +230,8 @@ static void init_transport(grpc_chttp2_transport *t, t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; + grpc_connectivity_state_init(&t->channel_callback.state_tracker, + GRPC_CHANNEL_READY); gpr_slice_buffer_init(&t->global.qbuf); @@ -325,6 +328,8 @@ static void destroy_transport(grpc_transport *gt) { static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; + grpc_connectivity_state_set(&t->channel_callback.state_tracker, + GRPC_CHANNEL_FATAL_FAILURE); if (t->ep) { grpc_endpoint_shutdown(t->ep); } @@ -445,8 +450,6 @@ static void unlock(grpc_chttp2_transport *t) { REF_TRANSPORT(t, "writing"); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); } - /* unlock_check_parser(t); */ - unlock_check_channel_callbacks(t); run_closures = t->global.pending_closures; t->global.pending_closures = NULL; @@ -520,6 +523,9 @@ void grpc_chttp2_add_incoming_goaway( gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; + grpc_connectivity_state_set( + &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, + GRPC_CHANNEL_FATAL_FAILURE); } static void maybe_start_some_streams( @@ -544,9 +550,9 @@ static void maybe_start_some_streams( transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { - grpc_chttp2_add_incoming_goaway( - transport_global, GRPC_CHTTP2_NO_ERROR, - gpr_slice_from_copied_string("Exceeded sequence number limit")); + grpc_connectivity_state_set(&TRANSPORT_FROM_GLOBAL(transport_global) + ->channel_callback.state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE); } stream_global->outgoing_window = @@ -669,10 +675,9 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->on_connectivity_state_change) { - GPR_ASSERT(t->channel_callback.on_connectivity_changed == NULL); - t->channel_callback.on_connectivity_changed = - op->on_connectivity_state_change; - t->channel_callback.connectivity = op->connectivity_state; + grpc_connectivity_state_notify_on_state_change( + &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); } if (op->send_goaway) { @@ -928,24 +933,6 @@ static void reading_action(void *pt, int iomgr_success_ignored) { * CALLBACK LOOP */ -static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { - if (t->channel_callback.on_connectivity_changed != NULL) { - grpc_connectivity_state current; - if (t->closed || t->global.seen_goaway) { - current = GRPC_CHANNEL_FATAL_FAILURE; - } else { - current = GRPC_CHANNEL_READY; - } - if (current != *t->channel_callback.connectivity) { - *t->channel_callback.connectivity = current; - grpc_chttp2_schedule_closure( - &t->global, t->channel_callback.on_connectivity_changed, 1); - t->channel_callback.on_connectivity_changed = NULL; - t->channel_callback.connectivity = NULL; - } - } -} - void grpc_chttp2_schedule_closure( grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success) { diff --git a/src/core/channel/connectivity_state.c b/src/core/transport/connectivity_state.c index 0ee268ee59..5cbd67ef3c 100644 --- a/src/core/channel/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -31,52 +31,57 @@ * */ -#include "src/core/channel/connectivity_state.h" +#include "src/core/transport/connectivity_state.h" #include <grpc/support/alloc.h> -void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state) { - tracker->current_state = init_state; - tracker->watchers = NULL; +void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state init_state) { + tracker->current_state = init_state; + tracker->watchers = NULL; } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { grpc_connectivity_state_watcher *w; while ((w = tracker->watchers)) { - tracker->watchers = w->next; + tracker->watchers = w->next; - if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) { - *w->current = GRPC_CHANNEL_FATAL_FAILURE; - grpc_iomgr_add_callback(w->notify); - } else { - grpc_iomgr_add_delayed_callback(w->notify, 0); - } - gpr_free(w); + if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) { + *w->current = GRPC_CHANNEL_FATAL_FAILURE; + grpc_iomgr_add_callback(w->notify); + } else { + grpc_iomgr_add_delayed_callback(w->notify, 0); + } + gpr_free(w); } } -grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker) { - return tracker->current_state; +grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker *tracker) { + return tracker->current_state; } -int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { +int grpc_connectivity_state_notify_on_state_change( + grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, + grpc_iomgr_closure *notify) { if (tracker->current_state != *current) { *current = tracker->current_state; grpc_iomgr_add_callback(notify); } else { - grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); - w->current = current; - w->notify = notify; + grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = current; + w->notify = notify; w->next = tracker->watchers; tracker->watchers = w; } return tracker->current_state == GRPC_CHANNEL_IDLE; } -void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) { +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state state) { grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *w; if (tracker->current_state == state) { - return; + return; } tracker->current_state = state; while ((w = tracker->watchers)) { diff --git a/src/core/channel/connectivity_state.h b/src/core/transport/connectivity_state.h index ebc1acc559..9a8c57525f 100644 --- a/src/core/channel/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -31,14 +31,14 @@ * */ -#ifndef GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H -#define GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H +#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H +#define GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H #include <grpc/grpc.h> #include "src/core/iomgr/iomgr.h" typedef struct grpc_connectivity_state_watcher { - /** we keep watchers in a linked list */ + /** we keep watchers in a linked list */ struct grpc_connectivity_state_watcher *next; /** closure to notify on change */ grpc_iomgr_closure *notify; @@ -47,20 +47,25 @@ typedef struct grpc_connectivity_state_watcher { } grpc_connectivity_state_watcher; typedef struct { - /** current connectivity state */ + /** current connectivity state */ grpc_connectivity_state current_state; /** all our watchers */ grpc_connectivity_state_watcher *watchers; } grpc_connectivity_state_tracker; -void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state); +void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state init_state); void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); -void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state); +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, + grpc_connectivity_state state); -grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker); +grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker *tracker); /** Return 1 if the channel should start connecting, 0 otherwise */ -int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify); +int grpc_connectivity_state_notify_on_state_change( + grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, + grpc_iomgr_closure *notify); -#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ |