diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-16 11:33:15 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-16 11:33:15 -0700 |
commit | 9850510e52713ccaf0e49bfad1b8e68efe65a383 (patch) | |
tree | 03c41e10a47724a33f6597ec6b156049db689fc9 /src | |
parent | 1937b06b785f577ad4b0fa1734eed286d2d67e6f (diff) |
qps_test links with the breakup
Diffstat (limited to 'src')
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.c | 3 | ||||
-rw-r--r-- | src/core/transport/chttp2/incoming_metadata.c | 120 | ||||
-rw-r--r-- | src/core/transport/chttp2/incoming_metadata.h | 24 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 36 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 36 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 99 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_map.c | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 3 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 124 |
9 files changed, 275 insertions, 176 deletions
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index b4aa7af2c1..4b11d46cfe 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -1393,7 +1393,8 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( } if (parser->is_boundary) { grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - &stream_parsing->incoming_metadata, &stream_parsing->data_parser.incoming_sopb); + &stream_parsing->incoming_metadata, + &stream_parsing->data_parser.incoming_sopb); } if (parser->is_eof) { stream_parsing->received_close = 1; diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index b120b3326f..5f32947df6 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -40,36 +40,38 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -void grpc_chttp2_incoming_metadata_buffer_init(grpc_chttp2_incoming_metadata_buffer *buffer) { +void grpc_chttp2_incoming_metadata_buffer_init( + grpc_chttp2_incoming_metadata_buffer *buffer) { buffer->deadline = gpr_inf_future; } -void grpc_chttp2_incoming_metadata_buffer_destroy(grpc_chttp2_incoming_metadata_buffer *buffer) { +void grpc_chttp2_incoming_metadata_buffer_destroy( + grpc_chttp2_incoming_metadata_buffer *buffer) { gpr_free(buffer->elems); } -void grpc_chttp2_incoming_metadata_buffer_add(grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem *elem) { +void grpc_chttp2_incoming_metadata_buffer_add( + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) { if (buffer->capacity == buffer->count) { - buffer->capacity = - GPR_MAX(8, 2 * buffer->capacity); + buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); buffer->elems = - gpr_realloc(buffer->elems, - sizeof(*buffer->elems) * - buffer->capacity); + gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); } - buffer->elems[buffer->count++] - .md = elem; + buffer->elems[buffer->count++].md = elem; } -void grpc_chttp2_incoming_metadata_buffer_set_deadline(grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { +void grpc_chttp2_incoming_metadata_buffer_set_deadline( + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { buffer->deadline = deadline; } -#if 0 -void grpc_chttp2_parsing_add_metadata_batch( - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing) { +void grpc_chttp2_incoming_metadata_live_op_buffer_end( + grpc_chttp2_incoming_metadata_live_op_buffer *buffer) { + gpr_free(buffer->elems); + buffer->elems = NULL; +} + +void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) { grpc_metadata_batch b; b.list.head = NULL; @@ -77,20 +79,19 @@ void grpc_chttp2_parsing_add_metadata_batch( we can reconstitute the list. We can't do list building here as later incoming metadata may reallocate the underlying array. */ - b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count; + b.list.tail = (void*)(gpr_intptr)buffer->count; b.garbage.head = b.garbage.tail = NULL; - b.deadline = stream_parsing->incoming_deadline; - stream_parsing->incoming_deadline = gpr_inf_future; + b.deadline = buffer->deadline; + buffer->deadline = gpr_inf_future; - grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); + grpc_sopb_add_metadata(sopb, b); } -#endif -#if 0 -static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, - grpc_chttp2_stream_parsing *stream_parsing) { - grpc_stream_op *ops = stream_global->incoming_sopb->ops; - size_t nops = stream_global->incoming_sopb->nops; +void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, + grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) { + grpc_stream_op *ops = sopb->ops; + size_t nops = sopb->nops; size_t i; size_t j; size_t mdidx = 0; @@ -109,40 +110,59 @@ static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, segment where this segment begins */ last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); GPR_ASSERT(last_mdidx > mdidx); - GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count); + GPR_ASSERT(last_mdidx <= buffer->count); /* turn the array into a doubly linked list */ - op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx]; - op->data.metadata.list.tail = - &stream_parsing->incoming_metadata[last_mdidx - 1]; + op->data.metadata.list.head = &buffer->elems[mdidx]; + op->data.metadata.list.tail = &buffer->elems[last_mdidx - 1]; for (j = mdidx + 1; j < last_mdidx; j++) { - stream_parsing->incoming_metadata[j].prev = - &stream_parsing->incoming_metadata[j - 1]; - stream_parsing->incoming_metadata[j - 1].next = - &stream_parsing->incoming_metadata[j]; + buffer->elems[j].prev = &buffer->elems[j - 1]; + buffer->elems[j - 1].next = &buffer->elems[j]; } - stream_parsing->incoming_metadata[mdidx].prev = NULL; - stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL; + buffer->elems[mdidx].prev = NULL; + buffer->elems[last_mdidx - 1].next = NULL; /* track where we're up to */ mdidx = last_mdidx; } if (found_metadata) { - stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata; - if (mdidx != stream_parsing->incoming_metadata_count) { + live_op_buffer->elems = buffer->elems; + if (mdidx != buffer->count) { /* we have a partially read metadata batch still in incoming_metadata */ - size_t new_count = stream_parsing->incoming_metadata_count - mdidx; - size_t copy_bytes = - sizeof(*stream_parsing->incoming_metadata) * new_count; - GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count); - stream_parsing->incoming_metadata = gpr_malloc(copy_bytes); - memcpy(stream_parsing->old_incoming_metadata + mdidx, - stream_parsing->incoming_metadata, copy_bytes); - stream_parsing->incoming_metadata_count = - stream_parsing->incoming_metadata_capacity = new_count; + size_t new_count = buffer->count - mdidx; + size_t copy_bytes = sizeof(*buffer->elems) * new_count; + GPR_ASSERT(mdidx < buffer->count); + buffer->elems = gpr_malloc(copy_bytes); + memcpy(live_op_buffer->elems + mdidx, buffer->elems, copy_bytes); + buffer->count = buffer->capacity = new_count; } else { - stream_parsing->incoming_metadata = NULL; - stream_parsing->incoming_metadata_count = 0; - stream_parsing->incoming_metadata_capacity = 0; + buffer->elems = NULL; + buffer->count = 0; + buffer->capacity = 0; } } } + +#if 0 +void grpc_chttp2_parsing_add_metadata_batch( + grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_parsing *stream_parsing) { + grpc_metadata_batch b; + + b.list.head = NULL; + /* Store away the last element of the list, so that in patch_metadata_ops + we can reconstitute the list. + We can't do list building here as later incoming metadata may reallocate + the underlying array. */ + b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count; + b.garbage.head = b.garbage.tail = NULL; + b.deadline = stream_parsing->incoming_deadline; + stream_parsing->incoming_deadline = gpr_inf_future; + + grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); +} +#endif + +#if 0 +static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, + grpc_chttp2_stream_parsing *stream_parsing) { +} #endif diff --git a/src/core/transport/chttp2/incoming_metadata.h b/src/core/transport/chttp2/incoming_metadata.h index d2b08943d4..5a7890a534 100644 --- a/src/core/transport/chttp2/incoming_metadata.h +++ b/src/core/transport/chttp2/incoming_metadata.h @@ -48,21 +48,29 @@ typedef struct { } grpc_chttp2_incoming_metadata_live_op_buffer; /** assumes everything initially zeroed */ -void grpc_chttp2_incoming_metadata_buffer_init(grpc_chttp2_incoming_metadata_buffer *buffer); -void grpc_chttp2_incoming_metadata_buffer_destroy(grpc_chttp2_incoming_metadata_buffer *buffer); -void grpc_chttp2_incoming_metadata_buffer_reset(grpc_chttp2_incoming_metadata_buffer *buffer); +void grpc_chttp2_incoming_metadata_buffer_init( + grpc_chttp2_incoming_metadata_buffer *buffer); +void grpc_chttp2_incoming_metadata_buffer_destroy( + grpc_chttp2_incoming_metadata_buffer *buffer); +void grpc_chttp2_incoming_metadata_buffer_reset( + grpc_chttp2_incoming_metadata_buffer *buffer); -void grpc_chttp2_incoming_metadata_buffer_add(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem); -void grpc_chttp2_incoming_metadata_buffer_set_deadline(grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); +void grpc_chttp2_incoming_metadata_buffer_add( + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem); +void grpc_chttp2_incoming_metadata_buffer_set_deadline( + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); /** extend sopb with a metadata batch; this must be post-processed by grpc_chttp2_incoming_metadata_buffer_postprocess_sopb before being handed out of the transport */ -void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb); +void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb); void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, + grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); -void grpc_chttp2_incoming_metadata_live_op_buffer_end(grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); +void grpc_chttp2_incoming_metadata_live_op_buffer_end( + grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); #endif /* GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index d34fc7e6ff..06f114c2fb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -88,7 +88,7 @@ typedef enum { PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE, NEW_OUTGOING_WINDOW, -#endif +#endif STREAM_LIST_COUNT /* must be last */ } grpc_chttp2_stream_list_id; @@ -466,13 +466,13 @@ struct grpc_chttp2_stream_parsing { /** incoming metadata */ grpc_chttp2_incoming_metadata_buffer incoming_metadata; -/* - grpc_linked_mdelem *incoming_metadata; - size_t incoming_metadata_count; - size_t incoming_metadata_capacity; - grpc_linked_mdelem *old_incoming_metadata; - gpr_timespec incoming_deadline; -*/ + /* + grpc_linked_mdelem *incoming_metadata; + size_t incoming_metadata_count; + size_t incoming_metadata_capacity; + grpc_linked_mdelem *old_incoming_metadata; + gpr_timespec incoming_deadline; + */ }; struct grpc_chttp2_stream { @@ -599,14 +599,22 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); -void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, - gpr_slice goaway_text); +void grpc_chttp2_add_incoming_goaway( + grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, + gpr_slice goaway_text); -void grpc_chttp2_remove_from_stream_map(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_remove_from_stream_map( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); -void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_for_all_streams(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global)); +void grpc_chttp2_register_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +void grpc_chttp2_for_all_streams( + grpc_chttp2_transport_global *transport_global, void *user_data, + void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, + grpc_chttp2_stream_global *stream_global)); #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index bf66bb42cf..e45ebb7176 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -61,8 +61,7 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, - grpc_chttp2_transport_parsing *parsing) { -} + grpc_chttp2_transport_parsing *parsing) {} void grpc_chttp2_publish_reads( grpc_chttp2_transport_global *transport_global, @@ -134,7 +133,9 @@ void grpc_chttp2_publish_reads( /* move goaway to the global state if we received one (it will be published later */ if (transport_parsing->goaway_received) { - grpc_chttp2_add_incoming_goaway(transport_global, transport_parsing->goaway_error, transport_parsing->goaway_text); + grpc_chttp2_add_incoming_goaway(transport_global, + transport_parsing->goaway_error, + transport_parsing->goaway_text); transport_parsing->goaway_received = 0; } @@ -164,11 +165,13 @@ void grpc_chttp2_publish_reads( /* updating closed status */ if (stream_parsing->received_close) { stream_global->read_closed = 1; - grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); } if (stream_parsing->saw_rst_stream) { stream_global->cancelled = 1; - grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); } } } @@ -486,10 +489,10 @@ static int init_data_frame_parser( stream_parsing->received_close = 1; stream_parsing->saw_rst_stream = 1; stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR; - gpr_slice_buffer_add(&transport_parsing->qbuf, - grpc_chttp2_rst_stream_create( - transport_parsing->incoming_stream_id, - GRPC_CHTTP2_PROTOCOL_ERROR)); + gpr_slice_buffer_add( + &transport_parsing->qbuf, + grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, + GRPC_CHTTP2_PROTOCOL_ERROR)); return init_skip_frame_parser(transport_parsing, 0); case GRPC_CHTTP2_CONNECTION_ERROR: return 0; @@ -526,10 +529,13 @@ static void on_header(void *tp, grpc_mdelem *md) { } grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } - grpc_chttp2_incoming_metadata_buffer_set_deadline(&stream_parsing->incoming_metadata, gpr_time_add(gpr_now(), *cached_timeout)); + grpc_chttp2_incoming_metadata_buffer_set_deadline( + &stream_parsing->incoming_metadata, + gpr_time_add(gpr_now(), *cached_timeout)); grpc_mdelem_unref(md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, md); + grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, + md); } grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); @@ -711,10 +717,10 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, if (stream_parsing) { stream_parsing->saw_rst_stream = 1; stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR; - gpr_slice_buffer_add(&transport_parsing->qbuf, - grpc_chttp2_rst_stream_create( - transport_parsing->incoming_stream_id, - GRPC_CHTTP2_PROTOCOL_ERROR)); + gpr_slice_buffer_add( + &transport_parsing->qbuf, + grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, + GRPC_CHTTP2_PROTOCOL_ERROR)); } return 1; case GRPC_CHTTP2_CONNECTION_ERROR: diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 544174fd67..85f6bd3616 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -35,29 +35,26 @@ #include <grpc/support/log.h> -#define TRANSPORT_FROM_GLOBAL(tg) \ +#define TRANSPORT_FROM_GLOBAL(tg) \ ((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \ global))) #define STREAM_FROM_GLOBAL(sg) \ - ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, \ - global))) + ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global))) -#define TRANSPORT_FROM_WRITING(tw) \ +#define TRANSPORT_FROM_WRITING(tw) \ ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ writing))) #define STREAM_FROM_WRITING(sw) \ - ((grpc_chttp2_stream *)((char *)(sw)-offsetof(grpc_chttp2_stream, \ - writing))) + ((grpc_chttp2_stream *)((char *)(sw)-offsetof(grpc_chttp2_stream, writing))) -#define TRANSPORT_FROM_PARSING(tp) \ +#define TRANSPORT_FROM_PARSING(tp) \ ((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \ parsing))) #define STREAM_FROM_PARSING(sp) \ - ((grpc_chttp2_stream *)((char *)(sp)-offsetof(grpc_chttp2_stream, \ - parsing))) + ((grpc_chttp2_stream *)((char *)(sp)-offsetof(grpc_chttp2_stream, parsing))) /* core list management */ @@ -66,8 +63,9 @@ static int stream_list_empty(grpc_chttp2_transport *t, return t->lists[id].head == NULL; } -static int stream_list_pop( - grpc_chttp2_transport *t, grpc_chttp2_stream **stream, grpc_chttp2_stream_list_id id) { +static int stream_list_pop(grpc_chttp2_transport *t, + grpc_chttp2_stream **stream, + grpc_chttp2_stream_list_id id) { grpc_chttp2_stream *s = t->lists[id].head; if (s) { grpc_chttp2_stream *new_head = s->links[id].next; @@ -121,7 +119,7 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, } static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_list_id id) { + grpc_chttp2_stream_list_id id) { if (s->included[id]) { return; } @@ -133,7 +131,8 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); } int grpc_chttp2_list_pop_writable_stream( @@ -142,7 +141,8 @@ int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE); + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_WRITABLE); *stream_global = &stream->global; *stream_writing = &stream->writing; return r; @@ -151,19 +151,23 @@ int grpc_chttp2_list_pop_writable_stream( void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_WRITING); + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_WRITING); } int grpc_chttp2_list_have_writing_streams( grpc_chttp2_transport_writing *transport_writing) { - return stream_list_empty(TRANSPORT_FROM_WRITING(transport_writing), GRPC_CHTTP2_LIST_WRITING); + return stream_list_empty(TRANSPORT_FROM_WRITING(transport_writing), + GRPC_CHTTP2_LIST_WRITING); } int grpc_chttp2_list_pop_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing **stream_writing) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITING); + int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, + GRPC_CHTTP2_LIST_WRITING); *stream_writing = &stream->writing; return r; } @@ -171,7 +175,9 @@ int grpc_chttp2_list_pop_writing_stream( void grpc_chttp2_list_add_written_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_WRITTEN); + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_WRITTEN); } int grpc_chttp2_list_pop_written_stream( @@ -180,7 +186,8 @@ int grpc_chttp2_list_pop_written_stream( grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITTEN); + int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, + GRPC_CHTTP2_LIST_WRITTEN); *stream_writing = &stream->writing; return r; } @@ -188,14 +195,17 @@ int grpc_chttp2_list_pop_written_stream( void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); } int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); *stream_global = &stream->global; return r; } @@ -203,7 +213,9 @@ int grpc_chttp2_list_pop_writable_window_update_stream( void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { - stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing), STREAM_FROM_PARSING(stream_parsing), GRPC_CHTTP2_LIST_PARSING_SEEN); + stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing), + STREAM_FROM_PARSING(stream_parsing), + GRPC_CHTTP2_LIST_PARSING_SEEN); } int grpc_chttp2_list_pop_parsing_seen_stream( @@ -212,7 +224,8 @@ int grpc_chttp2_list_pop_parsing_seen_stream( grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_parsing **stream_parsing) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream, GRPC_CHTTP2_LIST_PARSING_SEEN); + int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream, + GRPC_CHTTP2_LIST_PARSING_SEEN); *stream_global = &stream->global; *stream_parsing = &stream->parsing; return r; @@ -221,14 +234,17 @@ int grpc_chttp2_list_pop_parsing_seen_stream( void grpc_chttp2_list_add_waiting_for_concurrency( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); } int grpc_chttp2_list_pop_waiting_for_concurrency( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); *stream_global = &stream->global; return r; } @@ -236,14 +252,17 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( void grpc_chttp2_list_add_cancelled_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); } int grpc_chttp2_list_pop_cancelled_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); *stream_global = &stream->global; return r; } @@ -251,26 +270,38 @@ int grpc_chttp2_list_pop_cancelled_waiting_for_parsing( void grpc_chttp2_list_add_read_write_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); } void grpc_chttp2_list_add_incoming_window_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_INCOMING_WINDOW_STATE_CHANGED); + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_INCOMING_WINDOW_STATE_CHANGED); } -void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { +void grpc_chttp2_register_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); +void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } -void grpc_chttp2_for_all_streams(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global)) { +void grpc_chttp2_for_all_streams( + grpc_chttp2_transport_global *transport_global, void *user_data, + void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, + grpc_chttp2_stream_global *stream_global)) { grpc_chttp2_stream *s; - for (s = TRANSPORT_FROM_GLOBAL(transport_global)->lists[GRPC_CHTTP2_LIST_ALL_STREAMS].head; s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) { + for (s = TRANSPORT_FROM_GLOBAL(transport_global) + ->lists[GRPC_CHTTP2_LIST_ALL_STREAMS] + .head; + s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) { cb(transport_global, user_data, &s->global); } } diff --git a/src/core/transport/chttp2/stream_map.c b/src/core/transport/chttp2/stream_map.c index 2d80007bcb..baec29e18d 100644 --- a/src/core/transport/chttp2/stream_map.c +++ b/src/core/transport/chttp2/stream_map.c @@ -96,7 +96,8 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, gpr_uint32 key, map->count = count + 1; } -void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_stream_map *dst) { +void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, + grpc_chttp2_stream_map *dst) { /* if src is empty we dont need to do anything */ if (src->count == src->free) { return; @@ -115,7 +116,8 @@ void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_s /* the first element of src must be greater than the last of dst */ GPR_ASSERT(src->keys[0] > dst->keys[dst->count - 1]); memcpy(dst->keys + dst->count, src->keys, src->count * sizeof(gpr_uint32)); - memcpy(dst->values + dst->count, src->values, src->count * sizeof(gpr_uint32)); + memcpy(dst->values + dst->count, src->values, + src->count * sizeof(gpr_uint32)); dst->count += src->count; dst->free += src->free; src->count = 0; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 572ee0de56..6cc19aec83 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -192,7 +192,8 @@ void grpc_chttp2_cleanup_writing( if (!transport_global->is_client) { stream_global->read_closed = 1; } - grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); } } transport_writing->outbuf.count = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index db9a8ef8ee..6198b786fe 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -66,13 +66,12 @@ int grpc_flowctl_trace = 0; ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ parsing))) -#define TRANSPORT_FROM_GLOBAL(tg) \ +#define TRANSPORT_FROM_GLOBAL(tg) \ ((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \ global))) #define STREAM_FROM_GLOBAL(sg) \ - ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, \ - global))) + ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global))) static const grpc_transport_vtable vtable; @@ -98,13 +97,14 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, static void drop_connection(grpc_chttp2_transport *t); /** Perform a transport_op */ -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_op *op); +static void perform_op_locked(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_transport_op *op); /** Cancel a stream: coming from the transport API */ -static void cancel_from_api( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_status_code status); +static void cancel_from_api(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_status_code status); /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_chttp2_transport *t, @@ -394,7 +394,8 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) { grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); - grpc_chttp2_stream *s = grpc_chttp2_stream_map_find(&t->parsing_stream_map, id); + grpc_chttp2_stream *s = + grpc_chttp2_stream_map_find(&t->parsing_stream_map, id); return &s->parsing; } @@ -404,7 +405,8 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; - t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base, (void *)(gpr_uintptr)id); + t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, + &t->base, (void *)(gpr_uintptr)id); t->accepting_stream = NULL; return &accepting->parsing; } @@ -508,8 +510,9 @@ static void writing_action(void *gt, int iomgr_success_ignored) { grpc_chttp2_perform_writes(&t->writing, t->ep); } -void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, - gpr_slice goaway_text) { +void grpc_chttp2_add_incoming_goaway( + grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, + gpr_slice goaway_text) { if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) { transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; transport_global->goaway_text = goaway_text; @@ -519,18 +522,21 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_glo } } -static void maybe_start_some_streams(grpc_chttp2_transport_global *transport_global) { +static void maybe_start_some_streams( + grpc_chttp2_transport_global *transport_global) { grpc_chttp2_stream_global *stream_global; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID && transport_global->concurrent_stream_count < - transport_global->settings[PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && - grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { - IF_TRACING(gpr_log( - GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", - transport_global->is_client ? "CLI" : "SVR", stream_global, transport_global->next_stream_id)); + transport_global->settings + [PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && + grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, + &stream_global)) { + IF_TRACING(gpr_log(GPR_DEBUG, + "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", + transport_global->is_client ? "CLI" : "SVR", + stream_global, transport_global->next_stream_id)); if (transport_global->next_stream_id == MAX_CLIENT_STREAM_ID) { grpc_chttp2_add_incoming_goaway( @@ -545,20 +551,25 @@ static void maybe_start_some_streams(grpc_chttp2_transport_global *transport_glo transport_global ->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; stream_global->incoming_window = - transport_global-> - settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - grpc_chttp2_stream_map_add(&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); + transport_global + ->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + grpc_chttp2_stream_map_add( + &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, + stream_global->id, STREAM_FROM_GLOBAL(stream_global)); transport_global->concurrent_stream_count++; grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id > MAX_CLIENT_STREAM_ID && - grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { + grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, + &stream_global)) { cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE); } } -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_op *op) { +static void perform_op_locked(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_transport_op *op) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -572,19 +583,20 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, gr stream_global->write_state = WRITE_STATE_QUEUED_CLOSE; } if (stream_global->id == 0) { - IF_TRACING(gpr_log(GPR_DEBUG, - "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", - transport_global->is_client ? "CLI" : "SVR", stream_global)); - grpc_chttp2_list_add_waiting_for_concurrency( - transport_global, stream_global - ); + IF_TRACING(gpr_log( + GPR_DEBUG, + "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", + transport_global->is_client ? "CLI" : "SVR", stream_global)); + grpc_chttp2_list_add_waiting_for_concurrency(transport_global, + stream_global); maybe_start_some_streams(transport_global); } else if (stream_global->outgoing_window > 0) { grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } else { grpc_sopb_reset(op->send_ops); - grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 0); + grpc_chttp2_schedule_closure(transport_global, + stream_global->send_done_closure, 0); } } @@ -594,13 +606,17 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, gr stream_global->recv_done_closure = op->on_done_recv; stream_global->incoming_sopb = op->recv_ops; stream_global->incoming_sopb->nops = 0; - grpc_chttp2_incoming_metadata_live_op_buffer_end(&stream_global->outstanding_metadata); - grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); - grpc_chttp2_list_add_incoming_window_state_changed(transport_global, stream_global); + grpc_chttp2_incoming_metadata_live_op_buffer_end( + &stream_global->outstanding_metadata); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); + grpc_chttp2_list_add_incoming_window_state_changed(transport_global, + stream_global); } if (op->bind_pollset) { - add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), op->bind_pollset); + add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), + op->bind_pollset); } if (op->on_consumed) { @@ -651,11 +667,13 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) { we are not parsing before continuing the cancellation to keep things in a sane state */ if (!t->parsing_active) { - while (grpc_chttp2_list_pop_cancelled_waiting_for_parsing(transport_global, &stream_global)) { + while (grpc_chttp2_list_pop_cancelled_waiting_for_parsing(transport_global, + &stream_global)) { GPR_ASSERT(stream_global->in_stream_map); grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id); stream_global->in_stream_map = 0; - grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); } } @@ -674,17 +692,18 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) { #endif } -static void cancel_from_api( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_status_code status) { +static void cancel_from_api(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_status_code status) { stream_global->cancelled = 1; if (stream_global->in_stream_map) { gpr_slice_buffer_add(&transport_global->qbuf, - grpc_chttp2_rst_stream_create(stream_global->id, - grpc_chttp2_grpc_status_to_http2_status(status))); + grpc_chttp2_rst_stream_create( + stream_global->id, + grpc_chttp2_grpc_status_to_http2_status(status))); } else { - grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); } } @@ -773,7 +792,9 @@ static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s, } #endif -static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { +static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, + void *user_data, + grpc_chttp2_stream_global *stream_global) { cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE); } @@ -848,8 +869,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, t->parsing_active = 1; grpc_chttp2_prepare_to_read(&t->global, &t->parsing); gpr_mu_unlock(&t->mu); - for (; - i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); + for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++) { gpr_slice_unref(slices[i]); } @@ -954,7 +974,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { return; } if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) { - if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN && + if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN && t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { notify_goaways_args *a = gpr_malloc(sizeof(*a)); a->error = t->global.goaway_error; @@ -973,12 +993,14 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; t->channel_callback.executing = 1; ref_transport(t); - grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed, 1); + grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed, + 1); } } -void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, - int success) { +void grpc_chttp2_schedule_closure( + grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, + int success) { closure->success = success; closure->next = transport_global->pending_closures; transport_global->pending_closures = closure; |