diff options
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 3 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_map.c | 18 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 5 |
5 files changed, 33 insertions, 6 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7f50e711d6..3d1cd56e61 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -555,6 +555,9 @@ void grpc_chttp2_list_add_writable_window_update_stream( int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); +void grpc_chttp2_list_remove_writable_window_update_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index f675c038d0..07b98d0d57 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -615,6 +615,7 @@ static int init_header_frame_parser( grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) { int is_eoh = (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; + int via_accept = 0; grpc_chttp2_stream_parsing *stream_parsing; if (is_eoh) { @@ -632,7 +633,7 @@ static int init_header_frame_parser( /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ stream_parsing = grpc_chttp2_parsing_lookup_stream( transport_parsing, transport_parsing->incoming_stream_id); - if (!stream_parsing) { + if (stream_parsing == NULL) { if (is_continuation) { gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received"); @@ -666,13 +667,15 @@ static int init_header_frame_parser( stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream( transport_parsing, transport_parsing->incoming_stream_id); - if (!stream_parsing) { + if (stream_parsing == NULL) { gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); return init_skip_frame_parser(transport_parsing, 1); } + via_accept = 1; } else { transport_parsing->incoming_stream = stream_parsing; } + GPR_ASSERT(stream_parsing != NULL && (via_accept == 0 || via_accept == 1)); if (stream_parsing->received_close) { gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); transport_parsing->incoming_stream = NULL; diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index bac9060cb8..706612e4f0 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -219,6 +219,12 @@ int grpc_chttp2_list_pop_writable_window_update_stream( return r; } +void grpc_chttp2_list_remove_writable_window_update_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); +} + void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { diff --git a/src/core/transport/chttp2/stream_map.c b/src/core/transport/chttp2/stream_map.c index e137d6d6b8..0ec2f27291 100644 --- a/src/core/transport/chttp2/stream_map.c +++ b/src/core/transport/chttp2/stream_map.c @@ -107,17 +107,24 @@ void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, GPR_SWAP(grpc_chttp2_stream_map, *src, *dst); return; } + /* the first element of src must be greater than the last of dst... + * however the maps may need compacting for this property to hold */ + if (src->keys[0] <= dst->keys[dst->count - 1]) { + src->count = compact(src->keys, src->values, src->count); + src->free = 0; + dst->count = compact(dst->keys, dst->values, dst->count); + dst->free = 0; + } + GPR_ASSERT(src->keys[0] > dst->keys[dst->count - 1]); /* if dst doesn't have capacity, resize */ if (dst->count + src->count > dst->capacity) { dst->capacity = GPR_MAX(dst->capacity * 3 / 2, dst->count + src->count); dst->keys = gpr_realloc(dst->keys, dst->capacity * sizeof(gpr_uint32)); dst->values = gpr_realloc(dst->values, dst->capacity * sizeof(void *)); } - /* the first element of src must be greater than the last of dst */ - GPR_ASSERT(src->keys[0] > dst->keys[dst->count - 1]); memcpy(dst->keys + dst->count, src->keys, src->count * sizeof(gpr_uint32)); memcpy(dst->values + dst->count, src->values, - src->count * sizeof(gpr_uint32)); + src->count * sizeof(void*)); dst->count += src->count; dst->free += src->free; src->count = 0; @@ -159,6 +166,11 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, out = *pvalue; *pvalue = NULL; map->free += (out != NULL); + /* recognize complete emptyness and ensure we can skip + * defragmentation later */ + if (map->free == map->count) { + map->free = map->count = 0; + } } return out; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 47cb8f6cec..d490efc8d2 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -421,6 +421,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { } grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global); + grpc_chttp2_list_remove_writable_window_update_stream(&t->global, &s->global); gpr_mu_unlock(&t->mu); @@ -781,6 +782,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { stream_global->published_cancelled = 1; } } + gpr_log(GPR_DEBUG, "%s: id:%d ws:%d rc:%d ism:%d pa:%d ps:%p", transport_global->is_client?"CLI":"SVR", stream_global->id, stream_global->write_state, stream_global->read_closed, stream_global->in_stream_map, t->parsing_active, stream_global->publish_sopb); if (stream_global->write_state == WRITE_STATE_SENT_CLOSE && stream_global->read_closed && stream_global->in_stream_map) { if (t->parsing_active) { @@ -795,7 +797,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { } state = compute_state( stream_global->write_state == WRITE_STATE_SENT_CLOSE, - stream_global->read_closed && !stream_global->in_stream_map); + stream_global->read_closed); + gpr_log(GPR_DEBUG, "s=%d s'=%d nops=%d; rc:%d ism:%d", stream_global->published_state, state, stream_global->incoming_sopb.nops, stream_global->read_closed, stream_global->in_stream_map); if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) { continue; |