diff options
author | 2015-06-15 06:58:50 -0700 | |
---|---|---|
committer | 2015-06-15 06:58:50 -0700 | |
commit | 606d874d162a9a254035839701bf6926f681c77b (patch) | |
tree | 93b216007d5ba29bc88b948f1b61c4e179603638 /src/core/transport/chttp2 | |
parent | 3719f07233ed7b5b1371b811311af434c0128e03 (diff) |
Progress on splitting reading from transport lock
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 132 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 137 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_map.h | 3 |
3 files changed, 234 insertions, 38 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index f89d327f8c..ffadd04762 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -143,9 +143,9 @@ typedef struct { } grpc_chttp2_stream_link; typedef enum { - ERROR_STATE_NONE, - ERROR_STATE_SEEN, - ERROR_STATE_NOTIFIED + GRPC_CHTTP2_ERROR_STATE_NONE, + GRPC_CHTTP2_ERROR_STATE_SEEN, + GRPC_CHTTP2_ERROR_STATE_NOTIFIED } grpc_chttp2_error_state; /* We keep several sets of connection wide parameters */ @@ -198,11 +198,32 @@ typedef struct { /** settings values */ gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + /** has there been a connection level error, and have we notified + anyone about it? */ + grpc_chttp2_error_state error_state; + + /** what is the next stream id to be allocated by this peer? + copied to next_stream_id in parsing when parsing commences */ + gpr_uint32 next_stream_id; + /** last received stream id */ gpr_uint32 last_incoming_stream_id; /** pings awaiting responses */ grpc_chttp2_outstanding_ping pings; + /** next payload for an outgoing ping */ + gpr_uint64 ping_counter; + + /** concurrent stream count: updated when not parsing, + so this is a strict over-estimation on the client */ + gpr_uint32 concurrent_stream_count; + + /** is there a goaway available? */ + gpr_uint8 have_goaway; + /** what is the debug text of the goaway? */ + gpr_slice goaway_text; + /** what is the status code of the goaway? */ + grpc_status_code goaway_error; } grpc_chttp2_transport_global; typedef struct { @@ -279,7 +300,6 @@ struct grpc_chttp2_transport_parsing { grpc_chttp2_outstanding_ping pings; }; - struct grpc_chttp2_transport { grpc_transport base; /* must be first */ grpc_endpoint *ep; @@ -287,18 +307,67 @@ struct grpc_chttp2_transport { gpr_refcount refs; gpr_mu mu; - gpr_cv cv; + + /** is the transport destroying itself? */ + gpr_uint8 destroying; + /** has the upper layer closed the transport? */ + gpr_uint8 closed; /** is a thread currently writing */ gpr_uint8 writing_active; + /** is a thread currently parsing */ + gpr_uint8 parsing_active; + /** is there a read request to the endpoint outstanding? */ + gpr_uint8 endpoint_reading; + + /** various lists of streams */ + grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; + + /** global state for reading/writing */ + grpc_chttp2_transport_global global; + /** state only accessible by the chain of execution that + set writing_active=1 */ + grpc_chttp2_transport_writing writing; + /** state only accessible by the chain of execution that + set parsing_active=1 */ + grpc_chttp2_transport_parsing parsing; + + /** maps stream id to grpc_chttp2_stream objects; + owned by the parsing thread when parsing */ + grpc_chttp2_stream_map parsing_stream_map; + + /** streams created by the client (possibly during parsing); + merged with parsing_stream_map during unlock when no + parsing is occurring */ + grpc_chttp2_stream_map new_stream_map; + + /** closure to execute writing */ + grpc_iomgr_closure writing_action; + + /** address to place a newly accepted stream - set and unset by + grpc_chttp2_parsing_accept_stream; used by init_stream to + publish the accepted server stream */ + grpc_chttp2_stream **accepting_stream; + + struct { + /** is a thread currently performing channel callbacks */ + gpr_uint8 executing; + /** transport channel-level callback */ + const grpc_transport_callbacks *cb; + /** user data for cb calls */ + void *cb_user_data; + /** closure for notifying transport closure */ + grpc_iomgr_closure notify_closed; + } channel_callback; + +#if 0 /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; /** are we calling back any grpc_transport_op completion events */ gpr_uint8 calling_back_ops; gpr_uint8 destroying; gpr_uint8 closed; - grpc_chttp2_error_state error_state; /* stream indexing */ gpr_uint32 next_stream_id; @@ -306,40 +375,19 @@ struct grpc_chttp2_transport { /* window management */ gpr_uint32 outgoing_window_update; - /* goaway */ - grpc_chttp2_pending_goaway *pending_goaways; - size_t num_pending_goaways; - size_t cap_pending_goaways; - /* state for a stream that's not yet been created */ grpc_stream_op_buffer new_stream_sopb; /* stream ops that need to be destroyed, but outside of the lock */ grpc_stream_op_buffer nuke_later_sopb; - grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; - grpc_chttp2_stream_map stream_map; - /* pings */ gpr_int64 ping_counter; - grpc_chttp2_transport_global global; - grpc_chttp2_transport_writing writing; - grpc_chttp2_transport_parsing parsing; - /** closure to execute writing */ - grpc_iomgr_closure writing_action; + grpc_chttp2_stream **accepting_stream; - struct { - /** is a thread currently performing channel callbacks */ - gpr_uint8 executing; - /** transport channel-level callback */ - const grpc_transport_callbacks *cb; - /** user data for cb calls */ - void *cb_user_data; - /** closure for notifying transport closure */ - grpc_iomgr_closure notify_closed; - } channel_callback; +#endif }; typedef struct { @@ -361,6 +409,13 @@ typedef struct { grpc_chttp2_write_state write_state; /** is this stream closed (boolean) */ gpr_uint8 read_closed; + + /** stream state already published to the upper layer */ + grpc_stream_state published_state; + /** address to publish next stream state to */ + grpc_stream_state *publish_state; + /** pointer to sop buffer to fill in with new stream ops */ + grpc_stream_op_buffer *incoming_sopb; } grpc_chttp2_stream_global; typedef struct { @@ -377,12 +432,12 @@ struct grpc_chttp2_stream_parsing { gpr_uint32 id; /** has this stream received a close */ gpr_uint8 received_close; - /** incoming_window has been reduced during parsing */ - gpr_uint8 incoming_window_changed; /** saw an error on this stream during parsing (it should be cancelled) */ gpr_uint8 saw_error; /** saw a rst_stream */ gpr_uint8 saw_rst_stream; + /** incoming_window has been reduced by this much during parsing */ + gpr_uint32 incoming_window_delta; /** window available for peer to send to us */ gpr_uint32 incoming_window; /** parsing state for data frames */ @@ -403,20 +458,18 @@ struct grpc_chttp2_stream_parsing { struct grpc_chttp2_stream { grpc_chttp2_stream_global global; grpc_chttp2_stream_writing writing; - - gpr_uint32 outgoing_window_update; - gpr_uint8 cancelled; + grpc_chttp2_stream_parsing parsing; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; - /* sops from application */ - grpc_stream_op_buffer *incoming_sopb; - grpc_stream_state *publish_state; - grpc_stream_state published_state; +#if 0 + gpr_uint32 outgoing_window_update; + gpr_uint8 cancelled; grpc_stream_state callback_state; grpc_stream_op_buffer callback_sopb; +#endif }; /** Transport writing call flow: @@ -434,6 +487,7 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); /** Process one slice of incoming data */ +void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); @@ -450,9 +504,11 @@ int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); +void grpc_chttp2_list_add_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); +int grpc_chttp2_list_pop_parsing_seen_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_parsing **stream_parsing); void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success); void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index e2c39bec3c..9f50194125 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -32,6 +32,9 @@ */ #include "src/core/transport/chttp2/internal.h" + +#include <string.h> + #include "src/core/transport/chttp2/timeout_encoding.h" #include <grpc/support/alloc.h> @@ -50,6 +53,9 @@ static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsi static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_chttp2_stream_global *stream_global; + grpc_chttp2_stream_parsing *stream_parsing; + /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when sending GOAWAY frame. https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8 @@ -59,6 +65,84 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g if (!transport_parsing->is_client) { transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id; } + + /* TODO(ctiller): re-implement */ + GPR_ASSERT(transport_parsing->initial_window_update == 0); + +#if 0 + while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { + int publish = 0; + GPR_ASSERT(s->incoming_sopb); + *s->publish_state = + compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); + if (*s->publish_state != s->published_state) { + s->published_state = *s->publish_state; + publish = 1; + if (s->published_state == GRPC_STREAM_CLOSED) { + remove_from_stream_map(t, s); + } + } + if (s->parser.incoming_sopb.nops > 0) { + grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); + publish = 1; + } + if (publish) { + if (s->incoming_metadata_count > 0) { + patch_metadata_ops(s); + } + s->incoming_sopb = NULL; + schedule_cb(t, s->global.recv_done_closure, 1); + } + } +#endif + + /* copy parsing qbuf to global qbuf */ + gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); + + /* update global settings */ + if (transport_parsing->settings_updated) { + memcpy(transport_global->settings[PEER_SETTINGS], transport_parsing->settings, sizeof(transport_parsing->settings)); + transport_parsing->settings_updated = 0; + } + + /* update settings based on ack if received */ + if (transport_parsing->settings_ack_received) { + memcpy(transport_global->settings[ACKED_SETTINGS], transport_global->settings[SENT_SETTINGS], + GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); + transport_parsing->settings_ack_received = 0; + } + + /* move goaway to the global state if we received one (it will be + published later */ + if (transport_parsing->goaway_received) { + gpr_slice_unref(transport_global->goaway_text); + transport_global->goaway_text = gpr_slice_ref(transport_parsing->goaway_text); + transport_global->goaway_error = transport_parsing->goaway_error; + transport_global->have_goaway = 1; + transport_parsing->goaway_received = 0; + } + + /* for each stream that saw an update, fixup global state */ + while (grpc_chttp2_list_pop_parsing_seen_stream(transport_global, transport_parsing, &stream_global, &stream_parsing)) { + /* update incoming flow control window */ + if (stream_parsing->incoming_window_delta) { + stream_global->incoming_window -= stream_parsing->incoming_window_delta; + stream_parsing->incoming_window_delta = 0; + grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global); + } + + /* update outgoing flow control window */ + if (stream_parsing->outgoing_window_update) { + int was_zero = stream_global->outgoing_window <= 0; + int is_zero; + stream_global->outgoing_window += stream_parsing->outgoing_window_update; + stream_parsing->outgoing_window_update = 0; + is_zero = stream_global->outgoing_window <= 0; + if (was_zero && !is_zero) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } + } + } } int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) { @@ -555,6 +639,59 @@ void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *trans grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); } +static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_chttp2_stream_parsing *stream_parsing) { + grpc_stream_op *ops = stream_global->incoming_sopb->ops; + size_t nops = stream_global->incoming_sopb->nops; + size_t i; + size_t j; + size_t mdidx = 0; + size_t last_mdidx; + int found_metadata = 0; + + /* rework the array of metadata into a linked list, making use + of the breadcrumbs we left in metadata batches during + add_metadata_batch */ + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + found_metadata = 1; + /* we left a breadcrumb indicating where the end of this list is, + and since we add sequentially, we know from the end of the last + segment where this segment begins */ + last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); + GPR_ASSERT(last_mdidx > mdidx); + GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count); + /* turn the array into a doubly linked list */ + op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx]; + op->data.metadata.list.tail = &stream_parsing->incoming_metadata[last_mdidx - 1]; + for (j = mdidx + 1; j < last_mdidx; j++) { + stream_parsing->incoming_metadata[j].prev = &stream_parsing->incoming_metadata[j - 1]; + stream_parsing->incoming_metadata[j - 1].next = &stream_parsing->incoming_metadata[j]; + } + stream_parsing->incoming_metadata[mdidx].prev = NULL; + stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL; + /* track where we're up to */ + mdidx = last_mdidx; + } + if (found_metadata) { + stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata; + if (mdidx != stream_parsing->incoming_metadata_count) { + /* we have a partially read metadata batch still in incoming_metadata */ + size_t new_count = stream_parsing->incoming_metadata_count - mdidx; + size_t copy_bytes = sizeof(*stream_parsing->incoming_metadata) * new_count; + GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count); + stream_parsing->incoming_metadata = gpr_malloc(copy_bytes); + memcpy(stream_parsing->old_incoming_metadata + mdidx, stream_parsing->incoming_metadata, + copy_bytes); + stream_parsing->incoming_metadata_count = stream_parsing->incoming_metadata_capacity = new_count; + } else { + stream_parsing->incoming_metadata = NULL; + stream_parsing->incoming_metadata_count = 0; + stream_parsing->incoming_metadata_capacity = 0; + } + } +} + static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) { grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) { diff --git a/src/core/transport/chttp2/stream_map.h b/src/core/transport/chttp2/stream_map.h index d338d2f892..f59dece746 100644 --- a/src/core/transport/chttp2/stream_map.h +++ b/src/core/transport/chttp2/stream_map.h @@ -66,6 +66,9 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, gpr_uint32 key, void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, gpr_uint32 key); +/* Move all elements of src into dst */ +void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_stream_map *dst); + /* Return an existing key, or NULL if it does not exist */ void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, gpr_uint32 key); |