aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r--src/core/transport/chttp2/frame.h6
-rw-r--r--src/core/transport/chttp2/frame_data.c10
-rw-r--r--src/core/transport/chttp2/frame_data.h2
-rw-r--r--src/core/transport/chttp2/frame_goaway.c12
-rw-r--r--src/core/transport/chttp2/frame_goaway.h2
-rw-r--r--src/core/transport/chttp2/frame_ping.h2
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.h2
-rw-r--r--src/core/transport/chttp2/frame_settings.h2
-rw-r--r--src/core/transport/chttp2/frame_window_update.h2
-rw-r--r--src/core/transport/chttp2/hpack_parser.h2
-rw-r--r--src/core/transport/chttp2/internal.h293
-rw-r--r--src/core/transport/chttp2/parsing.c607
-rw-r--r--src/core/transport/chttp2/writing.c174
13 files changed, 941 insertions, 175 deletions
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h
index c9e3e13042..9012bfa1e1 100644
--- a/src/core/transport/chttp2/frame.h
+++ b/src/core/transport/chttp2/frame.h
@@ -45,6 +45,7 @@ typedef enum {
GRPC_CHTTP2_CONNECTION_ERROR
} grpc_chttp2_parse_error;
+#if 0
typedef struct {
gpr_uint8 end_of_stream;
gpr_uint8 need_flush_reads;
@@ -62,6 +63,11 @@ typedef struct {
gpr_slice goaway_text;
gpr_uint32 rst_stream_reason;
} grpc_chttp2_parse_state;
+#endif
+
+/* defined in internal.h */
+typedef struct grpc_chttp2_stream_parsing grpc_chttp2_stream_parsing;
+typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing;
#define GRPC_CHTTP2_FRAME_DATA 0
#define GRPC_CHTTP2_FRAME_HEADER 1
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index a1ae9ed2e6..129d211043 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -35,6 +35,7 @@
#include <string.h>
+#include "src/core/transport/chttp2/internal.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -69,7 +70,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
@@ -77,8 +78,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_data_parser *p = parser;
if (is_last && p->is_last_frame) {
- state->end_of_stream = 1;
- state->need_flush_reads = 1;
+ stream_parsing->received_close = 1;
}
if (cur == end) {
@@ -129,27 +129,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((gpr_uint32) * cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
- state->need_flush_reads = 1;
grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, 0);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) == p->frame_size) {
- state->need_flush_reads = 1;
grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg));
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
- state->need_flush_reads = 1;
grpc_sopb_add_slice(
&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, cur + p->frame_size - beg));
cur += p->frame_size;
goto fh_0; /* loop */
} else {
- state->need_flush_reads = 1;
grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg));
p->frame_size -= (end - cur);
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index 24e557accd..dbbb87fc01 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -72,7 +72,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
/* create a slice with an empty data frame and is_last set */
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id);
diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c
index 95b75d4fde..d7d6c587e6 100644
--- a/src/core/transport/chttp2/frame_goaway.c
+++ b/src/core/transport/chttp2/frame_goaway.c
@@ -32,6 +32,7 @@
*/
#include "src/core/transport/chttp2/frame_goaway.h"
+#include "src/core/transport/chttp2/internal.h"
#include <string.h>
@@ -62,7 +63,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
@@ -139,10 +140,11 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
p->debug_pos += end - cur;
p->state = GRPC_CHTTP2_GOAWAY_DEBUG;
if (is_last) {
- state->goaway = 1;
- state->goaway_last_stream_index = p->last_stream_id;
- state->goaway_error = p->error_code;
- state->goaway_text =
+ transport_parsing->goaway_received = 1;
+ transport_parsing->goaway_last_stream_index = p->last_stream_id;
+ gpr_slice_unref(transport_parsing->goaway_text);
+ transport_parsing->goaway_error = p->error_code;
+ transport_parsing->goaway_text =
gpr_slice_new(p->debug_data, p->debug_length, gpr_free);
p->debug_data = NULL;
}
diff --git a/src/core/transport/chttp2/frame_goaway.h b/src/core/transport/chttp2/frame_goaway.h
index 7638891514..8148fa90f2 100644
--- a/src/core/transport/chttp2/frame_goaway.h
+++ b/src/core/transport/chttp2/frame_goaway.h
@@ -65,7 +65,7 @@ void grpc_chttp2_goaway_parser_destroy(grpc_chttp2_goaway_parser *p);
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
grpc_chttp2_goaway_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code,
gpr_slice debug_data,
diff --git a/src/core/transport/chttp2/frame_ping.h b/src/core/transport/chttp2/frame_ping.h
index 11d38b80ea..71f8351223 100644
--- a/src/core/transport/chttp2/frame_ping.h
+++ b/src/core/transport/chttp2/frame_ping.h
@@ -48,6 +48,6 @@ gpr_slice grpc_chttp2_ping_create(gpr_uint8 ack, gpr_uint8 *opaque_8bytes);
grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
grpc_chttp2_ping_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */
diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h
index 07a3c98d03..b83d1261d0 100644
--- a/src/core/transport/chttp2/frame_rst_stream.h
+++ b/src/core/transport/chttp2/frame_rst_stream.h
@@ -47,6 +47,6 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */
diff --git a/src/core/transport/chttp2/frame_settings.h b/src/core/transport/chttp2/frame_settings.h
index 18765631a6..701f2b94d2 100644
--- a/src/core/transport/chttp2/frame_settings.h
+++ b/src/core/transport/chttp2/frame_settings.h
@@ -94,6 +94,6 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
grpc_chttp2_settings_parser *parser, gpr_uint32 length, gpr_uint8 flags,
gpr_uint32 *settings);
grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */
diff --git a/src/core/transport/chttp2/frame_window_update.h b/src/core/transport/chttp2/frame_window_update.h
index 85475a8f9e..7217325beb 100644
--- a/src/core/transport/chttp2/frame_window_update.h
+++ b/src/core/transport/chttp2/frame_window_update.h
@@ -50,6 +50,6 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_window_update_parser *parser, gpr_uint32 length,
gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */
diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h
index bfc06b3980..507d7cfea0 100644
--- a/src/core/transport/chttp2/hpack_parser.h
+++ b/src/core/transport/chttp2/hpack_parser.h
@@ -107,7 +107,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
/* wraps grpc_chttp2_hpack_parser_parse to provide a frame level parser for
the transport */
grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
- void *hpack_parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a21a7a4d75..5eba01a3e1 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -36,6 +36,7 @@
#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
+#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h"
#include "src/core/transport/chttp2/frame_ping.h"
@@ -172,16 +173,110 @@ typedef struct {
gpr_slice debug;
} grpc_chttp2_pending_goaway;
+typedef struct {
+ /** data to write next write */
+ gpr_slice_buffer qbuf;
+ /** queued callbacks */
+ grpc_iomgr_closure *pending_closures;
+
+ /** window available for us to send to peer */
+ gpr_uint32 outgoing_window;
+ /** how much window would we like to have for incoming_window */
+ gpr_uint32 connection_window_target;
+
+
+ /** are the local settings dirty and need to be sent? */
+ gpr_uint8 dirtied_local_settings;
+ /** have local settings been sent? */
+ gpr_uint8 sent_local_settings;
+ /** bitmask of setting indexes to send out */
+ gpr_uint32 force_send_settings;
+ /** settings values */
+ gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
+
+ /** last received stream id */
+ gpr_uint32 last_incoming_stream_id;
+} grpc_chttp2_transport_global;
+
+typedef struct {
+ /** data to write now */
+ gpr_slice_buffer outbuf;
+ /** hpack encoding */
+ grpc_chttp2_hpack_compressor hpack_compressor;
+} grpc_chttp2_transport_writing;
+
+struct grpc_chttp2_transport_parsing {
+ /** is this transport a client? (boolean) */
+ gpr_uint8 is_client;
+
+ /** were settings updated? */
+ gpr_uint8 settings_updated;
+ /** was a settings ack received? */
+ gpr_uint8 settings_ack_received;
+ /** was a goaway frame received? */
+ gpr_uint8 goaway_received;
+
+ /** data to write later - after parsing */
+ gpr_slice_buffer qbuf;
+ /* metadata object cache */
+ grpc_mdstr *str_grpc_timeout;
+ /** parser for headers */
+ grpc_chttp2_hpack_parser hpack_parser;
+ /** simple one shot parsers */
+ union {
+ grpc_chttp2_window_update_parser window_update;
+ grpc_chttp2_settings_parser settings;
+ grpc_chttp2_ping_parser ping;
+ grpc_chttp2_rst_stream_parser rst_stream;
+ } simple;
+ /** parser for goaway frames */
+ grpc_chttp2_goaway_parser goaway_parser;
+
+ /** window available for peer to send to us */
+ gpr_uint32 incoming_window;
+
+ /** next stream id available at the time of beginning parsing */
+ gpr_uint32 next_stream_id;
+ gpr_uint32 last_incoming_stream_id;
+
+ /* deframing */
+ grpc_chttp2_deframe_transport_state deframe_state;
+ gpr_uint8 incoming_frame_type;
+ gpr_uint8 incoming_frame_flags;
+ gpr_uint8 header_eof;
+ gpr_uint32 expect_continuation_stream_id;
+ gpr_uint32 incoming_frame_size;
+ gpr_uint32 incoming_stream_id;
+
+ /* active parser */
+ void *parser_data;
+ grpc_chttp2_stream_parsing *incoming_stream;
+ grpc_chttp2_parse_error (*parser)(void *parser_user_data,
+ grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing,
+ gpr_slice slice, int is_last);
+
+ /* received settings */
+ gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS];
+
+ /* goaway data */
+ grpc_status_code goaway_error;
+ gpr_uint32 goaway_last_stream_index;
+ gpr_slice goaway_text;
+};
+
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
grpc_endpoint *ep;
grpc_mdctx *metadata_context;
gpr_refcount refs;
- gpr_uint8 is_client;
gpr_mu mu;
gpr_cv cv;
+ /** is a thread currently writing */
+ gpr_uint8 writing_active;
+
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
/** are we calling back any grpc_transport_op completion events */
@@ -192,28 +287,9 @@ struct grpc_chttp2_transport {
/* stream indexing */
gpr_uint32 next_stream_id;
- gpr_uint32 last_incoming_stream_id;
-
- /* settings */
- gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
- gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
- gpr_uint8 sent_local_settings; /* have local settings been sent? */
- gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
/* window management */
- gpr_uint32 outgoing_window;
gpr_uint32 outgoing_window_update;
- gpr_uint32 incoming_window;
- gpr_uint32 connection_window_target;
-
- /* deframing */
- grpc_chttp2_deframe_transport_state deframe_state;
- gpr_uint8 incoming_frame_type;
- gpr_uint8 incoming_frame_flags;
- gpr_uint8 header_eof;
- gpr_uint32 expect_continuation_stream_id;
- gpr_uint32 incoming_frame_size;
- gpr_uint32 incoming_stream_id;
/* goaway */
grpc_chttp2_pending_goaway *pending_goaways;
@@ -226,13 +302,6 @@ struct grpc_chttp2_transport {
/* stream ops that need to be destroyed, but outside of the lock */
grpc_stream_op_buffer nuke_later_sopb;
- /* active parser */
- void *parser_data;
- grpc_chttp2_stream *incoming_stream;
- grpc_chttp2_parse_error (*parser)(void *parser_user_data,
- grpc_chttp2_parse_state *state,
- gpr_slice slice, int is_last);
-
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
grpc_chttp2_stream_map stream_map;
@@ -242,46 +311,12 @@ struct grpc_chttp2_transport {
size_t ping_capacity;
gpr_int64 ping_counter;
- struct {
- /* metadata object cache */
- grpc_mdstr *str_grpc_timeout;
- } constants;
-
- struct {
- /** data to write next write */
- gpr_slice_buffer qbuf;
- /* queued callbacks */
- grpc_iomgr_closure *pending_closures;
- } global;
-
- struct {
- /** is a thread currently writing */
- gpr_uint8 executing;
- /** closure to execute this action */
- grpc_iomgr_closure action;
- /** data to write now */
- gpr_slice_buffer outbuf;
- /* hpack encoding */
- grpc_chttp2_hpack_compressor hpack_compressor;
- } writing;
+ grpc_chttp2_transport_global global;
+ grpc_chttp2_transport_writing writing;
+ grpc_chttp2_transport_parsing parsing;
- struct {
- /** is a thread currently parsing */
- gpr_uint8 executing;
- /** data to write later - after parsing */
- gpr_slice_buffer qbuf;
- /** parser for headers */
- grpc_chttp2_hpack_parser hpack_parser;
- /** simple one shot parsers */
- union {
- grpc_chttp2_window_update_parser window_update;
- grpc_chttp2_settings_parser settings;
- grpc_chttp2_ping_parser ping;
- grpc_chttp2_rst_stream_parser rst_stream;
- } simple;
- /** parser for goaway frames */
- grpc_chttp2_goaway_parser goaway_parser;
- } parsing;
+ /** closure to execute writing */
+ grpc_iomgr_closure writing_action;
struct {
/** is a thread currently performing channel callbacks */
@@ -295,37 +330,47 @@ struct grpc_chttp2_transport {
} channel_callback;
};
-struct grpc_chttp2_stream {
- struct {
- grpc_iomgr_closure *send_done_closure;
- grpc_iomgr_closure *recv_done_closure;
- } global;
-
- struct {
- /* sops that have passed flow control to be written */
- grpc_stream_op_buffer sopb;
- /* how strongly should we indicate closure with the next write */
- grpc_chttp2_send_closed send_closed;
- } writing;
-
- struct {
- int unused;
- } parsing;
-
+typedef struct {
+ /** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
- gpr_uint32 incoming_window;
+ grpc_iomgr_closure *send_done_closure;
+ grpc_iomgr_closure *recv_done_closure;
+
+ /** window available for us to send to peer */
gpr_int64 outgoing_window;
- gpr_uint32 outgoing_window_update;
- /* when the application requests writes be closed, the write_closed is
- 'queued'; when the close is flow controlled into the send path, we are
- 'sending' it; when the write has been performed it is 'sent' */
+ /** stream ops the transport user would like to send */
+ grpc_stream_op_buffer *outgoing_sopb;
+ /** when the application requests writes be closed, the write_closed is
+ 'queued'; when the close is flow controlled into the send path, we are
+ 'sending' it; when the write has been performed it is 'sent' */
grpc_chttp2_write_state write_state;
+ /** is this stream closed (boolean) */
gpr_uint8 read_closed;
- gpr_uint8 cancelled;
+} grpc_chttp2_stream_global;
- grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
- gpr_uint8 included[STREAM_LIST_COUNT];
+typedef struct {
+ /** HTTP2 stream id for this stream, or zero if one has not been assigned */
+ gpr_uint32 id;
+ /** sops that have passed flow control to be written */
+ grpc_stream_op_buffer sopb;
+ /** how strongly should we indicate closure with the next write */
+ grpc_chttp2_send_closed send_closed;
+} grpc_chttp2_stream_writing;
+
+struct grpc_chttp2_stream_parsing {
+ /** HTTP2 stream id for this stream, or zero if one has not been assigned */
+ gpr_uint32 id;
+ /** has this stream received a close */
+ gpr_uint8 received_close;
+ /** incoming_window has been reduced during parsing */
+ gpr_uint8 incoming_window_changed;
+ /** saw an error on this stream during parsing (it should be cancelled) */
+ gpr_uint8 saw_error;
+ /** window available for peer to send to us */
+ gpr_uint32 incoming_window;
+ /** parsing state for data frames */
+ grpc_chttp2_data_parser data_parser;
/* incoming metadata */
grpc_linked_mdelem *incoming_metadata;
@@ -333,21 +378,79 @@ struct grpc_chttp2_stream {
size_t incoming_metadata_capacity;
grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
+};
+
+struct grpc_chttp2_stream {
+ grpc_chttp2_stream_global global;
+ grpc_chttp2_stream_writing writing;
+
+ gpr_uint32 outgoing_window_update;
+ gpr_uint8 cancelled;
+
+ grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
+ gpr_uint8 included[STREAM_LIST_COUNT];
/* sops from application */
- grpc_stream_op_buffer *outgoing_sopb;
grpc_stream_op_buffer *incoming_sopb;
grpc_stream_state *publish_state;
grpc_stream_state published_state;
- grpc_chttp2_data_parser parser;
-
grpc_stream_state callback_state;
grpc_stream_op_buffer callback_sopb;
};
+/** Transport writing call flow:
+ chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes are required;
+ if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the writes.
+ Once writes have been completed (meaning another write could potentially be started),
+ grpc_chttp2_terminate_writing is called. This will call grpc_chttp2_cleanup_writing, at which
+ point the write phase is complete. */
+
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
-void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
+int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
+void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
+void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success);
+void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
+
+/** Process one slice of incoming data */
+int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice);
+void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
+
+/** Get a writable stream
+ \return non-zero if there was a stream available */
+void grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
+int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
+
+void grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
+int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport_writing *transport_writing);
+int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing **stream_writing);
+
+void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
+int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
+
+int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global);
+
+void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing);
+
+void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success);
+void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
+
+grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
+grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
+
+#define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0)
+
+#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING)-1)
+
+extern int grpc_http_trace;
+
+#define IF_TRACING(stmt) \
+ if (!(grpc_http_trace)) \
+ ; \
+ else \
+ stmt
#endif
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 9a547ad319..2dd46b3116 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -32,4 +32,611 @@
*/
#include "src/core/transport/chttp2/internal.h"
+#include "src/core/transport/chttp2/timeout_encoding.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation);
+static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header);
+
+static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last);
+
+void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) {
+ /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when
+ sending GOAWAY frame.
+ https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
+ says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So,
+ since we don't have server pushed streams, client should send
+ GOAWAY last-grpc_chttp2_stream-id=0 in this case. */
+ if (!transport_parsing->is_client) {
+ transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id;
+ }
+}
+
+int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) {
+ gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
+ gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
+ gpr_uint8 *cur = beg;
+
+ if (cur == end) return 1;
+
+ switch (transport_parsing->deframe_state) {
+ case DTS_CLIENT_PREFIX_0:
+ case DTS_CLIENT_PREFIX_1:
+ case DTS_CLIENT_PREFIX_2:
+ case DTS_CLIENT_PREFIX_3:
+ case DTS_CLIENT_PREFIX_4:
+ case DTS_CLIENT_PREFIX_5:
+ case DTS_CLIENT_PREFIX_6:
+ case DTS_CLIENT_PREFIX_7:
+ case DTS_CLIENT_PREFIX_8:
+ case DTS_CLIENT_PREFIX_9:
+ case DTS_CLIENT_PREFIX_10:
+ case DTS_CLIENT_PREFIX_11:
+ case DTS_CLIENT_PREFIX_12:
+ case DTS_CLIENT_PREFIX_13:
+ case DTS_CLIENT_PREFIX_14:
+ case DTS_CLIENT_PREFIX_15:
+ case DTS_CLIENT_PREFIX_16:
+ case DTS_CLIENT_PREFIX_17:
+ case DTS_CLIENT_PREFIX_18:
+ case DTS_CLIENT_PREFIX_19:
+ case DTS_CLIENT_PREFIX_20:
+ case DTS_CLIENT_PREFIX_21:
+ case DTS_CLIENT_PREFIX_22:
+ case DTS_CLIENT_PREFIX_23:
+ while (cur != end && transport_parsing->deframe_state != DTS_FH_0) {
+ if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state]) {
+ gpr_log(GPR_ERROR,
+ "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
+ "at byte %d",
+ GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state],
+ (int)(gpr_uint8)GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state], *cur,
+ (int)*cur, transport_parsing->deframe_state);
+ return 0;
+ }
+ ++cur;
+ ++transport_parsing->deframe_state;
+ }
+ if (cur == end) {
+ return 1;
+ }
+ /* fallthrough */
+ dts_fh_0:
+ case DTS_FH_0:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_1;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_1:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_2;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_2:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_size |= *cur;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_3;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_3:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_type = *cur;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_4;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_4:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_flags = *cur;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_5;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_5:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_6;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_6:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_7;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_7:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_8;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_8:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur);
+ transport_parsing->deframe_state = DTS_FRAME;
+ if (!init_frame_parser(transport_parsing)) {
+ return 0;
+ }
+ if (transport_parsing->incoming_stream_id) {
+ transport_parsing->last_incoming_stream_id = transport_parsing->incoming_stream_id;
+ }
+ if (transport_parsing->incoming_frame_size == 0) {
+ if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) {
+ return 0;
+ }
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_0;
+ return 1;
+ }
+ goto dts_fh_0; /* loop */
+ }
+ if (++cur == end) {
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FRAME:
+ GPR_ASSERT(cur < end);
+ if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) {
+ if (!parse_frame_slice(
+ transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
+ return 0;
+ }
+ transport_parsing->deframe_state = DTS_FH_0;
+ return 1;
+ } else if ((gpr_uint32)(end - cur) > transport_parsing->incoming_frame_size) {
+ if (!parse_frame_slice(
+ transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg,
+ cur + transport_parsing->incoming_frame_size - beg),
+ 1)) {
+ return 0;
+ }
+ cur += transport_parsing->incoming_frame_size;
+ goto dts_fh_0; /* loop */
+ } else {
+ if (!parse_frame_slice(
+ transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
+ return 0;
+ }
+ transport_parsing->incoming_frame_size -= (end - cur);
+ return 1;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ }
+
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+
+ return 0;
+}
+
+static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ if (transport_parsing->expect_continuation_stream_id != 0) {
+ if (transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
+ gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
+ transport_parsing->incoming_frame_type);
+ return 0;
+ }
+ if (transport_parsing->expect_continuation_stream_id != transport_parsing->incoming_stream_id) {
+ gpr_log(GPR_ERROR,
+ "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x",
+ transport_parsing->expect_continuation_stream_id, transport_parsing->incoming_stream_id);
+ return 0;
+ }
+ return init_header_frame_parser(transport_parsing, 1);
+ }
+ switch (transport_parsing->incoming_frame_type) {
+ case GRPC_CHTTP2_FRAME_DATA:
+ return init_data_frame_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_HEADER:
+ return init_header_frame_parser(transport_parsing, 0);
+ case GRPC_CHTTP2_FRAME_CONTINUATION:
+ gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
+ return 0;
+ case GRPC_CHTTP2_FRAME_RST_STREAM:
+ return init_rst_stream_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_SETTINGS:
+ return init_settings_frame_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
+ return init_window_update_frame_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_PING:
+ return init_ping_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_GOAWAY:
+ return init_goaway_parser(transport_parsing);
+ default:
+ gpr_log(GPR_ERROR, "Unknown frame type %02x", transport_parsing->incoming_frame_type);
+ return init_skip_frame_parser(transport_parsing, 0);
+ }
+}
+
+static grpc_chttp2_parse_error skip_parser(void *parser,
+ grpc_chttp2_parse_state *st,
+ gpr_slice slice, int is_last) {
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
+
+static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
+ if (is_header) {
+ int is_eoh = transport_parsing->expect_continuation_stream_id != 0;
+ transport_parsing->parser = grpc_chttp2_header_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->hpack_parser;
+ transport_parsing->hpack_parser.on_header = skip_header;
+ transport_parsing->hpack_parser.on_header_user_data = NULL;
+ transport_parsing->hpack_parser.is_boundary = is_eoh;
+ transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0;
+ } else {
+ transport_parsing->parser = skip_parser;
+ }
+ return 1;
+}
+
+static void become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ init_skip_frame(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse);
+}
+
+static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
+ if (transport_parsing->incoming_frame_size > transport_parsing->incoming_window) {
+ gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
+ transport_parsing->incoming_frame_size, transport_parsing->incoming_window);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+
+ if (transport_parsing->incoming_frame_size > stream_parsing->incoming_window) {
+ gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
+ transport_parsing->incoming_frame_size, stream_parsing->incoming_window);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, -(gpr_int64)transport_parsing->incoming_frame_size);
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->global.id, -(gpr_int64)transport_parsing->incoming_frame_size);
+ transport_parsing->incoming_window -= transport_parsing->incoming_frame_size;
+ stream_parsing->incoming_window -= transport_parsing->incoming_frame_size;
+
+ /* if the grpc_chttp2_stream incoming window is getting low, schedule an update */
+ stream_parsing->incoming_window_changed = 1;
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id);
+ grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
+ if (!stream_parsing || stream_parsing->received_close) return init_skip_frame(transport_parsing, 0);
+ if (err == GRPC_CHTTP2_PARSE_OK) {
+ err = update_incoming_window(transport_parsing, stream_parsing);
+ }
+ if (err == GRPC_CHTTP2_PARSE_OK) {
+ err = grpc_chttp2_data_parser_begin_frame(&stream_parsing->data_parser,
+ transport_parsing->incoming_frame_flags);
+ }
+ switch (err) {
+ case GRPC_CHTTP2_PARSE_OK:
+ transport_parsing->incoming_stream = stream_parsing;
+ transport_parsing->parser = grpc_chttp2_data_parser_parse;
+ transport_parsing->parser_data = &stream_parsing->data_parser;
+ return 1;
+ case GRPC_CHTTP2_STREAM_ERROR:
+ stream_parsing->received_close = 1;
+ stream_parsing->saw_error = 1;
+ return init_skip_frame(transport_parsing, 0);
+ case GRPC_CHTTP2_CONNECTION_ERROR:
+ return 0;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ return 0;
+}
+
+static void free_timeout(void *p) { gpr_free(p); }
+
+static void add_incoming_metadata(grpc_chttp2_stream_parsing *stream_parsing, grpc_mdelem *elem) {
+ if (stream_parsing->incoming_metadata_capacity == stream_parsing->incoming_metadata_count) {
+ stream_parsing->incoming_metadata_capacity =
+ GPR_MAX(8, 2 * stream_parsing->incoming_metadata_capacity);
+ stream_parsing->incoming_metadata =
+ gpr_realloc(stream_parsing->incoming_metadata, sizeof(*stream_parsing->incoming_metadata) *
+ stream_parsing->incoming_metadata_capacity);
+ }
+ stream_parsing->incoming_metadata[stream_parsing->incoming_metadata_count++].md = elem;
+}
+
+static void on_header(void *tp, grpc_mdelem *md) {
+ grpc_chttp2_transport_parsing *transport_parsing = tp;
+ grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream;
+
+ GPR_ASSERT(stream_parsing);
+
+ IF_TRACING(gpr_log(
+ GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, transport_parsing->is_client ? "CLI" : "SVR",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+
+ if (md->key == transport_parsing->str_grpc_timeout) {
+ gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
+ if (!cached_timeout) {
+ /* not already parsed: parse it now, and store the result away */
+ cached_timeout = gpr_malloc(sizeof(gpr_timespec));
+ if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
+ cached_timeout)) {
+ gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
+ grpc_mdstr_as_c_string(md->value));
+ *cached_timeout = gpr_inf_future;
+ }
+ grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
+ }
+ stream_parsing->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
+ grpc_mdelem_unref(md);
+ } else {
+ add_incoming_metadata(stream_parsing, md);
+ }
+
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+}
+
+static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) {
+ int is_eoh =
+ (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
+ grpc_chttp2_stream_parsing *stream_parsing;
+
+ if (is_eoh) {
+ transport_parsing->expect_continuation_stream_id = 0;
+ } else {
+ transport_parsing->expect_continuation_stream_id = transport_parsing->incoming_stream_id;
+ }
+
+ if (!is_continuation) {
+ transport_parsing->header_eof =
+ (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
+ }
+
+ /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */
+ stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id);
+ if (!stream_parsing) {
+ if (is_continuation) {
+ gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received");
+ return init_skip_frame(transport_parsing, 1);
+ }
+ if (transport_parsing->is_client) {
+ if ((transport_parsing->incoming_stream_id & 1) &&
+ transport_parsing->incoming_stream_id < transport_parsing->next_stream_id) {
+ /* this is an old (probably cancelled) grpc_chttp2_stream */
+ } else {
+ gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client");
+ }
+ return init_skip_frame(transport_parsing, 1);
+ } else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) {
+ gpr_log(GPR_ERROR,
+ "ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream "
+ "id=%d, new grpc_chttp2_stream id=%d",
+ transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id);
+ return init_skip_frame(transport_parsing, 1);
+ } else if ((transport_parsing->incoming_stream_id & 1) == 0) {
+ gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d",
+ transport_parsing->incoming_stream_id);
+ return init_skip_frame(transport_parsing, 1);
+ }
+ stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream(transport_parsing, transport_parsing->incoming_stream_id);
+ if (!stream_parsing) {
+ gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
+ return init_skip_frame(transport_parsing, 1);
+ }
+ } else {
+ transport_parsing->incoming_stream = stream_parsing;
+ }
+ if (stream_parsing->received_close) {
+ gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
+ transport_parsing->incoming_stream = NULL;
+ return init_skip_frame(transport_parsing, 1);
+ }
+ transport_parsing->parser = grpc_chttp2_header_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->hpack_parser;
+ transport_parsing->hpack_parser.on_header = on_header;
+ transport_parsing->hpack_parser.on_header_user_data = transport_parsing;
+ transport_parsing->hpack_parser.is_boundary = is_eoh;
+ transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0;
+ if (!is_continuation &&
+ (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
+ grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser);
+ }
+ return 1;
+}
+
+static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
+ &transport_parsing->simple.window_update,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_window_update_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.window_update;
+ return ok;
+}
+
+static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_ping_parser_begin_frame(&transport_parsing->simple.ping,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_ping_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.ping;
+ return ok;
+}
+
+static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
+ &transport_parsing->simple.rst_stream,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.rst_stream;
+ return ok;
+}
+
+static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok =
+ GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_goaway_parser_begin_frame(
+ &transport_parsing->goaway_parser, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_goaway_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->goaway_parser;
+ return ok;
+}
+
+static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok;
+
+ if (transport_parsing->incoming_stream_id != 0) {
+ gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", transport_parsing->incoming_stream_id);
+ return 0;
+ }
+
+ ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_settings_parser_begin_frame(
+ &transport_parsing->simple.settings, transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags, transport_parsing->settings);
+ if (!ok) {
+ return 0;
+ }
+ if (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
+ transport_parsing->settings_ack_received = 1;
+ } else {
+ transport_parsing->settings_updated = 1;
+ }
+ transport_parsing->parser = grpc_chttp2_settings_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.settings;
+ return ok;
+}
+
+/*
+static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
+ return window + window_update < MAX_WINDOW;
+}
+*/
+
+static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
+ grpc_metadata_batch b;
+
+ b.list.head = NULL;
+ /* Store away the last element of the list, so that in patch_metadata_ops
+ we can reconstitute the list.
+ We can't do list building here as later incoming metadata may reallocate
+ the underlying array. */
+ b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count;
+ b.garbage.head = b.garbage.tail = NULL;
+ b.deadline = stream_parsing->incoming_deadline;
+ stream_parsing->incoming_deadline = gpr_inf_future;
+
+ grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
+}
+
+static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, int is_last) {
+ grpc_chttp2_parse_state st;
+ size_t i;
+ memset(&st, 0, sizeof(st));
+ switch (transport_parsing->parser(transport_parsing->parser_data, &st, slice, is_last)) {
+ case GRPC_CHTTP2_PARSE_OK:
+ if (stream_parsing) {
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+ }
+ if (st.end_of_stream) {
+ transport_parsing->incoming_stream->read_closed = 1;
+ maybe_finish_read(t, transport_parsing->incoming_stream, 1);
+ }
+ if (st.need_flush_reads) {
+ maybe_finish_read(t, transport_parsing->incoming_stream, 1);
+ }
+ if (st.metadata_boundary) {
+ add_metadata_batch(t, transport_parsing->incoming_stream);
+ maybe_finish_read(t, transport_parsing->incoming_stream, 1);
+ }
+ if (st.ack_settings) {
+ gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create());
+ }
+ if (st.send_ping_ack) {
+ gpr_slice_buffer_add(
+ &transport_parsing->qbuf,
+ grpc_chttp2_ping_create(1, transport_parsing->simple.ping.opaque_8bytes));
+ }
+ if (st.goaway) {
+ add_goaway(t, st.goaway_error, st.goaway_text);
+ }
+ if (st.rst_stream) {
+ cancel_stream_id(
+ t, transport_parsing->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
+ st.rst_stream_reason, 0);
+ }
+ if (st.process_ping_reply) {
+ for (i = 0; i < transport_parsing->ping_count; i++) {
+ if (0 ==
+ memcmp(transport_parsing->pings[i].id, transport_parsing->simple.ping.opaque_8bytes, 8)) {
+ transport_parsing->pings[i].cb(transport_parsing->pings[i].user_data);
+ memmove(&transport_parsing->pings[i], &transport_parsing->pings[i + 1],
+ (transport_parsing->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping));
+ transport_parsing->ping_count--;
+ break;
+ }
+ }
+ }
+ if (st.initial_window_update) {
+ for (i = 0; i < transport_parsing->stream_map.count; i++) {
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)(transport_parsing->stream_map.values[i]);
+ s->global.outgoing_window_update += st.initial_window_update;
+ stream_list_join(t, s, NEW_OUTGOING_WINDOW);
+ }
+ }
+ if (st.window_update) {
+ if (transport_parsing->incoming_stream_id) {
+ /* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */
+ grpc_chttp2_stream *s = lookup_stream(t, transport_parsing->incoming_stream_id);
+ if (s) {
+ s->global.outgoing_window_update += st.window_update;
+ stream_list_join(t, s, NEW_OUTGOING_WINDOW);
+ }
+ } else {
+ /* grpc_chttp2_transport level window update */
+ transport_parsing->global.outgoing_window_update += st.window_update;
+ }
+ }
+ return 1;
+ case GRPC_CHTTP2_STREAM_ERROR:
+ become_skip_parser(transport_parsing);
+ cancel_stream_id(
+ t, transport_parsing->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
+ GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ return 1;
+ case GRPC_CHTTP2_CONNECTION_ERROR:
+ drop_connection(transport_parsing);
+ return 0;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ return 0;
+}
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index a1830a8c25..0265f173f3 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -32,93 +32,145 @@
*/
#include "src/core/transport/chttp2/internal.h"
+#include "src/core/transport/chttp2/http2_errors.h"
#include <grpc/support/log.h>
-static void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport *t) {
- grpc_chttp2_stream *s;
- gpr_uint32 window_delta;
+static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
- /* don't do anything if we are already writing */
- if (t->writing.executing) {
- return;
- }
+int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
+ grpc_chttp2_stream_global *stream_global;
+ grpc_chttp2_stream_writing *stream_writing;
+ gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
- gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf);
- GPR_ASSERT(t->global.qbuf.count == 0);
+ gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
+ GPR_ASSERT(transport_global->qbuf.count == 0);
- if (t->dirtied_local_settings && !t->sent_local_settings) {
+ if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) {
gpr_slice_buffer_add(
- &t->writing.outbuf, grpc_chttp2_settings_create(
- t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
- t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
- t->force_send_settings = 0;
- t->dirtied_local_settings = 0;
- t->sent_local_settings = 1;
+ &transport_writing->outbuf, grpc_chttp2_settings_create(
+ transport_global->settings[SENT_SETTINGS], transport_global->settings[LOCAL_SETTINGS],
+ transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
+ transport_global->force_send_settings = 0;
+ transport_global->dirtied_local_settings = 0;
+ transport_global->sent_local_settings = 1;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data (according to
available window sizes) and add to the output buffer */
- while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
- s->outgoing_window > 0) {
+ while (transport_global->outgoing_window &&
+ grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, &stream_writing) &&
+ stream_global->outgoing_window > 0) {
+ stream_writing->id = stream_global->id;
window_delta = grpc_chttp2_preencode(
- s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
- GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb);
- FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
- FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
- t->outgoing_window -= window_delta;
- s->outgoing_window -= window_delta;
-
- if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
- s->outgoing_sopb->nops == 0) {
- if (!t->is_client && !s->read_closed) {
- s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM;
+ stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
+ GPR_MIN(transport_global->outgoing_window, stream_global->outgoing_window),
+ &stream_writing->sopb);
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
+ transport_global->outgoing_window -= window_delta;
+ stream_global->outgoing_window -= window_delta;
+
+ if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE &&
+ stream_global->outgoing_sopb->nops == 0) {
+ if (!transport_constants->is_client && !stream_global->read_closed) {
+ stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
- s->writing.send_closed = SEND_CLOSED;
+ stream_writing->send_closed = SEND_CLOSED;
}
}
- if (s->writing.sopb.nops > 0 || s->writing.send_closed) {
- stream_list_join(t, s, WRITING);
+ if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != DONT_SEND_CLOSED) {
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
/* we should either exhaust window or have no ops left, but not both */
- if (s->outgoing_sopb->nops == 0) {
- s->outgoing_sopb = NULL;
- schedule_cb(t, s->global.send_done_closure, 1);
- } else if (s->outgoing_window) {
- stream_list_add_tail(t, s, WRITABLE);
+ if (stream_global->outgoing_sopb->nops == 0) {
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1);
+ } else if (stream_global->outgoing_window) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
- if (!t->parsing.executing) {
- /* for each grpc_chttp2_stream that wants to update its window, add that window here */
- while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
- window_delta =
- t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- s->incoming_window;
- if (!s->read_closed && window_delta) {
- gpr_slice_buffer_add(
- &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
- FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
- s->incoming_window += window_delta;
- }
+ /* for each grpc_chttp2_stream that wants to update its window, add that window here */
+ while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, &stream_global)) {
+ window_delta =
+ transport_global->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
+ stream_global->incoming_window;
+ if (!stream_global->read_closed && window_delta > 0) {
+ gpr_slice_buffer_add(
+ &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_global->id, window_delta));
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->id, window_delta);
+ stream_global->incoming_window += window_delta;
}
+ }
+
+ /* if the grpc_chttp2_transport is ready to send a window update, do so here also */
+ if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4) {
+ window_delta = transport_global->connection_window_target - transport_global->incoming_window;
+ gpr_slice_buffer_add(&transport_writing->outbuf,
+ grpc_chttp2_window_update_create(0, window_delta));
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, window_delta);
+ transport_global->incoming_window += window_delta;
+ }
+
+ return transport_writing->outbuf.length > 0 || grpc_chttp2_list_have_writing_streams(transport_writing);
+}
+
+void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) {
+ finalize_outbuf(transport_writing);
- /* if the grpc_chttp2_transport is ready to send a window update, do so here also */
- if (t->incoming_window < t->connection_window_target * 3 / 4) {
- window_delta = t->connection_window_target - t->incoming_window;
- gpr_slice_buffer_add(&t->writing.outbuf,
- grpc_chttp2_window_update_create(0, window_delta));
- FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
- t->incoming_window += window_delta;
+ GPR_ASSERT(transport_writing->outbuf.count > 0);
+
+ switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, transport_writing->outbuf.count,
+ finish_write_cb, transport_writing)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
+ grpc_chttp2_terminate_writing(transport_writing, 1);
+ break;
+ case GRPC_ENDPOINT_WRITE_ERROR:
+ grpc_chttp2_terminate_writing(transport_writing, 0);
+ break;
+ case GRPC_ENDPOINT_WRITE_PENDING:
+ break;
+ }
+}
+
+static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
+ grpc_chttp2_stream_writing *stream_writing;
+
+ while (grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
+ grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
+ stream_writing->send_closed != DONT_SEND_CLOSED, stream_writing->id,
+ &transport_writing->hpack_compressor, &transport_writing->outbuf);
+ stream_writing->sopb.nops = 0;
+ if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
+ gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create(
+ stream_writing->id, GRPC_CHTTP2_NO_ERROR));
}
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
+}
- if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) {
- t->writing.executing = 1;
- ref_transport(t);
- gpr_log(GPR_DEBUG, "schedule write");
- schedule_cb(t, &t->writing.action, 1);
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
+ grpc_chttp2_transport_writing *transport_writing = tw;
+ grpc_chttp2_terminate_writing(transport_writing, write_status == GRPC_ENDPOINT_CB_OK);
+}
+
+void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
+ grpc_chttp2_stream_writing *stream_writing;
+ grpc_chttp2_stream_global *stream_global;
+
+ while (grpc_chttp2_list_pop_written_stream(transport_global, transport_writing, &stream_global, &stream_writing)) {
+ if (stream_writing->send_closed != DONT_SEND_CLOSED) {
+ stream_global->write_state = WRITE_STATE_SENT_CLOSE;
+ if (!transport_constants->is_client) {
+ stream_global->read_closed = 1;
+ }
+ grpc_chttp2_read_write_state_changed(transport_global, stream_global);
+ }
}
+ transport_writing->outbuf.count = 0;
+ transport_writing->outbuf.length = 0;
}