diff options
Diffstat (limited to 'src/core/transport/chttp2/internal.h')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 346 |
1 files changed, 232 insertions, 114 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index b35f8b5d88..2d0cb4abdb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H #define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H +#include <assert.h> + #include "src/core/iomgr/endpoint.h" #include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" @@ -42,9 +44,9 @@ #include "src/core/transport/chttp2/frame_rst_stream.h" #include "src/core/transport/chttp2/frame_settings.h" #include "src/core/transport/chttp2/frame_window_update.h" +#include "src/core/transport/chttp2/hpack_encoder.h" #include "src/core/transport/chttp2/hpack_parser.h" #include "src/core/transport/chttp2/incoming_metadata.h" -#include "src/core/transport/chttp2/stream_encoder.h" #include "src/core/transport/chttp2/stream_map.h" #include "src/core/transport/connectivity_state.h" #include "src/core/transport/transport_impl.h" @@ -56,14 +58,14 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream; happen to them... this enum labels each list */ typedef enum { GRPC_CHTTP2_LIST_ALL_STREAMS, - GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED, + GRPC_CHTTP2_LIST_CHECK_READ_OPS, + GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE, GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITTEN, GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, - GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING, - GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED, + GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, /** streams that are waiting to start because there are too many concurrent streams on the connection */ GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, @@ -113,22 +115,6 @@ typedef enum { GRPC_DTS_FRAME } grpc_chttp2_deframe_transport_state; -typedef enum { - GRPC_WRITE_STATE_OPEN, - GRPC_WRITE_STATE_QUEUED_CLOSE, - GRPC_WRITE_STATE_SENT_CLOSE -} grpc_chttp2_write_state; - -/* flags that can be or'd into stream_global::writing_now */ -#define GRPC_CHTTP2_WRITING_DATA 1 -#define GRPC_CHTTP2_WRITING_WINDOW 2 - -typedef enum { - GRPC_DONT_SEND_CLOSED = 0, - GRPC_SEND_CLOSED, - GRPC_SEND_CLOSED_WITH_RST_STREAM -} grpc_chttp2_send_closed; - typedef struct { grpc_chttp2_stream *head; grpc_chttp2_stream *tail; @@ -160,14 +146,28 @@ typedef struct grpc_chttp2_outstanding_ping { struct grpc_chttp2_outstanding_ping *prev; } grpc_chttp2_outstanding_ping; +/* forward declared in frame_data.h */ +struct grpc_chttp2_incoming_byte_stream { + grpc_byte_stream base; + gpr_refcount refs; + struct grpc_chttp2_incoming_byte_stream *next_message; + + grpc_chttp2_transport *transport; + grpc_chttp2_stream *stream; + int is_tail; + gpr_slice_buffer slices; + grpc_closure *on_next; + gpr_slice *next; +}; + typedef struct { /** data to write next write */ gpr_slice_buffer qbuf; /** window available for us to send to peer */ gpr_int64 outgoing_window; - /** window available for peer to send to us - updated after parse */ - gpr_uint32 incoming_window; + /** window available to announce to peer */ + gpr_int64 announce_incoming_window; /** how much window would we like to have for incoming_window */ gpr_uint32 connection_window_target; @@ -209,6 +209,7 @@ typedef struct { gpr_slice_buffer outbuf; /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; + gpr_int64 outgoing_window; /** is this a client? */ gpr_uint8 is_client; /** callback for when writing is done */ @@ -233,6 +234,7 @@ struct grpc_chttp2_transport_parsing { gpr_slice_buffer qbuf; /* metadata object cache */ grpc_mdstr *str_grpc_timeout; + grpc_mdelem *elem_grpc_status_ok; /** parser for headers */ grpc_chttp2_hpack_parser hpack_parser; /** simple one shot parsers */ @@ -246,8 +248,7 @@ struct grpc_chttp2_transport_parsing { grpc_chttp2_goaway_parser goaway_parser; /** window available for peer to send to us */ - gpr_uint32 incoming_window; - gpr_uint32 incoming_window_delta; + gpr_int64 incoming_window; /** next stream id available at the time of beginning parsing */ gpr_uint32 next_stream_id; @@ -278,7 +279,7 @@ struct grpc_chttp2_transport_parsing { gpr_uint32 goaway_last_stream_index; gpr_slice goaway_text; - gpr_int64 outgoing_window_update; + gpr_int64 outgoing_window; /** pings awaiting responses */ grpc_chttp2_outstanding_ping pings; @@ -345,8 +346,8 @@ struct grpc_chttp2_transport { struct { /* accept stream callback */ - void (*accept_stream)(void *user_data, grpc_transport *transport, - const void *server_data); + void (*accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, const void *server_data); void *accept_stream_user_data; /** connectivity tracking */ @@ -358,9 +359,6 @@ typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - grpc_closure *send_done_closure; - grpc_closure *recv_done_closure; - /** window available for us to send to peer */ gpr_int64 outgoing_window; /** The number of bytes the upper layers have offered to receive. @@ -371,54 +369,66 @@ typedef struct { not yet announced to HTTP2 flow control. As the upper layers offer to read more bytes, this value increases. As we advertise incoming flow control window, this value decreases. */ - gpr_uint32 unannounced_incoming_window; - /** The number of bytes of HTTP2 flow control we have advertised. - As we advertise incoming flow control window, this value increases. - As bytes are read, this value decreases. - Updated after parse. */ - gpr_uint32 incoming_window; - /** stream ops the transport user would like to send */ - grpc_stream_op_buffer *outgoing_sopb; + gpr_uint32 unannounced_incoming_window_for_parse; + gpr_uint32 unannounced_incoming_window_for_writing; + /** things the upper layers would like to send */ + grpc_metadata_batch *send_initial_metadata; + grpc_closure *send_initial_metadata_finished; + grpc_byte_stream *send_message; + grpc_closure *send_message_finished; + grpc_metadata_batch *send_trailing_metadata; + grpc_closure *send_trailing_metadata_finished; + + grpc_metadata_batch *recv_initial_metadata; + grpc_closure *recv_initial_metadata_finished; + grpc_byte_stream **recv_message; + grpc_closure *recv_message_ready; + grpc_metadata_batch *recv_trailing_metadata; + grpc_closure *recv_trailing_metadata_finished; + /** when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - grpc_chttp2_write_state write_state; - /** is this stream closed (boolean) */ + gpr_uint8 write_closed; + /** is this stream reading half-closed (boolean) */ gpr_uint8 read_closed; - /** has this stream been cancelled? (boolean) */ - gpr_uint8 cancelled; - grpc_status_code cancelled_status; - /** have we told the upper layer that this stream is cancelled? */ - gpr_uint8 published_cancelled; + /** is this stream finished closing (and reportably closed) */ + gpr_uint8 finished_close; /** is this stream in the stream map? (boolean) */ gpr_uint8 in_stream_map; - /** bitmask of GRPC_CHTTP2_WRITING_xxx above */ - gpr_uint8 writing_now; - /** has anything been written to this stream? */ - gpr_uint8 written_anything; - - /** stream state already published to the upper layer */ - grpc_stream_state published_state; - /** address to publish next stream state to */ - grpc_stream_state *publish_state; - /** pointer to sop buffer to fill in with new stream ops */ - grpc_stream_op_buffer *publish_sopb; - grpc_stream_op_buffer incoming_sopb; + /** has this stream seen an error? if 1, then pending incoming frames + can be thrown away */ + gpr_uint8 seen_error; - /** incoming metadata */ - grpc_chttp2_incoming_metadata_buffer incoming_metadata; - grpc_chttp2_incoming_metadata_live_op_buffer outstanding_metadata; + gpr_uint8 published_initial_metadata; + gpr_uint8 published_trailing_metadata; + gpr_uint8 faked_trailing_metadata; + + grpc_chttp2_incoming_metadata_buffer received_initial_metadata; + grpc_chttp2_incoming_metadata_buffer received_trailing_metadata; + + grpc_chttp2_incoming_frame_queue incoming_frames; } grpc_chttp2_stream_global; typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - /** sops that have passed flow control to be written */ - grpc_stream_op_buffer sopb; - /** how strongly should we indicate closure with the next write */ - grpc_chttp2_send_closed send_closed; + gpr_uint8 fetching; + gpr_uint8 sent_initial_metadata; + gpr_uint8 sent_message; + gpr_uint8 sent_trailing_metadata; + gpr_uint8 read_closed; + /** send this initial metadata */ + grpc_metadata_batch *send_initial_metadata; + grpc_byte_stream *send_message; + grpc_metadata_batch *send_trailing_metadata; + gpr_int64 outgoing_window; /** how much window should we announce? */ gpr_uint32 announce_window; + gpr_slice_buffer flow_controlled_buffer; + gpr_slice fetching_slice; + size_t stream_fetched; + grpc_closure finished_fetch; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -428,22 +438,29 @@ struct grpc_chttp2_stream_parsing { gpr_uint8 received_close; /** saw a rst_stream */ gpr_uint8 saw_rst_stream; - /** incoming_window has been reduced by this much during parsing */ - gpr_uint32 incoming_window_delta; + /** how many header frames have we received? */ + gpr_uint8 header_frames_received; + /** which metadata did we get (on this parse) */ + gpr_uint8 got_metadata_on_parse[2]; + /** should we raise the seen_error flag in transport_global */ + gpr_uint8 seen_error; /** window available for peer to send to us */ - gpr_uint32 incoming_window; + gpr_int64 incoming_window; /** parsing state for data frames */ grpc_chttp2_data_parser data_parser; /** reason give to rst_stream */ gpr_uint32 rst_stream_reason; - /* amount of window given */ - gpr_uint64 outgoing_window_update; + /** amount of window given */ + gpr_int64 outgoing_window; + /** number of bytes received - reset at end of parse thread execution */ + gpr_int64 received_bytes; /** incoming metadata */ - grpc_chttp2_incoming_metadata_buffer incoming_metadata; + grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; }; struct grpc_chttp2_stream { + grpc_stream_refcount *refcount; grpc_chttp2_stream_global global; grpc_chttp2_stream_writing writing; grpc_chttp2_stream_parsing parsing; @@ -504,21 +521,10 @@ void grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -void grpc_chttp2_list_add_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_parsing **stream_parsing); -void grpc_chttp2_list_remove_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); - -void grpc_chttp2_list_add_writing_stream( +/* returns 1 if stream added, 0 if it was already present */ +int grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_writing *stream_writing); + grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT; int grpc_chttp2_list_have_writing_streams( grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_list_pop_writing_stream( @@ -550,31 +556,44 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -void grpc_chttp2_list_add_closed_waiting_for_parsing( +void grpc_chttp2_list_add_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_closed_waiting_for_parsing( +int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -void grpc_chttp2_list_add_cancelled_waiting_for_writing( +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_pop_stalled_by_transport( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global); + +void grpc_chttp2_list_add_unannounced_incoming_window_available( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_cancelled_waiting_for_writing( +void grpc_chttp2_list_remove_unannounced_incoming_window_available( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global); + grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_unannounced_incoming_window_available( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_parsing **stream_parsing); -void grpc_chttp2_list_add_read_write_state_changed( +void grpc_chttp2_list_add_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_read_write_state_changed( +int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( - grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + gpr_uint32 id); void grpc_chttp2_add_incoming_goaway( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -592,7 +611,10 @@ void grpc_chttp2_for_all_streams( grpc_chttp2_stream_global *stream_global)); void grpc_chttp2_parsing_become_skip_parser( - grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + +void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_closure **pclosure, int success); #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ @@ -607,26 +629,122 @@ extern int grpc_flowctl_trace; else \ stmt -#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ - delta) \ - if (!(grpc_flowctl_trace)) { \ - } else { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ - transport->is_client, context->id, \ - (gpr_int64)(context->var), (gpr_int64)(delta)); \ - } - -#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \ - if (!(grpc_flowctl_trace)) { \ - } else { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ - context->is_client, 0, \ - (gpr_int64)(context->var), (gpr_int64)(delta)); \ - } - -void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, - const char *context, const char *var, - int is_client, gpr_uint32 stream_id, - gpr_int64 current_value, gpr_int64 delta); +typedef enum { + GRPC_CHTTP2_FLOWCTL_MOVE, + GRPC_CHTTP2_FLOWCTL_CREDIT, + GRPC_CHTTP2_FLOWCTL_DEBIT +} grpc_chttp2_flowctl_op; + +#define GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, id1, id2, dst_context, \ + dst_var, src_context, src_var) \ + do { \ + assert(id1 == id2); \ + if (grpc_flowctl_trace) { \ + grpc_chttp2_flowctl_trace( \ + __FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \ + #dst_var, #src_context, #src_var, transport->is_client, id1, \ + dst_context->dst_var, src_context->src_var); \ + } \ + dst_context->dst_var += src_context->src_var; \ + src_context->src_var = 0; \ + } while (0) + +#define GRPC_CHTTP2_FLOW_MOVE_STREAM(phase, transport, dst_context, dst_var, \ + src_context, src_var) \ + GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, dst_context->id, \ + src_context->id, dst_context, dst_var, \ + src_context, src_var) +#define GRPC_CHTTP2_FLOW_MOVE_TRANSPORT(phase, dst_context, dst_var, \ + src_context, src_var) \ + GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, dst_context, 0, 0, dst_context, dst_var, \ + src_context, src_var) + +#define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \ + dst_var, amount) \ + do { \ + if (grpc_flowctl_trace) { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ + GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \ + #dst_var, NULL, #amount, transport->is_client, \ + id, dst_context->dst_var, amount); \ + } \ + dst_context->dst_var += amount; \ + } while (0) + +#define GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, dst_var, \ + amount) \ + GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, dst_context->id, \ + dst_context, dst_var, amount) +#define GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(phase, dst_context, dst_var, amount) \ + GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ + amount) + +#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ + dst_var, amount) \ + do { \ + if (grpc_flowctl_trace) { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ + GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \ + #dst_var, NULL, #amount, transport->is_client, \ + id, dst_context->dst_var, amount); \ + } \ + dst_context->dst_var -= amount; \ + } while (0) + +#define GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, dst_var, \ + amount) \ + GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, dst_context->id, \ + dst_context, dst_var, amount) +#define GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT(phase, dst_context, dst_var, amount) \ + GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ + amount) + +void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, + grpc_chttp2_flowctl_op op, const char *context1, + const char *var1, const char *context2, + const char *var2, int is_client, + gpr_uint32 stream_id, gpr_int64 val1, + gpr_int64 val2); + +void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream, + grpc_status_code status, gpr_slice *details); +void grpc_chttp2_mark_stream_closed( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, int close_reads, + int close_writes); +void grpc_chttp2_start_writing(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global); + +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \ + grpc_chttp2_stream_ref(stream_global, reason) +#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \ + grpc_chttp2_stream_unref(exec_ctx, stream_global, reason) +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global, + const char *reason); +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, + const char *reason); +#else +#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \ + grpc_chttp2_stream_ref(stream_global) +#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \ + grpc_chttp2_stream_unref(exec_ctx, stream_global) +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global); +#endif + +grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, + gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue); +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice); +void grpc_chttp2_incoming_byte_stream_finished( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs); #endif |