aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-15 13:00:55 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-15 13:00:55 -0700
commit4aa71a17745c61a79cfe57c40f48a8860728bebf (patch)
tree1caa211567531061b0c81b32943d649b6b8d4211 /src/core/transport/chttp2_transport.c
parent606d874d162a9a254035839701bf6926f681c77b (diff)
clang-format, and process on lock splitting
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c286
1 files changed, 168 insertions, 118 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 5db9b92727..3718eed4dd 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -64,30 +64,53 @@ int grpc_flowctl_trace = 0;
else \
flowctl_trace(t, #dir, obj->dir##_window, id, delta)
-#define TRANSPORT_FROM_WRITING(tw) ((grpc_chttp2_transport*)((char*)(tw) - offsetof(grpc_chttp2_transport, writing)))
+#define TRANSPORT_FROM_WRITING(tw) \
+ ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
+ writing)))
static const grpc_transport_vtable vtable;
-static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
- gpr_uint32 value);
-
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
- static void unlock_check_cancellations(grpc_chttp2_transport* t);
- static void unlock_check_parser(grpc_chttp2_transport* t);
- static void unlock_check_channel_callbacks(grpc_chttp2_transport* t);
-
+/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored);
static void notify_closed(void *t, int iomgr_success_ignored);
+/** Set a transport level setting, and push it to our peer */
+static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
+ gpr_uint32 value);
+
+/** Endpoint callback to process incoming data */
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error);
+
+/** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t);
-static void end_all_the_calls(grpc_chttp2_transport *t);
-static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id);
-static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
-static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
-static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
+/* basic stream list management */
+static grpc_chttp2_stream *stream_list_remove_head(
+ grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id);
+static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id);
+static void stream_list_add_tail(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id);
+static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id);
+
+/** schedule a closure to be called outside of the transport lock after the next
+ unlock() operation */
+static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure,
+ int success);
+
+#if 0
+
+static void unlock_check_cancellations(grpc_chttp2_transport *t);
+static void unlock_check_parser(grpc_chttp2_transport *t);
+static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
+
+static void end_all_the_calls(grpc_chttp2_transport *t);
static void cancel_stream_id(grpc_chttp2_transport *t, gpr_uint32 id,
grpc_status_code local_status,
@@ -96,24 +119,27 @@ static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst);
-static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id);
-static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
+static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t,
+ gpr_uint32 id);
+static void remove_from_stream_map(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
static void maybe_start_some_streams(grpc_chttp2_transport *t);
static void parsing_become_skip_parser(grpc_chttp2_transport *t);
-static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error);
-
-static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, int success);
-static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser);
-static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
-static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset);
-static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op);
+static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ int is_parser);
+static void maybe_join_window_updates(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+static void add_to_pollset_locked(grpc_chttp2_transport *t,
+ grpc_pollset *pollset);
+static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_transport_op *op);
static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
+#endif
-static void flowctl_trace(grpc_chttp2_transport *t, const char *flow, gpr_int32 window,
- gpr_uint32 id, gpr_int32 delta) {
+static void flowctl_trace(grpc_chttp2_transport *t, const char *flow,
+ gpr_int32 window, gpr_uint32 id, gpr_int32 delta) {
gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
delta, window + delta);
}
@@ -176,15 +202,17 @@ static void unref_transport(grpc_chttp2_transport *t) {
static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
-static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callback setup,
- void *arg, const grpc_channel_args *channel_args,
+static void init_transport(grpc_chttp2_transport *t,
+ grpc_transport_setup_callback setup, void *arg,
+ const grpc_channel_args *channel_args,
grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
grpc_mdctx *mdctx, int is_client) {
size_t i;
int j;
grpc_transport_setup_result sr;
- GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
+ GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
+ GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
memset(t, 0, sizeof(*t));
@@ -220,8 +248,9 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t);
if (is_client) {
- gpr_slice_buffer_add(&t->global.qbuf,
- gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ gpr_slice_buffer_add(
+ &t->global.qbuf,
+ gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
@@ -234,7 +263,8 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
- t->global.settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
+ t->global.settings[i][j] =
+ grpc_chttp2_settings_parameters[j].default_value;
}
}
t->global.dirtied_local_settings = 1;
@@ -272,7 +302,8 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
} else if ((t->global.next_stream_id & 1) !=
(channel_args->args[i].value.integer & 1)) {
gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
- GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->global.next_stream_id & 1,
+ GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
+ t->global.next_stream_id & 1,
is_client ? "client" : "server");
} else {
t->global.next_stream_id = channel_args->args[i].value.integer;
@@ -355,9 +386,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
GPR_ASSERT(t->parsing_active);
s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
s->global.outgoing_window =
- t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ t->global
+ .settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
- t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ t->global
+ .settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
}
@@ -375,7 +408,8 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_lock(&t->mu);
- GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0);
+ GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
+ s->global.id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
@@ -400,11 +434,13 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
* LIST MANAGEMENT
*/
-static int stream_list_empty(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
+static int stream_list_empty(grpc_chttp2_transport *t,
+ grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
-static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
+static grpc_chttp2_stream *stream_list_remove_head(
+ grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
@@ -421,7 +457,8 @@ static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grp
return s;
}
-static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
+static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id) {
if (!s->included[id]) return;
s->included[id] = 0;
if (s->links[id].prev) {
@@ -437,7 +474,9 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
}
}
-static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
+static void stream_list_add_tail(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *old_tail;
GPR_ASSERT(!s->included[id]);
old_tail = t->lists[id].tail;
@@ -453,7 +492,8 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s
s->included[id] = 1;
}
-static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
+static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return;
}
@@ -475,7 +515,8 @@ static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream
* LOCK MANAGEMENT
*/
-/* We take a grpc_chttp2_transport-global lock in response to calls coming in from above,
+/* We take a grpc_chttp2_transport-global lock in response to calls coming in
+ from above,
and in response to data being received from below. New data to be written
is always queued, as are callbacks to process data. During unlock() we
check our todo lists and initiate callbacks and flush writes. */
@@ -485,14 +526,15 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
- if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
+ if (!t->writing_active &&
+ grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
ref_transport(t);
schedule_cb(t, &t->writing_action, 1);
}
- unlock_check_cancellations(t);
- unlock_check_parser(t);
- unlock_check_channel_callbacks(t);
+ /* unlock_check_cancellations(t); */
+ /* unlock_check_parser(t); */
+ /* unlock_check_channel_callbacks(t); */
run_closures = t->global.pending_closures;
t->global.pending_closures = NULL;
@@ -525,7 +567,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
-void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success) {
+void grpc_chttp2_terminate_writing(
+ grpc_chttp2_transport_writing *transport_writing, int success) {
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
@@ -551,34 +594,38 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ
unref_transport(t);
}
-
static void writing_action(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
grpc_chttp2_perform_writes(&t->writing, t->ep);
}
-static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error,
+void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
- gpr_slice_unref(t->channel_callback.goaway_text);
- t->channel_callback.have_goaway = 1;
- t->channel_callback.goaway_text = goaway_text;
- t->channel_callback.goaway_error = goaway_error;
+ if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
+ transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
+ transport_global->goaway_text = goaway_text;
+ transport_global->goaway_error = goaway_error;
+ } else {
+ gpr_slice_unref(goaway_text);
+ }
}
static void maybe_start_some_streams(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
- /* start streams where we have free grpc_chttp2_stream ids and free concurrency */
+ /* start streams where we have free grpc_chttp2_stream ids and free
+ * concurrency */
while (t->global.next_stream_id <= MAX_CLIENT_STREAM_ID &&
t->global.concurrent_stream_count <
t->global.settings[PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
- (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
- IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
- t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id));
+ [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
+ (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
+ IF_TRACING(gpr_log(
+ GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
+ t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id));
if (t->global.next_stream_id == MAX_CLIENT_STREAM_ID) {
- add_goaway(
- t, GRPC_CHTTP2_NO_ERROR,
+ grpc_chttp2_add_incoming_goaway(
+ &t->global, GRPC_CHTTP2_NO_ERROR,
gpr_slice_from_copied_string("Exceeded sequence number limit"));
}
@@ -586,15 +633,18 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) {
s->global.id = t->global.next_stream_id;
t->global.next_stream_id += 2;
s->global.outgoing_window =
- t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ t->global
+ .settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
- t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ t->global
+ .settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
t->global.concurrent_stream_count++;
stream_list_join(t, s, WRITABLE);
}
/* cancel out streams that will never be started */
- while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID && (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
+ while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID &&
+ (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
cancel_stream(
t, s, GRPC_STATUS_UNAVAILABLE,
grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL,
@@ -816,9 +866,9 @@ static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, i
stream_list_join(t, s, FINISHED_READ_OP);
}
}
-#endif
-static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
+static void maybe_join_window_updates(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
if (t->parsing.executing) {
stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return;
@@ -826,13 +876,12 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre
if (s->incoming_sopb != NULL &&
s->global.incoming_window <
t->global.settings[LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
3 / 4) {
stream_list_join(t, s, WINDOW_UPDATE);
}
}
-#if 0
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
return grpc_chttp2_stream_map_find(&t->stream_map, id);
}
@@ -867,15 +916,19 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
t->parsing_active = 1;
grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
- for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++)
+ for (i = 0;
+ i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
+ i++)
;
gpr_mu_lock(&t->mu);
if (i != nslices) {
drop_connection(t);
}
/* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
- t->global.concurrent_stream_count = grpc_stream_map_size(&t->parsing_stream_map);
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ t->global.concurrent_stream_count =
+ grpc_chttp2_stream_map_size(&t->parsing_stream_map);
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
t->parsing_active = 0;
@@ -905,7 +958,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
t->global.outgoing_window += t->global.outgoing_window_update;
t->global.outgoing_window_update = 0;
maybe_start_some_streams(t);
-#endif
+#endif
unlock(t);
keep_reading = 1;
break;
@@ -932,25 +985,18 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
typedef struct {
grpc_chttp2_transport *t;
- grpc_chttp2_pending_goaway *goaways;
- size_t num_goaways;
+ gpr_uint32 error;
+ gpr_slice text;
grpc_iomgr_closure closure;
} notify_goaways_args;
static void notify_goaways(void *p, int iomgr_success_ignored) {
- size_t i;
notify_goaways_args *a = p;
grpc_chttp2_transport *t = a->t;
- for (i = 0; i < a->num_goaways; i++) {
- t->channel_callback.cb->goaway(
- t->channel_callback.cb_user_data,
- &t->base,
- a->goaways[i].status,
- a->goaways[i].debug);
- }
+ t->channel_callback.cb->goaway(t->channel_callback.cb_user_data, &t->base,
+ a->error, a->text);
- gpr_free(a->goaways);
gpr_free(a);
lock(t);
@@ -960,37 +1006,6 @@ static void notify_goaways(void *p, int iomgr_success_ignored) {
unref_transport(t);
}
-static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
- if (t->channel_callback.executing) {
- return;
- }
- if (t->parsing.executing) {
- return;
- }
- if (t->num_pending_goaways) {
- notify_goaways_args *a = gpr_malloc(sizeof(*a));
- a->goaways = t->pending_goaways;
- a->num_goaways = t->num_pending_goaways;
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
- t->channel_callback.executing = 1;
- grpc_iomgr_closure_init(&a->closure, notify_goaways, a);
- ref_transport(t);
- schedule_cb(t, &a->closure, 1);
- return;
- }
- if (t->writing.executing) {
- return;
- }
- if (t->error_state == ERROR_STATE_SEEN) {
- t->error_state = ERROR_STATE_NOTIFIED;
- t->channel_callback.executing = 1;
- ref_transport(t);
- schedule_cb(t, &t->channel_callback.notify_closed, 1);
- }
-}
-
static void notify_closed(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
@@ -1002,7 +1017,36 @@ static void notify_closed(void *gt, int iomgr_success_ignored) {
unref_transport(t);
}
-static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, int success) {
+static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
+ if (t->channel_callback.executing) {
+ return;
+ }
+ if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) {
+ if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN &&
+ t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
+ notify_goaways_args *a = gpr_malloc(sizeof(*a));
+ a->error = t->global.goaway_error;
+ a->text = t->global.goaway_text;
+ t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
+ t->channel_callback.executing = 1;
+ grpc_iomgr_closure_init(&a->closure, notify_goaways, a);
+ ref_transport(t);
+ schedule_cb(t, &a->closure, 1);
+ return;
+ } else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
+ return;
+ }
+ }
+ if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) {
+ t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
+ t->channel_callback.executing = 1;
+ ref_transport(t);
+ schedule_cb(t, &t->channel_callback.notify_closed, 1);
+ }
+}
+
+static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure,
+ int success) {
closure->success = success;
closure->next = t->global.pending_closures;
t->global.pending_closures = closure;
@@ -1012,7 +1056,8 @@ static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, i
* POLLSET STUFF
*/
-static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset) {
+static void add_to_pollset_locked(grpc_chttp2_transport *t,
+ grpc_pollset *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
@@ -1029,10 +1074,15 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
* INTEGRATION GLUE
*/
-static const grpc_transport_vtable vtable = {
- sizeof(grpc_chttp2_stream), init_stream, perform_op,
- add_to_pollset, destroy_stream, goaway,
- close_transport, send_ping, destroy_transport};
+static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
+ init_stream,
+ perform_op,
+ add_to_pollset,
+ destroy_stream,
+ goaway,
+ close_transport,
+ send_ping,
+ destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,