diff options
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.c | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2/incoming_metadata.c | 5 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 57 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 3 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 31 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 13 |
6 files changed, 70 insertions, 46 deletions
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index b8ab664db5..c729e0f22f 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -1329,12 +1329,9 @@ static int parse_value_string_with_literal_key(grpc_chttp2_hpack_parser *p, /* PUBLIC INTERFACE */ static void on_header_not_set(void *user_data, grpc_mdelem *md) { - char *keyhex = - gpr_hexdump(grpc_mdstr_as_c_string(md->key), - GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT); + char *keyhex = gpr_dump_slice(md->key->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); char *valuehex = - gpr_hexdump(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT); + gpr_dump_slice(md->value->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_ERROR, "on_header callback not set; key=%s value=%s", keyhex, valuehex); gpr_free(keyhex); diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index e81927ab20..68e0912b9c 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -47,6 +47,10 @@ void grpc_chttp2_incoming_metadata_buffer_init( void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_chttp2_incoming_metadata_buffer *buffer) { + size_t i; + for (i = 0; i < buffer->count; i++) { + grpc_mdelem_unref(buffer->elems[i].md); + } gpr_free(buffer->elems); } @@ -120,6 +124,7 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb( sopb->ops[i].data.metadata.list.tail = (void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail); } + src->count = 0; } void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 02c94744ee..bdd4b432eb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -34,7 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H #define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H -#include "src/core/transport/transport_impl.h" #include "src/core/iomgr/endpoint.h" #include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" @@ -47,6 +46,8 @@ #include "src/core/transport/chttp2/incoming_metadata.h" #include "src/core/transport/chttp2/stream_encoder.h" #include "src/core/transport/chttp2/stream_map.h" +#include "src/core/transport/connectivity_state.h" +#include "src/core/transport/transport_impl.h" typedef struct grpc_chttp2_transport grpc_chttp2_transport; typedef struct grpc_chttp2_stream grpc_chttp2_stream; @@ -62,6 +63,7 @@ typedef enum { GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE, GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED, /** streams that are waiting to start because there are too many concurrent streams on the connection */ @@ -134,12 +136,6 @@ typedef struct { grpc_chttp2_stream *prev; } grpc_chttp2_stream_link; -typedef enum { - GRPC_CHTTP2_ERROR_STATE_NONE, - GRPC_CHTTP2_ERROR_STATE_SEEN, - GRPC_CHTTP2_ERROR_STATE_NOTIFIED -} grpc_chttp2_error_state; - /* We keep several sets of connection wide parameters */ typedef enum { /* The settings our peer has asked for (and we have acked) */ @@ -165,7 +161,8 @@ typedef struct { /** data to write next write */ gpr_slice_buffer qbuf; /** queued callbacks */ - grpc_iomgr_closure *pending_closures; + grpc_iomgr_closure *pending_closures_head; + grpc_iomgr_closure *pending_closures_tail; /** window available for us to send to peer */ gpr_uint32 outgoing_window; @@ -174,6 +171,11 @@ typedef struct { /** how much window would we like to have for incoming_window */ gpr_uint32 connection_window_target; + /** have we seen a goaway */ + gpr_uint8 seen_goaway; + /** have we sent a goaway */ + gpr_uint8 sent_goaway; + /** is this transport a client? */ gpr_uint8 is_client; /** are the local settings dirty and need to be sent? */ @@ -185,10 +187,6 @@ typedef struct { /** settings values */ gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; - /** has there been a connection level error, and have we notified - anyone about it? */ - grpc_chttp2_error_state error_state; - /** what is the next stream id to be allocated by this peer? copied to next_stream_id in parsing when parsing commences */ gpr_uint32 next_stream_id; @@ -204,13 +202,6 @@ typedef struct { /** concurrent stream count: updated when not parsing, so this is a strict over-estimation on the client */ gpr_uint32 concurrent_stream_count; - - /** is there a goaway available? (boolean) */ - grpc_chttp2_error_state goaway_state; - /** what is the debug text of the goaway? */ - gpr_slice goaway_text; - /** what is the status code of the goaway? */ - grpc_status_code goaway_error; } grpc_chttp2_transport_global; typedef struct { @@ -343,14 +334,13 @@ struct grpc_chttp2_transport { grpc_chttp2_stream **accepting_stream; struct { - /** is a thread currently performing channel callbacks */ - gpr_uint8 executing; - /** transport channel-level callback */ - const grpc_transport_callbacks *cb; - /** user data for cb calls */ - void *cb_user_data; - /** closure for notifying transport closure */ - grpc_iomgr_closure notify_closed; + /* accept stream callback */ + void (*accept_stream)(void *user_data, grpc_transport *transport, + const void *server_data); + void *accept_stream_user_data; + + /** connectivity tracking */ + grpc_connectivity_state_tracker state_tracker; } channel_callback; }; @@ -539,6 +529,13 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); +void grpc_chttp2_list_add_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global); + void grpc_chttp2_list_add_read_write_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); @@ -562,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway( void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +/* returns 1 if this is the last stream, 0 otherwise */ +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT; +int grpc_chttp2_has_streams(grpc_chttp2_transport *t); void grpc_chttp2_for_all_streams( grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8f682e9017..4664a0895c 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads( transport_parsing->incoming_stream_id; } - /* TODO(ctiller): re-implement */ - GPR_ASSERT(transport_parsing->initial_window_update == 0); - /* copy parsing qbuf to global qbuf */ gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index c6ba12fca8..4fea058c19 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -222,7 +222,9 @@ int grpc_chttp2_list_pop_writable_window_update_stream( void grpc_chttp2_list_remove_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); + stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); } void grpc_chttp2_list_add_parsing_seen_stream( @@ -282,6 +284,24 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing( return r; } +void grpc_chttp2_list_add_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING); +} + +int grpc_chttp2_list_pop_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_stream *stream; + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING); + *stream_global = &stream->global; + return r; +} + void grpc_chttp2_list_add_incoming_window_updated( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { @@ -334,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t, stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); +} + +int grpc_chttp2_has_streams(grpc_chttp2_transport *t) { + return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); } void grpc_chttp2_for_all_streams( diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index fdcc300099..a78654334e 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -97,12 +97,8 @@ int grpc_chttp2_unlocking_check_writes( grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } - /* we should either exhaust window or have no ops left, but not both */ - if (stream_global->outgoing_sopb->nops == 0) { - stream_global->outgoing_sopb = NULL; - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 1); - } else if (stream_global->outgoing_window > 0) { + if (stream_global->outgoing_window > 0 && + stream_global->outgoing_sopb->nops != 0) { grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } @@ -201,6 +197,11 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { + if (stream_global->outgoing_sopb->nops == 0) { + stream_global->outgoing_sopb = NULL; + grpc_chttp2_schedule_closure(transport_global, + stream_global->send_done_closure, 1); + } if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; if (!transport_global->is_client) { |