aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-11 09:37:17 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-11 09:37:17 -0700
commite0617624bafca93f795f12483451583764fd8c80 (patch)
treeed8e0660a5dd4a399f3dba21382a7ac70bca3da0 /src
parentf3fba749aa5a74b64f0a2c9423d470708e648b9a (diff)
Exploratory work towards splitting parsing from the transport lock
Diffstat (limited to 'src')
-rw-r--r--src/core/transport/chttp2_transport.c179
1 files changed, 101 insertions, 78 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index caaced75c4..f3af5b1320 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -104,6 +104,10 @@ typedef enum {
/* streams that have finished reading: we wait until unlock to coalesce
all changes into one callback */
FINISHED_READ_OP,
+ MAYBE_FINISH_READ_AFTER_PARSE,
+ PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
+ OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
+ NEW_OUTGOING_WINDOW,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@@ -229,6 +233,7 @@ struct transport {
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
+ gpr_uint8 parsing;
gpr_uint8 writing;
/** are we calling back (via cb) with a channel-level event */
gpr_uint8 calling_back_channel;
@@ -254,6 +259,7 @@ struct transport {
/* window management */
gpr_uint32 outgoing_window;
+ gpr_uint32 outgoing_window_update;
gpr_uint32 incoming_window;
gpr_uint32 connection_window_target;
@@ -319,6 +325,7 @@ struct stream {
gpr_uint32 incoming_window;
gpr_int64 outgoing_window;
+ gpr_uint32 outgoing_window_update;
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
@@ -395,7 +402,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
static void schedule_cb(transport *t, op_closure closure, int success);
-static void maybe_finish_read(transport *t, stream *s);
+static void maybe_finish_read(transport *t, stream *s, int is_parser);
static void maybe_join_window_updates(transport *t, stream *s);
static void finish_reads(transport *t);
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
@@ -652,8 +659,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
ref_transport(t);
+ lock(t);
if (!server_data) {
- lock(t);
s->id = 0;
s->outgoing_window = 0;
s->incoming_window = 0;
@@ -675,9 +682,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
if (initial_op) perform_op_locked(t, s, initial_op);
- if (!server_data) {
- unlock(t);
- }
+ unlock(t);
return 0;
}
@@ -694,16 +699,11 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_lock(&t->mu);
- /* stop parsing if we're currently parsing this stream */
- if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
- s->id != 0) {
- become_skip_parser(t);
- }
+ GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
}
- remove_from_stream_map(t, s);
gpr_mu_unlock(&t->mu);
@@ -835,7 +835,9 @@ static void unlock(transport *t) {
finalize_cancellations(t);
}
- finish_reads(t);
+ if (!t->parsing) {
+ finish_reads(t);
+ }
/* gather any callbacks that need to be made */
if (!t->calling_back_ops) {
@@ -850,7 +852,7 @@ static void unlock(transport *t) {
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
- if (t->num_pending_goaways) {
+ if (!t->parsing && t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
@@ -930,8 +932,10 @@ static int prepare_write(transport *t) {
gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
- gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
- GPR_ASSERT(t->qbuf.count == 0);
+ if (!t->parsing) {
+ gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
+ GPR_ASSERT(t->qbuf.count == 0);
+ }
if (t->dirtied_local_settings && !t->sent_local_settings) {
gpr_slice_buffer_add(
@@ -976,26 +980,28 @@ static int prepare_write(transport *t) {
}
}
- /* for each stream that wants to update its window, add that window here */
- while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
- window_delta =
- t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- s->incoming_window;
- if (!s->read_closed && window_delta) {
- gpr_slice_buffer_add(
- &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
- FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
- s->incoming_window += window_delta;
+ if (!t->parsing) {
+ /* for each stream that wants to update its window, add that window here */
+ while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
+ window_delta =
+ t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
+ s->incoming_window;
+ if (!s->read_closed && window_delta) {
+ gpr_slice_buffer_add(
+ &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
+ FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
+ s->incoming_window += window_delta;
+ }
}
- }
- /* if the transport is ready to send a window update, do so here also */
- if (t->incoming_window < t->connection_window_target * 3 / 4) {
- window_delta = t->connection_window_target - t->incoming_window;
- gpr_slice_buffer_add(&t->outbuf,
- grpc_chttp2_window_update_create(0, window_delta));
- FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
- t->incoming_window += window_delta;
+ /* if the transport is ready to send a window update, do so here also */
+ if (t->incoming_window < t->connection_window_target * 3 / 4) {
+ window_delta = t->connection_window_target - t->incoming_window;
+ gpr_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_window_update_create(0, window_delta));
+ FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
+ t->incoming_window += window_delta;
+ }
}
return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
@@ -1031,7 +1037,7 @@ static void finish_write_common(transport *t, int success) {
if (!t->is_client) {
s->read_closed = 1;
}
- maybe_finish_read(t, s);
+ maybe_finish_read(t, s, 0);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -1089,7 +1095,7 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error,
static void maybe_start_some_streams(transport *t) {
/* start streams where we have free stream ids and free concurrency */
- while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
+ while (!t->parsing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) <
t->settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
@@ -1169,7 +1175,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
s->publish_state = op->recv_state;
gpr_free(s->old_incoming_metadata);
s->old_incoming_metadata = NULL;
- maybe_finish_read(t, s);
+ maybe_finish_read(t, s, 0);
maybe_join_window_updates(t, s);
}
@@ -1231,7 +1237,7 @@ static void finalize_cancellations(transport *t) {
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->read_closed = 1;
s->write_state = WRITE_STATE_SENT_CLOSE;
- maybe_finish_read(t, s);
+ maybe_finish_read(t, s, 0);
}
}
@@ -1249,7 +1255,8 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
- grpc_mdstr *optional_message, int send_rst) {
+ grpc_mdstr *optional_message, int send_rst,
+ int is_parser) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
@@ -1299,7 +1306,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
add_metadata_batch(t, s);
}
}
- maybe_finish_read(t, s);
+ maybe_finish_read(t, s, is_parser);
}
if (!id) send_rst = 0;
if (send_rst) {
@@ -1314,8 +1321,10 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
+ lock(t);
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
- NULL, send_rst);
+ NULL, send_rst, 1);
+ unlock(t);
}
static void cancel_stream(transport *t, stream *s,
@@ -1323,7 +1332,7 @@ static void cancel_stream(transport *t, stream *s,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst) {
cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
- send_rst);
+ send_rst, 0);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
@@ -1343,13 +1352,19 @@ static void drop_connection(transport *t) {
end_all_the_calls(t);
}
-static void maybe_finish_read(transport *t, stream *s) {
- if (s->incoming_sopb) {
+static void maybe_finish_read(transport *t, stream *s, int is_parser) {
+ if (is_parser) {
+ stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE);
+ } else if (s->incoming_sopb) {
stream_list_join(t, s, FINISHED_READ_OP);
}
}
static void maybe_join_window_updates(transport *t, stream *s) {
+ if (t->parsing) {
+ stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
+ return;
+ }
if (s->incoming_sopb != NULL &&
s->incoming_window <
t->settings[LOCAL_SETTINGS]
@@ -1378,7 +1393,7 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
s->incoming_window -= t->incoming_frame_size;
/* if the stream incoming window is getting low, schedule an update */
- maybe_join_window_updates(t, s);
+ stream_list_join(t, s, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return GRPC_CHTTP2_PARSE_OK;
}
@@ -1475,7 +1490,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
} else {
add_incoming_metadata(t, s, md);
}
- maybe_finish_read(t, s);
+ maybe_finish_read(t, s, 1);
}
static int init_header_frame_parser(transport *t, int is_continuation) {
@@ -1667,9 +1682,11 @@ static int init_frame_parser(transport *t) {
}
}
+/*
static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
+*/
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
@@ -1695,18 +1712,17 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
case GRPC_CHTTP2_PARSE_OK:
if (st.end_of_stream) {
t->incoming_stream->read_closed = 1;
- maybe_finish_read(t, t->incoming_stream);
+ maybe_finish_read(t, t->incoming_stream, 1);
}
if (st.need_flush_reads) {
- maybe_finish_read(t, t->incoming_stream);
+ maybe_finish_read(t, t->incoming_stream, 1);
}
if (st.metadata_boundary) {
add_metadata_batch(t, t->incoming_stream);
- maybe_finish_read(t, t->incoming_stream);
+ maybe_finish_read(t, t->incoming_stream, 1);
}
if (st.ack_settings) {
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
- maybe_start_some_streams(t);
}
if (st.send_ping_ack) {
gpr_slice_buffer_add(
@@ -1737,13 +1753,8 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (st.initial_window_update) {
for (i = 0; i < t->stream_map.count; i++) {
stream *s = (stream *)(t->stream_map.values[i]);
- int was_window_empty = s->outgoing_window <= 0;
- FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update);
- s->outgoing_window += st.initial_window_update;
- if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
- s->outgoing_sopb->nops > 0) {
- stream_list_join(t, s, WRITABLE);
- }
+ s->outgoing_window_update += st.initial_window_update;
+ stream_list_join(t, s, NEW_OUTGOING_WINDOW);
}
}
if (st.window_update) {
@@ -1751,30 +1762,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
/* if there was a stream id, this is for some stream */
stream *s = lookup_stream(t, t->incoming_stream_id);
if (s) {
- int was_window_empty = s->outgoing_window <= 0;
- if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
- cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
- GRPC_CHTTP2_FLOW_CONTROL_ERROR),
- GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
- } else {
- FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update);
- s->outgoing_window += st.window_update;
- /* if this window update makes outgoing ops writable again,
- flag that */
- if (was_window_empty && s->outgoing_sopb &&
- s->outgoing_sopb->nops > 0) {
- stream_list_join(t, s, WRITABLE);
- }
- }
+ s->outgoing_window_update += st.window_update;
+ stream_list_join(t, s, NEW_OUTGOING_WINDOW);
}
} else {
/* transport level window update */
- if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
- drop_connection(t);
- } else {
- FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update);
- t->outgoing_window += st.window_update;
- }
+ t->outgoing_window_update += st.window_update;
}
}
return 1;
@@ -1979,6 +1972,7 @@ static int process_read(transport *t, gpr_slice slice) {
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
transport *t = tp;
+ stream *s;
size_t i;
int keep_reading = 0;
@@ -1998,11 +1992,40 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
unref_transport(t);
break;
case GRPC_ENDPOINT_CB_OK:
- lock(t);
+ gpr_mu_lock(&t->mu);
+ GPR_ASSERT(!t->parsing);
+ t->parsing = 1;
+ gpr_mu_unlock(&t->mu);
if (t->cb) {
for (i = 0; i < nslices && process_read(t, slices[i]); i++)
;
}
+ lock(t);
+ t->parsing = 0;
+ while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
+ maybe_finish_read(t, s, 0);
+ }
+ while ((s = stream_list_remove_head(t, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) {
+ maybe_join_window_updates(t, s);
+ }
+ while ((s = stream_list_remove_head(t, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) {
+ maybe_join_window_updates(t, s);
+ }
+ while ((s = stream_list_remove_head(t, NEW_OUTGOING_WINDOW))) {
+ int was_window_empty = s->outgoing_window <= 0;
+ FLOWCTL_TRACE(t, s, outgoing, s->id, s->outgoing_window_update);
+ s->outgoing_window += s->outgoing_window_update;
+ s->outgoing_window_update = 0;
+ /* if this window update makes outgoing ops writable again,
+ flag that */
+ if (was_window_empty && s->outgoing_sopb &&
+ s->outgoing_sopb->nops > 0) {
+ stream_list_join(t, s, WRITABLE);
+ }
+ }
+ t->outgoing_window += t->outgoing_window_update;
+ t->outgoing_window_update = 0;
+ maybe_start_some_streams(t);
unlock(t);
keep_reading = 1;
break;