diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/parsing.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.cc | 185 |
1 files changed, 106 insertions, 79 deletions
diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index a56f89cc75..46ec3fbaa6 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -31,22 +31,33 @@ #include "src/core/lib/transport/status_conversion.h" #include "src/core/lib/transport/timeout_encoding.h" -static grpc_error* init_frame_parser(grpc_chttp2_transport* t); -static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, +static grpc_error* init_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static grpc_error* init_header_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, int is_continuation); -static grpc_error* init_data_frame_parser(grpc_chttp2_transport* t); -static grpc_error* init_rst_stream_parser(grpc_chttp2_transport* t); -static grpc_error* init_settings_frame_parser(grpc_chttp2_transport* t); -static grpc_error* init_window_update_frame_parser(grpc_chttp2_transport* t); -static grpc_error* init_ping_parser(grpc_chttp2_transport* t); -static grpc_error* init_goaway_parser(grpc_chttp2_transport* t); -static grpc_error* init_skip_frame_parser(grpc_chttp2_transport* t, +static grpc_error* init_data_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static grpc_error* init_rst_stream_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static grpc_error* init_settings_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static grpc_error* init_window_update_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static grpc_error* init_ping_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static grpc_error* init_goaway_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static grpc_error* init_skip_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, int is_header); -static grpc_error* parse_frame_slice(grpc_chttp2_transport* t, grpc_slice slice, +static grpc_error* parse_frame_slice(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_slice slice, int is_last); -grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, +grpc_error* grpc_chttp2_perform_read(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_slice slice) { uint8_t* beg = GRPC_SLICE_START_PTR(slice); uint8_t* end = GRPC_SLICE_END_PTR(slice); @@ -171,12 +182,12 @@ grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, GPR_ASSERT(cur < end); t->incoming_stream_id |= ((uint32_t)*cur); t->deframe_state = GRPC_DTS_FRAME; - err = init_frame_parser(t); + err = init_frame_parser(exec_ctx, t); if (err != GRPC_ERROR_NONE) { return err; } if (t->incoming_frame_size == 0) { - err = parse_frame_slice(t, grpc_empty_slice(), 1); + err = parse_frame_slice(exec_ctx, t, grpc_empty_slice(), 1); if (err != GRPC_ERROR_NONE) { return err; } @@ -206,7 +217,7 @@ grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, GPR_ASSERT(cur < end); if ((uint32_t)(end - cur) == t->incoming_frame_size) { err = - parse_frame_slice(t, + parse_frame_slice(exec_ctx, t, grpc_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), 1); @@ -219,7 +230,7 @@ grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, } else if ((uint32_t)(end - cur) > t->incoming_frame_size) { size_t cur_offset = (size_t)(cur - beg); err = parse_frame_slice( - t, + exec_ctx, t, grpc_slice_sub_no_ref(slice, cur_offset, cur_offset + t->incoming_frame_size), 1); @@ -231,7 +242,7 @@ grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, goto dts_fh_0; /* loop */ } else { err = - parse_frame_slice(t, + parse_frame_slice(exec_ctx, t, grpc_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), 0); @@ -247,7 +258,8 @@ grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, GPR_UNREACHABLE_CODE(return nullptr); } -static grpc_error* init_frame_parser(grpc_chttp2_transport* t) { +static grpc_error* init_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (t->is_first_frame && t->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) { char* msg; @@ -279,43 +291,46 @@ static grpc_error* init_frame_parser(grpc_chttp2_transport* t) { gpr_free(msg); return err; } - return init_header_frame_parser(t, 1); + return init_header_frame_parser(exec_ctx, t, 1); } switch (t->incoming_frame_type) { case GRPC_CHTTP2_FRAME_DATA: - return init_data_frame_parser(t); + return init_data_frame_parser(exec_ctx, t); case GRPC_CHTTP2_FRAME_HEADER: - return init_header_frame_parser(t, 0); + return init_header_frame_parser(exec_ctx, t, 0); case GRPC_CHTTP2_FRAME_CONTINUATION: return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Unexpected CONTINUATION frame"); case GRPC_CHTTP2_FRAME_RST_STREAM: - return init_rst_stream_parser(t); + return init_rst_stream_parser(exec_ctx, t); case GRPC_CHTTP2_FRAME_SETTINGS: - return init_settings_frame_parser(t); + return init_settings_frame_parser(exec_ctx, t); case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: - return init_window_update_frame_parser(t); + return init_window_update_frame_parser(exec_ctx, t); case GRPC_CHTTP2_FRAME_PING: - return init_ping_parser(t); + return init_ping_parser(exec_ctx, t); case GRPC_CHTTP2_FRAME_GOAWAY: - return init_goaway_parser(t); + return init_goaway_parser(exec_ctx, t); default: if (grpc_http_trace.enabled()) { gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type); } - return init_skip_frame_parser(t, 0); + return init_skip_frame_parser(exec_ctx, t, 0); } } -static grpc_error* skip_parser(void* parser, grpc_chttp2_transport* t, - grpc_chttp2_stream* s, grpc_slice slice, - int is_last) { +static grpc_error* skip_parser(grpc_exec_ctx* exec_ctx, void* parser, + grpc_chttp2_transport* t, grpc_chttp2_stream* s, + grpc_slice slice, int is_last) { return GRPC_ERROR_NONE; } -static void skip_header(void* tp, grpc_mdelem md) { GRPC_MDELEM_UNREF(md); } +static void skip_header(grpc_exec_ctx* exec_ctx, void* tp, grpc_mdelem md) { + GRPC_MDELEM_UNREF(exec_ctx, md); +} -static grpc_error* init_skip_frame_parser(grpc_chttp2_transport* t, +static grpc_error* init_skip_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, int is_header) { if (is_header) { uint8_t is_eoh = t->expect_continuation_stream_id != 0; @@ -331,11 +346,14 @@ static grpc_error* init_skip_frame_parser(grpc_chttp2_transport* t, return GRPC_ERROR_NONE; } -void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t) { - init_skip_frame_parser(t, t->parser == grpc_chttp2_header_parser_parse); +void grpc_chttp2_parsing_become_skip_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { + init_skip_frame_parser(exec_ctx, t, + t->parser == grpc_chttp2_header_parser_parse); } -static grpc_error* init_data_frame_parser(grpc_chttp2_transport* t) { +static grpc_error* init_data_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_chttp2_stream* s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_error* err = GRPC_ERROR_NONE; @@ -347,17 +365,17 @@ static grpc_error* init_data_frame_parser(grpc_chttp2_transport* t) { err = s->flow_control->RecvData(t->incoming_frame_size); action = s->flow_control->MakeAction(); } - grpc_chttp2_act_on_flowctl_action(action, t, s); + grpc_chttp2_act_on_flowctl_action(exec_ctx, action, t, s); if (err != GRPC_ERROR_NONE) { goto error_handler; } if (s == nullptr) { - return init_skip_frame_parser(t, 0); + return init_skip_frame_parser(exec_ctx, t, 0); } s->received_bytes += t->incoming_frame_size; s->stats.incoming.framing_bytes += 9; if (err == GRPC_ERROR_NONE && s->read_closed) { - return init_skip_frame_parser(t, 0); + return init_skip_frame_parser(exec_ctx, t, 0); } if (err == GRPC_ERROR_NONE) { err = grpc_chttp2_data_parser_begin_frame( @@ -376,13 +394,13 @@ error_handler: } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, nullptr)) { /* handle stream errors by closing the stream */ if (s != nullptr) { - grpc_chttp2_mark_stream_closed(t, s, true, false, err); + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, err); } grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id, GRPC_HTTP2_PROTOCOL_ERROR, &s->stats.outgoing)); - return init_skip_frame_parser(t, 0); + return init_skip_frame_parser(exec_ctx, t, 0); } else { return err; } @@ -390,7 +408,8 @@ error_handler: static void free_timeout(void* p) { gpr_free(p); } -static void on_initial_header(void* tp, grpc_mdelem md) { +static void on_initial_header(grpc_exec_ctx* exec_ctx, void* tp, + grpc_mdelem md) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; grpc_chttp2_stream* s = t->incoming_stream; @@ -436,9 +455,9 @@ static void on_initial_header(void* tp, grpc_mdelem md) { } if (timeout != GRPC_MILLIS_INF_FUTURE) { grpc_chttp2_incoming_metadata_buffer_set_deadline( - &s->metadata_buffer[0], grpc_core::ExecCtx::Get()->Now() + timeout); + &s->metadata_buffer[0], grpc_exec_ctx_now(exec_ctx) + timeout); } - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } else { const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md); const size_t metadata_size_limit = @@ -450,22 +469,22 @@ static void on_initial_header(void* tp, grpc_mdelem md) { " vs. %" PRIuPTR ")", new_size, metadata_size_limit); grpc_chttp2_cancel_stream( - t, s, + exec_ctx, t, s, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "received initial metadata size exceeds limit"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); - grpc_chttp2_parsing_become_skip_parser(t); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); s->seen_error = true; - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_error* error = - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error* error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); if (error != GRPC_ERROR_NONE) { - grpc_chttp2_cancel_stream(t, s, error); - grpc_chttp2_parsing_become_skip_parser(t); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); s->seen_error = true; - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } } } @@ -473,7 +492,8 @@ static void on_initial_header(void* tp, grpc_mdelem md) { GPR_TIMER_END("on_initial_header", 0); } -static void on_trailing_header(void* tp, grpc_mdelem md) { +static void on_trailing_header(grpc_exec_ctx* exec_ctx, void* tp, + grpc_mdelem md) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; grpc_chttp2_stream* s = t->incoming_stream; @@ -507,29 +527,30 @@ static void on_trailing_header(void* tp, grpc_mdelem md) { " vs. %" PRIuPTR ")", new_size, metadata_size_limit); grpc_chttp2_cancel_stream( - t, s, + exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "received trailing metadata size exceeds limit"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); - grpc_chttp2_parsing_become_skip_parser(t); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); s->seen_error = true; - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_error* error = - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error* error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); if (error != GRPC_ERROR_NONE) { - grpc_chttp2_cancel_stream(t, s, error); - grpc_chttp2_parsing_become_skip_parser(t); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); s->seen_error = true; - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } } GPR_TIMER_END("on_trailing_header", 0); } -static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, +static grpc_error* init_header_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, int is_continuation) { uint8_t is_eoh = (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; @@ -559,7 +580,7 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received")); - return init_skip_frame_parser(t, 1); + return init_skip_frame_parser(exec_ctx, t, 1); } if (t->is_client) { if ((t->incoming_stream_id & 1) && @@ -569,7 +590,7 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client")); } - grpc_error* err = init_skip_frame_parser(t, 1); + grpc_error* err = init_skip_frame_parser(exec_ctx, t, 1); if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY) { grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser); } @@ -581,13 +602,13 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, "last grpc_chttp2_stream " "id=%d, new grpc_chttp2_stream id=%d", t->last_new_stream_id, t->incoming_stream_id)); - return init_skip_frame_parser(t, 1); + return init_skip_frame_parser(exec_ctx, t, 1); } else if ((t->incoming_stream_id & 1) == 0) { GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d", t->incoming_stream_id)); - return init_skip_frame_parser(t, 1); + return init_skip_frame_parser(exec_ctx, t, 1); } else if (grpc_chttp2_stream_map_size(&t->stream_map) >= t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { @@ -595,11 +616,11 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, } t->last_new_stream_id = t->incoming_stream_id; s = t->incoming_stream = - grpc_chttp2_parsing_accept_stream(t, t->incoming_stream_id); + grpc_chttp2_parsing_accept_stream(exec_ctx, t, t->incoming_stream_id); if (s == nullptr) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted")); - return init_skip_frame_parser(t, 1); + return init_skip_frame_parser(exec_ctx, t, 1); } } else { t->incoming_stream = s; @@ -610,7 +631,7 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_ERROR, "skipping already closed grpc_chttp2_stream header")); t->incoming_stream = nullptr; - return init_skip_frame_parser(t, 1); + return init_skip_frame_parser(exec_ctx, t, 1); } t->parser = grpc_chttp2_header_parser_parse; t->parser_data = &t->hpack_parser; @@ -635,7 +656,7 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, break; case 2: gpr_log(GPR_ERROR, "too many header frames received"); - return init_skip_frame_parser(t, 1); + return init_skip_frame_parser(exec_ctx, t, 1); } t->hpack_parser.on_header_user_data = t; t->hpack_parser.is_boundary = is_eoh; @@ -647,7 +668,8 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, return GRPC_ERROR_NONE; } -static grpc_error* init_window_update_frame_parser(grpc_chttp2_transport* t) { +static grpc_error* init_window_update_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_error* err = grpc_chttp2_window_update_parser_begin_frame( &t->simple.window_update, t->incoming_frame_size, t->incoming_frame_flags); @@ -656,7 +678,7 @@ static grpc_error* init_window_update_frame_parser(grpc_chttp2_transport* t) { grpc_chttp2_stream* s = t->incoming_stream = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); if (s == nullptr) { - return init_skip_frame_parser(t, 0); + return init_skip_frame_parser(exec_ctx, t, 0); } s->stats.incoming.framing_bytes += 9; } @@ -665,7 +687,8 @@ static grpc_error* init_window_update_frame_parser(grpc_chttp2_transport* t) { return GRPC_ERROR_NONE; } -static grpc_error* init_ping_parser(grpc_chttp2_transport* t) { +static grpc_error* init_ping_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_error* err = grpc_chttp2_ping_parser_begin_frame( &t->simple.ping, t->incoming_frame_size, t->incoming_frame_flags); if (err != GRPC_ERROR_NONE) return err; @@ -674,14 +697,15 @@ static grpc_error* init_ping_parser(grpc_chttp2_transport* t) { return GRPC_ERROR_NONE; } -static grpc_error* init_rst_stream_parser(grpc_chttp2_transport* t) { +static grpc_error* init_rst_stream_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_error* err = grpc_chttp2_rst_stream_parser_begin_frame( &t->simple.rst_stream, t->incoming_frame_size, t->incoming_frame_flags); if (err != GRPC_ERROR_NONE) return err; grpc_chttp2_stream* s = t->incoming_stream = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); if (!t->incoming_stream) { - return init_skip_frame_parser(t, 0); + return init_skip_frame_parser(exec_ctx, t, 0); } s->stats.incoming.framing_bytes += 9; t->parser = grpc_chttp2_rst_stream_parser_parse; @@ -689,7 +713,8 @@ static grpc_error* init_rst_stream_parser(grpc_chttp2_transport* t) { return GRPC_ERROR_NONE; } -static grpc_error* init_goaway_parser(grpc_chttp2_transport* t) { +static grpc_error* init_goaway_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_error* err = grpc_chttp2_goaway_parser_begin_frame( &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags); if (err != GRPC_ERROR_NONE) return err; @@ -698,7 +723,8 @@ static grpc_error* init_goaway_parser(grpc_chttp2_transport* t) { return GRPC_ERROR_NONE; } -static grpc_error* init_settings_frame_parser(grpc_chttp2_transport* t) { +static grpc_error* init_settings_frame_parser(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (t->incoming_stream_id != 0) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Settings frame received for grpc_chttp2_stream"); @@ -714,7 +740,7 @@ static grpc_error* init_settings_frame_parser(grpc_chttp2_transport* t) { memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS], GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); grpc_chttp2_hptbl_set_max_bytes( - &t->hpack_parser.table, + exec_ctx, &t->hpack_parser.table, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); t->sent_local_settings = 0; @@ -724,10 +750,11 @@ static grpc_error* init_settings_frame_parser(grpc_chttp2_transport* t) { return GRPC_ERROR_NONE; } -static grpc_error* parse_frame_slice(grpc_chttp2_transport* t, grpc_slice slice, +static grpc_error* parse_frame_slice(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_slice slice, int is_last) { grpc_chttp2_stream* s = t->incoming_stream; - grpc_error* err = t->parser(t->parser_data, t, s, slice, is_last); + grpc_error* err = t->parser(exec_ctx, t->parser_data, t, s, slice, is_last); if (err == GRPC_ERROR_NONE) { return err; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, nullptr)) { @@ -735,7 +762,7 @@ static grpc_error* parse_frame_slice(grpc_chttp2_transport* t, grpc_slice slice, const char* msg = grpc_error_string(err); gpr_log(GPR_ERROR, "%s", msg); } - grpc_chttp2_parsing_become_skip_parser(t); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); if (s) { s->forced_close_error = err; grpc_slice_buffer_add( |