diff options
Diffstat (limited to 'src')
5 files changed, 82 insertions, 110 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 13fc8ab374..e7d1a84420 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -79,8 +79,6 @@ static void read_action_begin(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void read_action_flush_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_error *error); @@ -106,8 +104,6 @@ static void connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state state, grpc_error *error, const char *reason); -static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); - static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, @@ -242,7 +238,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t); grpc_closure_init(&t->read_action_begin, read_action_begin, t); grpc_closure_init(&t->read_action_locked, read_action_locked, t); - grpc_closure_init(&t->read_action_flush_locked, read_action_flush_locked, t); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -479,7 +474,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, } grpc_chttp2_list_remove_stalled_by_transport(t, s); - grpc_chttp2_list_remove_check_read_ops(t, s); for (int i = 0; i < STREAM_LIST_COUNT; i++) { if (s->included[i]) { @@ -917,7 +911,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else { if (contains_non_ok_status(op->send_initial_metadata)) { s->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); } if (!s->write_closed) { if (t->is_client) { @@ -994,7 +987,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else { if (contains_non_ok_status(op->send_trailing_metadata)) { s->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); } if (s->write_closed) { s->send_trailing_metadata = NULL; @@ -1017,7 +1009,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->recv_initial_metadata_ready == NULL); s->recv_initial_metadata_ready = op->recv_initial_metadata_ready; s->recv_initial_metadata = op->recv_initial_metadata; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); } if (op->recv_message != NULL) { @@ -1029,7 +1021,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, incoming_byte_stream_update_flow_control(exec_ctx, t, s, t->stream_lookahead, 0); } - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } if (op->recv_trailing_metadata != NULL) { @@ -1037,7 +1029,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); s->recv_trailing_metadata = op->recv_trailing_metadata; s->final_metadata_requested = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } grpc_chttp2_complete_closure_step(exec_ctx, t, s, &on_complete, @@ -1177,75 +1169,73 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * INPUT PROCESSING - GENERAL */ -static void read_action_flush_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = tp; - t->check_read_ops_scheduled = false; - check_read_ops(exec_ctx, t); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "initiate_read_flush_locked"); +void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + grpc_byte_stream *bs; + if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0]) { + if (s->seen_error) { + while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != + NULL) { + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + } + } + grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0], + s->recv_initial_metadata); + grpc_closure_run(exec_ctx, s->recv_initial_metadata_ready, GRPC_ERROR_NONE); + s->recv_initial_metadata_ready = NULL; + } } -static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - GPR_TIMER_BEGIN("check_read_ops", 0); - grpc_chttp2_stream *s; +void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { grpc_byte_stream *bs; - while (grpc_chttp2_list_pop_check_read_ops(t, &s)) { - if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0]) { - if (s->seen_error) { - while ((bs = grpc_chttp2_incoming_frame_queue_pop( - &s->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); - } - } - grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0], - s->recv_initial_metadata); - grpc_closure_run(exec_ctx, s->recv_initial_metadata_ready, GRPC_ERROR_NONE); - s->recv_initial_metadata_ready = NULL; + if (s->recv_message_ready != NULL) { + while (s->final_metadata_requested && s->seen_error && + (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != + NULL) { + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } - if (s->recv_message_ready != NULL) { - while (s->final_metadata_requested && s->seen_error && - (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != - NULL) { + if (s->incoming_frames.head != NULL) { + *s->recv_message = + grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames); + GPR_ASSERT(*s->recv_message != NULL); + grpc_closure_run(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE); + s->recv_message_ready = NULL; + } else if (s->published_metadata[1]) { + *s->recv_message = NULL; + grpc_closure_run(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE); + s->recv_message_ready = NULL; + } + } +} + +void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + grpc_byte_stream *bs; + if (s->recv_trailing_metadata_finished != NULL && s->read_closed && + s->write_closed) { + if (s->seen_error) { + while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != + NULL) { incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } - if (s->incoming_frames.head != NULL) { - *s->recv_message = - grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames); - GPR_ASSERT(*s->recv_message != NULL); - grpc_closure_run(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE); - s->recv_message_ready = NULL; - } else if (s->published_metadata[1]) { - *s->recv_message = NULL; - grpc_closure_run(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE); - s->recv_message_ready = NULL; - } } - if (s->recv_trailing_metadata_finished != NULL && s->read_closed && - s->write_closed) { - if (s->seen_error) { - while ((bs = grpc_chttp2_incoming_frame_queue_pop( - &s->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); - } - } - if (s->all_incoming_byte_streams_finished) { - grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], - s->recv_trailing_metadata); - grpc_chttp2_complete_closure_step(exec_ctx, t, s, - &s->recv_trailing_metadata_finished, - GRPC_ERROR_NONE); - } + if (s->all_incoming_byte_streams_finished) { + grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], + s->recv_trailing_metadata); + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE); } } - GPR_TIMER_END("check_read_ops", 0); } static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) { - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); - } + s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams); } static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1333,7 +1323,6 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { s->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, due_to_error); } @@ -1343,7 +1332,6 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_slice *slice) { if (status != GRPC_STATUS_OK) { s->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); } /* stream_global->recv_trailing_metadata_finished gives us a last chance replacement: we've received trailing metadata, @@ -1366,7 +1354,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); } s->published_metadata[1] = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (slice) { gpr_slice_unref(*slice); @@ -1434,18 +1422,21 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); return; } - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); if (close_reads && !s->read_closed) { s->read_closed_error = GRPC_ERROR_REF(error); s->read_closed = true; s->published_metadata[0] = true; s->published_metadata[1] = true; decrement_active_streams_locked(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (close_writes && !s->write_closed) { s->write_closed_error = GRPC_ERROR_REF(error); s->write_closed = true; fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (s->read_closed && s->write_closed) { if (s->id != 0) { @@ -1852,7 +1843,8 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices); grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (bs->error != GRPC_ERROR_NONE) { - grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_REF(bs->error)); + grpc_closure_run(exec_ctx, bs->next_action.on_complete, + GRPC_ERROR_REF(bs->error)); } else { bs->on_next = bs->next_action.on_complete; bs->next = bs->next_action.slice; diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 0ff5499919..bd26b81622 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1571,6 +1571,13 @@ grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx, return p->state(exec_ctx, p, beg, end); } +typedef void (*maybe_complete_func_type)(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +static const maybe_complete_func_type maybe_complete_funcs[] = { + grpc_chttp2_maybe_complete_recv_initial_metadata, + grpc_chttp2_maybe_complete_recv_trailing_metadata}; + grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, void *hpack_parser, grpc_chttp2_transport *t, @@ -1601,8 +1608,8 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE("Too many trailer frames"); } s->published_metadata[s->header_frames_received] = true; + maybe_complete_funcs[s->header_frames_received](exec_ctx, t, s); s->header_frames_received++; - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); } if (parser->is_eof) { grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index bffd58a9d2..43639139d5 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -56,7 +56,6 @@ /* streams are kept in various linked lists depending on what things need to happen to them... this enum labels each list */ typedef enum { - GRPC_CHTTP2_LIST_CHECK_READ_OPS, GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, @@ -161,7 +160,7 @@ struct grpc_chttp2_incoming_byte_stream { grpc_chttp2_transport *transport; grpc_chttp2_stream *stream; - int is_tail; + bool is_tail; gpr_mu slice_mu; // protects slices, on_next gpr_slice_buffer slices; @@ -189,8 +188,6 @@ struct grpc_chttp2_transport { /** write execution state of the transport */ grpc_chttp2_write_state write_state; - /** has a check_read_ops been scheduled */ - bool check_read_ops_scheduled; /** is the transport destroying itself? */ uint8_t destroying; @@ -213,7 +210,6 @@ struct grpc_chttp2_transport { grpc_closure read_action_begin; grpc_closure read_action_locked; - grpc_closure read_action_flush_locked; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -468,14 +464,6 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t, int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, grpc_chttp2_stream **s); -void grpc_chttp2_list_add_check_read_ops(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s); -bool grpc_chttp2_list_remove_check_read_ops(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); -int grpc_chttp2_list_pop_check_read_ops(grpc_chttp2_transport *t, - grpc_chttp2_stream **s); - void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s); int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t, @@ -661,4 +649,14 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *due_to_error); +void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 2ff1e4c620..aa36f90cae 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -717,9 +717,6 @@ static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s = t->incoming_stream; grpc_error *err = t->parser(exec_ctx, t->parser_data, t, s, slice, is_last); if (err == GRPC_ERROR_NONE) { - if (s != NULL) { - grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s); - } return err; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) { if (grpc_http_trace) { diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 9d09e0c7c2..6d25b3ae57 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -158,28 +158,6 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); } -void grpc_chttp2_list_add_check_read_ops(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - if (!t->check_read_ops_scheduled) { - GRPC_CHTTP2_REF_TRANSPORT(t, "initiate_read_flush_locked"); - grpc_combiner_execute_finally( - exec_ctx, t->combiner, &t->read_action_flush_locked, GRPC_ERROR_NONE); - t->check_read_ops_scheduled = true; - } - stream_list_add(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS); -} - -bool grpc_chttp2_list_remove_check_read_ops(grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS); -} - -int grpc_chttp2_list_pop_check_read_ops(grpc_chttp2_transport *t, - grpc_chttp2_stream **s) { - return stream_list_pop(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS); -} - void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); |