diff options
author | 2015-06-12 16:17:09 -0700 | |
---|---|---|
committer | 2015-06-12 16:17:09 -0700 | |
commit | d20efd26e3a8448531d56050942ef66935311ef5 (patch) | |
tree | 26e52d719468e75dfc4ceeb441543ca241253204 /src/core/transport/chttp2/internal.h | |
parent | 3208e3922a246375f68969630941267e1a8930a3 (diff) |
Progress on splitting things up
Diffstat (limited to 'src/core/transport/chttp2/internal.h')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 293 |
1 files changed, 198 insertions, 95 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index a21a7a4d75..5eba01a3e1 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -36,6 +36,7 @@ #include "src/core/transport/transport_impl.h" #include "src/core/iomgr/endpoint.h" +#include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_goaway.h" #include "src/core/transport/chttp2/frame_ping.h" @@ -172,16 +173,110 @@ typedef struct { gpr_slice debug; } grpc_chttp2_pending_goaway; +typedef struct { + /** data to write next write */ + gpr_slice_buffer qbuf; + /** queued callbacks */ + grpc_iomgr_closure *pending_closures; + + /** window available for us to send to peer */ + gpr_uint32 outgoing_window; + /** how much window would we like to have for incoming_window */ + gpr_uint32 connection_window_target; + + + /** are the local settings dirty and need to be sent? */ + gpr_uint8 dirtied_local_settings; + /** have local settings been sent? */ + gpr_uint8 sent_local_settings; + /** bitmask of setting indexes to send out */ + gpr_uint32 force_send_settings; + /** settings values */ + gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + + /** last received stream id */ + gpr_uint32 last_incoming_stream_id; +} grpc_chttp2_transport_global; + +typedef struct { + /** data to write now */ + gpr_slice_buffer outbuf; + /** hpack encoding */ + grpc_chttp2_hpack_compressor hpack_compressor; +} grpc_chttp2_transport_writing; + +struct grpc_chttp2_transport_parsing { + /** is this transport a client? (boolean) */ + gpr_uint8 is_client; + + /** were settings updated? */ + gpr_uint8 settings_updated; + /** was a settings ack received? */ + gpr_uint8 settings_ack_received; + /** was a goaway frame received? */ + gpr_uint8 goaway_received; + + /** data to write later - after parsing */ + gpr_slice_buffer qbuf; + /* metadata object cache */ + grpc_mdstr *str_grpc_timeout; + /** parser for headers */ + grpc_chttp2_hpack_parser hpack_parser; + /** simple one shot parsers */ + union { + grpc_chttp2_window_update_parser window_update; + grpc_chttp2_settings_parser settings; + grpc_chttp2_ping_parser ping; + grpc_chttp2_rst_stream_parser rst_stream; + } simple; + /** parser for goaway frames */ + grpc_chttp2_goaway_parser goaway_parser; + + /** window available for peer to send to us */ + gpr_uint32 incoming_window; + + /** next stream id available at the time of beginning parsing */ + gpr_uint32 next_stream_id; + gpr_uint32 last_incoming_stream_id; + + /* deframing */ + grpc_chttp2_deframe_transport_state deframe_state; + gpr_uint8 incoming_frame_type; + gpr_uint8 incoming_frame_flags; + gpr_uint8 header_eof; + gpr_uint32 expect_continuation_stream_id; + gpr_uint32 incoming_frame_size; + gpr_uint32 incoming_stream_id; + + /* active parser */ + void *parser_data; + grpc_chttp2_stream_parsing *incoming_stream; + grpc_chttp2_parse_error (*parser)(void *parser_user_data, + grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, + gpr_slice slice, int is_last); + + /* received settings */ + gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS]; + + /* goaway data */ + grpc_status_code goaway_error; + gpr_uint32 goaway_last_stream_index; + gpr_slice goaway_text; +}; + + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; - gpr_uint8 is_client; gpr_mu mu; gpr_cv cv; + /** is a thread currently writing */ + gpr_uint8 writing_active; + /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; /** are we calling back any grpc_transport_op completion events */ @@ -192,28 +287,9 @@ struct grpc_chttp2_transport { /* stream indexing */ gpr_uint32 next_stream_id; - gpr_uint32 last_incoming_stream_id; - - /* settings */ - gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; - gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */ - gpr_uint8 sent_local_settings; /* have local settings been sent? */ - gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */ /* window management */ - gpr_uint32 outgoing_window; gpr_uint32 outgoing_window_update; - gpr_uint32 incoming_window; - gpr_uint32 connection_window_target; - - /* deframing */ - grpc_chttp2_deframe_transport_state deframe_state; - gpr_uint8 incoming_frame_type; - gpr_uint8 incoming_frame_flags; - gpr_uint8 header_eof; - gpr_uint32 expect_continuation_stream_id; - gpr_uint32 incoming_frame_size; - gpr_uint32 incoming_stream_id; /* goaway */ grpc_chttp2_pending_goaway *pending_goaways; @@ -226,13 +302,6 @@ struct grpc_chttp2_transport { /* stream ops that need to be destroyed, but outside of the lock */ grpc_stream_op_buffer nuke_later_sopb; - /* active parser */ - void *parser_data; - grpc_chttp2_stream *incoming_stream; - grpc_chttp2_parse_error (*parser)(void *parser_user_data, - grpc_chttp2_parse_state *state, - gpr_slice slice, int is_last); - grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; grpc_chttp2_stream_map stream_map; @@ -242,46 +311,12 @@ struct grpc_chttp2_transport { size_t ping_capacity; gpr_int64 ping_counter; - struct { - /* metadata object cache */ - grpc_mdstr *str_grpc_timeout; - } constants; - - struct { - /** data to write next write */ - gpr_slice_buffer qbuf; - /* queued callbacks */ - grpc_iomgr_closure *pending_closures; - } global; - - struct { - /** is a thread currently writing */ - gpr_uint8 executing; - /** closure to execute this action */ - grpc_iomgr_closure action; - /** data to write now */ - gpr_slice_buffer outbuf; - /* hpack encoding */ - grpc_chttp2_hpack_compressor hpack_compressor; - } writing; + grpc_chttp2_transport_global global; + grpc_chttp2_transport_writing writing; + grpc_chttp2_transport_parsing parsing; - struct { - /** is a thread currently parsing */ - gpr_uint8 executing; - /** data to write later - after parsing */ - gpr_slice_buffer qbuf; - /** parser for headers */ - grpc_chttp2_hpack_parser hpack_parser; - /** simple one shot parsers */ - union { - grpc_chttp2_window_update_parser window_update; - grpc_chttp2_settings_parser settings; - grpc_chttp2_ping_parser ping; - grpc_chttp2_rst_stream_parser rst_stream; - } simple; - /** parser for goaway frames */ - grpc_chttp2_goaway_parser goaway_parser; - } parsing; + /** closure to execute writing */ + grpc_iomgr_closure writing_action; struct { /** is a thread currently performing channel callbacks */ @@ -295,37 +330,47 @@ struct grpc_chttp2_transport { } channel_callback; }; -struct grpc_chttp2_stream { - struct { - grpc_iomgr_closure *send_done_closure; - grpc_iomgr_closure *recv_done_closure; - } global; - - struct { - /* sops that have passed flow control to be written */ - grpc_stream_op_buffer sopb; - /* how strongly should we indicate closure with the next write */ - grpc_chttp2_send_closed send_closed; - } writing; - - struct { - int unused; - } parsing; - +typedef struct { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - gpr_uint32 incoming_window; + grpc_iomgr_closure *send_done_closure; + grpc_iomgr_closure *recv_done_closure; + + /** window available for us to send to peer */ gpr_int64 outgoing_window; - gpr_uint32 outgoing_window_update; - /* when the application requests writes be closed, the write_closed is - 'queued'; when the close is flow controlled into the send path, we are - 'sending' it; when the write has been performed it is 'sent' */ + /** stream ops the transport user would like to send */ + grpc_stream_op_buffer *outgoing_sopb; + /** when the application requests writes be closed, the write_closed is + 'queued'; when the close is flow controlled into the send path, we are + 'sending' it; when the write has been performed it is 'sent' */ grpc_chttp2_write_state write_state; + /** is this stream closed (boolean) */ gpr_uint8 read_closed; - gpr_uint8 cancelled; +} grpc_chttp2_stream_global; - grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; - gpr_uint8 included[STREAM_LIST_COUNT]; +typedef struct { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ + gpr_uint32 id; + /** sops that have passed flow control to be written */ + grpc_stream_op_buffer sopb; + /** how strongly should we indicate closure with the next write */ + grpc_chttp2_send_closed send_closed; +} grpc_chttp2_stream_writing; + +struct grpc_chttp2_stream_parsing { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ + gpr_uint32 id; + /** has this stream received a close */ + gpr_uint8 received_close; + /** incoming_window has been reduced during parsing */ + gpr_uint8 incoming_window_changed; + /** saw an error on this stream during parsing (it should be cancelled) */ + gpr_uint8 saw_error; + /** window available for peer to send to us */ + gpr_uint32 incoming_window; + /** parsing state for data frames */ + grpc_chttp2_data_parser data_parser; /* incoming metadata */ grpc_linked_mdelem *incoming_metadata; @@ -333,21 +378,79 @@ struct grpc_chttp2_stream { size_t incoming_metadata_capacity; grpc_linked_mdelem *old_incoming_metadata; gpr_timespec incoming_deadline; +}; + +struct grpc_chttp2_stream { + grpc_chttp2_stream_global global; + grpc_chttp2_stream_writing writing; + + gpr_uint32 outgoing_window_update; + gpr_uint8 cancelled; + + grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; + gpr_uint8 included[STREAM_LIST_COUNT]; /* sops from application */ - grpc_stream_op_buffer *outgoing_sopb; grpc_stream_op_buffer *incoming_sopb; grpc_stream_state *publish_state; grpc_stream_state published_state; - grpc_chttp2_data_parser parser; - grpc_stream_state callback_state; grpc_stream_op_buffer callback_sopb; }; +/** Transport writing call flow: + chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes are required; + if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the writes. + Once writes have been completed (meaning another write could potentially be started), + grpc_chttp2_terminate_writing is called. This will call grpc_chttp2_cleanup_writing, at which + point the write phase is complete. */ + /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ -void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); +int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); +void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); +void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success); +void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); + +/** Process one slice of incoming data */ +int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice); +void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); + +/** Get a writable stream + \return non-zero if there was a stream available */ +void grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); + +void grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport_writing *transport_writing); +int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing **stream_writing); + +void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); + +int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); + +void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); + +void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success); +void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); + +grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); +grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); + +#define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0) + +#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" +#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING)-1) + +extern int grpc_http_trace; + +#define IF_TRACING(stmt) \ + if (!(grpc_http_trace)) \ + ; \ + else \ + stmt #endif |