aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-15 06:58:50 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-15 06:58:50 -0700
commit606d874d162a9a254035839701bf6926f681c77b (patch)
tree93b216007d5ba29bc88b948f1b61c4e179603638 /src/core/transport/chttp2
parent3719f07233ed7b5b1371b811311af434c0128e03 (diff)
Progress on splitting reading from transport lock
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r--src/core/transport/chttp2/internal.h132
-rw-r--r--src/core/transport/chttp2/parsing.c137
-rw-r--r--src/core/transport/chttp2/stream_map.h3
3 files changed, 234 insertions, 38 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index f89d327f8c..ffadd04762 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -143,9 +143,9 @@ typedef struct {
} grpc_chttp2_stream_link;
typedef enum {
- ERROR_STATE_NONE,
- ERROR_STATE_SEEN,
- ERROR_STATE_NOTIFIED
+ GRPC_CHTTP2_ERROR_STATE_NONE,
+ GRPC_CHTTP2_ERROR_STATE_SEEN,
+ GRPC_CHTTP2_ERROR_STATE_NOTIFIED
} grpc_chttp2_error_state;
/* We keep several sets of connection wide parameters */
@@ -198,11 +198,32 @@ typedef struct {
/** settings values */
gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
+ /** has there been a connection level error, and have we notified
+ anyone about it? */
+ grpc_chttp2_error_state error_state;
+
+ /** what is the next stream id to be allocated by this peer?
+ copied to next_stream_id in parsing when parsing commences */
+ gpr_uint32 next_stream_id;
+
/** last received stream id */
gpr_uint32 last_incoming_stream_id;
/** pings awaiting responses */
grpc_chttp2_outstanding_ping pings;
+ /** next payload for an outgoing ping */
+ gpr_uint64 ping_counter;
+
+ /** concurrent stream count: updated when not parsing,
+ so this is a strict over-estimation on the client */
+ gpr_uint32 concurrent_stream_count;
+
+ /** is there a goaway available? */
+ gpr_uint8 have_goaway;
+ /** what is the debug text of the goaway? */
+ gpr_slice goaway_text;
+ /** what is the status code of the goaway? */
+ grpc_status_code goaway_error;
} grpc_chttp2_transport_global;
typedef struct {
@@ -279,7 +300,6 @@ struct grpc_chttp2_transport_parsing {
grpc_chttp2_outstanding_ping pings;
};
-
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
grpc_endpoint *ep;
@@ -287,18 +307,67 @@ struct grpc_chttp2_transport {
gpr_refcount refs;
gpr_mu mu;
- gpr_cv cv;
+
+ /** is the transport destroying itself? */
+ gpr_uint8 destroying;
+ /** has the upper layer closed the transport? */
+ gpr_uint8 closed;
/** is a thread currently writing */
gpr_uint8 writing_active;
+ /** is a thread currently parsing */
+ gpr_uint8 parsing_active;
+ /** is there a read request to the endpoint outstanding? */
+ gpr_uint8 endpoint_reading;
+
+ /** various lists of streams */
+ grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
+
+ /** global state for reading/writing */
+ grpc_chttp2_transport_global global;
+ /** state only accessible by the chain of execution that
+ set writing_active=1 */
+ grpc_chttp2_transport_writing writing;
+ /** state only accessible by the chain of execution that
+ set parsing_active=1 */
+ grpc_chttp2_transport_parsing parsing;
+
+ /** maps stream id to grpc_chttp2_stream objects;
+ owned by the parsing thread when parsing */
+ grpc_chttp2_stream_map parsing_stream_map;
+
+ /** streams created by the client (possibly during parsing);
+ merged with parsing_stream_map during unlock when no
+ parsing is occurring */
+ grpc_chttp2_stream_map new_stream_map;
+
+ /** closure to execute writing */
+ grpc_iomgr_closure writing_action;
+
+ /** address to place a newly accepted stream - set and unset by
+ grpc_chttp2_parsing_accept_stream; used by init_stream to
+ publish the accepted server stream */
+ grpc_chttp2_stream **accepting_stream;
+
+ struct {
+ /** is a thread currently performing channel callbacks */
+ gpr_uint8 executing;
+ /** transport channel-level callback */
+ const grpc_transport_callbacks *cb;
+ /** user data for cb calls */
+ void *cb_user_data;
+ /** closure for notifying transport closure */
+ grpc_iomgr_closure notify_closed;
+ } channel_callback;
+
+#if 0
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
- grpc_chttp2_error_state error_state;
/* stream indexing */
gpr_uint32 next_stream_id;
@@ -306,40 +375,19 @@ struct grpc_chttp2_transport {
/* window management */
gpr_uint32 outgoing_window_update;
- /* goaway */
- grpc_chttp2_pending_goaway *pending_goaways;
- size_t num_pending_goaways;
- size_t cap_pending_goaways;
-
/* state for a stream that's not yet been created */
grpc_stream_op_buffer new_stream_sopb;
/* stream ops that need to be destroyed, but outside of the lock */
grpc_stream_op_buffer nuke_later_sopb;
- grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
- grpc_chttp2_stream_map stream_map;
-
/* pings */
gpr_int64 ping_counter;
- grpc_chttp2_transport_global global;
- grpc_chttp2_transport_writing writing;
- grpc_chttp2_transport_parsing parsing;
- /** closure to execute writing */
- grpc_iomgr_closure writing_action;
+ grpc_chttp2_stream **accepting_stream;
- struct {
- /** is a thread currently performing channel callbacks */
- gpr_uint8 executing;
- /** transport channel-level callback */
- const grpc_transport_callbacks *cb;
- /** user data for cb calls */
- void *cb_user_data;
- /** closure for notifying transport closure */
- grpc_iomgr_closure notify_closed;
- } channel_callback;
+#endif
};
typedef struct {
@@ -361,6 +409,13 @@ typedef struct {
grpc_chttp2_write_state write_state;
/** is this stream closed (boolean) */
gpr_uint8 read_closed;
+
+ /** stream state already published to the upper layer */
+ grpc_stream_state published_state;
+ /** address to publish next stream state to */
+ grpc_stream_state *publish_state;
+ /** pointer to sop buffer to fill in with new stream ops */
+ grpc_stream_op_buffer *incoming_sopb;
} grpc_chttp2_stream_global;
typedef struct {
@@ -377,12 +432,12 @@ struct grpc_chttp2_stream_parsing {
gpr_uint32 id;
/** has this stream received a close */
gpr_uint8 received_close;
- /** incoming_window has been reduced during parsing */
- gpr_uint8 incoming_window_changed;
/** saw an error on this stream during parsing (it should be cancelled) */
gpr_uint8 saw_error;
/** saw a rst_stream */
gpr_uint8 saw_rst_stream;
+ /** incoming_window has been reduced by this much during parsing */
+ gpr_uint32 incoming_window_delta;
/** window available for peer to send to us */
gpr_uint32 incoming_window;
/** parsing state for data frames */
@@ -403,20 +458,18 @@ struct grpc_chttp2_stream_parsing {
struct grpc_chttp2_stream {
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
-
- gpr_uint32 outgoing_window_update;
- gpr_uint8 cancelled;
+ grpc_chttp2_stream_parsing parsing;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
- /* sops from application */
- grpc_stream_op_buffer *incoming_sopb;
- grpc_stream_state *publish_state;
- grpc_stream_state published_state;
+#if 0
+ gpr_uint32 outgoing_window_update;
+ gpr_uint8 cancelled;
grpc_stream_state callback_state;
grpc_stream_op_buffer callback_sopb;
+#endif
};
/** Transport writing call flow:
@@ -434,6 +487,7 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
/** Process one slice of incoming data */
+void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
@@ -450,9 +504,11 @@ int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport
void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
+void grpc_chttp2_list_add_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing);
+int grpc_chttp2_list_pop_parsing_seen_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_parsing **stream_parsing);
void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success);
void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index e2c39bec3c..9f50194125 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -32,6 +32,9 @@
*/
#include "src/core/transport/chttp2/internal.h"
+
+#include <string.h>
+
#include "src/core/transport/chttp2/timeout_encoding.h"
#include <grpc/support/alloc.h>
@@ -50,6 +53,9 @@ static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsi
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_chttp2_stream_global *stream_global;
+ grpc_chttp2_stream_parsing *stream_parsing;
+
/* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when
sending GOAWAY frame.
https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
@@ -59,6 +65,84 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g
if (!transport_parsing->is_client) {
transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id;
}
+
+ /* TODO(ctiller): re-implement */
+ GPR_ASSERT(transport_parsing->initial_window_update == 0);
+
+#if 0
+ while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
+ int publish = 0;
+ GPR_ASSERT(s->incoming_sopb);
+ *s->publish_state =
+ compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
+ if (*s->publish_state != s->published_state) {
+ s->published_state = *s->publish_state;
+ publish = 1;
+ if (s->published_state == GRPC_STREAM_CLOSED) {
+ remove_from_stream_map(t, s);
+ }
+ }
+ if (s->parser.incoming_sopb.nops > 0) {
+ grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
+ publish = 1;
+ }
+ if (publish) {
+ if (s->incoming_metadata_count > 0) {
+ patch_metadata_ops(s);
+ }
+ s->incoming_sopb = NULL;
+ schedule_cb(t, s->global.recv_done_closure, 1);
+ }
+ }
+#endif
+
+ /* copy parsing qbuf to global qbuf */
+ gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
+
+ /* update global settings */
+ if (transport_parsing->settings_updated) {
+ memcpy(transport_global->settings[PEER_SETTINGS], transport_parsing->settings, sizeof(transport_parsing->settings));
+ transport_parsing->settings_updated = 0;
+ }
+
+ /* update settings based on ack if received */
+ if (transport_parsing->settings_ack_received) {
+ memcpy(transport_global->settings[ACKED_SETTINGS], transport_global->settings[SENT_SETTINGS],
+ GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
+ transport_parsing->settings_ack_received = 0;
+ }
+
+ /* move goaway to the global state if we received one (it will be
+ published later */
+ if (transport_parsing->goaway_received) {
+ gpr_slice_unref(transport_global->goaway_text);
+ transport_global->goaway_text = gpr_slice_ref(transport_parsing->goaway_text);
+ transport_global->goaway_error = transport_parsing->goaway_error;
+ transport_global->have_goaway = 1;
+ transport_parsing->goaway_received = 0;
+ }
+
+ /* for each stream that saw an update, fixup global state */
+ while (grpc_chttp2_list_pop_parsing_seen_stream(transport_global, transport_parsing, &stream_global, &stream_parsing)) {
+ /* update incoming flow control window */
+ if (stream_parsing->incoming_window_delta) {
+ stream_global->incoming_window -= stream_parsing->incoming_window_delta;
+ stream_parsing->incoming_window_delta = 0;
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global);
+ }
+
+ /* update outgoing flow control window */
+ if (stream_parsing->outgoing_window_update) {
+ int was_zero = stream_global->outgoing_window <= 0;
+ int is_zero;
+ stream_global->outgoing_window += stream_parsing->outgoing_window_update;
+ stream_parsing->outgoing_window_update = 0;
+ is_zero = stream_global->outgoing_window <= 0;
+ if (was_zero && !is_zero) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ }
+ }
+ }
}
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) {
@@ -555,6 +639,59 @@ void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *trans
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
}
+static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_chttp2_stream_parsing *stream_parsing) {
+ grpc_stream_op *ops = stream_global->incoming_sopb->ops;
+ size_t nops = stream_global->incoming_sopb->nops;
+ size_t i;
+ size_t j;
+ size_t mdidx = 0;
+ size_t last_mdidx;
+ int found_metadata = 0;
+
+ /* rework the array of metadata into a linked list, making use
+ of the breadcrumbs we left in metadata batches during
+ add_metadata_batch */
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ found_metadata = 1;
+ /* we left a breadcrumb indicating where the end of this list is,
+ and since we add sequentially, we know from the end of the last
+ segment where this segment begins */
+ last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
+ GPR_ASSERT(last_mdidx > mdidx);
+ GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count);
+ /* turn the array into a doubly linked list */
+ op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx];
+ op->data.metadata.list.tail = &stream_parsing->incoming_metadata[last_mdidx - 1];
+ for (j = mdidx + 1; j < last_mdidx; j++) {
+ stream_parsing->incoming_metadata[j].prev = &stream_parsing->incoming_metadata[j - 1];
+ stream_parsing->incoming_metadata[j - 1].next = &stream_parsing->incoming_metadata[j];
+ }
+ stream_parsing->incoming_metadata[mdidx].prev = NULL;
+ stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL;
+ /* track where we're up to */
+ mdidx = last_mdidx;
+ }
+ if (found_metadata) {
+ stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata;
+ if (mdidx != stream_parsing->incoming_metadata_count) {
+ /* we have a partially read metadata batch still in incoming_metadata */
+ size_t new_count = stream_parsing->incoming_metadata_count - mdidx;
+ size_t copy_bytes = sizeof(*stream_parsing->incoming_metadata) * new_count;
+ GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count);
+ stream_parsing->incoming_metadata = gpr_malloc(copy_bytes);
+ memcpy(stream_parsing->old_incoming_metadata + mdidx, stream_parsing->incoming_metadata,
+ copy_bytes);
+ stream_parsing->incoming_metadata_count = stream_parsing->incoming_metadata_capacity = new_count;
+ } else {
+ stream_parsing->incoming_metadata = NULL;
+ stream_parsing->incoming_metadata_count = 0;
+ stream_parsing->incoming_metadata_capacity = 0;
+ }
+ }
+}
+
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream;
switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) {
diff --git a/src/core/transport/chttp2/stream_map.h b/src/core/transport/chttp2/stream_map.h
index d338d2f892..f59dece746 100644
--- a/src/core/transport/chttp2/stream_map.h
+++ b/src/core/transport/chttp2/stream_map.h
@@ -66,6 +66,9 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, gpr_uint32 key,
void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map,
gpr_uint32 key);
+/* Move all elements of src into dst */
+void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_stream_map *dst);
+
/* Return an existing key, or NULL if it does not exist */
void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, gpr_uint32 key);