aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/transport/chttp2/frame.h6
-rw-r--r--src/core/transport/chttp2/frame_data.c10
-rw-r--r--src/core/transport/chttp2/frame_data.h2
-rw-r--r--src/core/transport/chttp2/frame_goaway.c12
-rw-r--r--src/core/transport/chttp2/frame_goaway.h2
-rw-r--r--src/core/transport/chttp2/frame_ping.h2
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.h2
-rw-r--r--src/core/transport/chttp2/frame_settings.h2
-rw-r--r--src/core/transport/chttp2/frame_window_update.h2
-rw-r--r--src/core/transport/chttp2/hpack_parser.h2
-rw-r--r--src/core/transport/chttp2/internal.h293
-rw-r--r--src/core/transport/chttp2/parsing.c607
-rw-r--r--src/core/transport/chttp2/writing.c174
-rw-r--r--src/core/transport/chttp2_transport.c815
14 files changed, 1027 insertions, 904 deletions
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h
index c9e3e13042..9012bfa1e1 100644
--- a/src/core/transport/chttp2/frame.h
+++ b/src/core/transport/chttp2/frame.h
@@ -45,6 +45,7 @@ typedef enum {
GRPC_CHTTP2_CONNECTION_ERROR
} grpc_chttp2_parse_error;
+#if 0
typedef struct {
gpr_uint8 end_of_stream;
gpr_uint8 need_flush_reads;
@@ -62,6 +63,11 @@ typedef struct {
gpr_slice goaway_text;
gpr_uint32 rst_stream_reason;
} grpc_chttp2_parse_state;
+#endif
+
+/* defined in internal.h */
+typedef struct grpc_chttp2_stream_parsing grpc_chttp2_stream_parsing;
+typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing;
#define GRPC_CHTTP2_FRAME_DATA 0
#define GRPC_CHTTP2_FRAME_HEADER 1
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index a1ae9ed2e6..129d211043 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -35,6 +35,7 @@
#include <string.h>
+#include "src/core/transport/chttp2/internal.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -69,7 +70,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
@@ -77,8 +78,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_data_parser *p = parser;
if (is_last && p->is_last_frame) {
- state->end_of_stream = 1;
- state->need_flush_reads = 1;
+ stream_parsing->received_close = 1;
}
if (cur == end) {
@@ -129,27 +129,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((gpr_uint32) * cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
- state->need_flush_reads = 1;
grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, 0);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) == p->frame_size) {
- state->need_flush_reads = 1;
grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg));
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
- state->need_flush_reads = 1;
grpc_sopb_add_slice(
&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, cur + p->frame_size - beg));
cur += p->frame_size;
goto fh_0; /* loop */
} else {
- state->need_flush_reads = 1;
grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg));
p->frame_size -= (end - cur);
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index 24e557accd..dbbb87fc01 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -72,7 +72,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
/* create a slice with an empty data frame and is_last set */
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id);
diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c
index 95b75d4fde..d7d6c587e6 100644
--- a/src/core/transport/chttp2/frame_goaway.c
+++ b/src/core/transport/chttp2/frame_goaway.c
@@ -32,6 +32,7 @@
*/
#include "src/core/transport/chttp2/frame_goaway.h"
+#include "src/core/transport/chttp2/internal.h"
#include <string.h>
@@ -62,7 +63,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
@@ -139,10 +140,11 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
p->debug_pos += end - cur;
p->state = GRPC_CHTTP2_GOAWAY_DEBUG;
if (is_last) {
- state->goaway = 1;
- state->goaway_last_stream_index = p->last_stream_id;
- state->goaway_error = p->error_code;
- state->goaway_text =
+ transport_parsing->goaway_received = 1;
+ transport_parsing->goaway_last_stream_index = p->last_stream_id;
+ gpr_slice_unref(transport_parsing->goaway_text);
+ transport_parsing->goaway_error = p->error_code;
+ transport_parsing->goaway_text =
gpr_slice_new(p->debug_data, p->debug_length, gpr_free);
p->debug_data = NULL;
}
diff --git a/src/core/transport/chttp2/frame_goaway.h b/src/core/transport/chttp2/frame_goaway.h
index 7638891514..8148fa90f2 100644
--- a/src/core/transport/chttp2/frame_goaway.h
+++ b/src/core/transport/chttp2/frame_goaway.h
@@ -65,7 +65,7 @@ void grpc_chttp2_goaway_parser_destroy(grpc_chttp2_goaway_parser *p);
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
grpc_chttp2_goaway_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code,
gpr_slice debug_data,
diff --git a/src/core/transport/chttp2/frame_ping.h b/src/core/transport/chttp2/frame_ping.h
index 11d38b80ea..71f8351223 100644
--- a/src/core/transport/chttp2/frame_ping.h
+++ b/src/core/transport/chttp2/frame_ping.h
@@ -48,6 +48,6 @@ gpr_slice grpc_chttp2_ping_create(gpr_uint8 ack, gpr_uint8 *opaque_8bytes);
grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
grpc_chttp2_ping_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */
diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h
index 07a3c98d03..b83d1261d0 100644
--- a/src/core/transport/chttp2/frame_rst_stream.h
+++ b/src/core/transport/chttp2/frame_rst_stream.h
@@ -47,6 +47,6 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */
diff --git a/src/core/transport/chttp2/frame_settings.h b/src/core/transport/chttp2/frame_settings.h
index 18765631a6..701f2b94d2 100644
--- a/src/core/transport/chttp2/frame_settings.h
+++ b/src/core/transport/chttp2/frame_settings.h
@@ -94,6 +94,6 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
grpc_chttp2_settings_parser *parser, gpr_uint32 length, gpr_uint8 flags,
gpr_uint32 *settings);
grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */
diff --git a/src/core/transport/chttp2/frame_window_update.h b/src/core/transport/chttp2/frame_window_update.h
index 85475a8f9e..7217325beb 100644
--- a/src/core/transport/chttp2/frame_window_update.h
+++ b/src/core/transport/chttp2/frame_window_update.h
@@ -50,6 +50,6 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_window_update_parser *parser, gpr_uint32 length,
gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
- void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+ void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */
diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h
index bfc06b3980..507d7cfea0 100644
--- a/src/core/transport/chttp2/hpack_parser.h
+++ b/src/core/transport/chttp2/hpack_parser.h
@@ -107,7 +107,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
/* wraps grpc_chttp2_hpack_parser_parse to provide a frame level parser for
the transport */
grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
- void *hpack_parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a21a7a4d75..5eba01a3e1 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -36,6 +36,7 @@
#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
+#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h"
#include "src/core/transport/chttp2/frame_ping.h"
@@ -172,16 +173,110 @@ typedef struct {
gpr_slice debug;
} grpc_chttp2_pending_goaway;
+typedef struct {
+ /** data to write next write */
+ gpr_slice_buffer qbuf;
+ /** queued callbacks */
+ grpc_iomgr_closure *pending_closures;
+
+ /** window available for us to send to peer */
+ gpr_uint32 outgoing_window;
+ /** how much window would we like to have for incoming_window */
+ gpr_uint32 connection_window_target;
+
+
+ /** are the local settings dirty and need to be sent? */
+ gpr_uint8 dirtied_local_settings;
+ /** have local settings been sent? */
+ gpr_uint8 sent_local_settings;
+ /** bitmask of setting indexes to send out */
+ gpr_uint32 force_send_settings;
+ /** settings values */
+ gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
+
+ /** last received stream id */
+ gpr_uint32 last_incoming_stream_id;
+} grpc_chttp2_transport_global;
+
+typedef struct {
+ /** data to write now */
+ gpr_slice_buffer outbuf;
+ /** hpack encoding */
+ grpc_chttp2_hpack_compressor hpack_compressor;
+} grpc_chttp2_transport_writing;
+
+struct grpc_chttp2_transport_parsing {
+ /** is this transport a client? (boolean) */
+ gpr_uint8 is_client;
+
+ /** were settings updated? */
+ gpr_uint8 settings_updated;
+ /** was a settings ack received? */
+ gpr_uint8 settings_ack_received;
+ /** was a goaway frame received? */
+ gpr_uint8 goaway_received;
+
+ /** data to write later - after parsing */
+ gpr_slice_buffer qbuf;
+ /* metadata object cache */
+ grpc_mdstr *str_grpc_timeout;
+ /** parser for headers */
+ grpc_chttp2_hpack_parser hpack_parser;
+ /** simple one shot parsers */
+ union {
+ grpc_chttp2_window_update_parser window_update;
+ grpc_chttp2_settings_parser settings;
+ grpc_chttp2_ping_parser ping;
+ grpc_chttp2_rst_stream_parser rst_stream;
+ } simple;
+ /** parser for goaway frames */
+ grpc_chttp2_goaway_parser goaway_parser;
+
+ /** window available for peer to send to us */
+ gpr_uint32 incoming_window;
+
+ /** next stream id available at the time of beginning parsing */
+ gpr_uint32 next_stream_id;
+ gpr_uint32 last_incoming_stream_id;
+
+ /* deframing */
+ grpc_chttp2_deframe_transport_state deframe_state;
+ gpr_uint8 incoming_frame_type;
+ gpr_uint8 incoming_frame_flags;
+ gpr_uint8 header_eof;
+ gpr_uint32 expect_continuation_stream_id;
+ gpr_uint32 incoming_frame_size;
+ gpr_uint32 incoming_stream_id;
+
+ /* active parser */
+ void *parser_data;
+ grpc_chttp2_stream_parsing *incoming_stream;
+ grpc_chttp2_parse_error (*parser)(void *parser_user_data,
+ grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing,
+ gpr_slice slice, int is_last);
+
+ /* received settings */
+ gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS];
+
+ /* goaway data */
+ grpc_status_code goaway_error;
+ gpr_uint32 goaway_last_stream_index;
+ gpr_slice goaway_text;
+};
+
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
grpc_endpoint *ep;
grpc_mdctx *metadata_context;
gpr_refcount refs;
- gpr_uint8 is_client;
gpr_mu mu;
gpr_cv cv;
+ /** is a thread currently writing */
+ gpr_uint8 writing_active;
+
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
/** are we calling back any grpc_transport_op completion events */
@@ -192,28 +287,9 @@ struct grpc_chttp2_transport {
/* stream indexing */
gpr_uint32 next_stream_id;
- gpr_uint32 last_incoming_stream_id;
-
- /* settings */
- gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
- gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
- gpr_uint8 sent_local_settings; /* have local settings been sent? */
- gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
/* window management */
- gpr_uint32 outgoing_window;
gpr_uint32 outgoing_window_update;
- gpr_uint32 incoming_window;
- gpr_uint32 connection_window_target;
-
- /* deframing */
- grpc_chttp2_deframe_transport_state deframe_state;
- gpr_uint8 incoming_frame_type;
- gpr_uint8 incoming_frame_flags;
- gpr_uint8 header_eof;
- gpr_uint32 expect_continuation_stream_id;
- gpr_uint32 incoming_frame_size;
- gpr_uint32 incoming_stream_id;
/* goaway */
grpc_chttp2_pending_goaway *pending_goaways;
@@ -226,13 +302,6 @@ struct grpc_chttp2_transport {
/* stream ops that need to be destroyed, but outside of the lock */
grpc_stream_op_buffer nuke_later_sopb;
- /* active parser */
- void *parser_data;
- grpc_chttp2_stream *incoming_stream;
- grpc_chttp2_parse_error (*parser)(void *parser_user_data,
- grpc_chttp2_parse_state *state,
- gpr_slice slice, int is_last);
-
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
grpc_chttp2_stream_map stream_map;
@@ -242,46 +311,12 @@ struct grpc_chttp2_transport {
size_t ping_capacity;
gpr_int64 ping_counter;
- struct {
- /* metadata object cache */
- grpc_mdstr *str_grpc_timeout;
- } constants;
-
- struct {
- /** data to write next write */
- gpr_slice_buffer qbuf;
- /* queued callbacks */
- grpc_iomgr_closure *pending_closures;
- } global;
-
- struct {
- /** is a thread currently writing */
- gpr_uint8 executing;
- /** closure to execute this action */
- grpc_iomgr_closure action;
- /** data to write now */
- gpr_slice_buffer outbuf;
- /* hpack encoding */
- grpc_chttp2_hpack_compressor hpack_compressor;
- } writing;
+ grpc_chttp2_transport_global global;
+ grpc_chttp2_transport_writing writing;
+ grpc_chttp2_transport_parsing parsing;
- struct {
- /** is a thread currently parsing */
- gpr_uint8 executing;
- /** data to write later - after parsing */
- gpr_slice_buffer qbuf;
- /** parser for headers */
- grpc_chttp2_hpack_parser hpack_parser;
- /** simple one shot parsers */
- union {
- grpc_chttp2_window_update_parser window_update;
- grpc_chttp2_settings_parser settings;
- grpc_chttp2_ping_parser ping;
- grpc_chttp2_rst_stream_parser rst_stream;
- } simple;
- /** parser for goaway frames */
- grpc_chttp2_goaway_parser goaway_parser;
- } parsing;
+ /** closure to execute writing */
+ grpc_iomgr_closure writing_action;
struct {
/** is a thread currently performing channel callbacks */
@@ -295,37 +330,47 @@ struct grpc_chttp2_transport {
} channel_callback;
};
-struct grpc_chttp2_stream {
- struct {
- grpc_iomgr_closure *send_done_closure;
- grpc_iomgr_closure *recv_done_closure;
- } global;
-
- struct {
- /* sops that have passed flow control to be written */
- grpc_stream_op_buffer sopb;
- /* how strongly should we indicate closure with the next write */
- grpc_chttp2_send_closed send_closed;
- } writing;
-
- struct {
- int unused;
- } parsing;
-
+typedef struct {
+ /** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
- gpr_uint32 incoming_window;
+ grpc_iomgr_closure *send_done_closure;
+ grpc_iomgr_closure *recv_done_closure;
+
+ /** window available for us to send to peer */
gpr_int64 outgoing_window;
- gpr_uint32 outgoing_window_update;
- /* when the application requests writes be closed, the write_closed is
- 'queued'; when the close is flow controlled into the send path, we are
- 'sending' it; when the write has been performed it is 'sent' */
+ /** stream ops the transport user would like to send */
+ grpc_stream_op_buffer *outgoing_sopb;
+ /** when the application requests writes be closed, the write_closed is
+ 'queued'; when the close is flow controlled into the send path, we are
+ 'sending' it; when the write has been performed it is 'sent' */
grpc_chttp2_write_state write_state;
+ /** is this stream closed (boolean) */
gpr_uint8 read_closed;
- gpr_uint8 cancelled;
+} grpc_chttp2_stream_global;
- grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
- gpr_uint8 included[STREAM_LIST_COUNT];
+typedef struct {
+ /** HTTP2 stream id for this stream, or zero if one has not been assigned */
+ gpr_uint32 id;
+ /** sops that have passed flow control to be written */
+ grpc_stream_op_buffer sopb;
+ /** how strongly should we indicate closure with the next write */
+ grpc_chttp2_send_closed send_closed;
+} grpc_chttp2_stream_writing;
+
+struct grpc_chttp2_stream_parsing {
+ /** HTTP2 stream id for this stream, or zero if one has not been assigned */
+ gpr_uint32 id;
+ /** has this stream received a close */
+ gpr_uint8 received_close;
+ /** incoming_window has been reduced during parsing */
+ gpr_uint8 incoming_window_changed;
+ /** saw an error on this stream during parsing (it should be cancelled) */
+ gpr_uint8 saw_error;
+ /** window available for peer to send to us */
+ gpr_uint32 incoming_window;
+ /** parsing state for data frames */
+ grpc_chttp2_data_parser data_parser;
/* incoming metadata */
grpc_linked_mdelem *incoming_metadata;
@@ -333,21 +378,79 @@ struct grpc_chttp2_stream {
size_t incoming_metadata_capacity;
grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
+};
+
+struct grpc_chttp2_stream {
+ grpc_chttp2_stream_global global;
+ grpc_chttp2_stream_writing writing;
+
+ gpr_uint32 outgoing_window_update;
+ gpr_uint8 cancelled;
+
+ grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
+ gpr_uint8 included[STREAM_LIST_COUNT];
/* sops from application */
- grpc_stream_op_buffer *outgoing_sopb;
grpc_stream_op_buffer *incoming_sopb;
grpc_stream_state *publish_state;
grpc_stream_state published_state;
- grpc_chttp2_data_parser parser;
-
grpc_stream_state callback_state;
grpc_stream_op_buffer callback_sopb;
};
+/** Transport writing call flow:
+ chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes are required;
+ if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the writes.
+ Once writes have been completed (meaning another write could potentially be started),
+ grpc_chttp2_terminate_writing is called. This will call grpc_chttp2_cleanup_writing, at which
+ point the write phase is complete. */
+
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
-void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
+int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
+void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
+void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success);
+void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
+
+/** Process one slice of incoming data */
+int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice);
+void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
+
+/** Get a writable stream
+ \return non-zero if there was a stream available */
+void grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
+int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
+
+void grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
+int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport_writing *transport_writing);
+int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing **stream_writing);
+
+void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
+int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
+
+int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global);
+
+void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing);
+
+void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success);
+void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
+
+grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
+grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
+
+#define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0)
+
+#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING)-1)
+
+extern int grpc_http_trace;
+
+#define IF_TRACING(stmt) \
+ if (!(grpc_http_trace)) \
+ ; \
+ else \
+ stmt
#endif
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 9a547ad319..2dd46b3116 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -32,4 +32,611 @@
*/
#include "src/core/transport/chttp2/internal.h"
+#include "src/core/transport/chttp2/timeout_encoding.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation);
+static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header);
+
+static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last);
+
+void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) {
+ /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when
+ sending GOAWAY frame.
+ https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
+ says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So,
+ since we don't have server pushed streams, client should send
+ GOAWAY last-grpc_chttp2_stream-id=0 in this case. */
+ if (!transport_parsing->is_client) {
+ transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id;
+ }
+}
+
+int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) {
+ gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
+ gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
+ gpr_uint8 *cur = beg;
+
+ if (cur == end) return 1;
+
+ switch (transport_parsing->deframe_state) {
+ case DTS_CLIENT_PREFIX_0:
+ case DTS_CLIENT_PREFIX_1:
+ case DTS_CLIENT_PREFIX_2:
+ case DTS_CLIENT_PREFIX_3:
+ case DTS_CLIENT_PREFIX_4:
+ case DTS_CLIENT_PREFIX_5:
+ case DTS_CLIENT_PREFIX_6:
+ case DTS_CLIENT_PREFIX_7:
+ case DTS_CLIENT_PREFIX_8:
+ case DTS_CLIENT_PREFIX_9:
+ case DTS_CLIENT_PREFIX_10:
+ case DTS_CLIENT_PREFIX_11:
+ case DTS_CLIENT_PREFIX_12:
+ case DTS_CLIENT_PREFIX_13:
+ case DTS_CLIENT_PREFIX_14:
+ case DTS_CLIENT_PREFIX_15:
+ case DTS_CLIENT_PREFIX_16:
+ case DTS_CLIENT_PREFIX_17:
+ case DTS_CLIENT_PREFIX_18:
+ case DTS_CLIENT_PREFIX_19:
+ case DTS_CLIENT_PREFIX_20:
+ case DTS_CLIENT_PREFIX_21:
+ case DTS_CLIENT_PREFIX_22:
+ case DTS_CLIENT_PREFIX_23:
+ while (cur != end && transport_parsing->deframe_state != DTS_FH_0) {
+ if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state]) {
+ gpr_log(GPR_ERROR,
+ "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
+ "at byte %d",
+ GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state],
+ (int)(gpr_uint8)GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state], *cur,
+ (int)*cur, transport_parsing->deframe_state);
+ return 0;
+ }
+ ++cur;
+ ++transport_parsing->deframe_state;
+ }
+ if (cur == end) {
+ return 1;
+ }
+ /* fallthrough */
+ dts_fh_0:
+ case DTS_FH_0:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_1;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_1:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_2;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_2:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_size |= *cur;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_3;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_3:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_type = *cur;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_4;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_4:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_frame_flags = *cur;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_5;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_5:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_6;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_6:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_7;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_7:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_8;
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FH_8:
+ GPR_ASSERT(cur < end);
+ transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur);
+ transport_parsing->deframe_state = DTS_FRAME;
+ if (!init_frame_parser(transport_parsing)) {
+ return 0;
+ }
+ if (transport_parsing->incoming_stream_id) {
+ transport_parsing->last_incoming_stream_id = transport_parsing->incoming_stream_id;
+ }
+ if (transport_parsing->incoming_frame_size == 0) {
+ if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) {
+ return 0;
+ }
+ if (++cur == end) {
+ transport_parsing->deframe_state = DTS_FH_0;
+ return 1;
+ }
+ goto dts_fh_0; /* loop */
+ }
+ if (++cur == end) {
+ return 1;
+ }
+ /* fallthrough */
+ case DTS_FRAME:
+ GPR_ASSERT(cur < end);
+ if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) {
+ if (!parse_frame_slice(
+ transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
+ return 0;
+ }
+ transport_parsing->deframe_state = DTS_FH_0;
+ return 1;
+ } else if ((gpr_uint32)(end - cur) > transport_parsing->incoming_frame_size) {
+ if (!parse_frame_slice(
+ transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg,
+ cur + transport_parsing->incoming_frame_size - beg),
+ 1)) {
+ return 0;
+ }
+ cur += transport_parsing->incoming_frame_size;
+ goto dts_fh_0; /* loop */
+ } else {
+ if (!parse_frame_slice(
+ transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
+ return 0;
+ }
+ transport_parsing->incoming_frame_size -= (end - cur);
+ return 1;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ }
+
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+
+ return 0;
+}
+
+static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ if (transport_parsing->expect_continuation_stream_id != 0) {
+ if (transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
+ gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
+ transport_parsing->incoming_frame_type);
+ return 0;
+ }
+ if (transport_parsing->expect_continuation_stream_id != transport_parsing->incoming_stream_id) {
+ gpr_log(GPR_ERROR,
+ "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x",
+ transport_parsing->expect_continuation_stream_id, transport_parsing->incoming_stream_id);
+ return 0;
+ }
+ return init_header_frame_parser(transport_parsing, 1);
+ }
+ switch (transport_parsing->incoming_frame_type) {
+ case GRPC_CHTTP2_FRAME_DATA:
+ return init_data_frame_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_HEADER:
+ return init_header_frame_parser(transport_parsing, 0);
+ case GRPC_CHTTP2_FRAME_CONTINUATION:
+ gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
+ return 0;
+ case GRPC_CHTTP2_FRAME_RST_STREAM:
+ return init_rst_stream_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_SETTINGS:
+ return init_settings_frame_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
+ return init_window_update_frame_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_PING:
+ return init_ping_parser(transport_parsing);
+ case GRPC_CHTTP2_FRAME_GOAWAY:
+ return init_goaway_parser(transport_parsing);
+ default:
+ gpr_log(GPR_ERROR, "Unknown frame type %02x", transport_parsing->incoming_frame_type);
+ return init_skip_frame_parser(transport_parsing, 0);
+ }
+}
+
+static grpc_chttp2_parse_error skip_parser(void *parser,
+ grpc_chttp2_parse_state *st,
+ gpr_slice slice, int is_last) {
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
+
+static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
+ if (is_header) {
+ int is_eoh = transport_parsing->expect_continuation_stream_id != 0;
+ transport_parsing->parser = grpc_chttp2_header_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->hpack_parser;
+ transport_parsing->hpack_parser.on_header = skip_header;
+ transport_parsing->hpack_parser.on_header_user_data = NULL;
+ transport_parsing->hpack_parser.is_boundary = is_eoh;
+ transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0;
+ } else {
+ transport_parsing->parser = skip_parser;
+ }
+ return 1;
+}
+
+static void become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ init_skip_frame(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse);
+}
+
+static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
+ if (transport_parsing->incoming_frame_size > transport_parsing->incoming_window) {
+ gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
+ transport_parsing->incoming_frame_size, transport_parsing->incoming_window);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+
+ if (transport_parsing->incoming_frame_size > stream_parsing->incoming_window) {
+ gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
+ transport_parsing->incoming_frame_size, stream_parsing->incoming_window);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, -(gpr_int64)transport_parsing->incoming_frame_size);
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->global.id, -(gpr_int64)transport_parsing->incoming_frame_size);
+ transport_parsing->incoming_window -= transport_parsing->incoming_frame_size;
+ stream_parsing->incoming_window -= transport_parsing->incoming_frame_size;
+
+ /* if the grpc_chttp2_stream incoming window is getting low, schedule an update */
+ stream_parsing->incoming_window_changed = 1;
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id);
+ grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
+ if (!stream_parsing || stream_parsing->received_close) return init_skip_frame(transport_parsing, 0);
+ if (err == GRPC_CHTTP2_PARSE_OK) {
+ err = update_incoming_window(transport_parsing, stream_parsing);
+ }
+ if (err == GRPC_CHTTP2_PARSE_OK) {
+ err = grpc_chttp2_data_parser_begin_frame(&stream_parsing->data_parser,
+ transport_parsing->incoming_frame_flags);
+ }
+ switch (err) {
+ case GRPC_CHTTP2_PARSE_OK:
+ transport_parsing->incoming_stream = stream_parsing;
+ transport_parsing->parser = grpc_chttp2_data_parser_parse;
+ transport_parsing->parser_data = &stream_parsing->data_parser;
+ return 1;
+ case GRPC_CHTTP2_STREAM_ERROR:
+ stream_parsing->received_close = 1;
+ stream_parsing->saw_error = 1;
+ return init_skip_frame(transport_parsing, 0);
+ case GRPC_CHTTP2_CONNECTION_ERROR:
+ return 0;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ return 0;
+}
+
+static void free_timeout(void *p) { gpr_free(p); }
+
+static void add_incoming_metadata(grpc_chttp2_stream_parsing *stream_parsing, grpc_mdelem *elem) {
+ if (stream_parsing->incoming_metadata_capacity == stream_parsing->incoming_metadata_count) {
+ stream_parsing->incoming_metadata_capacity =
+ GPR_MAX(8, 2 * stream_parsing->incoming_metadata_capacity);
+ stream_parsing->incoming_metadata =
+ gpr_realloc(stream_parsing->incoming_metadata, sizeof(*stream_parsing->incoming_metadata) *
+ stream_parsing->incoming_metadata_capacity);
+ }
+ stream_parsing->incoming_metadata[stream_parsing->incoming_metadata_count++].md = elem;
+}
+
+static void on_header(void *tp, grpc_mdelem *md) {
+ grpc_chttp2_transport_parsing *transport_parsing = tp;
+ grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream;
+
+ GPR_ASSERT(stream_parsing);
+
+ IF_TRACING(gpr_log(
+ GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, transport_parsing->is_client ? "CLI" : "SVR",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+
+ if (md->key == transport_parsing->str_grpc_timeout) {
+ gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
+ if (!cached_timeout) {
+ /* not already parsed: parse it now, and store the result away */
+ cached_timeout = gpr_malloc(sizeof(gpr_timespec));
+ if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
+ cached_timeout)) {
+ gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
+ grpc_mdstr_as_c_string(md->value));
+ *cached_timeout = gpr_inf_future;
+ }
+ grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
+ }
+ stream_parsing->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
+ grpc_mdelem_unref(md);
+ } else {
+ add_incoming_metadata(stream_parsing, md);
+ }
+
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+}
+
+static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) {
+ int is_eoh =
+ (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
+ grpc_chttp2_stream_parsing *stream_parsing;
+
+ if (is_eoh) {
+ transport_parsing->expect_continuation_stream_id = 0;
+ } else {
+ transport_parsing->expect_continuation_stream_id = transport_parsing->incoming_stream_id;
+ }
+
+ if (!is_continuation) {
+ transport_parsing->header_eof =
+ (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
+ }
+
+ /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */
+ stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id);
+ if (!stream_parsing) {
+ if (is_continuation) {
+ gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received");
+ return init_skip_frame(transport_parsing, 1);
+ }
+ if (transport_parsing->is_client) {
+ if ((transport_parsing->incoming_stream_id & 1) &&
+ transport_parsing->incoming_stream_id < transport_parsing->next_stream_id) {
+ /* this is an old (probably cancelled) grpc_chttp2_stream */
+ } else {
+ gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client");
+ }
+ return init_skip_frame(transport_parsing, 1);
+ } else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) {
+ gpr_log(GPR_ERROR,
+ "ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream "
+ "id=%d, new grpc_chttp2_stream id=%d",
+ transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id);
+ return init_skip_frame(transport_parsing, 1);
+ } else if ((transport_parsing->incoming_stream_id & 1) == 0) {
+ gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d",
+ transport_parsing->incoming_stream_id);
+ return init_skip_frame(transport_parsing, 1);
+ }
+ stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream(transport_parsing, transport_parsing->incoming_stream_id);
+ if (!stream_parsing) {
+ gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
+ return init_skip_frame(transport_parsing, 1);
+ }
+ } else {
+ transport_parsing->incoming_stream = stream_parsing;
+ }
+ if (stream_parsing->received_close) {
+ gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
+ transport_parsing->incoming_stream = NULL;
+ return init_skip_frame(transport_parsing, 1);
+ }
+ transport_parsing->parser = grpc_chttp2_header_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->hpack_parser;
+ transport_parsing->hpack_parser.on_header = on_header;
+ transport_parsing->hpack_parser.on_header_user_data = transport_parsing;
+ transport_parsing->hpack_parser.is_boundary = is_eoh;
+ transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0;
+ if (!is_continuation &&
+ (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
+ grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser);
+ }
+ return 1;
+}
+
+static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
+ &transport_parsing->simple.window_update,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_window_update_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.window_update;
+ return ok;
+}
+
+static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_ping_parser_begin_frame(&transport_parsing->simple.ping,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_ping_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.ping;
+ return ok;
+}
+
+static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
+ &transport_parsing->simple.rst_stream,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.rst_stream;
+ return ok;
+}
+
+static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok =
+ GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_goaway_parser_begin_frame(
+ &transport_parsing->goaway_parser, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags);
+ transport_parsing->parser = grpc_chttp2_goaway_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->goaway_parser;
+ return ok;
+}
+
+static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+ int ok;
+
+ if (transport_parsing->incoming_stream_id != 0) {
+ gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", transport_parsing->incoming_stream_id);
+ return 0;
+ }
+
+ ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_settings_parser_begin_frame(
+ &transport_parsing->simple.settings, transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags, transport_parsing->settings);
+ if (!ok) {
+ return 0;
+ }
+ if (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
+ transport_parsing->settings_ack_received = 1;
+ } else {
+ transport_parsing->settings_updated = 1;
+ }
+ transport_parsing->parser = grpc_chttp2_settings_parser_parse;
+ transport_parsing->parser_data = &transport_parsing->simple.settings;
+ return ok;
+}
+
+/*
+static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
+ return window + window_update < MAX_WINDOW;
+}
+*/
+
+static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
+ grpc_metadata_batch b;
+
+ b.list.head = NULL;
+ /* Store away the last element of the list, so that in patch_metadata_ops
+ we can reconstitute the list.
+ We can't do list building here as later incoming metadata may reallocate
+ the underlying array. */
+ b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count;
+ b.garbage.head = b.garbage.tail = NULL;
+ b.deadline = stream_parsing->incoming_deadline;
+ stream_parsing->incoming_deadline = gpr_inf_future;
+
+ grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
+}
+
+static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, int is_last) {
+ grpc_chttp2_parse_state st;
+ size_t i;
+ memset(&st, 0, sizeof(st));
+ switch (transport_parsing->parser(transport_parsing->parser_data, &st, slice, is_last)) {
+ case GRPC_CHTTP2_PARSE_OK:
+ if (stream_parsing) {
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+ }
+ if (st.end_of_stream) {
+ transport_parsing->incoming_stream->read_closed = 1;
+ maybe_finish_read(t, transport_parsing->incoming_stream, 1);
+ }
+ if (st.need_flush_reads) {
+ maybe_finish_read(t, transport_parsing->incoming_stream, 1);
+ }
+ if (st.metadata_boundary) {
+ add_metadata_batch(t, transport_parsing->incoming_stream);
+ maybe_finish_read(t, transport_parsing->incoming_stream, 1);
+ }
+ if (st.ack_settings) {
+ gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create());
+ }
+ if (st.send_ping_ack) {
+ gpr_slice_buffer_add(
+ &transport_parsing->qbuf,
+ grpc_chttp2_ping_create(1, transport_parsing->simple.ping.opaque_8bytes));
+ }
+ if (st.goaway) {
+ add_goaway(t, st.goaway_error, st.goaway_text);
+ }
+ if (st.rst_stream) {
+ cancel_stream_id(
+ t, transport_parsing->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
+ st.rst_stream_reason, 0);
+ }
+ if (st.process_ping_reply) {
+ for (i = 0; i < transport_parsing->ping_count; i++) {
+ if (0 ==
+ memcmp(transport_parsing->pings[i].id, transport_parsing->simple.ping.opaque_8bytes, 8)) {
+ transport_parsing->pings[i].cb(transport_parsing->pings[i].user_data);
+ memmove(&transport_parsing->pings[i], &transport_parsing->pings[i + 1],
+ (transport_parsing->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping));
+ transport_parsing->ping_count--;
+ break;
+ }
+ }
+ }
+ if (st.initial_window_update) {
+ for (i = 0; i < transport_parsing->stream_map.count; i++) {
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)(transport_parsing->stream_map.values[i]);
+ s->global.outgoing_window_update += st.initial_window_update;
+ stream_list_join(t, s, NEW_OUTGOING_WINDOW);
+ }
+ }
+ if (st.window_update) {
+ if (transport_parsing->incoming_stream_id) {
+ /* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */
+ grpc_chttp2_stream *s = lookup_stream(t, transport_parsing->incoming_stream_id);
+ if (s) {
+ s->global.outgoing_window_update += st.window_update;
+ stream_list_join(t, s, NEW_OUTGOING_WINDOW);
+ }
+ } else {
+ /* grpc_chttp2_transport level window update */
+ transport_parsing->global.outgoing_window_update += st.window_update;
+ }
+ }
+ return 1;
+ case GRPC_CHTTP2_STREAM_ERROR:
+ become_skip_parser(transport_parsing);
+ cancel_stream_id(
+ t, transport_parsing->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
+ GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ return 1;
+ case GRPC_CHTTP2_CONNECTION_ERROR:
+ drop_connection(transport_parsing);
+ return 0;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ return 0;
+}
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index a1830a8c25..0265f173f3 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -32,93 +32,145 @@
*/
#include "src/core/transport/chttp2/internal.h"
+#include "src/core/transport/chttp2/http2_errors.h"
#include <grpc/support/log.h>
-static void grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport *t) {
- grpc_chttp2_stream *s;
- gpr_uint32 window_delta;
+static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
- /* don't do anything if we are already writing */
- if (t->writing.executing) {
- return;
- }
+int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
+ grpc_chttp2_stream_global *stream_global;
+ grpc_chttp2_stream_writing *stream_writing;
+ gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
- gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf);
- GPR_ASSERT(t->global.qbuf.count == 0);
+ gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
+ GPR_ASSERT(transport_global->qbuf.count == 0);
- if (t->dirtied_local_settings && !t->sent_local_settings) {
+ if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) {
gpr_slice_buffer_add(
- &t->writing.outbuf, grpc_chttp2_settings_create(
- t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
- t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
- t->force_send_settings = 0;
- t->dirtied_local_settings = 0;
- t->sent_local_settings = 1;
+ &transport_writing->outbuf, grpc_chttp2_settings_create(
+ transport_global->settings[SENT_SETTINGS], transport_global->settings[LOCAL_SETTINGS],
+ transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
+ transport_global->force_send_settings = 0;
+ transport_global->dirtied_local_settings = 0;
+ transport_global->sent_local_settings = 1;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data (according to
available window sizes) and add to the output buffer */
- while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
- s->outgoing_window > 0) {
+ while (transport_global->outgoing_window &&
+ grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, &stream_writing) &&
+ stream_global->outgoing_window > 0) {
+ stream_writing->id = stream_global->id;
window_delta = grpc_chttp2_preencode(
- s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
- GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb);
- FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
- FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
- t->outgoing_window -= window_delta;
- s->outgoing_window -= window_delta;
-
- if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
- s->outgoing_sopb->nops == 0) {
- if (!t->is_client && !s->read_closed) {
- s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM;
+ stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
+ GPR_MIN(transport_global->outgoing_window, stream_global->outgoing_window),
+ &stream_writing->sopb);
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
+ transport_global->outgoing_window -= window_delta;
+ stream_global->outgoing_window -= window_delta;
+
+ if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE &&
+ stream_global->outgoing_sopb->nops == 0) {
+ if (!transport_constants->is_client && !stream_global->read_closed) {
+ stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
- s->writing.send_closed = SEND_CLOSED;
+ stream_writing->send_closed = SEND_CLOSED;
}
}
- if (s->writing.sopb.nops > 0 || s->writing.send_closed) {
- stream_list_join(t, s, WRITING);
+ if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != DONT_SEND_CLOSED) {
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
/* we should either exhaust window or have no ops left, but not both */
- if (s->outgoing_sopb->nops == 0) {
- s->outgoing_sopb = NULL;
- schedule_cb(t, s->global.send_done_closure, 1);
- } else if (s->outgoing_window) {
- stream_list_add_tail(t, s, WRITABLE);
+ if (stream_global->outgoing_sopb->nops == 0) {
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1);
+ } else if (stream_global->outgoing_window) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
- if (!t->parsing.executing) {
- /* for each grpc_chttp2_stream that wants to update its window, add that window here */
- while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
- window_delta =
- t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- s->incoming_window;
- if (!s->read_closed && window_delta) {
- gpr_slice_buffer_add(
- &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
- FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
- s->incoming_window += window_delta;
- }
+ /* for each grpc_chttp2_stream that wants to update its window, add that window here */
+ while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, &stream_global)) {
+ window_delta =
+ transport_global->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
+ stream_global->incoming_window;
+ if (!stream_global->read_closed && window_delta > 0) {
+ gpr_slice_buffer_add(
+ &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_global->id, window_delta));
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->id, window_delta);
+ stream_global->incoming_window += window_delta;
}
+ }
+
+ /* if the grpc_chttp2_transport is ready to send a window update, do so here also */
+ if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4) {
+ window_delta = transport_global->connection_window_target - transport_global->incoming_window;
+ gpr_slice_buffer_add(&transport_writing->outbuf,
+ grpc_chttp2_window_update_create(0, window_delta));
+ GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, window_delta);
+ transport_global->incoming_window += window_delta;
+ }
+
+ return transport_writing->outbuf.length > 0 || grpc_chttp2_list_have_writing_streams(transport_writing);
+}
+
+void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) {
+ finalize_outbuf(transport_writing);
- /* if the grpc_chttp2_transport is ready to send a window update, do so here also */
- if (t->incoming_window < t->connection_window_target * 3 / 4) {
- window_delta = t->connection_window_target - t->incoming_window;
- gpr_slice_buffer_add(&t->writing.outbuf,
- grpc_chttp2_window_update_create(0, window_delta));
- FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
- t->incoming_window += window_delta;
+ GPR_ASSERT(transport_writing->outbuf.count > 0);
+
+ switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, transport_writing->outbuf.count,
+ finish_write_cb, transport_writing)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
+ grpc_chttp2_terminate_writing(transport_writing, 1);
+ break;
+ case GRPC_ENDPOINT_WRITE_ERROR:
+ grpc_chttp2_terminate_writing(transport_writing, 0);
+ break;
+ case GRPC_ENDPOINT_WRITE_PENDING:
+ break;
+ }
+}
+
+static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
+ grpc_chttp2_stream_writing *stream_writing;
+
+ while (grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
+ grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
+ stream_writing->send_closed != DONT_SEND_CLOSED, stream_writing->id,
+ &transport_writing->hpack_compressor, &transport_writing->outbuf);
+ stream_writing->sopb.nops = 0;
+ if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
+ gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create(
+ stream_writing->id, GRPC_CHTTP2_NO_ERROR));
}
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
+}
- if (t->writing.outbuf.length > 0 || !stream_list_empty(t, WRITING)) {
- t->writing.executing = 1;
- ref_transport(t);
- gpr_log(GPR_DEBUG, "schedule write");
- schedule_cb(t, &t->writing.action, 1);
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
+ grpc_chttp2_transport_writing *transport_writing = tw;
+ grpc_chttp2_terminate_writing(transport_writing, write_status == GRPC_ENDPOINT_CB_OK);
+}
+
+void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
+ grpc_chttp2_stream_writing *stream_writing;
+ grpc_chttp2_stream_global *stream_global;
+
+ while (grpc_chttp2_list_pop_written_stream(transport_global, transport_writing, &stream_global, &stream_writing)) {
+ if (stream_writing->send_closed != DONT_SEND_CLOSED) {
+ stream_global->write_state = WRITE_STATE_SENT_CLOSE;
+ if (!transport_constants->is_client) {
+ stream_global->read_closed = 1;
+ }
+ grpc_chttp2_read_write_state_changed(transport_global, stream_global);
+ }
}
+ transport_writing->outbuf.count = 0;
+ transport_writing->outbuf.length = 0;
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1cfbc07d97..13ddeacc02 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -55,24 +55,17 @@
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
-#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
-#define CLIENT_CONNECT_STRLEN 24
-
int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
-#define IF_TRACING(stmt) \
- if (!(grpc_http_trace)) \
- ; \
- else \
- stmt
-
#define FLOWCTL_TRACE(t, obj, dir, id, delta) \
if (!grpc_flowctl_trace) \
; \
else \
flowctl_trace(t, #dir, obj->dir##_window, id, delta)
+#define TRANSPORT_FROM_WRITING(tw) ((grpc_chttp2_transport*)((char*)(tw) - offsetof(grpc_chttp2_transport, writing)))
+
static const grpc_transport_vtable vtable;
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
@@ -211,10 +204,10 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
t->reading = 1;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
- t->is_client = is_client;
- t->outgoing_window = DEFAULT_WINDOW;
- t->incoming_window = DEFAULT_WINDOW;
- t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
+ t->constants.is_client = is_client;
+ t->global.outgoing_window = DEFAULT_WINDOW;
+ t->global.incoming_window = DEFAULT_WINDOW;
+ t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
t->ping_counter = gpr_now().tv_nsec;
@@ -222,7 +215,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
- grpc_iomgr_closure_init(&t->writing.action, writing_action, t);
+ grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@@ -244,17 +237,17 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
- t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
+ t->global.settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
}
}
- t->dirtied_local_settings = 1;
+ t->global.dirtied_local_settings = 1;
/* Hack: it's common for implementations to assume 65536 bytes initial send
window -- this should by rights be 0 */
- t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
- t->sent_local_settings = 0;
+ t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+ t->global.sent_local_settings = 0;
/* configure http2 the way we like it */
- if (t->is_client) {
+ if (t->constants.is_client) {
push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
@@ -264,7 +257,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
for (i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
- if (t->is_client) {
+ if (t->constants.is_client) {
gpr_log(GPR_ERROR, "%s: is ignored on the client",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
} else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
@@ -283,7 +276,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
(channel_args->args[i].value.integer & 1)) {
gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
- t->is_client ? "client" : "server");
+ t->constants.is_client ? "client" : "server");
} else {
t->next_stream_id = channel_args->args[i].value.integer;
}
@@ -322,7 +315,7 @@ static void destroy_transport(grpc_transport *gt) {
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
- while (t->channel_callback.executing || t->writing.executing) {
+ while (t->channel_callback.executing || t->writing_active) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
drop_connection(t);
@@ -378,18 +371,18 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
lock(t);
if (!server_data) {
- s->id = 0;
- s->outgoing_window = 0;
- s->incoming_window = 0;
+ s->global.id = 0;
+ s->global.outgoing_window = 0;
+ s->global.incoming_window = 0;
} else {
/* already locked */
- s->id = (gpr_uint32)(gpr_uintptr)server_data;
- s->outgoing_window =
- t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window =
- t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
+ s->global.outgoing_window =
+ t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ s->global.incoming_window =
+ t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->incoming_stream = s;
- grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
}
s->incoming_deadline = gpr_inf_future;
@@ -416,7 +409,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_lock(&t->mu);
- GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->id == 0);
+ GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->global.id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
@@ -424,7 +417,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
- GPR_ASSERT(s->outgoing_sopb == NULL);
+ GPR_ASSERT(s->global.outgoing_sopb == NULL);
GPR_ASSERT(s->incoming_sopb == NULL);
grpc_sopb_destroy(&s->writing.sopb);
grpc_sopb_destroy(&s->callback_sopb);
@@ -503,10 +496,10 @@ static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gr
}
static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
- if (s->id == 0) return;
+ if (s->global.id == 0) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing grpc_chttp2_stream %d",
- t->is_client ? "CLI" : "SVR", s->id));
- if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
+ t->constants.is_client ? "CLI" : "SVR", s->global.id));
+ if (grpc_chttp2_stream_map_delete(&t->stream_map, s->global.id)) {
maybe_start_some_streams(t);
}
}
@@ -525,7 +518,11 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
- grpc_chttp2_unlocking_check_writes(t);
+ if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->constants, &t->global, &t->writing)) {
+ t->writing_active = 1;
+ ref_transport(t);
+ schedule_cb(t, &t->writing_action, 1);
+ }
unlock_check_cancellations(t);
unlock_check_parser(t);
unlock_check_channel_callbacks(t);
@@ -555,49 +552,27 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
value, use_value);
}
- if (use_value != t->settings[LOCAL_SETTINGS][id]) {
- t->settings[LOCAL_SETTINGS][id] = use_value;
- t->dirtied_local_settings = 1;
- }
-}
-
-static void writing_finalize_outbuf(grpc_chttp2_transport *t) {
- grpc_chttp2_stream *s;
-
- while ((s = stream_list_remove_head(t, WRITING))) {
- grpc_chttp2_encode(s->writing.sopb.ops, s->writing.sopb.nops,
- s->writing.send_closed != DONT_SEND_CLOSED, s->id,
- &t->writing.hpack_compressor, &t->writing.outbuf);
- s->writing.sopb.nops = 0;
- if (s->writing.send_closed == SEND_CLOSED_WITH_RST_STREAM) {
- gpr_slice_buffer_add(&t->writing.outbuf, grpc_chttp2_rst_stream_create(
- s->id, GRPC_CHTTP2_NO_ERROR));
- }
- if (s->writing.send_closed != DONT_SEND_CLOSED) {
- stream_list_join(t, s, WRITTEN_CLOSED);
- }
+ if (use_value != t->global.settings[LOCAL_SETTINGS][id]) {
+ t->global.settings[LOCAL_SETTINGS][id] = use_value;
+ t->global.dirtied_local_settings = 1;
}
}
-static void writing_finish(grpc_chttp2_transport *t, int success) {
- grpc_chttp2_stream *s;
+void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success) {
+ grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
+
if (!success) {
drop_connection(t);
}
- while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
- s->write_state = WRITE_STATE_SENT_CLOSE;
- if (!t->is_client) {
- s->read_closed = 1;
- }
- maybe_finish_read(t, s, 0);
- }
- t->writing.outbuf.count = 0;
- t->writing.outbuf.length = 0;
+
+ /* cleanup writing related jazz */
+ grpc_chttp2_cleanup_writing(&t->constants, &t->global, &t->writing);
+
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
- t->writing.executing = 0;
+ t->writing_active = 0;
if (t->destroying) {
gpr_cv_signal(&t->cv);
}
@@ -606,36 +581,16 @@ static void writing_finish(grpc_chttp2_transport *t, int success) {
t->ep = NULL;
unref_transport(t); /* safe because we'll still have the ref for write */
}
+
unlock(t);
unref_transport(t);
}
-static void writing_finish_write_cb(void *tp, grpc_endpoint_cb_status error) {
- grpc_chttp2_transport *t = tp;
- writing_finish(t, error == GRPC_ENDPOINT_CB_OK);
-}
static void writing_action(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
-
- gpr_log(GPR_DEBUG, "writing_action");
-
- writing_finalize_outbuf(t);
-
- GPR_ASSERT(t->writing.outbuf.count > 0);
-
- switch (grpc_endpoint_write(t->ep, t->writing.outbuf.slices, t->writing.outbuf.count,
- writing_finish_write_cb, t)) {
- case GRPC_ENDPOINT_WRITE_DONE:
- writing_finish(t, 1);
- break;
- case GRPC_ENDPOINT_WRITE_ERROR:
- writing_finish(t, 0);
- break;
- case GRPC_ENDPOINT_WRITE_PENDING:
- break;
- }
+ grpc_chttp2_perform_writes(&t->writing, t->ep);
}
static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error,
@@ -655,13 +610,13 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) {
/* start streams where we have free grpc_chttp2_stream ids and free concurrency */
while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) <
- t->settings[PEER_SETTINGS]
+ t->global.settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
- t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
+ t->constants.is_client ? "CLI" : "SVR", s, t->next_stream_id));
if (t->next_stream_id == MAX_CLIENT_STREAM_ID) {
add_goaway(
@@ -669,14 +624,14 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) {
gpr_slice_from_copied_string("Exceeded sequence number limit"));
}
- GPR_ASSERT(s->id == 0);
- s->id = t->next_stream_id;
+ GPR_ASSERT(s->global.id == 0);
+ s->global.id = t->next_stream_id;
t->next_stream_id += 2;
- s->outgoing_window =
- t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window =
- t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ s->global.outgoing_window =
+ t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ s->global.incoming_window =
+ t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
stream_list_join(t, s, WRITABLE);
}
/* cancel out streams that will never be started */
@@ -700,20 +655,20 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g
}
if (op->send_ops) {
- GPR_ASSERT(s->outgoing_sopb == NULL);
+ GPR_ASSERT(s->global.outgoing_sopb == NULL);
s->global.send_done_closure = op->on_done_send;
if (!s->cancelled) {
- s->outgoing_sopb = op->send_ops;
- if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
- s->write_state = WRITE_STATE_QUEUED_CLOSE;
+ s->global.outgoing_sopb = op->send_ops;
+ if (op->is_last_send && s->global.write_state == WRITE_STATE_OPEN) {
+ s->global.write_state = WRITE_STATE_QUEUED_CLOSE;
}
- if (s->id == 0) {
+ if (s->global.id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
- t->is_client ? "CLI" : "SVR", s));
+ t->constants.is_client ? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
- } else if (s->outgoing_window > 0) {
+ } else if (s->global.outgoing_window > 0) {
stream_list_join(t, s, WRITABLE);
}
} else {
@@ -787,26 +742,15 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
static void unlock_check_cancellations(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
- if (t->writing.executing) {
+ if (t->writing_active) {
return;
}
while ((s = stream_list_remove_head(t, CANCELLED))) {
- s->read_closed = 1;
- s->write_state = WRITE_STATE_SENT_CLOSE;
- maybe_finish_read(t, s, 0);
- }
-}
-
-static void add_incoming_metadata(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_mdelem *elem) {
- if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
- s->incoming_metadata_capacity =
- GPR_MAX(8, 2 * s->incoming_metadata_capacity);
- s->incoming_metadata =
- gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) *
- s->incoming_metadata_capacity);
+ s->global.read_closed = 1;
+ s->global.write_state = WRITE_STATE_SENT_CLOSE;
+ grpc_chttp2_read_write_state_changed(&t->global, &s->global);
}
- s->incoming_metadata[s->incoming_metadata_count++].md = elem;
}
static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gpr_uint32 id,
@@ -819,12 +763,12 @@ static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
- had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
+ had_outgoing = s->global.outgoing_sopb && s->global.outgoing_sopb->nops != 0;
if (error_code != GRPC_CHTTP2_NO_ERROR) {
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- if (s->outgoing_sopb) {
- schedule_nuke_sopb(t, s->outgoing_sopb);
- s->outgoing_sopb = NULL;
+ if (s->global.outgoing_sopb) {
+ schedule_nuke_sopb(t, s->global.outgoing_sopb);
+ s->global.outgoing_sopb = NULL;
stream_list_remove(t, s, WRITABLE);
schedule_cb(t, s->global.send_done_closure, 0);
}
@@ -888,7 +832,7 @@ static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst) {
- cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
+ cancel_stream_inner(t, s, s->global.id, local_status, error_code, optional_message,
send_rst, 0);
}
@@ -923,608 +867,18 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre
return;
}
if (s->incoming_sopb != NULL &&
- s->incoming_window <
- t->settings[LOCAL_SETTINGS]
+ s->global.incoming_window <
+ t->global.settings[LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
3 / 4) {
stream_list_join(t, s, WINDOW_UPDATE);
}
}
-static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
- if (t->incoming_frame_size > t->incoming_window) {
- gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
- t->incoming_frame_size, t->incoming_window);
- return GRPC_CHTTP2_CONNECTION_ERROR;
- }
-
- if (t->incoming_frame_size > s->incoming_window) {
- gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
- t->incoming_frame_size, s->incoming_window);
- return GRPC_CHTTP2_CONNECTION_ERROR;
- }
-
- FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size);
- FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
- t->incoming_window -= t->incoming_frame_size;
- s->incoming_window -= t->incoming_frame_size;
-
- /* if the grpc_chttp2_stream incoming window is getting low, schedule an update */
- stream_list_join(t, s, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
-
- return GRPC_CHTTP2_PARSE_OK;
-}
-
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
return grpc_chttp2_stream_map_find(&t->stream_map, id);
}
-static grpc_chttp2_parse_error skip_parser(void *parser,
- grpc_chttp2_parse_state *st,
- gpr_slice slice, int is_last) {
- return GRPC_CHTTP2_PARSE_OK;
-}
-
-static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
-
-static int parsing_init_skip_frame(grpc_chttp2_transport *t, int is_header) {
- if (is_header) {
- int is_eoh = t->expect_continuation_stream_id != 0;
- t->parser = grpc_chttp2_header_parser_parse;
- t->parser_data = &t->parsing.hpack_parser;
- t->parsing.hpack_parser.on_header = skip_header;
- t->parsing.hpack_parser.on_header_user_data = NULL;
- t->parsing.hpack_parser.is_boundary = is_eoh;
- t->parsing.hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
- } else {
- t->parser = skip_parser;
- }
- return 1;
-}
-
-static void parsing_become_skip_parser(grpc_chttp2_transport *t) {
- parsing_init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
-}
-
-static int parsing_init_data_frame_parser(grpc_chttp2_transport *t) {
- grpc_chttp2_stream *s = lookup_stream(t, t->incoming_stream_id);
- grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
- if (!s || s->read_closed) return parsing_init_skip_frame(t, 0);
- if (err == GRPC_CHTTP2_PARSE_OK) {
- err = update_incoming_window(t, s);
- }
- if (err == GRPC_CHTTP2_PARSE_OK) {
- err = grpc_chttp2_data_parser_begin_frame(&s->parser,
- t->incoming_frame_flags);
- }
- switch (err) {
- case GRPC_CHTTP2_PARSE_OK:
- t->incoming_stream = s;
- t->parser = grpc_chttp2_data_parser_parse;
- t->parser_data = &s->parser;
- return 1;
- case GRPC_CHTTP2_STREAM_ERROR:
- cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
- GRPC_CHTTP2_INTERNAL_ERROR),
- GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
- return parsing_init_skip_frame(t, 0);
- case GRPC_CHTTP2_CONNECTION_ERROR:
- drop_connection(t);
- return 0;
- }
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- return 0;
-}
-
-static void free_timeout(void *p) { gpr_free(p); }
-
-static void parsing_on_header(void *tp, grpc_mdelem *md) {
- grpc_chttp2_transport *t = tp;
- grpc_chttp2_stream *s = t->incoming_stream;
-
- GPR_ASSERT(s);
-
- IF_TRACING(gpr_log(
- GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
- grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
-
- if (md->key == t->constants.str_grpc_timeout) {
- gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
- if (!cached_timeout) {
- /* not already parsed: parse it now, and store the result away */
- cached_timeout = gpr_malloc(sizeof(gpr_timespec));
- if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
- cached_timeout)) {
- gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
- grpc_mdstr_as_c_string(md->value));
- *cached_timeout = gpr_inf_future;
- }
- grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
- }
- s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
- grpc_mdelem_unref(md);
- } else {
- add_incoming_metadata(t, s, md);
- }
- maybe_finish_read(t, s, 1);
-}
-
-static int parsing_init_header_frame_parser(grpc_chttp2_transport *t, int is_continuation) {
- int is_eoh =
- (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
- grpc_chttp2_stream *s;
-
- if (is_eoh) {
- t->expect_continuation_stream_id = 0;
- } else {
- t->expect_continuation_stream_id = t->incoming_stream_id;
- }
-
- if (!is_continuation) {
- t->header_eof =
- (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
- }
-
- /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */
- s = lookup_stream(t, t->incoming_stream_id);
- if (!s) {
- if (is_continuation) {
- gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received");
- return parsing_init_skip_frame(t, 1);
- }
- if (t->is_client) {
- if ((t->incoming_stream_id & 1) &&
- t->incoming_stream_id < t->next_stream_id) {
- /* this is an old (probably cancelled) grpc_chttp2_stream */
- } else {
- gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client");
- }
- return parsing_init_skip_frame(t, 1);
- } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
- gpr_log(GPR_ERROR,
- "ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream "
- "id=%d, new grpc_chttp2_stream id=%d",
- t->last_incoming_stream_id, t->incoming_stream_id);
- return parsing_init_skip_frame(t, 1);
- } else if ((t->incoming_stream_id & 1) == 0) {
- gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d",
- t->incoming_stream_id);
- return parsing_init_skip_frame(t, 1);
- }
- t->incoming_stream = NULL;
- /* if grpc_chttp2_stream is accepted, we set incoming_stream in init_stream */
- t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base,
- (void *)(gpr_uintptr)t->incoming_stream_id);
- s = t->incoming_stream;
- if (!s) {
- gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
- return parsing_init_skip_frame(t, 1);
- }
- } else {
- t->incoming_stream = s;
- }
- if (t->incoming_stream->read_closed) {
- gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
- t->incoming_stream = NULL;
- return parsing_init_skip_frame(t, 1);
- }
- t->parser = grpc_chttp2_header_parser_parse;
- t->parser_data = &t->parsing.hpack_parser;
- t->parsing.hpack_parser.on_header = parsing_on_header;
- t->parsing.hpack_parser.on_header_user_data = t;
- t->parsing.hpack_parser.is_boundary = is_eoh;
- t->parsing.hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
- if (!is_continuation &&
- (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
- grpc_chttp2_hpack_parser_set_has_priority(&t->parsing.hpack_parser);
- }
- return 1;
-}
-
-static int parsing_init_window_update_frame_parser(grpc_chttp2_transport *t) {
- int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
- &t->parsing.simple.window_update,
- t->incoming_frame_size,
- t->incoming_frame_flags);
- if (!ok) {
- drop_connection(t);
- }
- t->parser = grpc_chttp2_window_update_parser_parse;
- t->parser_data = &t->parsing.simple.window_update;
- return ok;
-}
-
-static int parsing_init_ping_parser(grpc_chttp2_transport *t) {
- int ok = GRPC_CHTTP2_PARSE_OK ==
- grpc_chttp2_ping_parser_begin_frame(&t->parsing.simple.ping,
- t->incoming_frame_size,
- t->incoming_frame_flags);
- if (!ok) {
- drop_connection(t);
- }
- t->parser = grpc_chttp2_ping_parser_parse;
- t->parser_data = &t->parsing.simple.ping;
- return ok;
-}
-
-static int parsing_init_rst_stream_parser(grpc_chttp2_transport *t) {
- int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
- &t->parsing.simple.rst_stream,
- t->incoming_frame_size,
- t->incoming_frame_flags);
- if (!ok) {
- drop_connection(t);
- }
- t->parser = grpc_chttp2_rst_stream_parser_parse;
- t->parser_data = &t->parsing.simple.rst_stream;
- return ok;
-}
-
-static int parsing_init_goaway_parser(grpc_chttp2_transport *t) {
- int ok =
- GRPC_CHTTP2_PARSE_OK ==
- grpc_chttp2_goaway_parser_begin_frame(
- &t->parsing.goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
- if (!ok) {
- drop_connection(t);
- }
- t->parser = grpc_chttp2_goaway_parser_parse;
- t->parser_data = &t->parsing.goaway_parser;
- return ok;
-}
-
-static int parsing_init_settings_frame_parser(grpc_chttp2_transport *t) {
- int ok;
-
- if (t->incoming_stream_id != 0) {
- gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", t->incoming_stream_id);
- drop_connection(t);
- return 0;
- }
-
- ok = GRPC_CHTTP2_PARSE_OK ==
- grpc_chttp2_settings_parser_begin_frame(
- &t->parsing.simple.settings, t->incoming_frame_size,
- t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
- if (!ok) {
- drop_connection(t);
- return 0;
- }
- if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
- memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
- GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
- }
- t->parser = grpc_chttp2_settings_parser_parse;
- t->parser_data = &t->parsing.simple.settings;
- return ok;
-}
-
-static int init_frame_parser(grpc_chttp2_transport *t) {
- if (t->expect_continuation_stream_id != 0) {
- if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
- gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
- t->incoming_frame_type);
- return 0;
- }
- if (t->expect_continuation_stream_id != t->incoming_stream_id) {
- gpr_log(GPR_ERROR,
- "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x",
- t->expect_continuation_stream_id, t->incoming_stream_id);
- return 0;
- }
- return parsing_init_header_frame_parser(t, 1);
- }
- switch (t->incoming_frame_type) {
- case GRPC_CHTTP2_FRAME_DATA:
- return parsing_init_data_frame_parser(t);
- case GRPC_CHTTP2_FRAME_HEADER:
- return parsing_init_header_frame_parser(t, 0);
- case GRPC_CHTTP2_FRAME_CONTINUATION:
- gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
- return 0;
- case GRPC_CHTTP2_FRAME_RST_STREAM:
- return parsing_init_rst_stream_parser(t);
- case GRPC_CHTTP2_FRAME_SETTINGS:
- return parsing_init_settings_frame_parser(t);
- case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
- return parsing_init_window_update_frame_parser(t);
- case GRPC_CHTTP2_FRAME_PING:
- return parsing_init_ping_parser(t);
- case GRPC_CHTTP2_FRAME_GOAWAY:
- return parsing_init_goaway_parser(t);
- default:
- gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
- return parsing_init_skip_frame(t, 0);
- }
-}
-
-/*
-static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
- return window + window_update < MAX_WINDOW;
-}
-*/
-
-static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
- grpc_metadata_batch b;
-
- b.list.head = NULL;
- /* Store away the last element of the list, so that in patch_metadata_ops
- we can reconstitute the list.
- We can't do list building here as later incoming metadata may reallocate
- the underlying array. */
- b.list.tail = (void *)(gpr_intptr)s->incoming_metadata_count;
- b.garbage.head = b.garbage.tail = NULL;
- b.deadline = s->incoming_deadline;
- s->incoming_deadline = gpr_inf_future;
-
- grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
-}
-
-static int parse_frame_slice(grpc_chttp2_transport *t, gpr_slice slice, int is_last) {
- grpc_chttp2_parse_state st;
- size_t i;
- memset(&st, 0, sizeof(st));
- switch (t->parser(t->parser_data, &st, slice, is_last)) {
- case GRPC_CHTTP2_PARSE_OK:
- if (st.end_of_stream) {
- t->incoming_stream->read_closed = 1;
- maybe_finish_read(t, t->incoming_stream, 1);
- }
- if (st.need_flush_reads) {
- maybe_finish_read(t, t->incoming_stream, 1);
- }
- if (st.metadata_boundary) {
- add_metadata_batch(t, t->incoming_stream);
- maybe_finish_read(t, t->incoming_stream, 1);
- }
- if (st.ack_settings) {
- gpr_slice_buffer_add(&t->parsing.qbuf, grpc_chttp2_settings_ack_create());
- }
- if (st.send_ping_ack) {
- gpr_slice_buffer_add(
- &t->parsing.qbuf,
- grpc_chttp2_ping_create(1, t->parsing.simple.ping.opaque_8bytes));
- }
- if (st.goaway) {
- add_goaway(t, st.goaway_error, st.goaway_text);
- }
- if (st.rst_stream) {
- cancel_stream_id(
- t, t->incoming_stream_id,
- grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
- st.rst_stream_reason, 0);
- }
- if (st.process_ping_reply) {
- for (i = 0; i < t->ping_count; i++) {
- if (0 ==
- memcmp(t->pings[i].id, t->parsing.simple.ping.opaque_8bytes, 8)) {
- t->pings[i].cb(t->pings[i].user_data);
- memmove(&t->pings[i], &t->pings[i + 1],
- (t->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping));
- t->ping_count--;
- break;
- }
- }
- }
- if (st.initial_window_update) {
- for (i = 0; i < t->stream_map.count; i++) {
- grpc_chttp2_stream *s = (grpc_chttp2_stream *)(t->stream_map.values[i]);
- s->outgoing_window_update += st.initial_window_update;
- stream_list_join(t, s, NEW_OUTGOING_WINDOW);
- }
- }
- if (st.window_update) {
- if (t->incoming_stream_id) {
- /* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */
- grpc_chttp2_stream *s = lookup_stream(t, t->incoming_stream_id);
- if (s) {
- s->outgoing_window_update += st.window_update;
- stream_list_join(t, s, NEW_OUTGOING_WINDOW);
- }
- } else {
- /* grpc_chttp2_transport level window update */
- t->outgoing_window_update += st.window_update;
- }
- }
- return 1;
- case GRPC_CHTTP2_STREAM_ERROR:
- parsing_become_skip_parser(t);
- cancel_stream_id(
- t, t->incoming_stream_id,
- grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
- GRPC_CHTTP2_INTERNAL_ERROR, 1);
- return 1;
- case GRPC_CHTTP2_CONNECTION_ERROR:
- drop_connection(t);
- return 0;
- }
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- return 0;
-}
-
-static int process_read(grpc_chttp2_transport *t, gpr_slice slice) {
- gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
- gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
- gpr_uint8 *cur = beg;
-
- if (cur == end) return 1;
-
- switch (t->deframe_state) {
- case DTS_CLIENT_PREFIX_0:
- case DTS_CLIENT_PREFIX_1:
- case DTS_CLIENT_PREFIX_2:
- case DTS_CLIENT_PREFIX_3:
- case DTS_CLIENT_PREFIX_4:
- case DTS_CLIENT_PREFIX_5:
- case DTS_CLIENT_PREFIX_6:
- case DTS_CLIENT_PREFIX_7:
- case DTS_CLIENT_PREFIX_8:
- case DTS_CLIENT_PREFIX_9:
- case DTS_CLIENT_PREFIX_10:
- case DTS_CLIENT_PREFIX_11:
- case DTS_CLIENT_PREFIX_12:
- case DTS_CLIENT_PREFIX_13:
- case DTS_CLIENT_PREFIX_14:
- case DTS_CLIENT_PREFIX_15:
- case DTS_CLIENT_PREFIX_16:
- case DTS_CLIENT_PREFIX_17:
- case DTS_CLIENT_PREFIX_18:
- case DTS_CLIENT_PREFIX_19:
- case DTS_CLIENT_PREFIX_20:
- case DTS_CLIENT_PREFIX_21:
- case DTS_CLIENT_PREFIX_22:
- case DTS_CLIENT_PREFIX_23:
- while (cur != end && t->deframe_state != DTS_FH_0) {
- if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
- gpr_log(GPR_ERROR,
- "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
- "at byte %d",
- CLIENT_CONNECT_STRING[t->deframe_state],
- (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
- (int)*cur, t->deframe_state);
- drop_connection(t);
- return 0;
- }
- ++cur;
- ++t->deframe_state;
- }
- if (cur == end) {
- return 1;
- }
- /* fallthrough */
- dts_fh_0:
- case DTS_FH_0:
- GPR_ASSERT(cur < end);
- t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
- if (++cur == end) {
- t->deframe_state = DTS_FH_1;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_1:
- GPR_ASSERT(cur < end);
- t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
- if (++cur == end) {
- t->deframe_state = DTS_FH_2;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_2:
- GPR_ASSERT(cur < end);
- t->incoming_frame_size |= *cur;
- if (++cur == end) {
- t->deframe_state = DTS_FH_3;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_3:
- GPR_ASSERT(cur < end);
- t->incoming_frame_type = *cur;
- if (++cur == end) {
- t->deframe_state = DTS_FH_4;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_4:
- GPR_ASSERT(cur < end);
- t->incoming_frame_flags = *cur;
- if (++cur == end) {
- t->deframe_state = DTS_FH_5;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_5:
- GPR_ASSERT(cur < end);
- t->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24;
- if (++cur == end) {
- t->deframe_state = DTS_FH_6;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_6:
- GPR_ASSERT(cur < end);
- t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
- if (++cur == end) {
- t->deframe_state = DTS_FH_7;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_7:
- GPR_ASSERT(cur < end);
- t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
- if (++cur == end) {
- t->deframe_state = DTS_FH_8;
- return 1;
- }
- /* fallthrough */
- case DTS_FH_8:
- GPR_ASSERT(cur < end);
- t->incoming_stream_id |= ((gpr_uint32)*cur);
- t->deframe_state = DTS_FRAME;
- if (!init_frame_parser(t)) {
- return 0;
- }
- /* t->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when
- sending GOAWAY frame.
- https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
- says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So,
- since we don't have server pushed streams, client should send
- GOAWAY last-grpc_chttp2_stream-id=0 in this case. */
- if (!t->is_client) {
- t->last_incoming_stream_id = t->incoming_stream_id;
- }
- if (t->incoming_frame_size == 0) {
- if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
- return 0;
- }
- if (++cur == end) {
- t->deframe_state = DTS_FH_0;
- return 1;
- }
- goto dts_fh_0; /* loop */
- }
- if (++cur == end) {
- return 1;
- }
- /* fallthrough */
- case DTS_FRAME:
- GPR_ASSERT(cur < end);
- if ((gpr_uint32)(end - cur) == t->incoming_frame_size) {
- if (!parse_frame_slice(
- t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
- return 0;
- }
- t->deframe_state = DTS_FH_0;
- return 1;
- } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) {
- if (!parse_frame_slice(
- t, gpr_slice_sub_no_ref(slice, cur - beg,
- cur + t->incoming_frame_size - beg),
- 1)) {
- return 0;
- }
- cur += t->incoming_frame_size;
- goto dts_fh_0; /* loop */
- } else {
- if (!parse_frame_slice(
- t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
- return 0;
- }
- t->incoming_frame_size -= (end - cur);
- return 1;
- }
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- }
-
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
-
- return 0;
-}
-
/* tcp read callback */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
@@ -1554,9 +908,12 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
if (t->error_state == ERROR_STATE_NONE) {
t->parsing.executing = 1;
gpr_mu_unlock(&t->mu);
- for (i = 0; i < nslices && process_read(t, slices[i]); i++)
+ for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++)
;
gpr_mu_lock(&t->mu);
+ if (i != nslices) {
+ drop_connection(t);
+ }
t->parsing.executing = 0;
}
while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
@@ -1569,19 +926,19 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
maybe_join_window_updates(t, s);
}
while ((s = stream_list_remove_head(t, NEW_OUTGOING_WINDOW))) {
- int was_window_empty = s->outgoing_window <= 0;
- FLOWCTL_TRACE(t, s, outgoing, s->id, s->outgoing_window_update);
- s->outgoing_window += s->outgoing_window_update;
- s->outgoing_window_update = 0;
+ int was_window_empty = s->global.outgoing_window <= 0;
+ FLOWCTL_TRACE(t, s, outgoing, s->global.id, s->global.outgoing_window_update);
+ s->global.outgoing_window += s->global.outgoing_window_update;
+ s->global.outgoing_window_update = 0;
/* if this window update makes outgoing ops writable again,
flag that */
- if (was_window_empty && s->outgoing_sopb &&
- s->outgoing_sopb->nops > 0) {
+ if (was_window_empty && s->global.outgoing_sopb &&
+ s->global.outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
- t->outgoing_window += t->outgoing_window_update;
- t->outgoing_window_update = 0;
+ t->global.outgoing_window += t->global.outgoing_window_update;
+ t->global.outgoing_window_update = 0;
maybe_start_some_streams(t);
unlock(t);
keep_reading = 1;