diff options
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2/frame.h | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_goaway.c | 12 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_goaway.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_ping.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_rst_stream.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_settings.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_window_update.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 293 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 607 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 174 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 815 |
14 files changed, 1027 insertions, 904 deletions
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h index c9e3e13042..9012bfa1e1 100644 --- a/src/core/transport/chttp2/frame.h +++ b/src/core/transport/chttp2/frame.h @@ -45,6 +45,7 @@ typedef enum { GRPC_CHTTP2_CONNECTION_ERROR } grpc_chttp2_parse_error; +#if 0 typedef struct { gpr_uint8 end_of_stream; gpr_uint8 need_flush_reads; @@ -62,6 +63,11 @@ typedef struct { gpr_slice goaway_text; gpr_uint32 rst_stream_reason; } grpc_chttp2_parse_state; +#endif + +/* defined in internal.h */ +typedef struct grpc_chttp2_stream_parsing grpc_chttp2_stream_parsing; +typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing; #define GRPC_CHTTP2_FRAME_DATA 0 #define GRPC_CHTTP2_FRAME_HEADER 1 diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index a1ae9ed2e6..129d211043 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -35,6 +35,7 @@ #include <string.h> +#include "src/core/transport/chttp2/internal.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -69,7 +70,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); @@ -77,8 +78,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_data_parser *p = parser; if (is_last && p->is_last_frame) { - state->end_of_stream = 1; - state->need_flush_reads = 1; + stream_parsing->received_close = 1; } if (cur == end) { @@ -129,27 +129,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->frame_size |= ((gpr_uint32) * cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; - state->need_flush_reads = 1; grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, 0); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) == p->frame_size) { - state->need_flush_reads = 1; grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { - state->need_flush_reads = 1; grpc_sopb_add_slice( &p->incoming_sopb, gpr_slice_sub(slice, cur - beg, cur + p->frame_size - beg)); cur += p->frame_size; goto fh_0; /* loop */ } else { - state->need_flush_reads = 1; grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->frame_size -= (end - cur); diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index 24e557accd..dbbb87fc01 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -72,7 +72,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( /* handle a slice of a data frame - is_last indicates the last slice of a frame */ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); /* create a slice with an empty data frame and is_last set */ gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id); diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c index 95b75d4fde..d7d6c587e6 100644 --- a/src/core/transport/chttp2/frame_goaway.c +++ b/src/core/transport/chttp2/frame_goaway.c @@ -32,6 +32,7 @@ */ #include "src/core/transport/chttp2/frame_goaway.h" +#include "src/core/transport/chttp2/internal.h" #include <string.h> @@ -62,7 +63,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); @@ -139,10 +140,11 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( p->debug_pos += end - cur; p->state = GRPC_CHTTP2_GOAWAY_DEBUG; if (is_last) { - state->goaway = 1; - state->goaway_last_stream_index = p->last_stream_id; - state->goaway_error = p->error_code; - state->goaway_text = + transport_parsing->goaway_received = 1; + transport_parsing->goaway_last_stream_index = p->last_stream_id; + gpr_slice_unref(transport_parsing->goaway_text); + transport_parsing->goaway_error = p->error_code; + transport_parsing->goaway_text = gpr_slice_new(p->debug_data, p->debug_length, gpr_free); p->debug_data = NULL; } diff --git a/src/core/transport/chttp2/frame_goaway.h b/src/core/transport/chttp2/frame_goaway.h index 7638891514..8148fa90f2 100644 --- a/src/core/transport/chttp2/frame_goaway.h +++ b/src/core/transport/chttp2/frame_goaway.h @@ -65,7 +65,7 @@ void grpc_chttp2_goaway_parser_destroy(grpc_chttp2_goaway_parser *p); grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( grpc_chttp2_goaway_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code, gpr_slice debug_data, diff --git a/src/core/transport/chttp2/frame_ping.h b/src/core/transport/chttp2/frame_ping.h index 11d38b80ea..71f8351223 100644 --- a/src/core/transport/chttp2/frame_ping.h +++ b/src/core/transport/chttp2/frame_ping.h @@ -48,6 +48,6 @@ gpr_slice grpc_chttp2_ping_create(gpr_uint8 ack, gpr_uint8 *opaque_8bytes); grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame( grpc_chttp2_ping_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */ diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h index 07a3c98d03..b83d1261d0 100644 --- a/src/core/transport/chttp2/frame_rst_stream.h +++ b/src/core/transport/chttp2/frame_rst_stream.h @@ -47,6 +47,6 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code); grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */ diff --git a/src/core/transport/chttp2/frame_settings.h b/src/core/transport/chttp2/frame_settings.h index 18765631a6..701f2b94d2 100644 --- a/src/core/transport/chttp2/frame_settings.h +++ b/src/core/transport/chttp2/frame_settings.h @@ -94,6 +94,6 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame( grpc_chttp2_settings_parser *parser, gpr_uint32 length, gpr_uint8 flags, gpr_uint32 *settings); grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */ diff --git a/src/core/transport/chttp2/frame_window_update.h b/src/core/transport/chttp2/frame_window_update.h index 85475a8f9e..7217325beb 100644 --- a/src/core/transport/chttp2/frame_window_update.h +++ b/src/core/transport/chttp2/frame_window_update.h @@ -50,6 +50,6 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_window_update_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */ diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index bfc06b3980..507d7cfea0 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -107,7 +107,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, /* wraps grpc_chttp2_hpack_parser_parse to provide a frame level parser for the transport */ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( - void *hpack_parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index a21a7a4d75..5eba01a3e1 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -36,6 +36,7 @@ #include "src/core/transport/transport_impl.h" #include "src/core/iomgr/endpoint.h" +#include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_goaway.h" #include "src/core/transport/chttp2/frame_ping.h" @@ -172,16 +173,110 @@ typedef struct { gpr_slice debug; } grpc_chttp2_pending_goaway; +typedef struct { + /** data to write next write */ + gpr_slice_buffer qbuf; + /** queued callbacks */ + grpc_iomgr_closure *pending_closures; + + /** window available for us to send to peer */ + gpr_uint32 outgoing_window; + /** how much window would we like to have for incoming_window */ + gpr_uint32 connection_window_target; + + + /** are the local settings dirty and need to be sent? */ + gpr_uint8 dirtied_local_settings; + /** have local settings been sent? */ + gpr_uint8 sent_local_settings; + /** bitmask of setting indexes to send out */ + gpr_uint32 force_send_settings; + /** settings values */ + gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + + /** last received stream id */ + gpr_uint32 last_incoming_stream_id; +} grpc_chttp2_transport_global; + +typedef struct { + /** data to write now */ + gpr_slice_buffer outbuf; + /** hpack encoding */ + grpc_chttp2_hpack_compressor hpack_compressor; +} grpc_chttp2_transport_writing; + +struct grpc_chttp2_transport_parsing { + /** is this transport a client? (boolean) */ + gpr_uint8 is_client; + + /** were settings updated? */ + gpr_uint8 settings_updated; + /** was a settings ack received? */ + gpr_uint8 settings_ack_received; + /** was a goaway frame received? */ + gpr_uint8 goaway_received; + + /** data to write later - after parsing */ + gpr_slice_buffer qbuf; + /* metadata object cache */ + grpc_mdstr *str_grpc_timeout; + /** parser for headers */ + grpc_chttp2_hpack_parser hpack_parser; + /** simple one shot parsers */ + union { + grpc_chttp2_window_update_parser window_update; + grpc_chttp2_settings_parser settings; + grpc_chttp2_ping_parser ping; + grpc_chttp2_rst_stream_parser rst_stream; + } simple; + /** parser for goaway frames */ + grpc_chttp2_goaway_parser goaway_parser; + + /** window available for peer to send to us */ + gpr_uint32 incoming_window; + + /** next stream id available at the time of beginning parsing */ + gpr_uint32 next_stream_id; + gpr_uint32 last_incoming_stream_id; + + /* deframing */ + grpc_chttp2_deframe_transport_state deframe_state; + gpr_uint8 incoming_frame_type; + gpr_uint8 incoming_frame_flags; + gpr_uint8 header_eof; + gpr_uint32 expect_continuation_stream_id; + gpr_uint32 incoming_frame_size; + gpr_uint32 incoming_stream_id; + + /* active parser */ + void *parser_data; + grpc_chttp2_stream_parsing *incoming_stream; + grpc_chttp2_parse_error (*parser)(void *parser_user_data, + grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, + gpr_slice slice, int is_last); + + /* received settings */ + gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS]; + + /* goaway data */ + grpc_status_code goaway_error; + gpr_uint32 goaway_last_stream_index; + gpr_slice goaway_text; +}; + + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; - gpr_uint8 is_client; gpr_mu mu; gpr_cv cv; + /** is a thread currently writing */ + gpr_uint8 writing_active; + /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; /** are we calling back any grpc_transport_op completion events */ @@ -192,28 +287,9 @@ struct grpc_chttp2_transport { /* stream indexing */ gpr_uint32 next_stream_id; - gpr_uint32 last_incoming_stream_id; - - /* settings */ - gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; - gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */ - gpr_uint8 sent_local_settings; /* have local settings been sent? */ - gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */ /* window management */ - gpr_uint32 outgoing_window; gpr_uint32 outgoing_window_update; - gpr_uint32 incoming_window; - gpr_uint32 connection_window_target; - - /* deframing */ - grpc_chttp2_deframe_transport_state deframe_state; - gpr_uint8 incoming_frame_type; - gpr_uint8 incoming_frame_flags; - gpr_uint8 header_eof; - gpr_uint32 expect_continuation_stream_id; - gpr_uint32 incoming_frame_size; - gpr_uint32 incoming_stream_id; /* goaway */ grpc_chttp2_pending_goaway *pending_goaways; @@ -226,13 +302,6 @@ struct grpc_chttp2_transport { /* stream ops that need to be destroyed, but outside of the lock */ grpc_stream_op_buffer nuke_later_sopb; - /* active parser */ - void *parser_data; - grpc_chttp2_stream *incoming_stream; - grpc_chttp2_parse_error (*parser)(void *parser_user_data, - grpc_chttp2_parse_state *state, - gpr_slice slice, int is_last); - grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; grpc_chttp2_stream_map stream_map; @@ -242,46 +311,12 @@ struct grpc_chttp2_transport { size_t ping_capacity; gpr_int64 ping_counter; - struct { - /* metadata object cache */ - grpc_mdstr *str_grpc_timeout; - } constants; - - struct { - /** data to write next write */ - gpr_slice_buffer qbuf; - /* queued callbacks */ - grpc_iomgr_closure *pending_closures; - } global; - - struct { - /** is a thread currently writing */ - gpr_uint8 executing; - /** closure to execute this action */ - grpc_iomgr_closure action; - /** data to write now */ - gpr_slice_buffer outbuf; - /* hpack encoding */ - grpc_chttp2_hpack_compressor hpack_compressor; - } writing; + grpc_chttp2_transport_global global; + grpc_chttp2_transport_writing writing; + grpc_chttp2_transport_parsing parsing; - struct { - /** is a thread currently parsing */ - gpr_uint8 executing; - /** data to write later - after parsing */ - gpr_slice_buffer qbuf; - /** parser for headers */ - grpc_chttp2_hpack_parser hpack_parser; - /** simple one shot parsers */ - union { - grpc_chttp2_window_update_parser window_update; - grpc_chttp2_settings_parser settings; - grpc_chttp2_ping_parser ping; - grpc_chttp2_rst_stream_parser rst_stream; - } simple; - /** parser for goaway frames */ - grpc_chttp2_goaway_parser goaway_parser; - } parsing; + /** closure to execute writing */ + grpc_iomgr_closure writing_action; struct { /** is a thread currently performing channel callbacks */ @@ -295,37 +330,47 @@ struct grpc_chttp2_transport { } channel_callback; }; -struct grpc_chttp2_stream { - struct { - grpc_iomgr_closure *send_done_closure; - grpc_iomgr_closure *recv_done_closure; - } global; - - struct { - /* sops that have passed flow control to be written */ - grpc_stream_op_buffer sopb; - /* how strongly should we indicate closure with the next write */ - grpc_chttp2_send_closed send_closed; - } writing; - - struct { - int unused; - } parsing; - +typedef struct { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - gpr_uint32 incoming_window; + grpc_iomgr_closure *send_done_closure; + grpc_iomgr_closure *recv_done_closure; + + /** window available for us to send to peer */ gpr_int64 outgoing_window; - gpr_uint32 outgoing_window_update; - /* when the application requests writes be closed, the write_closed is - 'queued'; when the close is flow controlled into the send path, we are - 'sending' it; when the write has been performed it is 'sent' */ + /** stream ops the transport user would like to send */ + grpc_stream_op_buffer *outgoing_sopb; + /** when the application requests writes be closed, the write_closed is + 'queued'; when the close is flow controlled into the send path, we are + 'sending' it; when the write has been performed it is 'sent' */ grpc_chttp2_write_state write_state; + /** is this stream closed (boolean) */ gpr_uint8 read_closed; - gpr_uint8 cancelled; +} grpc_chttp2_stream_global; - grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; - gpr_uint8 included[STREAM_LIST_COUNT]; +typedef struct { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ + gpr_uint32 id; + /** sops that have passed flow control to be written */ + grpc_stream_op_buffer sopb; + /** how strongly should we indicate closure with the next write */ + grpc_chttp2_send_closed send_closed; +} grpc_chttp2_stream_writing; + +struct grpc_chttp2_stream_parsing { + /** HTTP2 stream id for this stream, or zero if one has not been assigned */ + gpr_uint32 id; + /** has this stream received a close */ + gpr_uint8 received_close; + /** incoming_window has been reduced during parsing */ + gpr_uint8 incoming_window_changed; + /** saw an error on this stream during parsing (it should be cancelled) */ + gpr_uint8 saw_error; + /** window available for peer to send to us */ + gpr_uint32 incoming_window; + /** parsing state for data frames */ + grpc_chttp2_data_parser data_parser; /* incoming metadata */ grpc_linked_mdelem *incoming_metadata; @@ -333,21 +378,79 @@ struct grpc_chttp2_stream { size_t incoming_metadata_capacity; grpc_linked_mdelem *old_incoming_metadata; gpr_timespec incoming_deadline; +}; + +struct grpc_chttp2_stream { + grpc_chttp2_stream_global global; + grpc_chttp2_stream_writing writing; + + gpr_uint32 outgoing_window_update; + gpr_uint8 cancelled; + + grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; + gpr_uint8 included[STREAM_LIST_COUNT]; /* sops from application */ - grpc_stream_op_buffer *outgoing_sopb; grpc_stream_op_buffer *incoming_sopb; grpc_stream_state *publish_state; grpc_stream_state published_state; - grpc_chttp2_data_parser parser; - grpc_stream_state callback_state; grpc_stream_op_buffer callback_sopb; }; +/** Transport writing call flow: + chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes are required; + if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the writes. + Once writes have been completed (meaning another write could potentially be started), + grpc_chttp2_terminate_writing is called. This will call grpc_chttp2_cleanup_writing, at which + point the write phase is complete. */ + /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ -void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); +int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); +void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); +void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success); +void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); + +/** Process one slice of incoming data */ +int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice); +void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); + +/** Get a writable stream + \return non-zero if there was a stream available */ +void grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); + +void grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport_writing *transport_writing); +int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing **stream_writing); + +void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); + +int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); + +void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); + +void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success); +void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); + +grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); +grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); + +#define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0) + +#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" +#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING)-1) + +extern int grpc_http_trace; + +#define IF_TRACING(stmt) \ + if (!(grpc_http_trace)) \ + ; \ + else \ + stmt #endif diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 9a547ad319..2dd46b3116 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -32,4 +32,611 @@ */ #include "src/core/transport/chttp2/internal.h" +#include "src/core/transport/chttp2/timeout_encoding.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation); +static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header); + +static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); + +void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) { + /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when + sending GOAWAY frame. + https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8 + says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So, + since we don't have server pushed streams, client should send + GOAWAY last-grpc_chttp2_stream-id=0 in this case. */ + if (!transport_parsing->is_client) { + transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id; + } +} + +int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) { + gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); + gpr_uint8 *end = GPR_SLICE_END_PTR(slice); + gpr_uint8 *cur = beg; + + if (cur == end) return 1; + + switch (transport_parsing->deframe_state) { + case DTS_CLIENT_PREFIX_0: + case DTS_CLIENT_PREFIX_1: + case DTS_CLIENT_PREFIX_2: + case DTS_CLIENT_PREFIX_3: + case DTS_CLIENT_PREFIX_4: + case DTS_CLIENT_PREFIX_5: + case DTS_CLIENT_PREFIX_6: + case DTS_CLIENT_PREFIX_7: + case DTS_CLIENT_PREFIX_8: + case DTS_CLIENT_PREFIX_9: + case DTS_CLIENT_PREFIX_10: + case DTS_CLIENT_PREFIX_11: + case DTS_CLIENT_PREFIX_12: + case DTS_CLIENT_PREFIX_13: + case DTS_CLIENT_PREFIX_14: + case DTS_CLIENT_PREFIX_15: + case DTS_CLIENT_PREFIX_16: + case DTS_CLIENT_PREFIX_17: + case DTS_CLIENT_PREFIX_18: + case DTS_CLIENT_PREFIX_19: + case DTS_CLIENT_PREFIX_20: + case DTS_CLIENT_PREFIX_21: + case DTS_CLIENT_PREFIX_22: + case DTS_CLIENT_PREFIX_23: + while (cur != end && transport_parsing->deframe_state != DTS_FH_0) { + if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state]) { + gpr_log(GPR_ERROR, + "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " + "at byte %d", + GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state], + (int)(gpr_uint8)GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state], *cur, + (int)*cur, transport_parsing->deframe_state); + return 0; + } + ++cur; + ++transport_parsing->deframe_state; + } + if (cur == end) { + return 1; + } + /* fallthrough */ + dts_fh_0: + case DTS_FH_0: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_1; + return 1; + } + /* fallthrough */ + case DTS_FH_1: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_2; + return 1; + } + /* fallthrough */ + case DTS_FH_2: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_size |= *cur; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_3; + return 1; + } + /* fallthrough */ + case DTS_FH_3: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_type = *cur; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_4; + return 1; + } + /* fallthrough */ + case DTS_FH_4: + GPR_ASSERT(cur < end); + transport_parsing->incoming_frame_flags = *cur; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_5; + return 1; + } + /* fallthrough */ + case DTS_FH_5: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_6; + return 1; + } + /* fallthrough */ + case DTS_FH_6: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_7; + return 1; + } + /* fallthrough */ + case DTS_FH_7: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8; + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_8; + return 1; + } + /* fallthrough */ + case DTS_FH_8: + GPR_ASSERT(cur < end); + transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur); + transport_parsing->deframe_state = DTS_FRAME; + if (!init_frame_parser(transport_parsing)) { + return 0; + } + if (transport_parsing->incoming_stream_id) { + transport_parsing->last_incoming_stream_id = transport_parsing->incoming_stream_id; + } + if (transport_parsing->incoming_frame_size == 0) { + if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) { + return 0; + } + if (++cur == end) { + transport_parsing->deframe_state = DTS_FH_0; + return 1; + } + goto dts_fh_0; /* loop */ + } + if (++cur == end) { + return 1; + } + /* fallthrough */ + case DTS_FRAME: + GPR_ASSERT(cur < end); + if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) { + if (!parse_frame_slice( + transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { + return 0; + } + transport_parsing->deframe_state = DTS_FH_0; + return 1; + } else if ((gpr_uint32)(end - cur) > transport_parsing->incoming_frame_size) { + if (!parse_frame_slice( + transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, + cur + transport_parsing->incoming_frame_size - beg), + 1)) { + return 0; + } + cur += transport_parsing->incoming_frame_size; + goto dts_fh_0; /* loop */ + } else { + if (!parse_frame_slice( + transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) { + return 0; + } + transport_parsing->incoming_frame_size -= (end - cur); + return 1; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + } + + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + + return 0; +} + +static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + if (transport_parsing->expect_continuation_stream_id != 0) { + if (transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) { + gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x", + transport_parsing->incoming_frame_type); + return 0; + } + if (transport_parsing->expect_continuation_stream_id != transport_parsing->incoming_stream_id) { + gpr_log(GPR_ERROR, + "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x", + transport_parsing->expect_continuation_stream_id, transport_parsing->incoming_stream_id); + return 0; + } + return init_header_frame_parser(transport_parsing, 1); + } + switch (transport_parsing->incoming_frame_type) { + case GRPC_CHTTP2_FRAME_DATA: + return init_data_frame_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_HEADER: + return init_header_frame_parser(transport_parsing, 0); + case GRPC_CHTTP2_FRAME_CONTINUATION: + gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); + return 0; + case GRPC_CHTTP2_FRAME_RST_STREAM: + return init_rst_stream_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_SETTINGS: + return init_settings_frame_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: + return init_window_update_frame_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_PING: + return init_ping_parser(transport_parsing); + case GRPC_CHTTP2_FRAME_GOAWAY: + return init_goaway_parser(transport_parsing); + default: + gpr_log(GPR_ERROR, "Unknown frame type %02x", transport_parsing->incoming_frame_type); + return init_skip_frame_parser(transport_parsing, 0); + } +} + +static grpc_chttp2_parse_error skip_parser(void *parser, + grpc_chttp2_parse_state *st, + gpr_slice slice, int is_last) { + return GRPC_CHTTP2_PARSE_OK; +} + +static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } + +static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int is_header) { + if (is_header) { + int is_eoh = transport_parsing->expect_continuation_stream_id != 0; + transport_parsing->parser = grpc_chttp2_header_parser_parse; + transport_parsing->parser_data = &transport_parsing->hpack_parser; + transport_parsing->hpack_parser.on_header = skip_header; + transport_parsing->hpack_parser.on_header_user_data = NULL; + transport_parsing->hpack_parser.is_boundary = is_eoh; + transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0; + } else { + transport_parsing->parser = skip_parser; + } + return 1; +} + +static void become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing) { + init_skip_frame(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse); +} + +static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { + if (transport_parsing->incoming_frame_size > transport_parsing->incoming_window) { + gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", + transport_parsing->incoming_frame_size, transport_parsing->incoming_window); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + + if (transport_parsing->incoming_frame_size > stream_parsing->incoming_window) { + gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", + transport_parsing->incoming_frame_size, stream_parsing->incoming_window); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + + GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, -(gpr_int64)transport_parsing->incoming_frame_size); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->global.id, -(gpr_int64)transport_parsing->incoming_frame_size); + transport_parsing->incoming_window -= transport_parsing->incoming_frame_size; + stream_parsing->incoming_window -= transport_parsing->incoming_frame_size; + + /* if the grpc_chttp2_stream incoming window is getting low, schedule an update */ + stream_parsing->incoming_window_changed = 1; + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); + + return GRPC_CHTTP2_PARSE_OK; +} + +static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); + grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; + if (!stream_parsing || stream_parsing->received_close) return init_skip_frame(transport_parsing, 0); + if (err == GRPC_CHTTP2_PARSE_OK) { + err = update_incoming_window(transport_parsing, stream_parsing); + } + if (err == GRPC_CHTTP2_PARSE_OK) { + err = grpc_chttp2_data_parser_begin_frame(&stream_parsing->data_parser, + transport_parsing->incoming_frame_flags); + } + switch (err) { + case GRPC_CHTTP2_PARSE_OK: + transport_parsing->incoming_stream = stream_parsing; + transport_parsing->parser = grpc_chttp2_data_parser_parse; + transport_parsing->parser_data = &stream_parsing->data_parser; + return 1; + case GRPC_CHTTP2_STREAM_ERROR: + stream_parsing->received_close = 1; + stream_parsing->saw_error = 1; + return init_skip_frame(transport_parsing, 0); + case GRPC_CHTTP2_CONNECTION_ERROR: + return 0; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return 0; +} + +static void free_timeout(void *p) { gpr_free(p); } + +static void add_incoming_metadata(grpc_chttp2_stream_parsing *stream_parsing, grpc_mdelem *elem) { + if (stream_parsing->incoming_metadata_capacity == stream_parsing->incoming_metadata_count) { + stream_parsing->incoming_metadata_capacity = + GPR_MAX(8, 2 * stream_parsing->incoming_metadata_capacity); + stream_parsing->incoming_metadata = + gpr_realloc(stream_parsing->incoming_metadata, sizeof(*stream_parsing->incoming_metadata) * + stream_parsing->incoming_metadata_capacity); + } + stream_parsing->incoming_metadata[stream_parsing->incoming_metadata_count++].md = elem; +} + +static void on_header(void *tp, grpc_mdelem *md) { + grpc_chttp2_transport_parsing *transport_parsing = tp; + grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; + + GPR_ASSERT(stream_parsing); + + IF_TRACING(gpr_log( + GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, transport_parsing->is_client ? "CLI" : "SVR", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); + + if (md->key == transport_parsing->str_grpc_timeout) { + gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); + if (!cached_timeout) { + /* not already parsed: parse it now, and store the result away */ + cached_timeout = gpr_malloc(sizeof(gpr_timespec)); + if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value), + cached_timeout)) { + gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", + grpc_mdstr_as_c_string(md->value)); + *cached_timeout = gpr_inf_future; + } + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + } + stream_parsing->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout); + grpc_mdelem_unref(md); + } else { + add_incoming_metadata(stream_parsing, md); + } + + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); +} + +static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) { + int is_eoh = + (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; + grpc_chttp2_stream_parsing *stream_parsing; + + if (is_eoh) { + transport_parsing->expect_continuation_stream_id = 0; + } else { + transport_parsing->expect_continuation_stream_id = transport_parsing->incoming_stream_id; + } + + if (!is_continuation) { + transport_parsing->header_eof = + (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; + } + + /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ + stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); + if (!stream_parsing) { + if (is_continuation) { + gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received"); + return init_skip_frame(transport_parsing, 1); + } + if (transport_parsing->is_client) { + if ((transport_parsing->incoming_stream_id & 1) && + transport_parsing->incoming_stream_id < transport_parsing->next_stream_id) { + /* this is an old (probably cancelled) grpc_chttp2_stream */ + } else { + gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client"); + } + return init_skip_frame(transport_parsing, 1); + } else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) { + gpr_log(GPR_ERROR, + "ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream " + "id=%d, new grpc_chttp2_stream id=%d", + transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id); + return init_skip_frame(transport_parsing, 1); + } else if ((transport_parsing->incoming_stream_id & 1) == 0) { + gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d", + transport_parsing->incoming_stream_id); + return init_skip_frame(transport_parsing, 1); + } + stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream(transport_parsing, transport_parsing->incoming_stream_id); + if (!stream_parsing) { + gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); + return init_skip_frame(transport_parsing, 1); + } + } else { + transport_parsing->incoming_stream = stream_parsing; + } + if (stream_parsing->received_close) { + gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); + transport_parsing->incoming_stream = NULL; + return init_skip_frame(transport_parsing, 1); + } + transport_parsing->parser = grpc_chttp2_header_parser_parse; + transport_parsing->parser_data = &transport_parsing->hpack_parser; + transport_parsing->hpack_parser.on_header = on_header; + transport_parsing->hpack_parser.on_header_user_data = transport_parsing; + transport_parsing->hpack_parser.is_boundary = is_eoh; + transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0; + if (!is_continuation && + (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { + grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser); + } + return 1; +} + +static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame( + &transport_parsing->simple.window_update, + transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_window_update_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.window_update; + return ok; +} + +static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_ping_parser_begin_frame(&transport_parsing->simple.ping, + transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_ping_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.ping; + return ok; +} + +static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame( + &transport_parsing->simple.rst_stream, + transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.rst_stream; + return ok; +} + +static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok = + GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_goaway_parser_begin_frame( + &transport_parsing->goaway_parser, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); + transport_parsing->parser = grpc_chttp2_goaway_parser_parse; + transport_parsing->parser_data = &transport_parsing->goaway_parser; + return ok; +} + +static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { + int ok; + + if (transport_parsing->incoming_stream_id != 0) { + gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", transport_parsing->incoming_stream_id); + return 0; + } + + ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_settings_parser_begin_frame( + &transport_parsing->simple.settings, transport_parsing->incoming_frame_size, + transport_parsing->incoming_frame_flags, transport_parsing->settings); + if (!ok) { + return 0; + } + if (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { + transport_parsing->settings_ack_received = 1; + } else { + transport_parsing->settings_updated = 1; + } + transport_parsing->parser = grpc_chttp2_settings_parser_parse; + transport_parsing->parser_data = &transport_parsing->simple.settings; + return ok; +} + +/* +static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { + return window + window_update < MAX_WINDOW; +} +*/ + +static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { + grpc_metadata_batch b; + + b.list.head = NULL; + /* Store away the last element of the list, so that in patch_metadata_ops + we can reconstitute the list. + We can't do list building here as later incoming metadata may reallocate + the underlying array. */ + b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count; + b.garbage.head = b.garbage.tail = NULL; + b.deadline = stream_parsing->incoming_deadline; + stream_parsing->incoming_deadline = gpr_inf_future; + + grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); +} + +static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, int is_last) { + grpc_chttp2_parse_state st; + size_t i; + memset(&st, 0, sizeof(st)); + switch (transport_parsing->parser(transport_parsing->parser_data, &st, slice, is_last)) { + case GRPC_CHTTP2_PARSE_OK: + if (stream_parsing) { + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); + } + if (st.end_of_stream) { + transport_parsing->incoming_stream->read_closed = 1; + maybe_finish_read(t, transport_parsing->incoming_stream, 1); + } + if (st.need_flush_reads) { + maybe_finish_read(t, transport_parsing->incoming_stream, 1); + } + if (st.metadata_boundary) { + add_metadata_batch(t, transport_parsing->incoming_stream); + maybe_finish_read(t, transport_parsing->incoming_stream, 1); + } + if (st.ack_settings) { + gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create()); + } + if (st.send_ping_ack) { + gpr_slice_buffer_add( + &transport_parsing->qbuf, + grpc_chttp2_ping_create(1, transport_parsing->simple.ping.opaque_8bytes)); + } + if (st.goaway) { + add_goaway(t, st.goaway_error, st.goaway_text); + } + if (st.rst_stream) { + cancel_stream_id( + t, transport_parsing->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason), + st.rst_stream_reason, 0); + } + if (st.process_ping_reply) { + for (i = 0; i < transport_parsing->ping_count; i++) { + if (0 == + memcmp(transport_parsing->pings[i].id, transport_parsing->simple.ping.opaque_8bytes, 8)) { + transport_parsing->pings[i].cb(transport_parsing->pings[i].user_data); + memmove(&transport_parsing->pings[i], &transport_parsing->pings[i + 1], + (transport_parsing->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping)); + transport_parsing->ping_count--; + break; + } + } + } + if (st.initial_window_update) { + for (i = 0; i < transport_parsing->stream_map.count; i++) { + grpc_chttp2_stream *s = (grpc_chttp2_stream *)(transport_parsing->stream_map.values[i]); + s->global.outgoing_window_update += st.initial_window_update; + stream_list_join(t, s, NEW_OUTGOING_WINDOW); + } + } + if (st.window_update) { + if (transport_parsing->incoming_stream_id) { + /* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */ + grpc_chttp2_stream *s = lookup_stream(t, transport_parsing->incoming_stream_id); + if (s) { + s->global.outgoing_window_update += st.window_update; + stream_list_join(t, s, NEW_OUTGOING_WINDOW); + } + } else { + /* grpc_chttp2_transport level window update */ + transport_parsing->global.outgoing_window_update += st.window_update; + } + } + return 1; + case GRPC_CHTTP2_STREAM_ERROR: + become_skip_parser(transport_parsing); + cancel_stream_id( + t, transport_parsing->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR), + GRPC_CHTTP2_INTERNAL_ERROR, 1); + return 1; + case GRPC_CHTTP2_CONNECTION_ERROR: + drop_connection(transport_parsing); + return 0; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return 0; +} diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index a1830a8c25..0265f173f3 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -32,93 +32,145 @@ */ #include "src/core/transport/chttp2/internal.h" +#include "src/core/transport/chttp2/http2_errors.h" #include <grpc/support/log.h> -static void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport *t) { - grpc_chttp2_stream *s; - gpr_uint32 window_delta; +static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); +static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status); - /* don't do anything if we are already writing */ - if (t->writing.executing) { - return; - } +int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream_global *stream_global; + grpc_chttp2_stream_writing *stream_writing; + gpr_uint32 window_delta; /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf); - GPR_ASSERT(t->global.qbuf.count == 0); + gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); + GPR_ASSERT(transport_global->qbuf.count == 0); - if (t->dirtied_local_settings && !t->sent_local_settings) { + if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) { gpr_slice_buffer_add( - &t->writing.outbuf, grpc_chttp2_settings_create( - t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS], - t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); - t->force_send_settings = 0; - t->dirtied_local_settings = 0; - t->sent_local_settings = 1; + &transport_writing->outbuf, grpc_chttp2_settings_create( + transport_global->settings[SENT_SETTINGS], transport_global->settings[LOCAL_SETTINGS], + transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); + transport_global->force_send_settings = 0; + transport_global->dirtied_local_settings = 0; + transport_global->sent_local_settings = 1; } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && - s->outgoing_window > 0) { + while (transport_global->outgoing_window && + grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, &stream_writing) && + stream_global->outgoing_window > 0) { + stream_writing->id = stream_global->id; window_delta = grpc_chttp2_preencode( - s->outgoing_sopb->ops, &s->outgoing_sopb->nops, - GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb); - FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); - FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); - t->outgoing_window -= window_delta; - s->outgoing_window -= window_delta; - - if (s->write_state == WRITE_STATE_QUEUED_CLOSE && - s->outgoing_sopb->nops == 0) { - if (!t->is_client && !s->read_closed) { - s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM; + stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, + GPR_MIN(transport_global->outgoing_window, stream_global->outgoing_window), + &stream_writing->sopb); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); + transport_global->outgoing_window -= window_delta; + stream_global->outgoing_window -= window_delta; + + if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE && + stream_global->outgoing_sopb->nops == 0) { + if (!transport_constants->is_client && !stream_global->read_closed) { + stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM; } else { - s->writing.send_closed = SEND_CLOSED; + stream_writing->send_closed = SEND_CLOSED; } } - if (s->writing.sopb.nops > 0 || s->writing.send_closed) { - stream_list_join(t, s, WRITING); + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != DONT_SEND_CLOSED) { + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } /* we should either exhaust window or have no ops left, but not both */ - if (s->outgoing_sopb->nops == 0) { - s->outgoing_sopb = NULL; - schedule_cb(t, s->global.send_done_closure, 1); - } else if (s->outgoing_window) { - stream_list_add_tail(t, s, WRITABLE); + if (stream_global->outgoing_sopb->nops == 0) { + stream_global->outgoing_sopb = NULL; + grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1); + } else if (stream_global->outgoing_window) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } - if (!t->parsing.executing) { - /* for each grpc_chttp2_stream that wants to update its window, add that window here */ - while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { - window_delta = - t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - s->incoming_window; - if (!s->read_closed && window_delta) { - gpr_slice_buffer_add( - &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); - FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); - s->incoming_window += window_delta; - } + /* for each grpc_chttp2_stream that wants to update its window, add that window here */ + while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, &stream_global)) { + window_delta = + transport_global->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - + stream_global->incoming_window; + if (!stream_global->read_closed && window_delta > 0) { + gpr_slice_buffer_add( + &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_global->id, window_delta)); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->id, window_delta); + stream_global->incoming_window += window_delta; } + } + + /* if the grpc_chttp2_transport is ready to send a window update, do so here also */ + if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4) { + window_delta = transport_global->connection_window_target - transport_global->incoming_window; + gpr_slice_buffer_add(&transport_writing->outbuf, + grpc_chttp2_window_update_create(0, window_delta)); + GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, window_delta); + transport_global->incoming_window += window_delta; + } + + return transport_writing->outbuf.length > 0 || grpc_chttp2_list_have_writing_streams(transport_writing); +} + +void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) { + finalize_outbuf(transport_writing); - /* if the grpc_chttp2_transport is ready to send a window update, do so here also */ - if (t->incoming_window < t->connection_window_target * 3 / 4) { - window_delta = t->connection_window_target - t->incoming_window; - gpr_slice_buffer_add(&t->writing.outbuf, - grpc_chttp2_window_update_create(0, window_delta)); - FLOWCTL_TRACE(t, t, incoming, 0, window_delta); - t->incoming_window += window_delta; + GPR_ASSERT(transport_writing->outbuf.count > 0); + + switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, transport_writing->outbuf.count, + finish_write_cb, transport_writing)) { + case GRPC_ENDPOINT_WRITE_DONE: + grpc_chttp2_terminate_writing(transport_writing, 1); + break; + case GRPC_ENDPOINT_WRITE_ERROR: + grpc_chttp2_terminate_writing(transport_writing, 0); + break; + case GRPC_ENDPOINT_WRITE_PENDING: + break; + } +} + +static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream_writing *stream_writing; + + while (grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { + grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, + stream_writing->send_closed != DONT_SEND_CLOSED, stream_writing->id, + &transport_writing->hpack_compressor, &transport_writing->outbuf); + stream_writing->sopb.nops = 0; + if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) { + gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create( + stream_writing->id, GRPC_CHTTP2_NO_ERROR)); } + grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } +} - if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) { - t->writing.executing = 1; - ref_transport(t); - gpr_log(GPR_DEBUG, "schedule write"); - schedule_cb(t, &t->writing.action, 1); +static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) { + grpc_chttp2_transport_writing *transport_writing = tw; + grpc_chttp2_terminate_writing(transport_writing, write_status == GRPC_ENDPOINT_CB_OK); +} + +void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream_writing *stream_writing; + grpc_chttp2_stream_global *stream_global; + + while (grpc_chttp2_list_pop_written_stream(transport_global, transport_writing, &stream_global, &stream_writing)) { + if (stream_writing->send_closed != DONT_SEND_CLOSED) { + stream_global->write_state = WRITE_STATE_SENT_CLOSE; + if (!transport_constants->is_client) { + stream_global->read_closed = 1; + } + grpc_chttp2_read_write_state_changed(transport_global, stream_global); + } } + transport_writing->outbuf.count = 0; + transport_writing->outbuf.length = 0; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1cfbc07d97..13ddeacc02 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -55,24 +55,17 @@ #define MAX_CLIENT_STREAM_ID 0x7fffffffu -#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" -#define CLIENT_CONNECT_STRLEN 24 - int grpc_http_trace = 0; int grpc_flowctl_trace = 0; -#define IF_TRACING(stmt) \ - if (!(grpc_http_trace)) \ - ; \ - else \ - stmt - #define FLOWCTL_TRACE(t, obj, dir, id, delta) \ if (!grpc_flowctl_trace) \ ; \ else \ flowctl_trace(t, #dir, obj->dir##_window, id, delta) +#define TRANSPORT_FROM_WRITING(tw) ((grpc_chttp2_transport*)((char*)(tw) - offsetof(grpc_chttp2_transport, writing))) + static const grpc_transport_vtable vtable; static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, @@ -211,10 +204,10 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba t->reading = 1; t->error_state = ERROR_STATE_NONE; t->next_stream_id = is_client ? 1 : 2; - t->is_client = is_client; - t->outgoing_window = DEFAULT_WINDOW; - t->incoming_window = DEFAULT_WINDOW; - t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; + t->constants.is_client = is_client; + t->global.outgoing_window = DEFAULT_WINDOW; + t->global.incoming_window = DEFAULT_WINDOW; + t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; t->ping_counter = gpr_now().tv_nsec; @@ -222,7 +215,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba gpr_slice_buffer_init(&t->writing.outbuf); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); - grpc_iomgr_closure_init(&t->writing.action, writing_action, t); + grpc_iomgr_closure_init(&t->writing_action, writing_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); @@ -244,17 +237,17 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) { - t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value; + t->global.settings[i][j] = grpc_chttp2_settings_parameters[j].default_value; } } - t->dirtied_local_settings = 1; + t->global.dirtied_local_settings = 1; /* Hack: it's common for implementations to assume 65536 bytes initial send window -- this should by rights be 0 */ - t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - t->sent_local_settings = 0; + t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; + t->global.sent_local_settings = 0; /* configure http2 the way we like it */ - if (t->is_client) { + if (t->constants.is_client) { push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } @@ -264,7 +257,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) { - if (t->is_client) { + if (t->constants.is_client) { gpr_log(GPR_ERROR, "%s: is ignored on the client", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) { @@ -283,7 +276,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba (channel_args->args[i].value.integer & 1)) { gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, - t->is_client ? "client" : "server"); + t->constants.is_client ? "client" : "server"); } else { t->next_stream_id = channel_args->args[i].value.integer; } @@ -322,7 +315,7 @@ static void destroy_transport(grpc_transport *gt) { We need to be not writing as cancellation finalization may produce some callbacks that NEED to be made to close out some streams when t->writing becomes 0. */ - while (t->channel_callback.executing || t->writing.executing) { + while (t->channel_callback.executing || t->writing_active) { gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); } drop_connection(t); @@ -378,18 +371,18 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, lock(t); if (!server_data) { - s->id = 0; - s->outgoing_window = 0; - s->incoming_window = 0; + s->global.id = 0; + s->global.outgoing_window = 0; + s->global.incoming_window = 0; } else { /* already locked */ - s->id = (gpr_uint32)(gpr_uintptr)server_data; - s->outgoing_window = - t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = - t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->global.id = (gpr_uint32)(gpr_uintptr)server_data; + s->global.outgoing_window = + t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->global.incoming_window = + t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; t->incoming_stream = s; - grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s); } s->incoming_deadline = gpr_inf_future; @@ -416,7 +409,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_lock(&t->mu); - GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->id == 0); + GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->global.id == 0); for (i = 0; i < STREAM_LIST_COUNT; i++) { stream_list_remove(t, s, i); @@ -424,7 +417,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_unlock(&t->mu); - GPR_ASSERT(s->outgoing_sopb == NULL); + GPR_ASSERT(s->global.outgoing_sopb == NULL); GPR_ASSERT(s->incoming_sopb == NULL); grpc_sopb_destroy(&s->writing.sopb); grpc_sopb_destroy(&s->callback_sopb); @@ -503,10 +496,10 @@ static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gr } static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (s->id == 0) return; + if (s->global.id == 0) return; IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing grpc_chttp2_stream %d", - t->is_client ? "CLI" : "SVR", s->id)); - if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) { + t->constants.is_client ? "CLI" : "SVR", s->global.id)); + if (grpc_chttp2_stream_map_delete(&t->stream_map, s->global.id)) { maybe_start_some_streams(t); } } @@ -525,7 +518,11 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; - grpc_chttp2_unlocking_check_writes(t); + if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->constants, &t->global, &t->writing)) { + t->writing_active = 1; + ref_transport(t); + schedule_cb(t, &t->writing_action, 1); + } unlock_check_cancellations(t); unlock_check_parser(t); unlock_check_channel_callbacks(t); @@ -555,49 +552,27 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name, value, use_value); } - if (use_value != t->settings[LOCAL_SETTINGS][id]) { - t->settings[LOCAL_SETTINGS][id] = use_value; - t->dirtied_local_settings = 1; - } -} - -static void writing_finalize_outbuf(grpc_chttp2_transport *t) { - grpc_chttp2_stream *s; - - while ((s = stream_list_remove_head(t, WRITING))) { - grpc_chttp2_encode(s->writing.sopb.ops, s->writing.sopb.nops, - s->writing.send_closed != DONT_SEND_CLOSED, s->id, - &t->writing.hpack_compressor, &t->writing.outbuf); - s->writing.sopb.nops = 0; - if (s->writing.send_closed == SEND_CLOSED_WITH_RST_STREAM) { - gpr_slice_buffer_add(&t->writing.outbuf, grpc_chttp2_rst_stream_create( - s->id, GRPC_CHTTP2_NO_ERROR)); - } - if (s->writing.send_closed != DONT_SEND_CLOSED) { - stream_list_join(t, s, WRITTEN_CLOSED); - } + if (use_value != t->global.settings[LOCAL_SETTINGS][id]) { + t->global.settings[LOCAL_SETTINGS][id] = use_value; + t->global.dirtied_local_settings = 1; } } -static void writing_finish(grpc_chttp2_transport *t, int success) { - grpc_chttp2_stream *s; +void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success) { + grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); lock(t); + if (!success) { drop_connection(t); } - while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { - s->write_state = WRITE_STATE_SENT_CLOSE; - if (!t->is_client) { - s->read_closed = 1; - } - maybe_finish_read(t, s, 0); - } - t->writing.outbuf.count = 0; - t->writing.outbuf.length = 0; + + /* cleanup writing related jazz */ + grpc_chttp2_cleanup_writing(&t->constants, &t->global, &t->writing); + /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ - t->writing.executing = 0; + t->writing_active = 0; if (t->destroying) { gpr_cv_signal(&t->cv); } @@ -606,36 +581,16 @@ static void writing_finish(grpc_chttp2_transport *t, int success) { t->ep = NULL; unref_transport(t); /* safe because we'll still have the ref for write */ } + unlock(t); unref_transport(t); } -static void writing_finish_write_cb(void *tp, grpc_endpoint_cb_status error) { - grpc_chttp2_transport *t = tp; - writing_finish(t, error == GRPC_ENDPOINT_CB_OK); -} static void writing_action(void *gt, int iomgr_success_ignored) { grpc_chttp2_transport *t = gt; - - gpr_log(GPR_DEBUG, "writing_action"); - - writing_finalize_outbuf(t); - - GPR_ASSERT(t->writing.outbuf.count > 0); - - switch (grpc_endpoint_write(t->ep, t->writing.outbuf.slices, t->writing.outbuf.count, - writing_finish_write_cb, t)) { - case GRPC_ENDPOINT_WRITE_DONE: - writing_finish(t, 1); - break; - case GRPC_ENDPOINT_WRITE_ERROR: - writing_finish(t, 0); - break; - case GRPC_ENDPOINT_WRITE_PENDING: - break; - } + grpc_chttp2_perform_writes(&t->writing, t->ep); } static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error, @@ -655,13 +610,13 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) { /* start streams where we have free grpc_chttp2_stream ids and free concurrency */ while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID && grpc_chttp2_stream_map_size(&t->stream_map) < - t->settings[PEER_SETTINGS] + t->global.settings[PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); if (!s) return; IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", - t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); + t->constants.is_client ? "CLI" : "SVR", s, t->next_stream_id)); if (t->next_stream_id == MAX_CLIENT_STREAM_ID) { add_goaway( @@ -669,14 +624,14 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) { gpr_slice_from_copied_string("Exceeded sequence number limit")); } - GPR_ASSERT(s->id == 0); - s->id = t->next_stream_id; + GPR_ASSERT(s->global.id == 0); + s->global.id = t->next_stream_id; t->next_stream_id += 2; - s->outgoing_window = - t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = - t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + s->global.outgoing_window = + t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->global.incoming_window = + t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s); stream_list_join(t, s, WRITABLE); } /* cancel out streams that will never be started */ @@ -700,20 +655,20 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g } if (op->send_ops) { - GPR_ASSERT(s->outgoing_sopb == NULL); + GPR_ASSERT(s->global.outgoing_sopb == NULL); s->global.send_done_closure = op->on_done_send; if (!s->cancelled) { - s->outgoing_sopb = op->send_ops; - if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) { - s->write_state = WRITE_STATE_QUEUED_CLOSE; + s->global.outgoing_sopb = op->send_ops; + if (op->is_last_send && s->global.write_state == WRITE_STATE_OPEN) { + s->global.write_state = WRITE_STATE_QUEUED_CLOSE; } - if (s->id == 0) { + if (s->global.id == 0) { IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", - t->is_client ? "CLI" : "SVR", s)); + t->constants.is_client ? "CLI" : "SVR", s)); stream_list_join(t, s, WAITING_FOR_CONCURRENCY); maybe_start_some_streams(t); - } else if (s->outgoing_window > 0) { + } else if (s->global.outgoing_window > 0) { stream_list_join(t, s, WRITABLE); } } else { @@ -787,26 +742,15 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), static void unlock_check_cancellations(grpc_chttp2_transport *t) { grpc_chttp2_stream *s; - if (t->writing.executing) { + if (t->writing_active) { return; } while ((s = stream_list_remove_head(t, CANCELLED))) { - s->read_closed = 1; - s->write_state = WRITE_STATE_SENT_CLOSE; - maybe_finish_read(t, s, 0); - } -} - -static void add_incoming_metadata(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_mdelem *elem) { - if (s->incoming_metadata_capacity == s->incoming_metadata_count) { - s->incoming_metadata_capacity = - GPR_MAX(8, 2 * s->incoming_metadata_capacity); - s->incoming_metadata = - gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) * - s->incoming_metadata_capacity); + s->global.read_closed = 1; + s->global.write_state = WRITE_STATE_SENT_CLOSE; + grpc_chttp2_read_write_state_changed(&t->global, &s->global); } - s->incoming_metadata[s->incoming_metadata_count++].md = elem; } static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gpr_uint32 id, @@ -819,12 +763,12 @@ static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, if (s) { /* clear out any unreported input & output: nobody cares anymore */ - had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0; + had_outgoing = s->global.outgoing_sopb && s->global.outgoing_sopb->nops != 0; if (error_code != GRPC_CHTTP2_NO_ERROR) { schedule_nuke_sopb(t, &s->parser.incoming_sopb); - if (s->outgoing_sopb) { - schedule_nuke_sopb(t, s->outgoing_sopb); - s->outgoing_sopb = NULL; + if (s->global.outgoing_sopb) { + schedule_nuke_sopb(t, s->global.outgoing_sopb); + s->global.outgoing_sopb = NULL; stream_list_remove(t, s, WRITABLE); schedule_cb(t, s->global.send_done_closure, 0); } @@ -888,7 +832,7 @@ static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_status_code local_status, grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst) { - cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, + cancel_stream_inner(t, s, s->global.id, local_status, error_code, optional_message, send_rst, 0); } @@ -923,608 +867,18 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre return; } if (s->incoming_sopb != NULL && - s->incoming_window < - t->settings[LOCAL_SETTINGS] + s->global.incoming_window < + t->global.settings[LOCAL_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] * 3 / 4) { stream_list_join(t, s, WINDOW_UPDATE); } } -static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (t->incoming_frame_size > t->incoming_window) { - gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", - t->incoming_frame_size, t->incoming_window); - return GRPC_CHTTP2_CONNECTION_ERROR; - } - - if (t->incoming_frame_size > s->incoming_window) { - gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", - t->incoming_frame_size, s->incoming_window); - return GRPC_CHTTP2_CONNECTION_ERROR; - } - - FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size); - FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size); - t->incoming_window -= t->incoming_frame_size; - s->incoming_window -= t->incoming_frame_size; - - /* if the grpc_chttp2_stream incoming window is getting low, schedule an update */ - stream_list_join(t, s, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE); - - return GRPC_CHTTP2_PARSE_OK; -} - static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) { return grpc_chttp2_stream_map_find(&t->stream_map, id); } -static grpc_chttp2_parse_error skip_parser(void *parser, - grpc_chttp2_parse_state *st, - gpr_slice slice, int is_last) { - return GRPC_CHTTP2_PARSE_OK; -} - -static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } - -static int parsing_init_skip_frame(grpc_chttp2_transport *t, int is_header) { - if (is_header) { - int is_eoh = t->expect_continuation_stream_id != 0; - t->parser = grpc_chttp2_header_parser_parse; - t->parser_data = &t->parsing.hpack_parser; - t->parsing.hpack_parser.on_header = skip_header; - t->parsing.hpack_parser.on_header_user_data = NULL; - t->parsing.hpack_parser.is_boundary = is_eoh; - t->parsing.hpack_parser.is_eof = is_eoh ? t->header_eof : 0; - } else { - t->parser = skip_parser; - } - return 1; -} - -static void parsing_become_skip_parser(grpc_chttp2_transport *t) { - parsing_init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse); -} - -static int parsing_init_data_frame_parser(grpc_chttp2_transport *t) { - grpc_chttp2_stream *s = lookup_stream(t, t->incoming_stream_id); - grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; - if (!s || s->read_closed) return parsing_init_skip_frame(t, 0); - if (err == GRPC_CHTTP2_PARSE_OK) { - err = update_incoming_window(t, s); - } - if (err == GRPC_CHTTP2_PARSE_OK) { - err = grpc_chttp2_data_parser_begin_frame(&s->parser, - t->incoming_frame_flags); - } - switch (err) { - case GRPC_CHTTP2_PARSE_OK: - t->incoming_stream = s; - t->parser = grpc_chttp2_data_parser_parse; - t->parser_data = &s->parser; - return 1; - case GRPC_CHTTP2_STREAM_ERROR: - cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( - GRPC_CHTTP2_INTERNAL_ERROR), - GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1); - return parsing_init_skip_frame(t, 0); - case GRPC_CHTTP2_CONNECTION_ERROR: - drop_connection(t); - return 0; - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - return 0; -} - -static void free_timeout(void *p) { gpr_free(p); } - -static void parsing_on_header(void *tp, grpc_mdelem *md) { - grpc_chttp2_transport *t = tp; - grpc_chttp2_stream *s = t->incoming_stream; - - GPR_ASSERT(s); - - IF_TRACING(gpr_log( - GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR", - grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); - - if (md->key == t->constants.str_grpc_timeout) { - gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); - if (!cached_timeout) { - /* not already parsed: parse it now, and store the result away */ - cached_timeout = gpr_malloc(sizeof(gpr_timespec)); - if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value), - cached_timeout)) { - gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", - grpc_mdstr_as_c_string(md->value)); - *cached_timeout = gpr_inf_future; - } - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); - } - s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout); - grpc_mdelem_unref(md); - } else { - add_incoming_metadata(t, s, md); - } - maybe_finish_read(t, s, 1); -} - -static int parsing_init_header_frame_parser(grpc_chttp2_transport *t, int is_continuation) { - int is_eoh = - (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; - grpc_chttp2_stream *s; - - if (is_eoh) { - t->expect_continuation_stream_id = 0; - } else { - t->expect_continuation_stream_id = t->incoming_stream_id; - } - - if (!is_continuation) { - t->header_eof = - (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; - } - - /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ - s = lookup_stream(t, t->incoming_stream_id); - if (!s) { - if (is_continuation) { - gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received"); - return parsing_init_skip_frame(t, 1); - } - if (t->is_client) { - if ((t->incoming_stream_id & 1) && - t->incoming_stream_id < t->next_stream_id) { - /* this is an old (probably cancelled) grpc_chttp2_stream */ - } else { - gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client"); - } - return parsing_init_skip_frame(t, 1); - } else if (t->last_incoming_stream_id > t->incoming_stream_id) { - gpr_log(GPR_ERROR, - "ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream " - "id=%d, new grpc_chttp2_stream id=%d", - t->last_incoming_stream_id, t->incoming_stream_id); - return parsing_init_skip_frame(t, 1); - } else if ((t->incoming_stream_id & 1) == 0) { - gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d", - t->incoming_stream_id); - return parsing_init_skip_frame(t, 1); - } - t->incoming_stream = NULL; - /* if grpc_chttp2_stream is accepted, we set incoming_stream in init_stream */ - t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base, - (void *)(gpr_uintptr)t->incoming_stream_id); - s = t->incoming_stream; - if (!s) { - gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); - return parsing_init_skip_frame(t, 1); - } - } else { - t->incoming_stream = s; - } - if (t->incoming_stream->read_closed) { - gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); - t->incoming_stream = NULL; - return parsing_init_skip_frame(t, 1); - } - t->parser = grpc_chttp2_header_parser_parse; - t->parser_data = &t->parsing.hpack_parser; - t->parsing.hpack_parser.on_header = parsing_on_header; - t->parsing.hpack_parser.on_header_user_data = t; - t->parsing.hpack_parser.is_boundary = is_eoh; - t->parsing.hpack_parser.is_eof = is_eoh ? t->header_eof : 0; - if (!is_continuation && - (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { - grpc_chttp2_hpack_parser_set_has_priority(&t->parsing.hpack_parser); - } - return 1; -} - -static int parsing_init_window_update_frame_parser(grpc_chttp2_transport *t) { - int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame( - &t->parsing.simple.window_update, - t->incoming_frame_size, - t->incoming_frame_flags); - if (!ok) { - drop_connection(t); - } - t->parser = grpc_chttp2_window_update_parser_parse; - t->parser_data = &t->parsing.simple.window_update; - return ok; -} - -static int parsing_init_ping_parser(grpc_chttp2_transport *t) { - int ok = GRPC_CHTTP2_PARSE_OK == - grpc_chttp2_ping_parser_begin_frame(&t->parsing.simple.ping, - t->incoming_frame_size, - t->incoming_frame_flags); - if (!ok) { - drop_connection(t); - } - t->parser = grpc_chttp2_ping_parser_parse; - t->parser_data = &t->parsing.simple.ping; - return ok; -} - -static int parsing_init_rst_stream_parser(grpc_chttp2_transport *t) { - int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame( - &t->parsing.simple.rst_stream, - t->incoming_frame_size, - t->incoming_frame_flags); - if (!ok) { - drop_connection(t); - } - t->parser = grpc_chttp2_rst_stream_parser_parse; - t->parser_data = &t->parsing.simple.rst_stream; - return ok; -} - -static int parsing_init_goaway_parser(grpc_chttp2_transport *t) { - int ok = - GRPC_CHTTP2_PARSE_OK == - grpc_chttp2_goaway_parser_begin_frame( - &t->parsing.goaway_parser, t->incoming_frame_size, t->incoming_frame_flags); - if (!ok) { - drop_connection(t); - } - t->parser = grpc_chttp2_goaway_parser_parse; - t->parser_data = &t->parsing.goaway_parser; - return ok; -} - -static int parsing_init_settings_frame_parser(grpc_chttp2_transport *t) { - int ok; - - if (t->incoming_stream_id != 0) { - gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", t->incoming_stream_id); - drop_connection(t); - return 0; - } - - ok = GRPC_CHTTP2_PARSE_OK == - grpc_chttp2_settings_parser_begin_frame( - &t->parsing.simple.settings, t->incoming_frame_size, - t->incoming_frame_flags, t->settings[PEER_SETTINGS]); - if (!ok) { - drop_connection(t); - return 0; - } - if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { - memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS], - GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); - } - t->parser = grpc_chttp2_settings_parser_parse; - t->parser_data = &t->parsing.simple.settings; - return ok; -} - -static int init_frame_parser(grpc_chttp2_transport *t) { - if (t->expect_continuation_stream_id != 0) { - if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) { - gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x", - t->incoming_frame_type); - return 0; - } - if (t->expect_continuation_stream_id != t->incoming_stream_id) { - gpr_log(GPR_ERROR, - "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x", - t->expect_continuation_stream_id, t->incoming_stream_id); - return 0; - } - return parsing_init_header_frame_parser(t, 1); - } - switch (t->incoming_frame_type) { - case GRPC_CHTTP2_FRAME_DATA: - return parsing_init_data_frame_parser(t); - case GRPC_CHTTP2_FRAME_HEADER: - return parsing_init_header_frame_parser(t, 0); - case GRPC_CHTTP2_FRAME_CONTINUATION: - gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); - return 0; - case GRPC_CHTTP2_FRAME_RST_STREAM: - return parsing_init_rst_stream_parser(t); - case GRPC_CHTTP2_FRAME_SETTINGS: - return parsing_init_settings_frame_parser(t); - case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: - return parsing_init_window_update_frame_parser(t); - case GRPC_CHTTP2_FRAME_PING: - return parsing_init_ping_parser(t); - case GRPC_CHTTP2_FRAME_GOAWAY: - return parsing_init_goaway_parser(t); - default: - gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type); - return parsing_init_skip_frame(t, 0); - } -} - -/* -static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { - return window + window_update < MAX_WINDOW; -} -*/ - -static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - grpc_metadata_batch b; - - b.list.head = NULL; - /* Store away the last element of the list, so that in patch_metadata_ops - we can reconstitute the list. - We can't do list building here as later incoming metadata may reallocate - the underlying array. */ - b.list.tail = (void *)(gpr_intptr)s->incoming_metadata_count; - b.garbage.head = b.garbage.tail = NULL; - b.deadline = s->incoming_deadline; - s->incoming_deadline = gpr_inf_future; - - grpc_sopb_add_metadata(&s->parser.incoming_sopb, b); -} - -static int parse_frame_slice(grpc_chttp2_transport *t, gpr_slice slice, int is_last) { - grpc_chttp2_parse_state st; - size_t i; - memset(&st, 0, sizeof(st)); - switch (t->parser(t->parser_data, &st, slice, is_last)) { - case GRPC_CHTTP2_PARSE_OK: - if (st.end_of_stream) { - t->incoming_stream->read_closed = 1; - maybe_finish_read(t, t->incoming_stream, 1); - } - if (st.need_flush_reads) { - maybe_finish_read(t, t->incoming_stream, 1); - } - if (st.metadata_boundary) { - add_metadata_batch(t, t->incoming_stream); - maybe_finish_read(t, t->incoming_stream, 1); - } - if (st.ack_settings) { - gpr_slice_buffer_add(&t->parsing.qbuf, grpc_chttp2_settings_ack_create()); - } - if (st.send_ping_ack) { - gpr_slice_buffer_add( - &t->parsing.qbuf, - grpc_chttp2_ping_create(1, t->parsing.simple.ping.opaque_8bytes)); - } - if (st.goaway) { - add_goaway(t, st.goaway_error, st.goaway_text); - } - if (st.rst_stream) { - cancel_stream_id( - t, t->incoming_stream_id, - grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason), - st.rst_stream_reason, 0); - } - if (st.process_ping_reply) { - for (i = 0; i < t->ping_count; i++) { - if (0 == - memcmp(t->pings[i].id, t->parsing.simple.ping.opaque_8bytes, 8)) { - t->pings[i].cb(t->pings[i].user_data); - memmove(&t->pings[i], &t->pings[i + 1], - (t->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping)); - t->ping_count--; - break; - } - } - } - if (st.initial_window_update) { - for (i = 0; i < t->stream_map.count; i++) { - grpc_chttp2_stream *s = (grpc_chttp2_stream *)(t->stream_map.values[i]); - s->outgoing_window_update += st.initial_window_update; - stream_list_join(t, s, NEW_OUTGOING_WINDOW); - } - } - if (st.window_update) { - if (t->incoming_stream_id) { - /* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */ - grpc_chttp2_stream *s = lookup_stream(t, t->incoming_stream_id); - if (s) { - s->outgoing_window_update += st.window_update; - stream_list_join(t, s, NEW_OUTGOING_WINDOW); - } - } else { - /* grpc_chttp2_transport level window update */ - t->outgoing_window_update += st.window_update; - } - } - return 1; - case GRPC_CHTTP2_STREAM_ERROR: - parsing_become_skip_parser(t); - cancel_stream_id( - t, t->incoming_stream_id, - grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR), - GRPC_CHTTP2_INTERNAL_ERROR, 1); - return 1; - case GRPC_CHTTP2_CONNECTION_ERROR: - drop_connection(t); - return 0; - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - return 0; -} - -static int process_read(grpc_chttp2_transport *t, gpr_slice slice) { - gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); - gpr_uint8 *end = GPR_SLICE_END_PTR(slice); - gpr_uint8 *cur = beg; - - if (cur == end) return 1; - - switch (t->deframe_state) { - case DTS_CLIENT_PREFIX_0: - case DTS_CLIENT_PREFIX_1: - case DTS_CLIENT_PREFIX_2: - case DTS_CLIENT_PREFIX_3: - case DTS_CLIENT_PREFIX_4: - case DTS_CLIENT_PREFIX_5: - case DTS_CLIENT_PREFIX_6: - case DTS_CLIENT_PREFIX_7: - case DTS_CLIENT_PREFIX_8: - case DTS_CLIENT_PREFIX_9: - case DTS_CLIENT_PREFIX_10: - case DTS_CLIENT_PREFIX_11: - case DTS_CLIENT_PREFIX_12: - case DTS_CLIENT_PREFIX_13: - case DTS_CLIENT_PREFIX_14: - case DTS_CLIENT_PREFIX_15: - case DTS_CLIENT_PREFIX_16: - case DTS_CLIENT_PREFIX_17: - case DTS_CLIENT_PREFIX_18: - case DTS_CLIENT_PREFIX_19: - case DTS_CLIENT_PREFIX_20: - case DTS_CLIENT_PREFIX_21: - case DTS_CLIENT_PREFIX_22: - case DTS_CLIENT_PREFIX_23: - while (cur != end && t->deframe_state != DTS_FH_0) { - if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) { - gpr_log(GPR_ERROR, - "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " - "at byte %d", - CLIENT_CONNECT_STRING[t->deframe_state], - (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, - (int)*cur, t->deframe_state); - drop_connection(t); - return 0; - } - ++cur; - ++t->deframe_state; - } - if (cur == end) { - return 1; - } - /* fallthrough */ - dts_fh_0: - case DTS_FH_0: - GPR_ASSERT(cur < end); - t->incoming_frame_size = ((gpr_uint32)*cur) << 16; - if (++cur == end) { - t->deframe_state = DTS_FH_1; - return 1; - } - /* fallthrough */ - case DTS_FH_1: - GPR_ASSERT(cur < end); - t->incoming_frame_size |= ((gpr_uint32)*cur) << 8; - if (++cur == end) { - t->deframe_state = DTS_FH_2; - return 1; - } - /* fallthrough */ - case DTS_FH_2: - GPR_ASSERT(cur < end); - t->incoming_frame_size |= *cur; - if (++cur == end) { - t->deframe_state = DTS_FH_3; - return 1; - } - /* fallthrough */ - case DTS_FH_3: - GPR_ASSERT(cur < end); - t->incoming_frame_type = *cur; - if (++cur == end) { - t->deframe_state = DTS_FH_4; - return 1; - } - /* fallthrough */ - case DTS_FH_4: - GPR_ASSERT(cur < end); - t->incoming_frame_flags = *cur; - if (++cur == end) { - t->deframe_state = DTS_FH_5; - return 1; - } - /* fallthrough */ - case DTS_FH_5: - GPR_ASSERT(cur < end); - t->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24; - if (++cur == end) { - t->deframe_state = DTS_FH_6; - return 1; - } - /* fallthrough */ - case DTS_FH_6: - GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32)*cur) << 16; - if (++cur == end) { - t->deframe_state = DTS_FH_7; - return 1; - } - /* fallthrough */ - case DTS_FH_7: - GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32)*cur) << 8; - if (++cur == end) { - t->deframe_state = DTS_FH_8; - return 1; - } - /* fallthrough */ - case DTS_FH_8: - GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32)*cur); - t->deframe_state = DTS_FRAME; - if (!init_frame_parser(t)) { - return 0; - } - /* t->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when - sending GOAWAY frame. - https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8 - says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So, - since we don't have server pushed streams, client should send - GOAWAY last-grpc_chttp2_stream-id=0 in this case. */ - if (!t->is_client) { - t->last_incoming_stream_id = t->incoming_stream_id; - } - if (t->incoming_frame_size == 0) { - if (!parse_frame_slice(t, gpr_empty_slice(), 1)) { - return 0; - } - if (++cur == end) { - t->deframe_state = DTS_FH_0; - return 1; - } - goto dts_fh_0; /* loop */ - } - if (++cur == end) { - return 1; - } - /* fallthrough */ - case DTS_FRAME: - GPR_ASSERT(cur < end); - if ((gpr_uint32)(end - cur) == t->incoming_frame_size) { - if (!parse_frame_slice( - t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { - return 0; - } - t->deframe_state = DTS_FH_0; - return 1; - } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) { - if (!parse_frame_slice( - t, gpr_slice_sub_no_ref(slice, cur - beg, - cur + t->incoming_frame_size - beg), - 1)) { - return 0; - } - cur += t->incoming_frame_size; - goto dts_fh_0; /* loop */ - } else { - if (!parse_frame_slice( - t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) { - return 0; - } - t->incoming_frame_size -= (end - cur); - return 1; - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - } - - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - - return 0; -} - /* tcp read callback */ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { @@ -1554,9 +908,12 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, if (t->error_state == ERROR_STATE_NONE) { t->parsing.executing = 1; gpr_mu_unlock(&t->mu); - for (i = 0; i < nslices && process_read(t, slices[i]); i++) + for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++) ; gpr_mu_lock(&t->mu); + if (i != nslices) { + drop_connection(t); + } t->parsing.executing = 0; } while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) { @@ -1569,19 +926,19 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, maybe_join_window_updates(t, s); } while ((s = stream_list_remove_head(t, NEW_OUTGOING_WINDOW))) { - int was_window_empty = s->outgoing_window <= 0; - FLOWCTL_TRACE(t, s, outgoing, s->id, s->outgoing_window_update); - s->outgoing_window += s->outgoing_window_update; - s->outgoing_window_update = 0; + int was_window_empty = s->global.outgoing_window <= 0; + FLOWCTL_TRACE(t, s, outgoing, s->global.id, s->global.outgoing_window_update); + s->global.outgoing_window += s->global.outgoing_window_update; + s->global.outgoing_window_update = 0; /* if this window update makes outgoing ops writable again, flag that */ - if (was_window_empty && s->outgoing_sopb && - s->outgoing_sopb->nops > 0) { + if (was_window_empty && s->global.outgoing_sopb && + s->global.outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } - t->outgoing_window += t->outgoing_window_update; - t->outgoing_window_update = 0; + t->global.outgoing_window += t->global.outgoing_window_update; + t->global.outgoing_window_update = 0; maybe_start_some_streams(t); unlock(t); keep_reading = 1; |