diff options
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r-- | src/core/transport/chttp2/frame.h | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_goaway.c | 12 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_goaway.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_ping.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_rst_stream.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_settings.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_window_update.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 293 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 607 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 174 |
13 files changed, 941 insertions, 175 deletions
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h index c9e3e13042..9012bfa1e1 100644 --- a/src/core/transport/chttp2/frame.h +++ b/src/core/transport/chttp2/frame.h @@ -45,6 +45,7 @@ typedef enum { GRPC_CHTTP2_CONNECTION_ERROR } grpc_chttp2_parse_error; +#if 0 typedef struct { gpr_uint8 end_of_stream; gpr_uint8 need_flush_reads; @@ -62,6 +63,11 @@ typedef struct { gpr_slice goaway_text; gpr_uint32 rst_stream_reason; } grpc_chttp2_parse_state; +#endif + +/* defined in internal.h */ +typedef struct grpc_chttp2_stream_parsing grpc_chttp2_stream_parsing; +typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing; #define GRPC_CHTTP2_FRAME_DATA 0 #define GRPC_CHTTP2_FRAME_HEADER 1 diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index a1ae9ed2e6..129d211043 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -35,6 +35,7 @@ #include <string.h> +#include "src/core/transport/chttp2/internal.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -69,7 +70,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); @@ -77,8 +78,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_data_parser *p = parser; if (is_last && p->is_last_frame) { - state->end_of_stream = 1; - state->need_flush_reads = 1; + stream_parsing->received_close = 1; } if (cur == end) { @@ -129,27 +129,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->frame_size |= ((gpr_uint32) * cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; - state->need_flush_reads = 1; grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, 0); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) == p->frame_size) { - state->need_flush_reads = 1; grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { - state->need_flush_reads = 1; grpc_sopb_add_slice( &p->incoming_sopb, gpr_slice_sub(slice, cur - beg, cur + p->frame_size - beg)); cur += p->frame_size; goto fh_0; /* loop */ } else { - state->need_flush_reads = 1; grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->frame_size -= (end - cur); diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index 24e557accd..dbbb87fc01 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -72,7 +72,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( /* handle a slice of a data frame - is_last indicates the last slice of a frame */ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); /* create a slice with an empty data frame and is_last set */ gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id); diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c index 95b75d4fde..d7d6c587e6 100644 --- a/src/core/transport/chttp2/frame_goaway.c +++ b/src/core/transport/chttp2/frame_goaway.c @@ -32,6 +32,7 @@ */ #include "src/core/transport/chttp2/frame_goaway.h" +#include "src/core/transport/chttp2/internal.h" #include <string.h> @@ -62,7 +63,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); @@ -139,10 +140,11 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( p->debug_pos += end - cur; p->state = GRPC_CHTTP2_GOAWAY_DEBUG; if (is_last) { - state->goaway = 1; - state->goaway_last_stream_index = p->last_stream_id; - state->goaway_error = p->error_code; - state->goaway_text = + transport_parsing->goaway_received = 1; + transport_parsing->goaway_last_stream_index = p->last_stream_id; + gpr_slice_unref(transport_parsing->goaway_text); + transport_parsing->goaway_error = p->error_code; + transport_parsing->goaway_text = gpr_slice_new(p->debug_data, p->debug_length, gpr_free); p->debug_data = NULL; } diff --git a/src/core/transport/chttp2/frame_goaway.h b/src/core/transport/chttp2/frame_goaway.h index 7638891514..8148fa90f2 100644 --- a/src/core/transport/chttp2/frame_goaway.h +++ b/src/core/transport/chttp2/frame_goaway.h @@ -65,7 +65,7 @@ void grpc_chttp2_goaway_parser_destroy(grpc_chttp2_goaway_parser *p); grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( grpc_chttp2_goaway_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code, gpr_slice debug_data, diff --git a/src/core/transport/chttp2/frame_ping.h b/src/core/transport/chttp2/frame_ping.h index 11d38b80ea..71f8351223 100644 --- a/src/core/transport/chttp2/frame_ping.h +++ b/src/core/transport/chttp2/frame_ping.h @@ -48,6 +48,6 @@ gpr_slice grpc_chttp2_ping_create(gpr_uint8 ack, gpr_uint8 *opaque_8bytes); grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame( grpc_chttp2_ping_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */ diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h index 07a3c98d03..b83d1261d0 100644 --- a/src/core/transport/chttp2/frame_rst_stream.h +++ b/src/core/transport/chttp2/frame_rst_stream.h @@ -47,6 +47,6 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code); grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */ diff --git a/src/core/transport/chttp2/frame_settings.h b/src/core/transport/chttp2/frame_settings.h index 18765631a6..701f2b94d2 100644 --- a/src/core/transport/chttp2/frame_settings.h +++ b/src/core/transport/chttp2/frame_settings.h @@ -94,6 +94,6 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame( grpc_chttp2_settings_parser *parser, gpr_uint32 length, gpr_uint8 flags, gpr_uint32 *settings); grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */ diff --git a/src/core/transport/chttp2/frame_window_update.h b/src/core/transport/chttp2/frame_window_update.h index 85475a8f9e..7217325beb 100644 --- a/src/core/transport/chttp2/frame_window_update.h +++ b/src/core/transport/chttp2/frame_window_update.h @@ -50,6 +50,6 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_window_update_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */ diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index bfc06b3980..507d7cfea0 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -107,7 +107,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, /* wraps grpc_chttp2_hpack_parser_parse to provide a frame level parser for the transport */ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( - void *hpack_parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index a21a7a4d75..5eba01a3e1 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -36,6 +36,7 @@ #include "src/core/transport/transport_impl.h" #include "src/core/iomgr/endpoint.h" +#include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_goaway.h" #include "src/core/transport/chttp2/frame_ping.h" @@ -172,16 +173,110 @@ typedef struct { gpr_slice debug; } grpc_chttp2_pending_goaway; +typedef struct { + /** data to write next write */ + gpr_slice_buffer qbuf; + /** queued callbacks */ + grpc_iomgr_closure *pending_closures; + + /** window available for us to send to peer */ + gpr_uint32 outgoing_window; + /** how much window would we like to have for incoming_window */ + gpr_uint32 connection_window_target; + + + /** are the local settings dirty and need to be sent? */ + gpr_uint8 dirtied_local_settings; + /** have local settings been sent? */ + gpr_uint8 sent_local_settings; + /** bitmask of setting indexes to send out */ + gpr_uint32 force_send_settings; + /** settings values */ + gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + + /** last received stream id */ + gpr_uint32 last_incoming_stream_id; +} grpc_chttp2_transport_global; + +typedef struct { + /** data to write now */ + gpr_slice_buffer outbuf; + /** hpack encoding */ + grpc_chttp2_hpack_compressor hpack_compressor; +} grpc_chttp2_transport_writing; + +struct grpc_chttp2_transport_parsing { + /** is this transport a client? (boolean) */ + gpr_uint8 is_client; + + /** were settings updated? */ + gpr_uint8 settings_updated; + /** was a settings ack received? */ + gpr_uint8 settings_ack_received; + /** was a goaway frame received? */ + gpr_uint8 goaway_received; + + /** data to write later - after parsing */ + gpr_slice_buffer qbuf; + /* metadata object cache */ + grpc_mdstr *str_grpc_timeout; + /** parser for headers */ + grpc_chttp2_hpack_parser hpack_parser; + /** simple one shot parsers */ + union { + grpc_chttp2_window_update_parser window_update; + grpc_chttp2_settings_parser settings; + grpc_chttp2_ping_parser ping; + grpc_chttp2_rst_stream_parser rst_stream; + } simple; + /** parser for goaway frames */ + grpc_chttp2_goaway_parser goaway_parser; + + /** window available for peer to send to us */ + gpr_uint32 incoming_window; + + /** next stream id available at the time of beginning parsing */ + gpr_uint32 next_stream_id; + gpr_uint32 last_incoming_stream_id; + + /* deframing */ + grpc_chttp2_deframe_transport_state deframe_state; + gpr_uint8 incoming_frame_type; + gpr_uint8 incoming_frame_flags; + gpr_uint8 header_eof; + gpr_uint32 expect_continuation_stream_id; + gpr_uint32 incoming_frame_size; + gpr_uint32 incoming_stream_id; + + /* active parser */ + void *parser_data; + grpc_chttp2_stream_parsing *incoming_stream; + grpc_chttp2_parse_error (*parser)(void *parser_user_data, + grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, + gpr_slice slice, int is_last); + + /* received settings */ + gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS]; + + /* goaway data */ + grpc_status_code goaway_error; + gpr_uint32 goaway_last_stream_index; + gpr_slice goaway_text; +}; + + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; - gpr_uint8 is_client; gpr_mu mu; gpr_cv cv; + /** is a thread currently writing */ + gpr_uint8 writing_active; + /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; /** are we calling back any grpc_transport_op completion events */ @@ -192,28 +287,9 @@ struct grpc_chttp2_transport { /* stream indexing */ gpr_uint32 next_stream_id; - gpr_uint32 last_incoming_stream_id; - - /* settings */ - gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; - gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */ - gpr_uint8 sent_local_settings; /* have local settings been sent? */ - gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */ /* window management */ - gpr_uint32 outgoing_window; gpr_uint32 outgoing_window_update; - gpr_uint32 incoming_window; - gpr_uint32 connection_window_target; - - /* deframing */ - grpc_chttp2_deframe_transport_state deframe_state; - gpr_uint8 incoming_frame_type; - gpr_uint8 incoming_frame_flags; - gpr_uint8 header_eof; - gpr_uint32 expect_continuation_stream_id; - gpr_uint32 incoming_frame_size; - gpr_uint32 incoming_stream_id; /* goaway */ grpc_chttp2_pending_goaway *pending_goaways; @@ -226,13 +302,6 @@ struct grpc_chttp2_transport { /* stream ops that need to be destroyed, but outside of the lock */ grpc_stream_op_buffer nuke_later_sopb; - /* active parser */ - void *parser_data; - grpc_chttp2_stream *incoming_stream; - grpc_chttp2_parse_error (*parser)(void *parser_user_data, - grpc_chttp2_parse_state *state, - gpr_slice slice, int is_last); - grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; grpc_chttp2_stream_map stream_map; @@ -242,46 +311,12 @@ struct grpc_chttp2_transport { size_t ping_capacity; gpr_int64 ping_counter; - struct { - /* metadata object cache */ - grpc_mdstr *str_grpc_timeout; - } constants; - - struct { - /** data to write next write */ - gpr_slice_buffer qbuf; - /* queued callbacks */ - grpc_iomgr_closure *pending_closures; - } global; - - struct { - /** is a thread currently writing */ - gpr_uint8 executing; - /** closure to execute this action */ - grpc_iomgr_closure action; - /** data to write now */ - gpr_slice_buffer outbuf; - /* hpack encoding */ - grpc_chttp2_hpack_compressor hpack_compressor; - } writing; + grpc_chttp2_transport_global global; + grpc_chttp2_transport_writing writing; + grpc_chttp2_transport_parsing parsing; - struct { - /** is a thread currently parsing */ - gpr_uint8 executing; - /** data to write later - after parsing */ - gpr_slice_buffer qbuf; - /** parser for headers */ - grpc_chttp2_hpack_parser hpack_parser; - /** simple one shot parsers */ - union { - grpc_chttp2_window_update_parser window_update; - grpc_chttp2_settings_parser settings; - grpc_chttp2_ping_parser ping; - grpc_chttp2_rst_stream_parser rst_stream; - } simple; - /** parser for goaway frames */ - grpc_chttp2_goaway_parser goaway_parser; - } parsing; + /** closure to execute writing */ + grpc_iomgr_closure writing_action; struct { /** is a thread currently performing channel callbacks */ @@ -295,37 +330,47 @@ struct grpc_chttp2_transport { } channel_callback; }; -struct grpc_chttp2_stream { - struct { - grpc_iomgr_closure *send_done_closure; - grpc_iomgr_closure *recv_done_closure; - } global; - - struct { - /* sops that have passed flow control to be written */ - grpc_stream_op_buffer sopb; - /* how strongly should we indicate closure with the next write */ - grpc_chttp2_send_closed send_closed; - } writing; - - struct { - int unused; - } parsing; - +typedef struct { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - gpr_uint32 incoming_window; + grpc_iomgr_closure *send_done_closure; + grpc_iomgr_closure *recv_done_closure; + + /** window available for us to send to peer */ gpr_int64 outgoing_window; - gpr_uint32 outgoing_window_update; - /* when the application requests writes be closed, the write_closed is - 'queued'; when the close is flow controlled into the send path, we are - 'sending' it; when the write has been performed it is 'sent' */ + /** stream ops the transport user would like to send */ + grpc_stream_op_buffer *outgoing_sopb; + /** when the application requests writes be closed, the write_closed is + 'queued'; when the close is flow controlled into the send path, we are + 'sending' it; when the write has been performed it is 'sent' */ grpc_chttp2_write_state write_state; + /** is this stream closed (boolean) */ gpr_uint8 read_closed; - gpr_uint8 cancelled; +} grpc_chttp2_stream_global; - grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; - gpr_uint8 included[STREAM_LIST_COUNT]; +typedef struct { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ + gpr_uint32 id; + /** sops that have passed flow control to be written */ + grpc_stream_op_buffer sopb; + /** how strongly should we indicate closure with the next write */ + grpc_chttp2_send_closed send_closed; +} grpc_chttp2_stream_writing; + +struct grpc_chttp2_stream_parsing { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ + gpr_uint32 id; + /** has this stream received a close */ + gpr_uint8 received_close; + /** incoming_window has been reduced during parsing */ + gpr_uint8 incoming_window_changed; + /** saw an error on this stream during parsing (it should be cancelled) */ + gpr_uint8 saw_error; + /** window available for peer to send to us */ + gpr_uint32 incoming_window; + /** parsing state for data frames */ + grpc_chttp2_data_parser data_parser; /* incoming metadata */ grpc_linked_mdelem *incoming_metadata; @@ -333,21 +378,79 @@ struct grpc_chttp2_stream { size_t incoming_metadata_capacity; grpc_linked_mdelem *old_incoming_metadata; gpr_timespec incoming_deadline; +}; + +struct grpc_chttp2_stream { + grpc_chttp2_stream_global global; + grpc_chttp2_stream_writing writing; + + gpr_uint32 outgoing_window_update; + gpr_uint8 cancelled; + + grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; + gpr_uint8 included[STREAM_LIST_COUNT]; /* sops from application */ - grpc_stream_op_buffer *outgoing_sopb; grpc_stream_op_buffer *incoming_sopb; grpc_stream_state *publish_state; grpc_stream_state published_state; - grpc_chttp2_data_parser parser; - grpc_stream_state callback_state; grpc_stream_op_buffer callback_sopb; }; +/** Transport writing call flow: + chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes are required; + if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the writes. + Once writes have been completed (meaning another write could potentially be started), + grpc_chttp2_terminate_writing is called. This will call grpc_chttp2_cleanup_writing, at which + point the write phase is complete. */ + /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ -void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); +int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); +void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); +void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success); +void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); + +/** Process one slice of incoming data */ +int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice); +void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); + +/** Get a writable stream + \return non-zero if there was a stream available */ +void grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); + +void grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport_writing *transport_writing); +int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing **stream_writing); + +void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); + +int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); + +void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); + +void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success); +void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); + +grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); +grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); + +#define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0) + +#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" +#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING)-1) + +extern int grpc_http_trace; + +#define IF_TRACING(stmt) \ + if (!(grpc_http_trace)) \ + ; \ + else \ + stmt #endif diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 9a547ad319..2dd46b3116 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -32,4 +32,611 @@ */ #include "src/core/transport/chttp2/internal.h" +#include "src/core/transport/chttp2/timeout_encoding.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation); +static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header); + +static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); + +void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) { + /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when + sending GOAWAY frame. + https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8 + says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So, + since we don't have server pushed streams, client should send + GOAWAY last-grpc_chttp2_stream-id=0 in this case. */ + if (!transport_parsing->is_client) { + transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id; + } +} + +int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) { + gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); + gpr_uint8 *end = GPR_SLICE_END_PTR(slice); + gpr_uint8 *cur = beg; + + if (cur == end) return 1; + + switch (transport_parsing->deframe_state) { + case DTS_CLIENT_PREFIX_0: + case DTS_CLIENT_PREFIX_1: + case DTS_CLIENT_PREFIX_2: + case DTS_CLIENT_PREFIX_3: + case DTS_CLIENT_PREFIX_4: + case DTS_CLIENT_PREFIX_5: + case DTS_CLIENT_PREFIX_6: + case DTS_CLIENT_PREFIX_7: + case DTS_CLIENT_PREFIX_8: + case DTS_CLIENT_PREFIX_9: + case DTS_CLIENT_PREFIX_10: + case DTS_CLIENT_PREFIX_11: + case DTS_CLIENT_PREFIX_12: + case DTS_CLIENT_PREFIX_13: + case DTS_CLIENT_PREFIX_14: + case DTS_CLIENT_PREFIX_15: + case DTS_CLIENT_PREFIX_16: + case DTS_CLIENT_PREFIX_17: + case DTS_CLIENT_PREFIX_18: + case DTS_CLIENT_PREFIX_19: + case DTS_CLIENT_PREFIX_20: + case DTS_CLIENT_PREFIX_21: + case DTS_CLIENT_PREFIX_22: + case DTS_CLIENT_PREFIX_23: + while (cur != end && transport_parsing->deframe_state != DTS_FH_0) { + if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state]) { + gpr_log(GPR_ERROR, + "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " + "at byte %d", + GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state], + (int)(gpr_uint8)GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state], *cur, + (int)*cur, transport_parsing->deframe_state); + return 0; + } + ++cur; + ++transport_parsing->deframe_state; + } + if (cur == end) { + return 1; + } + /* fallthrough */ + dts_fh_0: + case DTS_FH_0: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_1; + return 1; + } + /* fallthrough */ + case DTS_FH_1: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_2; + return 1; + } + /* fallthrough */ + case DTS_FH_2: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_size |= *cur; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_3; + return 1; + } + /* fallthrough */ + case DTS_FH_3: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_type = *cur; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_4; + return 1; + } + /* fallthrough */ + case DTS_FH_4: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_flags = *cur; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_5; + return 1; + } + /* fallthrough */ + case DTS_FH_5: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_6; + return 1; + } + /* fallthrough */ + case DTS_FH_6: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_7; + return 1; + } + /* fallthrough */ + case DTS_FH_7: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_8; + return 1; + } + /* fallthrough */ + case DTS_FH_8: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur); + transport_parsing->deframe_state = DTS_FRAME; + if (!init_frame_parser(transport_parsing)) { + return 0; + } + if (transport_parsing->incoming_stream_id) { + transport_parsing->last_incoming_stream_id = transport_parsing->incoming_stream_id; + } + if (transport_parsing->incoming_frame_size == 0) { + if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) { + return 0; + } + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_0; + return 1; + } + goto dts_fh_0; /* loop */ + } + if (++cur == end) { + return 1; + } + /* fallthrough */ + case DTS_FRAME: + GPR_ASSERT(cur < end); + if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) { + if (!parse_frame_slice( + transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { + return 0; + } + transport_parsing->deframe_state = DTS_FH_0; + return 1; + } else if ((gpr_uint32)(end - cur) > transport_parsing->incoming_frame_size) { + if (!parse_frame_slice( + transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, + cur + transport_parsing->incoming_frame_size - beg), + 1)) { + return 0; + } + cur += transport_parsing->incoming_frame_size; + goto dts_fh_0; /* loop */ + } else { + if (!parse_frame_slice( + transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) { + return 0; + } + transport_parsing->incoming_frame_size -= (end - cur); + return 1; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + } + + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + + return 0; +} + +static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + if (transport_parsing->expect_continuation_stream_id != 0) { + if (transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) { + gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x", + transport_parsing->incoming_frame_type); + return 0; + } + if (transport_parsing->expect_continuation_stream_id != transport_parsing->incoming_stream_id) { + gpr_log(GPR_ERROR, + "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x", + transport_parsing->expect_continuation_stream_id, transport_parsing->incoming_stream_id); + return 0; + } + return init_header_frame_parser(transport_parsing, 1); + } + switch (transport_parsing->incoming_frame_type) { + case GRPC_CHTTP2_FRAME_DATA: + return init_data_frame_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_HEADER: + return init_header_frame_parser(transport_parsing, 0); + case GRPC_CHTTP2_FRAME_CONTINUATION: + gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); + return 0; + case GRPC_CHTTP2_FRAME_RST_STREAM: + return init_rst_stream_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_SETTINGS: + return init_settings_frame_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: + return init_window_update_frame_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_PING: + return init_ping_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_GOAWAY: + return init_goaway_parser(transport_parsing); + default: + gpr_log(GPR_ERROR, "Unknown frame type %02x", transport_parsing->incoming_frame_type); + return init_skip_frame_parser(transport_parsing, 0); + } +} + +static grpc_chttp2_parse_error skip_parser(void *parser, + grpc_chttp2_parse_state *st, + gpr_slice slice, int is_last) { + return GRPC_CHTTP2_PARSE_OK; +} + +static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } + +static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int is_header) { + if (is_header) { + int is_eoh = transport_parsing->expect_continuation_stream_id != 0; + transport_parsing->parser = grpc_chttp2_header_parser_parse; + transport_parsing->parser_data = &transport_parsing->hpack_parser; + transport_parsing->hpack_parser.on_header = skip_header; + transport_parsing->hpack_parser.on_header_user_data = NULL; + transport_parsing->hpack_parser.is_boundary = is_eoh; + transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0; + } else { + transport_parsing->parser = skip_parser; + } + return 1; +} + +static void become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing) { + init_skip_frame(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse); +} + +static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { + if (transport_parsing->incoming_frame_size > transport_parsing->incoming_window) { + gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", + transport_parsing->incoming_frame_size, transport_parsing->incoming_window); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + + if (transport_parsing->incoming_frame_size > stream_parsing->incoming_window) { + gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", + transport_parsing->incoming_frame_size, stream_parsing->incoming_window); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + + GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, -(gpr_int64)transport_parsing->incoming_frame_size); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->global.id, -(gpr_int64)transport_parsing->incoming_frame_size); + transport_parsing->incoming_window -= transport_parsing->incoming_frame_size; + stream_parsing->incoming_window -= transport_parsing->incoming_frame_size; + + /* if the grpc_chttp2_stream incoming window is getting low, schedule an update */ + stream_parsing->incoming_window_changed = 1; + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); + + return GRPC_CHTTP2_PARSE_OK; +} + +static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); + grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; + if (!stream_parsing || stream_parsing->received_close) return init_skip_frame(transport_parsing, 0); + if (err == GRPC_CHTTP2_PARSE_OK) { + err = update_incoming_window(transport_parsing, stream_parsing); + } + if (err == GRPC_CHTTP2_PARSE_OK) { + err = grpc_chttp2_data_parser_begin_frame(&stream_parsing->data_parser, + transport_parsing->incoming_frame_flags); + } + switch (err) { + case GRPC_CHTTP2_PARSE_OK: + transport_parsing->incoming_stream = stream_parsing; + transport_parsing->parser = grpc_chttp2_data_parser_parse; + transport_parsing->parser_data = &stream_parsing->data_parser; + return 1; + case GRPC_CHTTP2_STREAM_ERROR: + stream_parsing->received_close = 1; + stream_parsing->saw_error = 1; + return init_skip_frame(transport_parsing, 0); + case GRPC_CHTTP2_CONNECTION_ERROR: + return 0; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return 0; +} + +static void free_timeout(void *p) { gpr_free(p); } + +static void add_incoming_metadata(grpc_chttp2_stream_parsing *stream_parsing, grpc_mdelem *elem) { + if (stream_parsing->incoming_metadata_capacity == stream_parsing->incoming_metadata_count) { + stream_parsing->incoming_metadata_capacity = + GPR_MAX(8, 2 * stream_parsing->incoming_metadata_capacity); + stream_parsing->incoming_metadata = + gpr_realloc(stream_parsing->incoming_metadata, sizeof(*stream_parsing->incoming_metadata) * + stream_parsing->incoming_metadata_capacity); + } + stream_parsing->incoming_metadata[stream_parsing->incoming_metadata_count++].md = elem; +} + +static void on_header(void *tp, grpc_mdelem *md) { + grpc_chttp2_transport_parsing *transport_parsing = tp; + grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; + + GPR_ASSERT(stream_parsing); + + IF_TRACING(gpr_log( + GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, transport_parsing->is_client ? "CLI" : "SVR", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); + + if (md->key == transport_parsing->str_grpc_timeout) { + gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); + if (!cached_timeout) { + /* not already parsed: parse it now, and store the result away */ + cached_timeout = gpr_malloc(sizeof(gpr_timespec)); + if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value), + cached_timeout)) { + gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", + grpc_mdstr_as_c_string(md->value)); + *cached_timeout = gpr_inf_future; + } + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + } + stream_parsing->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout); + grpc_mdelem_unref(md); + } else { + add_incoming_metadata(stream_parsing, md); + } + + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); +} + +static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) { + int is_eoh = + (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; + grpc_chttp2_stream_parsing *stream_parsing; + + if (is_eoh) { + transport_parsing->expect_continuation_stream_id = 0; + } else { + transport_parsing->expect_continuation_stream_id = transport_parsing->incoming_stream_id; + } + + if (!is_continuation) { + transport_parsing->header_eof = + (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; + } + + /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ + stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); + if (!stream_parsing) { + if (is_continuation) { + gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received"); + return init_skip_frame(transport_parsing, 1); + } + if (transport_parsing->is_client) { + if ((transport_parsing->incoming_stream_id & 1) && + transport_parsing->incoming_stream_id < transport_parsing->next_stream_id) { + /* this is an old (probably cancelled) grpc_chttp2_stream */ + } else { + gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client"); + } + return init_skip_frame(transport_parsing, 1); + } else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) { + gpr_log(GPR_ERROR, + "ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream " + "id=%d, new grpc_chttp2_stream id=%d", + transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id); + return init_skip_frame(transport_parsing, 1); + } else if ((transport_parsing->incoming_stream_id & 1) == 0) { + gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d", + transport_parsing->incoming_stream_id); + return init_skip_frame(transport_parsing, 1); + } + stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream(transport_parsing, transport_parsing->incoming_stream_id); + if (!stream_parsing) { + gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); + return init_skip_frame(transport_parsing, 1); + } + } else { + transport_parsing->incoming_stream = stream_parsing; + } + if (stream_parsing->received_close) { + gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); + transport_parsing->incoming_stream = NULL; + return init_skip_frame(transport_parsing, 1); + } + transport_parsing->parser = grpc_chttp2_header_parser_parse; + transport_parsing->parser_data = &transport_parsing->hpack_parser; + transport_parsing->hpack_parser.on_header = on_header; + transport_parsing->hpack_parser.on_header_user_data = transport_parsing; + transport_parsing->hpack_parser.is_boundary = is_eoh; + transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0; + if (!is_continuation && + (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { + grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser); + } + return 1; +} + +static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame( + &transport_parsing->simple.window_update, + transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_window_update_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.window_update; + return ok; +} + +static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_ping_parser_begin_frame(&transport_parsing->simple.ping, + transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_ping_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.ping; + return ok; +} + +static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame( + &transport_parsing->simple.rst_stream, + transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.rst_stream; + return ok; +} + +static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = + GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_goaway_parser_begin_frame( + &transport_parsing->goaway_parser, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_goaway_parser_parse; + transport_parsing->parser_data = &transport_parsing->goaway_parser; + return ok; +} + +static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok; + + if (transport_parsing->incoming_stream_id != 0) { + gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", transport_parsing->incoming_stream_id); + return 0; + } + + ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_settings_parser_begin_frame( + &transport_parsing->simple.settings, transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags, transport_parsing->settings); + if (!ok) { + return 0; + } + if (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { + transport_parsing->settings_ack_received = 1; + } else { + transport_parsing->settings_updated = 1; + } + transport_parsing->parser = grpc_chttp2_settings_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.settings; + return ok; +} + +/* +static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { + return window + window_update < MAX_WINDOW; +} +*/ + +static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { + grpc_metadata_batch b; + + b.list.head = NULL; + /* Store away the last element of the list, so that in patch_metadata_ops + we can reconstitute the list. + We can't do list building here as later incoming metadata may reallocate + the underlying array. */ + b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count; + b.garbage.head = b.garbage.tail = NULL; + b.deadline = stream_parsing->incoming_deadline; + stream_parsing->incoming_deadline = gpr_inf_future; + + grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); +} + +static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, int is_last) { + grpc_chttp2_parse_state st; + size_t i; + memset(&st, 0, sizeof(st)); + switch (transport_parsing->parser(transport_parsing->parser_data, &st, slice, is_last)) { + case GRPC_CHTTP2_PARSE_OK: + if (stream_parsing) { + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); + } + if (st.end_of_stream) { + transport_parsing->incoming_stream->read_closed = 1; + maybe_finish_read(t, transport_parsing->incoming_stream, 1); + } + if (st.need_flush_reads) { + maybe_finish_read(t, transport_parsing->incoming_stream, 1); + } + if (st.metadata_boundary) { + add_metadata_batch(t, transport_parsing->incoming_stream); + maybe_finish_read(t, transport_parsing->incoming_stream, 1); + } + if (st.ack_settings) { + gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create()); + } + if (st.send_ping_ack) { + gpr_slice_buffer_add( + &transport_parsing->qbuf, + grpc_chttp2_ping_create(1, transport_parsing->simple.ping.opaque_8bytes)); + } + if (st.goaway) { + add_goaway(t, st.goaway_error, st.goaway_text); + } + if (st.rst_stream) { + cancel_stream_id( + t, transport_parsing->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason), + st.rst_stream_reason, 0); + } + if (st.process_ping_reply) { + for (i = 0; i < transport_parsing->ping_count; i++) { + if (0 == + memcmp(transport_parsing->pings[i].id, transport_parsing->simple.ping.opaque_8bytes, 8)) { + transport_parsing->pings[i].cb(transport_parsing->pings[i].user_data); + memmove(&transport_parsing->pings[i], &transport_parsing->pings[i + 1], + (transport_parsing->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping)); + transport_parsing->ping_count--; + break; + } + } + } + if (st.initial_window_update) { + for (i = 0; i < transport_parsing->stream_map.count; i++) { + grpc_chttp2_stream *s = (grpc_chttp2_stream *)(transport_parsing->stream_map.values[i]); + s->global.outgoing_window_update += st.initial_window_update; + stream_list_join(t, s, NEW_OUTGOING_WINDOW); + } + } + if (st.window_update) { + if (transport_parsing->incoming_stream_id) { + /* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */ + grpc_chttp2_stream *s = lookup_stream(t, transport_parsing->incoming_stream_id); + if (s) { + s->global.outgoing_window_update += st.window_update; + stream_list_join(t, s, NEW_OUTGOING_WINDOW); + } + } else { + /* grpc_chttp2_transport level window update */ + transport_parsing->global.outgoing_window_update += st.window_update; + } + } + return 1; + case GRPC_CHTTP2_STREAM_ERROR: + become_skip_parser(transport_parsing); + cancel_stream_id( + t, transport_parsing->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR), + GRPC_CHTTP2_INTERNAL_ERROR, 1); + return 1; + case GRPC_CHTTP2_CONNECTION_ERROR: + drop_connection(transport_parsing); + return 0; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return 0; +} diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index a1830a8c25..0265f173f3 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -32,93 +32,145 @@ */ #include "src/core/transport/chttp2/internal.h" +#include "src/core/transport/chttp2/http2_errors.h" #include <grpc/support/log.h> -static void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport *t) { - grpc_chttp2_stream *s; - gpr_uint32 window_delta; +static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); +static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status); - /* don't do anything if we are already writing */ - if (t->writing.executing) { - return; - } +int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream_global *stream_global; + grpc_chttp2_stream_writing *stream_writing; + gpr_uint32 window_delta; /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf); - GPR_ASSERT(t->global.qbuf.count == 0); + gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); + GPR_ASSERT(transport_global->qbuf.count == 0); - if (t->dirtied_local_settings && !t->sent_local_settings) { + if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) { gpr_slice_buffer_add( - &t->writing.outbuf, grpc_chttp2_settings_create( - t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS], - t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); - t->force_send_settings = 0; - t->dirtied_local_settings = 0; - t->sent_local_settings = 1; + &transport_writing->outbuf, grpc_chttp2_settings_create( + transport_global->settings[SENT_SETTINGS], transport_global->settings[LOCAL_SETTINGS], + transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); + transport_global->force_send_settings = 0; + transport_global->dirtied_local_settings = 0; + transport_global->sent_local_settings = 1; } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && - s->outgoing_window > 0) { + while (transport_global->outgoing_window && + grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, &stream_writing) && + stream_global->outgoing_window > 0) { + stream_writing->id = stream_global->id; window_delta = grpc_chttp2_preencode( - s->outgoing_sopb->ops, &s->outgoing_sopb->nops, - GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb); - FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); - FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); - t->outgoing_window -= window_delta; - s->outgoing_window -= window_delta; - - if (s->write_state == WRITE_STATE_QUEUED_CLOSE && - s->outgoing_sopb->nops == 0) { - if (!t->is_client && !s->read_closed) { - s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM; + stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, + GPR_MIN(transport_global->outgoing_window, stream_global->outgoing_window), + &stream_writing->sopb); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); + transport_global->outgoing_window -= window_delta; + stream_global->outgoing_window -= window_delta; + + if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE && + stream_global->outgoing_sopb->nops == 0) { + if (!transport_constants->is_client && !stream_global->read_closed) { + stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM; } else { - s->writing.send_closed = SEND_CLOSED; + stream_writing->send_closed = SEND_CLOSED; } } - if (s->writing.sopb.nops > 0 || s->writing.send_closed) { - stream_list_join(t, s, WRITING); + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != DONT_SEND_CLOSED) { + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } /* we should either exhaust window or have no ops left, but not both */ - if (s->outgoing_sopb->nops == 0) { - s->outgoing_sopb = NULL; - schedule_cb(t, s->global.send_done_closure, 1); - } else if (s->outgoing_window) { - stream_list_add_tail(t, s, WRITABLE); + if (stream_global->outgoing_sopb->nops == 0) { + stream_global->outgoing_sopb = NULL; + grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1); + } else if (stream_global->outgoing_window) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } - if (!t->parsing.executing) { - /* for each grpc_chttp2_stream that wants to update its window, add that window here */ - while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { - window_delta = - t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - s->incoming_window; - if (!s->read_closed && window_delta) { - gpr_slice_buffer_add( - &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); - FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); - s->incoming_window += window_delta; - } + /* for each grpc_chttp2_stream that wants to update its window, add that window here */ + while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, &stream_global)) { + window_delta = + transport_global->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - + stream_global->incoming_window; + if (!stream_global->read_closed && window_delta > 0) { + gpr_slice_buffer_add( + &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_global->id, window_delta)); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->id, window_delta); + stream_global->incoming_window += window_delta; } + } + + /* if the grpc_chttp2_transport is ready to send a window update, do so here also */ + if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4) { + window_delta = transport_global->connection_window_target - transport_global->incoming_window; + gpr_slice_buffer_add(&transport_writing->outbuf, + grpc_chttp2_window_update_create(0, window_delta)); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, window_delta); + transport_global->incoming_window += window_delta; + } + + return transport_writing->outbuf.length > 0 || grpc_chttp2_list_have_writing_streams(transport_writing); +} + +void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) { + finalize_outbuf(transport_writing); - /* if the grpc_chttp2_transport is ready to send a window update, do so here also */ - if (t->incoming_window < t->connection_window_target * 3 / 4) { - window_delta = t->connection_window_target - t->incoming_window; - gpr_slice_buffer_add(&t->writing.outbuf, - grpc_chttp2_window_update_create(0, window_delta)); - FLOWCTL_TRACE(t, t, incoming, 0, window_delta); - t->incoming_window += window_delta; + GPR_ASSERT(transport_writing->outbuf.count > 0); + + switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, transport_writing->outbuf.count, + finish_write_cb, transport_writing)) { + case GRPC_ENDPOINT_WRITE_DONE: + grpc_chttp2_terminate_writing(transport_writing, 1); + break; + case GRPC_ENDPOINT_WRITE_ERROR: + grpc_chttp2_terminate_writing(transport_writing, 0); + break; + case GRPC_ENDPOINT_WRITE_PENDING: + break; + } +} + +static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream_writing *stream_writing; + + while (grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { + grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, + stream_writing->send_closed != DONT_SEND_CLOSED, stream_writing->id, + &transport_writing->hpack_compressor, &transport_writing->outbuf); + stream_writing->sopb.nops = 0; + if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) { + gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create( + stream_writing->id, GRPC_CHTTP2_NO_ERROR)); } + grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } +} - if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) { - t->writing.executing = 1; - ref_transport(t); - gpr_log(GPR_DEBUG, "schedule write"); - schedule_cb(t, &t->writing.action, 1); +static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) { + grpc_chttp2_transport_writing *transport_writing = tw; + grpc_chttp2_terminate_writing(transport_writing, write_status == GRPC_ENDPOINT_CB_OK); +} + +void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream_writing *stream_writing; + grpc_chttp2_stream_global *stream_global; + + while (grpc_chttp2_list_pop_written_stream(transport_global, transport_writing, &stream_global, &stream_writing)) { + if (stream_writing->send_closed != DONT_SEND_CLOSED) { + stream_global->write_state = WRITE_STATE_SENT_CLOSE; + if (!transport_constants->is_client) { + stream_global->read_closed = 1; + } + grpc_chttp2_read_write_state_changed(transport_global, stream_global); + } } + transport_writing->outbuf.count = 0; + transport_writing->outbuf.length = 0; } |