diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-18 11:21:22 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-18 11:21:22 -0700 |
commit | 564872d51d10587ea16a6c0ba75d1f60c214cf5a (patch) | |
tree | 2efced59ec4690f8a52ec368061b9a36caacebcc /src/core | |
parent | c3fdaf9fd41918f09f7bab3413988cdd8f04eb4f (diff) |
Fix goaways, stream counting
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 34 |
1 files changed, 26 insertions, 8 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 215aac10a3..21ffd2abf0 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -113,6 +113,10 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset); +/** Start new streams that have been created if we can */ +static void maybe_start_some_streams( + grpc_chttp2_transport_global *transport_global); + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -489,6 +493,16 @@ static void unlock(grpc_chttp2_transport *t) { /* unlock_check_parser(t); */ unlock_check_channel_callbacks(t); + if (!t->parsing_active) { + size_t new_stream_count = + grpc_chttp2_stream_map_size(&t->parsing_stream_map) + + grpc_chttp2_stream_map_size(&t->new_stream_map); + if (new_stream_count != t->global.concurrent_stream_count) { + t->global.concurrent_stream_count = new_stream_count; + maybe_start_some_streams(&t->global); + } + } + run_closures = t->global.pending_closures; t->global.pending_closures = NULL; @@ -556,8 +570,11 @@ static void writing_action(void *gt, int iomgr_success_ignored) { void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, gpr_slice goaway_text) { + char *msg = gpr_hexdump((char*)GPR_SLICE_START_PTR(goaway_text), GPR_SLICE_LENGTH(goaway_text), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "add goaway: st=%d err=%d text=%s", transport_global->goaway_state, goaway_error, msg); + gpr_free(msg); if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) { - transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; + transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_SEEN; transport_global->goaway_text = goaway_text; transport_global->goaway_error = goaway_error; } else { @@ -568,6 +585,7 @@ void grpc_chttp2_add_incoming_goaway( static void maybe_start_some_streams( grpc_chttp2_transport_global *transport_global) { grpc_chttp2_stream_global *stream_global; + gpr_log(GPR_DEBUG, "nextid=%d count=%d", transport_global->next_stream_id, transport_global->concurrent_stream_count); /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID && @@ -581,15 +599,16 @@ static void maybe_start_some_streams( transport_global->is_client ? "CLI" : "SVR", stream_global, transport_global->next_stream_id)); - if (transport_global->next_stream_id == MAX_CLIENT_STREAM_ID) { + GPR_ASSERT(stream_global->id == 0); + stream_global->id = transport_global->next_stream_id; + transport_global->next_stream_id += 2; + + if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { grpc_chttp2_add_incoming_goaway( transport_global, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit")); } - GPR_ASSERT(stream_global->id == 0); - stream_global->id = transport_global->next_stream_id; - transport_global->next_stream_id += 2; stream_global->outgoing_window = transport_global ->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; @@ -606,7 +625,7 @@ static void maybe_start_some_streams( grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } /* cancel out streams that will never be started */ - while (transport_global->next_stream_id > MAX_CLIENT_STREAM_ID && + while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE); @@ -1000,8 +1019,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, &t->parsing_stream_map); /* handle higher level things */ grpc_chttp2_publish_reads(&t->global, &t->parsing); - t->global.concurrent_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map); t->parsing_active = 0; } if (i == nslices) { @@ -1059,6 +1076,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN && t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { notify_goaways_args *a = gpr_malloc(sizeof(*a)); + a->t = t; a->error = t->global.goaway_error; a->text = t->global.goaway_text; t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; |