diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.c')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 851 |
1 files changed, 566 insertions, 285 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 53633227ac..38e782b9b4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -47,6 +47,7 @@ #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" +#include "src/core/lib/http/parser.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -56,6 +57,8 @@ #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu +#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) + #define MAX_CLIENT_STREAM_ID 0x7fffffffu int grpc_http_trace = 0; @@ -65,8 +68,8 @@ int grpc_flowctl_trace = 0; ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ writing))) -#define TRANSPORT_FROM_PARSING(tw) \ - ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ +#define TRANSPORT_FROM_PARSING(tp) \ + ((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \ parsing))) #define TRANSPORT_FROM_GLOBAL(tg) \ @@ -82,19 +85,17 @@ int grpc_flowctl_trace = 0; static const grpc_transport_vtable vtable; /* forward declarations of various callbacks that we'll build closures around */ -static void writing_action(grpc_exec_ctx *exec_ctx, void *t, - bool iomgr_success_ignored); -static void reading_action(grpc_exec_ctx *exec_ctx, void *t, - bool iomgr_success_ignored); -static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, - bool iomgr_success_ignored); +static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); +static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); +static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); /** Start disconnection chain */ -static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); +static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error); /** Perform a transport_op */ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, @@ -105,13 +106,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status); + grpc_error *error); static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status, - gpr_slice *optional_message); + grpc_error *error); /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, @@ -131,7 +131,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - grpc_connectivity_state state, const char *reason); + grpc_connectivity_state state, grpc_error *error, const char *reason); static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); @@ -145,7 +145,9 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s, void *byte_stream); static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_stream_global *stream_global); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_error *error); /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING @@ -188,7 +190,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, false, NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, + GRPC_ERROR_CREATE("Transport closed"), NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -257,6 +260,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->parsing.is_client = is_client; t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; + t->parsing.is_first_frame = true; t->writing.is_client = is_client; grpc_connectivity_state_init( &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, @@ -311,6 +315,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW); + push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + DEFAULT_MAX_HEADER_LIST_SIZE); if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { @@ -378,6 +384,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &t->writing.hpack_compressor, (uint32_t)channel_args->args[i].value.integer); } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_MAX_METADATA_SIZE)) { + if (channel_args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s: must be an integer", + GRPC_ARG_MAX_METADATA_SIZE); + } else if (channel_args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s: must be non-negative", + GRPC_ARG_MAX_METADATA_SIZE); + } else { + push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + (uint32_t)channel_args->args[i].value.integer); + } } } } @@ -388,7 +406,7 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s_ignored, void *arg_ignored) { t->destroying = 1; - drop_connection(exec_ctx, t); + drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { @@ -424,12 +442,11 @@ static void destroy_endpoint(grpc_exec_ctx *exec_ctx, static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s_ignored, - void *arg_ignored) { + grpc_error *error) { if (!t->closed) { t->closed = 1; - connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE, - "close_transport"); + connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "close_transport"); if (t->ep) { allow_endpoint_shutdown_locked(exec_ctx, t); } @@ -442,6 +459,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } } + GRPC_ERROR_UNREF(error); } #ifdef GRPC_STREAM_REFCOUNT_DEBUG @@ -510,7 +528,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); - s->global.in_stream_map = 1; + s->global.in_stream_map = true; } grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked, @@ -530,7 +548,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(exec_ctx, t, NULL, NULL); + close_transport_locked( + exec_ctx, t, + GRPC_ERROR_CREATE("Last stream closed after sending goaway")); } if (!t->executor.parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, @@ -569,6 +589,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_destroy( &s->global.received_trailing_metadata); gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); + GRPC_ERROR_UNREF(s->global.removal_error); UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -617,14 +638,15 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, grpc_chttp2_executor_action_header *hdr; grpc_chttp2_executor_action_header *next; + GPR_TIMER_BEGIN("finish_global_actions", 0); + for (;;) { if (!t->executor.writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing, - t->executor.parsing_active)) { + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { t->executor.writing_active = 1; REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); - grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); } check_read_ops(exec_ctx, &t->global); @@ -635,7 +657,9 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, NULL; gpr_mu_unlock(&t->executor.mu); while (hdr != NULL) { + GPR_TIMER_BEGIN("chttp2:locked_action", 0); hdr->action(exec_ctx, t, hdr->stream, hdr->arg); + GPR_TIMER_END("chttp2:locked_action", 0); next = hdr->next; gpr_free(hdr); UNREF_TRANSPORT(exec_ctx, t, "pending_action"); @@ -648,6 +672,8 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&t->executor.mu); break; } + + GPR_TIMER_END("finish_global_actions", 0); } void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, @@ -657,6 +683,8 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, void *arg, size_t sizeof_arg) { grpc_chttp2_executor_action_header *hdr; + GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0); + REF_TRANSPORT(t, "run_global"); gpr_mu_lock(&t->executor.mu); @@ -665,7 +693,9 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, t->executor.global_active = 1; gpr_mu_unlock(&t->executor.mu); + GPR_TIMER_BEGIN("chttp2:locked_action", 0); action(exec_ctx, t, optional_stream, arg); + GPR_TIMER_END("chttp2:locked_action", 0); finish_global_actions(exec_ctx, t); } else { @@ -702,6 +732,8 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, } UNREF_TRANSPORT(exec_ctx, t, "run_global"); + + GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0); } /******************************************************************************* @@ -735,12 +767,12 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { - bool success = (bool)(uintptr_t)a; + grpc_error *error = a; allow_endpoint_shutdown_locked(exec_ctx, t); - if (!success) { - drop_connection(exec_ctx, t); + if (error != GRPC_ERROR_NONE) { + drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); } grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); @@ -748,7 +780,8 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { - fail_pending_writes(exec_ctx, stream_global); + fail_pending_writes(exec_ctx, &t->global, stream_global, + GRPC_ERROR_REF(error)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); } @@ -761,18 +794,18 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, } UNREF_TRANSPORT(exec_ctx, t, "writing"); + GRPC_ERROR_UNREF(error); } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, - void *transport_writing, bool success) { + void *transport_writing, grpc_error *error) { grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); - grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, - terminate_writing_with_lock, - (void *)(uintptr_t)success, 0); + grpc_chttp2_run_with_global_lock( + exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0); } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, - bool iomgr_success_ignored) { + grpc_error *error) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("writing_action", 0); grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep); @@ -785,11 +818,19 @@ void grpc_chttp2_add_incoming_goaway( char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg)); - gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; - connectivity_state_set(exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE, - "got_goaway"); + /* lie: use transient failure from the transport to indicate goaway has been + * received */ + connectivity_state_set( + exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_set_str( + grpc_error_set_int(GRPC_ERROR_CREATE("GOAWAY received"), + GRPC_ERROR_INT_HTTP2_ERROR, + (intptr_t)goaway_error), + GRPC_ERROR_STR_RAW_BYTES, msg), + "got_goaway"); + gpr_free(msg); } static void maybe_start_some_streams( @@ -818,9 +859,9 @@ static void maybe_start_some_streams( transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { - connectivity_state_set(exec_ctx, transport_global, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "no_more_stream_ids"); + connectivity_state_set( + exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE("Stream IDs exhausted"), "no_more_stream_ids"); } stream_global->outgoing_window = @@ -834,7 +875,7 @@ static void maybe_start_some_streams( grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); - stream_global->in_stream_map = 1; + stream_global->in_stream_map = true; transport_global->concurrent_stream_count++; grpc_chttp2_become_writable(transport_global, stream_global); } @@ -843,39 +884,47 @@ static void maybe_start_some_streams( grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE); + grpc_error_set_int( + GRPC_ERROR_CREATE("Stream IDs exhausted"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } #define CLOSURE_BARRIER_STATS_BIT (1 << 0) -#define CLOSURE_BARRIER_FAILURE_BIT (1 << 1) #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) static grpc_closure *add_closure_barrier(grpc_closure *closure) { - closure->final_data += CLOSURE_BARRIER_FIRST_REF_BIT; + closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT; return closure; } -void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, - grpc_chttp2_stream_global *stream_global, - grpc_closure **pclosure, int success) { +void grpc_chttp2_complete_closure_step( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, + grpc_error *error) { grpc_closure *closure = *pclosure; if (closure == NULL) { + GRPC_ERROR_UNREF(error); return; } - closure->final_data -= CLOSURE_BARRIER_FIRST_REF_BIT; - if (!success) { - closure->final_data |= CLOSURE_BARRIER_FAILURE_BIT; + closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; + if (error != GRPC_ERROR_NONE) { + if (closure->error == GRPC_ERROR_NONE) { + closure->error = + GRPC_ERROR_CREATE("Error in HTTP transport completing operation"); + closure->error = grpc_error_set_str( + closure->error, GRPC_ERROR_STR_TARGET_ADDRESS, + TRANSPORT_FROM_GLOBAL(transport_global)->peer_string); + } + closure->error = grpc_error_add_child(closure->error, error); } - if (closure->final_data < CLOSURE_BARRIER_FIRST_REF_BIT) { - if (closure->final_data & CLOSURE_BARRIER_STATS_BIT) { + if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { + if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) { grpc_transport_move_stats(&stream_global->stats, stream_global->collecting_stats); stream_global->collecting_stats = NULL; } - grpc_exec_ctx_enqueue( - exec_ctx, closure, - (closure->final_data & CLOSURE_BARRIER_FAILURE_BIT) == 0, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL); } *pclosure = NULL; } @@ -893,7 +942,7 @@ static int contains_non_ok_status( return 0; } -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {} +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -910,22 +959,23 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } /* use final_data as a barrier until enqueue time; the inital counter is dropped at the end of this function */ - on_complete->final_data = CLOSURE_BARRIER_FIRST_REF_BIT; + on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; + on_complete->error = GRPC_ERROR_NONE; if (op->collect_stats != NULL) { GPR_ASSERT(stream_global->collecting_stats == NULL); stream_global->collecting_stats = op->collect_stats; - on_complete->final_data |= CLOSURE_BARRIER_STATS_BIT; + on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT; } - if (op->cancel_with_status != GRPC_STATUS_OK) { + if (op->cancel_error != GRPC_ERROR_NONE) { cancel_from_api(exec_ctx, transport_global, stream_global, - op->cancel_with_status); + GRPC_ERROR_REF(op->cancel_error)); } - if (op->close_with_status != GRPC_STATUS_OK) { + if (op->close_error != GRPC_ERROR_NONE) { close_from_api(exec_ctx, transport_global, stream_global, - op->close_with_status, op->optional_close_message); + GRPC_ERROR_REF(op->close_error)); } if (op->send_initial_metadata != NULL) { @@ -933,24 +983,44 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, stream_global->send_initial_metadata_finished = add_closure_barrier(on_complete); stream_global->send_initial_metadata = op->send_initial_metadata; - if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { - stream_global->seen_error = 1; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); - } - if (!stream_global->write_closed) { - if (transport_global->is_client) { - GPR_ASSERT(stream_global->id == 0); - grpc_chttp2_list_add_waiting_for_concurrency(transport_global, - stream_global); - maybe_start_some_streams(exec_ctx, transport_global); + const size_t metadata_size = + grpc_metadata_batch_size(op->send_initial_metadata); + const size_t metadata_peer_limit = + transport_global->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; + if (metadata_size > metadata_peer_limit) { + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + grpc_error_set_int( + grpc_error_set_int( + GRPC_ERROR_CREATE("to-be-sent initial metadata size " + "exceeds peer limit"), + GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size), + GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); + } else { + if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { + stream_global->seen_error = true; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (!stream_global->write_closed) { + if (transport_global->is_client) { + GPR_ASSERT(stream_global->id == 0); + grpc_chttp2_list_add_waiting_for_concurrency(transport_global, + stream_global); + maybe_start_some_streams(exec_ctx, transport_global); + } else { + GPR_ASSERT(stream_global->id != 0); + grpc_chttp2_become_writable(transport_global, stream_global); + } } else { - GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_complete_closure_step( + exec_ctx, transport_global, stream_global, + &stream_global->send_initial_metadata_finished, + GRPC_ERROR_CREATE( + "Attempt to send initial metadata after stream was closed")); } - } else { - grpc_chttp2_complete_closure_step( - exec_ctx, stream_global, - &stream_global->send_initial_metadata_finished, 0); } } @@ -960,7 +1030,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, stream_global->send_message_finished = add_closure_barrier(on_complete); if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( - exec_ctx, stream_global, &stream_global->send_message_finished, 0); + exec_ctx, transport_global, stream_global, + &stream_global->send_message_finished, + GRPC_ERROR_CREATE("Attempt to send message after stream was closed")); } else { stream_global->send_message = op->send_message; if (stream_global->id != 0) { @@ -974,19 +1046,41 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, stream_global->send_trailing_metadata_finished = add_closure_barrier(on_complete); stream_global->send_trailing_metadata = op->send_trailing_metadata; - if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { - stream_global->seen_error = 1; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); - } - if (stream_global->write_closed) { - grpc_chttp2_complete_closure_step( - exec_ctx, stream_global, - &stream_global->send_trailing_metadata_finished, - grpc_metadata_batch_is_empty(op->send_trailing_metadata)); - } else if (stream_global->id != 0) { - /* TODO(ctiller): check if there's flow control for any outstanding - bytes before going writable */ - grpc_chttp2_become_writable(transport_global, stream_global); + const size_t metadata_size = + grpc_metadata_batch_size(op->send_trailing_metadata); + const size_t metadata_peer_limit = + transport_global->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; + if (metadata_size > metadata_peer_limit) { + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + grpc_error_set_int( + grpc_error_set_int( + GRPC_ERROR_CREATE("to-be-sent trailing metadata size " + "exceeds peer limit"), + GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size), + GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); + } else { + if (contains_non_ok_status(transport_global, + op->send_trailing_metadata)) { + stream_global->seen_error = true; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (stream_global->write_closed) { + grpc_chttp2_complete_closure_step( + exec_ctx, transport_global, stream_global, + &stream_global->send_trailing_metadata_finished, + grpc_metadata_batch_is_empty(op->send_trailing_metadata) + ? GRPC_ERROR_NONE + : GRPC_ERROR_CREATE("Attempt to send trailing metadata after " + "stream was closed")); + } else if (stream_global->id != 0) { + /* TODO(ctiller): check if there's flow control for any outstanding + bytes before going writable */ + grpc_chttp2_become_writable(transport_global, stream_global); + } } } @@ -1017,10 +1111,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, stream_global->recv_trailing_metadata_finished = add_closure_barrier(on_complete); stream_global->recv_trailing_metadata = op->recv_trailing_metadata; + stream_global->final_metadata_requested = true; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } - grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &on_complete, 1); + grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global, + &on_complete, GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op_locked", 0); } @@ -1057,7 +1153,7 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, true, NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -1079,7 +1175,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s_unused, void *stream_op) { grpc_transport_op *op = stream_op; - bool close_transport = op->disconnect; + grpc_error *close_transport = op->disconnect_with_error; /* If there's a set_accept_stream ensure that we're not parsing to avoid changing things out from underneath */ @@ -1090,7 +1186,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, return; } - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -1104,7 +1200,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, t->global.last_incoming_stream_id, (uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), gpr_slice_ref(*op->goaway_message), &t->global.qbuf); - close_transport = !grpc_chttp2_has_streams(t); + close_transport = grpc_chttp2_has_streams(t) + ? GRPC_ERROR_NONE + : GRPC_ERROR_CREATE("GOAWAY sent"); } if (op->set_accept_stream) { @@ -1125,8 +1223,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, send_ping_locked(t, op->send_ping); } - if (close_transport) { - close_transport_locked(exec_ctx, t, NULL, NULL); + if (close_transport != GRPC_ERROR_NONE) { + close_transport_locked(exec_ctx, t, close_transport); } } @@ -1149,15 +1247,30 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) { if (stream_global->recv_initial_metadata_ready != NULL && stream_global->published_initial_metadata) { + if (stream_global->seen_error) { + while ((bs = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames)) != NULL) { + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + } + if (stream_global->exceeded_metadata_size) { + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + GRPC_ERROR_CREATE( + "received initial metadata size exceeds limit"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); + } + } grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_exec_ctx_enqueue( - exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { - while (stream_global->seen_error && + while (stream_global->final_metadata_requested && + stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); @@ -1166,30 +1279,39 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); GPR_ASSERT(*stream_global->recv_message != NULL); - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true, - NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } else if (stream_global->published_trailing_metadata) { *stream_global->recv_message = NULL; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true, - NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } } if (stream_global->recv_trailing_metadata_finished != NULL && stream_global->read_closed && stream_global->write_closed) { - while (stream_global->seen_error && - (bs = grpc_chttp2_incoming_frame_queue_pop( - &stream_global->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + if (stream_global->seen_error) { + while ((bs = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames)) != NULL) { + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + } + if (stream_global->exceeded_metadata_size) { + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + GRPC_ERROR_CREATE( + "received trailing metadata size exceeds limit"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); + } } if (stream_global->all_incoming_byte_streams_finished) { grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_trailing_metadata, stream_global->recv_trailing_metadata); grpc_chttp2_complete_closure_step( - exec_ctx, stream_global, - &stream_global->recv_trailing_metadata_finished, 1); + exec_ctx, transport_global, stream_global, + &stream_global->recv_trailing_metadata_finished, GRPC_ERROR_NONE); } } } @@ -1205,7 +1327,7 @@ static void decrement_active_streams_locked( } static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - uint32_t id) { + uint32_t id, grpc_error *error) { size_t new_stream_count; grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id); @@ -1213,19 +1335,22 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id); } GPR_ASSERT(s); - s->global.in_stream_map = 0; + s->global.in_stream_map = false; if (t->parsing.incoming_stream == &s->parsing) { t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing); } if (s->parsing.data_parser.parsing_frame != NULL) { grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, s->parsing.data_parser.parsing_frame, 0, 0); + exec_ctx, s->parsing.data_parser.parsing_frame, GRPC_ERROR_REF(error), + 0); s->parsing.data_parser.parsing_frame = NULL; } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(exec_ctx, t, NULL, NULL); + close_transport_locked( + exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING( + "Last stream closed after sending GOAWAY", &error, 1)); } if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing"); @@ -1238,30 +1363,70 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->global.concurrent_stream_count = (uint32_t)new_stream_count; maybe_start_some_streams(exec_ctx, &t->global); } + GRPC_ERROR_UNREF(error); +} + +static void status_codes_from_error(grpc_error *error, + grpc_chttp2_error_code *http2_error, + grpc_status_code *grpc_status) { + intptr_t ip_http; + intptr_t ip_grpc; + bool have_http = + grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &ip_http); + bool have_grpc = + grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &ip_grpc); + if (have_http) { + *http2_error = (grpc_chttp2_error_code)ip_http; + } else if (have_grpc) { + *http2_error = + grpc_chttp2_grpc_status_to_http2_error((grpc_status_code)ip_grpc); + } else { + *http2_error = GRPC_CHTTP2_INTERNAL_ERROR; + } + if (have_grpc) { + *grpc_status = (grpc_status_code)ip_grpc; + } else if (have_http) { + *grpc_status = + grpc_chttp2_http2_error_to_grpc_status((grpc_chttp2_error_code)ip_http); + } else { + *grpc_status = GRPC_STATUS_INTERNAL; + } } static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status) { + grpc_error *due_to_error) { if (!stream_global->read_closed || !stream_global->write_closed) { + grpc_status_code grpc_status; + grpc_chttp2_error_code http_error; + status_codes_from_error(due_to_error, &http_error, &grpc_status); + if (stream_global->id != 0) { gpr_slice_buffer_add( &transport_global->qbuf, - grpc_chttp2_rst_stream_create( - stream_global->id, - (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status), - &stream_global->stats.outgoing)); + grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, + &stream_global->stats.outgoing)); + } + + const char *msg = + grpc_error_get_str(due_to_error, GRPC_ERROR_STR_GRPC_MESSAGE); + bool free_msg = false; + if (msg == NULL) { + free_msg = true; + msg = grpc_error_string(due_to_error); } - grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, - NULL); + gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, + grpc_status, &msg_slice); + if (free_msg) grpc_error_free_string(msg); } - if (status != GRPC_STATUS_OK && !stream_global->seen_error) { - stream_global->seen_error = 1; + if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) { + stream_global->seen_error = true; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, - 1); + 1, due_to_error); } void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, @@ -1269,7 +1434,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global, grpc_status_code status, gpr_slice *slice) { if (status != GRPC_STATUS_OK) { - stream_global->seen_error = 1; + stream_global->seen_error = true; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } /* stream_global->recv_trailing_metadata_finished gives us a @@ -1293,7 +1458,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); } - stream_global->published_trailing_metadata = 1; + stream_global->published_trailing_metadata = true; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (slice) { @@ -1302,43 +1467,49 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, } static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_stream_global *stream_global) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_error *error) { grpc_chttp2_complete_closure_step( - exec_ctx, stream_global, &stream_global->send_initial_metadata_finished, - 0); + exec_ctx, transport_global, stream_global, + &stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error)); grpc_chttp2_complete_closure_step( - exec_ctx, stream_global, &stream_global->send_trailing_metadata_finished, - 0); - grpc_chttp2_complete_closure_step(exec_ctx, stream_global, - &stream_global->send_message_finished, 0); + exec_ctx, transport_global, stream_global, + &stream_global->send_trailing_metadata_finished, GRPC_ERROR_REF(error)); + grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global, + &stream_global->send_message_finished, + error); } void grpc_chttp2_mark_stream_closed( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, int close_reads, - int close_writes) { + grpc_chttp2_stream_global *stream_global, int close_reads, int close_writes, + grpc_error *error) { if (stream_global->read_closed && stream_global->write_closed) { /* already closed */ + GRPC_ERROR_UNREF(error); return; } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); if (close_reads && !stream_global->read_closed) { - stream_global->read_closed = 1; - stream_global->published_initial_metadata = 1; - stream_global->published_trailing_metadata = 1; + stream_global->read_closed = true; + stream_global->published_initial_metadata = true; + stream_global->published_trailing_metadata = true; decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { - stream_global->write_closed = 1; + stream_global->write_closed = true; if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, stream_global); } else { - fail_pending_writes(exec_ctx, stream_global); + fail_pending_writes(exec_ctx, transport_global, stream_global, + GRPC_ERROR_REF(error)); } } if (stream_global->read_closed && stream_global->write_closed) { + stream_global->removal_error = GRPC_ERROR_REF(error); if (stream_global->id != 0 && TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, @@ -1346,134 +1517,156 @@ void grpc_chttp2_mark_stream_closed( } else { if (stream_global->id != 0) { remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - stream_global->id); + stream_global->id, GRPC_ERROR_REF(error)); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } } + GRPC_ERROR_UNREF(error); } static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status, - gpr_slice *optional_message) { + grpc_error *error) { gpr_slice hdr; gpr_slice status_hdr; gpr_slice message_pfx; uint8_t *p; uint32_t len = 0; - - GPR_ASSERT(status >= 0 && (int)status < 100); - - GPR_ASSERT(stream_global->id != 0); - - /* Hand roll a header block. - This is unnecessarily ugly - at some point we should find a more elegant - solution. - It's complicated by the fact that our send machinery would be dead by the - time we got around to sending this, so instead we ignore HPACK compression - and just write the uncompressed bytes onto the wire. */ - status_hdr = gpr_slice_malloc(15 + (status >= 10)); - p = GPR_SLICE_START_PTR(status_hdr); - *p++ = 0x40; /* literal header */ - *p++ = 11; /* len(grpc-status) */ - *p++ = 'g'; - *p++ = 'r'; - *p++ = 'p'; - *p++ = 'c'; - *p++ = '-'; - *p++ = 's'; - *p++ = 't'; - *p++ = 'a'; - *p++ = 't'; - *p++ = 'u'; - *p++ = 's'; - if (status < 10) { - *p++ = 1; - *p++ = (uint8_t)('0' + status); - } else { - *p++ = 2; - *p++ = (uint8_t)('0' + (status / 10)); - *p++ = (uint8_t)('0' + (status % 10)); - } - GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); - len += (uint32_t)GPR_SLICE_LENGTH(status_hdr); - - if (optional_message) { - GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127); - message_pfx = gpr_slice_malloc(15); - p = GPR_SLICE_START_PTR(message_pfx); - *p++ = 0x40; - *p++ = 12; /* len(grpc-message) */ + grpc_status_code grpc_status; + grpc_chttp2_error_code http_error; + status_codes_from_error(error, &http_error, &grpc_status); + + GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100); + + if (stream_global->id != 0 && !transport_global->is_client) { + /* Hand roll a header block. + This is unnecessarily ugly - at some point we should find a more elegant + solution. + It's complicated by the fact that our send machinery would be dead by the + time we got around to sending this, so instead we ignore HPACK + compression + and just write the uncompressed bytes onto the wire. */ + status_hdr = gpr_slice_malloc(15 + (grpc_status >= 10)); + p = GPR_SLICE_START_PTR(status_hdr); + *p++ = 0x40; /* literal header */ + *p++ = 11; /* len(grpc-status) */ *p++ = 'g'; *p++ = 'r'; *p++ = 'p'; *p++ = 'c'; *p++ = '-'; - *p++ = 'm'; - *p++ = 'e'; - *p++ = 's'; *p++ = 's'; + *p++ = 't'; *p++ = 'a'; - *p++ = 'g'; - *p++ = 'e'; - *p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message); - GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); - len += (uint32_t)GPR_SLICE_LENGTH(message_pfx); - len += (uint32_t)GPR_SLICE_LENGTH(*optional_message); - } - - hdr = gpr_slice_malloc(9); - p = GPR_SLICE_START_PTR(hdr); - *p++ = (uint8_t)(len >> 16); - *p++ = (uint8_t)(len >> 8); - *p++ = (uint8_t)(len); - *p++ = GRPC_CHTTP2_FRAME_HEADER; - *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS; - *p++ = (uint8_t)(stream_global->id >> 24); - *p++ = (uint8_t)(stream_global->id >> 16); - *p++ = (uint8_t)(stream_global->id >> 8); - *p++ = (uint8_t)(stream_global->id); - GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); - - gpr_slice_buffer_add(&transport_global->qbuf, hdr); - gpr_slice_buffer_add(&transport_global->qbuf, status_hdr); - if (optional_message) { - gpr_slice_buffer_add(&transport_global->qbuf, message_pfx); - gpr_slice_buffer_add(&transport_global->qbuf, - gpr_slice_ref(*optional_message)); - } - - gpr_slice_buffer_add( - &transport_global->qbuf, - grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR, - &stream_global->stats.outgoing)); - - if (optional_message) { - gpr_slice_ref(*optional_message); - } - grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, - optional_message); + *p++ = 't'; + *p++ = 'u'; + *p++ = 's'; + if (grpc_status < 10) { + *p++ = 1; + *p++ = (uint8_t)('0' + grpc_status); + } else { + *p++ = 2; + *p++ = (uint8_t)('0' + (grpc_status / 10)); + *p++ = (uint8_t)('0' + (grpc_status % 10)); + } + GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); + len += (uint32_t)GPR_SLICE_LENGTH(status_hdr); + + const char *optional_message = + grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); + + if (optional_message != NULL) { + size_t msg_len = strlen(optional_message); + GPR_ASSERT(msg_len < 127); + message_pfx = gpr_slice_malloc(15); + p = GPR_SLICE_START_PTR(message_pfx); + *p++ = 0x40; + *p++ = 12; /* len(grpc-message) */ + *p++ = 'g'; + *p++ = 'r'; + *p++ = 'p'; + *p++ = 'c'; + *p++ = '-'; + *p++ = 'm'; + *p++ = 'e'; + *p++ = 's'; + *p++ = 's'; + *p++ = 'a'; + *p++ = 'g'; + *p++ = 'e'; + *p++ = (uint8_t)msg_len; + GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); + len += (uint32_t)GPR_SLICE_LENGTH(message_pfx); + len += (uint32_t)msg_len; + } + + hdr = gpr_slice_malloc(9); + p = GPR_SLICE_START_PTR(hdr); + *p++ = (uint8_t)(len >> 16); + *p++ = (uint8_t)(len >> 8); + *p++ = (uint8_t)(len); + *p++ = GRPC_CHTTP2_FRAME_HEADER; + *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS; + *p++ = (uint8_t)(stream_global->id >> 24); + *p++ = (uint8_t)(stream_global->id >> 16); + *p++ = (uint8_t)(stream_global->id >> 8); + *p++ = (uint8_t)(stream_global->id); + GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); + + gpr_slice_buffer_add(&transport_global->qbuf, hdr); + gpr_slice_buffer_add(&transport_global->qbuf, status_hdr); + if (optional_message) { + gpr_slice_buffer_add(&transport_global->qbuf, message_pfx); + gpr_slice_buffer_add(&transport_global->qbuf, + gpr_slice_from_copied_string(optional_message)); + } + gpr_slice_buffer_add( + &transport_global->qbuf, + grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR, + &stream_global->stats.outgoing)); + } + + const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); + bool free_msg = false; + if (msg == NULL) { + free_msg = true; + msg = grpc_error_string(error); + } + gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, + grpc_status, &msg_slice); + if (free_msg) grpc_error_free_string(msg); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, - 1); + 1, error); } +typedef struct { + grpc_exec_ctx *exec_ctx; + grpc_error *error; +} cancel_stream_cb_args; + static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { - cancel_from_api(user_data, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE); + cancel_stream_cb_args *args = user_data; + cancel_from_api(args->exec_ctx, transport_global, stream_global, + GRPC_ERROR_REF(args->error)); } -static void end_all_the_calls(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb); +static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error) { + cancel_stream_cb_args args = {exec_ctx, error}; + grpc_chttp2_for_all_streams(&t->global, &args, cancel_stream_cb); + GRPC_ERROR_UNREF(error); } -static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - close_transport_locked(exec_ctx, t, NULL, NULL); - end_all_the_calls(exec_ctx, t); +static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error) { + close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); + end_all_the_calls(exec_ctx, t, error); } /** update window from a settings change */ @@ -1503,20 +1696,22 @@ static void update_global_window(void *args, uint32_t id, void *stream) { static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_unused, void *arg); -static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_unused, void *arg); static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_unused, void *arg); -static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, bool success) { +static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { /* Control flow: reading_action_locked -> (parse_unlocked -> post_parse_locked)? -> post_reading_action_locked */ grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked, - (void *)(uintptr_t)success, 0); + GRPC_ERROR_REF(error), 0); } static void reading_action_locked(grpc_exec_ctx *exec_ctx, @@ -1524,7 +1719,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s_unused, void *arg) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; - bool success = (bool)(uintptr_t)arg; + grpc_error *error = arg; GPR_ASSERT(!t->executor.parsing_active); if (!t->closed) { @@ -1533,27 +1728,65 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_enqueue(exec_ctx, &t->parsing_action, success, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL); } else { post_reading_action_locked(exec_ctx, t, s_unused, arg); } } -static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + grpc_http_parser parser; + size_t i = 0; + grpc_error *error = GRPC_ERROR_NONE; + grpc_http_response response; + memset(&response, 0, sizeof(response)); + + grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response); + + grpc_error *parse_error = GRPC_ERROR_NONE; + for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) { + parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]); + } + if (parse_error == GRPC_ERROR_NONE && + (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) { + error = grpc_error_set_int( + GRPC_ERROR_CREATE("Trying to connect an http1.x server"), + GRPC_ERROR_INT_HTTP_STATUS, response.status); + } + GRPC_ERROR_UNREF(parse_error); + + grpc_http_parser_destroy(&parser); + grpc_http_response_destroy(&response); + return error; +} + +static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_chttp2_transport *t = arg; GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; - for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, &t->parsing, - t->read_buffer.slices[i]); - i++) - ; + grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, + GRPC_ERROR_NONE}; + for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { + errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing, + t->read_buffer.slices[i]); + }; if (i != t->read_buffer.count) { - success = false; + errors[2] = try_http_parsing(exec_ctx, t); + } + grpc_error *err = + errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE && + errors[2] == GRPC_ERROR_NONE + ? GRPC_ERROR_NONE + : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, + GPR_ARRAY_SIZE(errors)); + for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { + GRPC_ERROR_UNREF(errors[i]); } GPR_TIMER_END("reading_action.parse", 0); - grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, - (void *)(uintptr_t)success, 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err, + 0); } static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1590,7 +1823,8 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(stream_global->in_stream_map); GPR_ASSERT(stream_global->write_closed); GPR_ASSERT(stream_global->read_closed); - remove_stream(exec_ctx, t, stream_global->id); + remove_stream(exec_ctx, t, stream_global->id, + GRPC_ERROR_REF(stream_global->removal_error)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1601,10 +1835,17 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_unused, void *arg) { - bool success = (bool)(uintptr_t)arg; + grpc_error *error = arg; bool keep_reading = false; - if (!success || t->closed) { - drop_connection(exec_ctx, t); + if (error == GRPC_ERROR_NONE && t->closed) { + error = GRPC_ERROR_CREATE("Transport closed"); + } + if (error != GRPC_ERROR_NONE) { + if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } + drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; if (!t->executor.writing_active && t->ep) { grpc_endpoint_destroy(exec_ctx, t->ep); @@ -1618,6 +1859,7 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); + GRPC_ERROR_UNREF(error); if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); @@ -1634,13 +1876,13 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - grpc_connectivity_state state, const char *reason) { + grpc_connectivity_state state, grpc_error *error, const char *reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set( exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, reason); + state, error, reason); } /******************************************************************************* @@ -1672,6 +1914,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, add_to_pollset_locked, pollset, 0); } +static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset_set *pollset_set) { + grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, + (grpc_chttp2_stream *)gs, + add_to_pollset_set_locked, pollset_set, 0); +} + /******************************************************************************* * BYTE STREAM */ @@ -1679,6 +1928,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { + GRPC_ERROR_UNREF(bs->error); gpr_slice_buffer_destroy(&bs->slices); gpr_free(bs); } @@ -1747,9 +1997,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); - grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, true, NULL); - } else if (bs->failed) { - grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, false, NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); + } else if (bs->error != GRPC_ERROR_NONE) { + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), + NULL); } else { bs->on_next = arg->on_complete; bs->next = arg->slice; @@ -1806,7 +2057,7 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { *bs->next = arg->slice; - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { gpr_slice_buffer_add(&bs->slices, arg->slice); @@ -1824,13 +2075,30 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, sizeof(arg)); } +typedef struct { + grpc_chttp2_incoming_byte_stream *bs; + grpc_error *error; +} bs_fail_args; + +static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs, + grpc_error *error) { + bs_fail_args *a = gpr_malloc(sizeof(*a)); + a->bs = bs; + a->error = error; + return a; +} + static void incoming_byte_stream_finished_failed_locked( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *argp) { - grpc_chttp2_incoming_byte_stream *bs = argp; - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); + bs_fail_args *a = argp; + grpc_chttp2_incoming_byte_stream *bs = a->bs; + grpc_error *error = a->error; + gpr_free(a); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); bs->on_next = NULL; - bs->failed = 1; + GRPC_ERROR_UNREF(bs->error); + bs->error = error; incoming_byte_stream_unref(exec_ctx, bs); } @@ -1843,25 +2111,26 @@ static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx, } void grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success, - int from_parsing_thread) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, + grpc_error *error, int from_parsing_thread) { if (from_parsing_thread) { - if (success) { + if (error == GRPC_ERROR_NONE) { grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, incoming_byte_stream_finished_ok_locked, bs, 0); } else { - incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport, - bs->stream, bs); - } - } else { - if (success) { grpc_chttp2_run_with_global_lock( exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_finished_failed_locked, bs, 0); + incoming_byte_stream_finished_failed_locked, + make_bs_fail_args(bs, error), 0); + } + } else { + if (error == GRPC_ERROR_NONE) { + incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport, + bs->stream, bs); } else { - incoming_byte_stream_finished_failed_locked(exec_ctx, bs->transport, - bs->stream, bs); + incoming_byte_stream_finished_failed_locked( + exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error)); } } } @@ -1884,7 +2153,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( gpr_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; - incoming_byte_stream->failed = 0; + incoming_byte_stream->error = GRPC_ERROR_NONE; if (add_to_queue->head == NULL) { add_to_queue->head = incoming_byte_stream; } else { @@ -1903,10 +2172,13 @@ static char *format_flowctl_context_var(const char *context, const char *var, int64_t val, uint32_t id, char **scope) { char *underscore_pos; + char *buf; char *result; if (context == NULL) { *scope = NULL; - gpr_asprintf(&result, "%s(%lld)", var, val); + gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); + result = gpr_leftpad(buf, ' ', 40); + gpr_free(buf); return result; } underscore_pos = strchr(context, '_'); @@ -1917,7 +2189,9 @@ static char *format_flowctl_context_var(const char *context, const char *var, gpr_asprintf(scope, "%s[%d]", tmp, id); gpr_free(tmp); } - gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val); + gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); + result = gpr_leftpad(buf, ' ', 40); + gpr_free(buf); return result; } @@ -1938,6 +2212,8 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, uint32_t stream_id, int64_t val1, int64_t val2) { char *scope1; char *scope2; + char *tmp_phase; + char *tmp_scope1; char *label1 = format_flowctl_context_var(context1, var1, val1, stream_id, &scope1); char *label2 = @@ -1945,14 +2221,18 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, char *clisvr = is_client ? "client" : "server"; char *prefix; - gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1); + tmp_phase = gpr_leftpad(phase, ' ', 8); + tmp_scope1 = gpr_leftpad(scope1, ' ', 11); + gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); + gpr_free(tmp_phase); + gpr_free(tmp_scope1); switch (op) { case GRPC_CHTTP2_FLOWCTL_MOVE: GPR_ASSERT(samestr(scope1, scope2)); if (val2 != 0) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sMOVE % 40s <- % 40s giving %d", prefix, label1, label2, + "%sMOVE %s <- %s giving %" PRId64, prefix, label1, label2, val1 + val2); } break; @@ -1960,7 +2240,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, GPR_ASSERT(val2 >= 0); if (val2 != 0) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2, + "%sCREDIT %s by %s giving %" PRId64, prefix, label1, label2, val1 + val2); } break; @@ -1968,7 +2248,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, GPR_ASSERT(val2 >= 0); if (val2 != 0) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sDEBIT % 40s by % 40s giving %d", prefix, label1, label2, + "%sDEBIT %s by %s giving %" PRId64, prefix, label1, label2, val1 - val2); } break; @@ -1993,6 +2273,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset, + set_pollset_set, perform_stream_op, perform_transport_op, destroy_stream, @@ -2013,5 +2294,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */ gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); - reading_action(exec_ctx, t, 1); + reading_action(exec_ctx, t, GRPC_ERROR_NONE); } |