From a9d8c5564e37e2d42fa3a6f4546ac3507b1b03c8 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 2 Jun 2016 16:11:28 -0700 Subject: Provide reasons in error messages when streams are canceled with a connection drop --- .../transport/chttp2/transport/chttp2_transport.c | 74 ++++++++++++++++++---- src/core/ext/transport/chttp2/transport/internal.h | 3 + 2 files changed, 65 insertions(+), 12 deletions(-) (limited to 'src/core/ext/transport') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index b6886a2201..52ac895271 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -47,6 +47,7 @@ #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" +#include "src/core/lib/http/parser.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -107,7 +108,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status); + grpc_status_code status, + gpr_slice *optional_message); static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -161,6 +163,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, GPR_ASSERT(t->ep == NULL); + gpr_slice_unref(t->optional_drop_message); + gpr_slice_buffer_destroy(&t->global.qbuf); gpr_slice_buffer_destroy(&t->writing.outbuf); @@ -260,6 +264,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; + t->optional_drop_message = gpr_empty_slice(); grpc_connectivity_state_init( &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, is_client ? "client_transport" : "server_transport"); @@ -859,7 +864,7 @@ static void maybe_start_some_streams( grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE); + GRPC_STATUS_UNAVAILABLE, NULL); } } @@ -936,7 +941,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(exec_ctx, transport_global, stream_global, - op->cancel_with_status); + op->cancel_with_status, op->optional_close_message); } if (op->close_with_status != GRPC_STATUS_OK) { @@ -960,7 +965,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, "(%lu vs. %lu)", metadata_size, metadata_peer_limit); cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } else { if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { stream_global->seen_error = true; @@ -1015,7 +1020,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, "(%lu vs. %lu)", metadata_size, metadata_peer_limit); cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } else { if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { @@ -1201,7 +1206,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } if (stream_global->exceeded_metadata_size) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } } grpc_chttp2_incoming_metadata_buffer_publish( @@ -1240,7 +1245,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } if (stream_global->exceeded_metadata_size) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } } if (stream_global->all_incoming_byte_streams_finished) { @@ -1303,7 +1308,8 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status) { + grpc_status_code status, + gpr_slice *optional_message) { if (!stream_global->read_closed || !stream_global->write_closed) { if (stream_global->id != 0) { gpr_slice_buffer_add( @@ -1313,8 +1319,12 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status), &stream_global->stats.outgoing)); } + + if (optional_message) { + gpr_slice_ref(*optional_message); + } grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, - NULL); + optional_message); } if (status != GRPC_STATUS_OK && !stream_global->seen_error) { stream_global->seen_error = true; @@ -1519,16 +1529,23 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, 1); } +typedef struct { + grpc_exec_ctx *exec_ctx; + gpr_slice optional_drop_message; +} cancel_stream_cb_arg; + static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { - cancel_from_api(user_data, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE); + cancel_stream_cb_arg *arg = user_data; + cancel_from_api(arg->exec_ctx, transport_global, stream_global, + GRPC_STATUS_UNAVAILABLE, &arg->optional_drop_message); } static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb); + cancel_stream_cb_arg arg = {exec_ctx, t->optional_drop_message}; + grpc_chttp2_for_all_streams(&t->global, &arg, cancel_stream_cb); } static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { @@ -1599,6 +1616,31 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, } } +static bool try_http_parsing(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + return false; + grpc_http_parser parser; + size_t i = 0; + bool success = false; + + grpc_http_parser_init(&parser); + grpc_http1_trace = 1; + + for (; i < t->read_buffer.count && + grpc_http_parser_parse(&parser, t->read_buffer.slices[i]); + i++) + ; + if (grpc_http_parser_eof(&parser) && parser.type == GRPC_HTTP_RESPONSE) { + success = true; + GRPC_CHTTP2_IF_TRACING(gpr_log( + GPR_DEBUG, "Trying to connect an http1.x server, received status:%d", + parser.http.response.status)); + } + + grpc_http_parser_destroy(&parser); + return success; +} + static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_chttp2_transport *t = arg; GPR_TIMER_BEGIN("reading_action.parse", 0); @@ -1610,6 +1652,14 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { ; if (i != t->read_buffer.count) { success = false; + gpr_slice_unref(t->optional_drop_message); + if (try_http_parsing(exec_ctx, t)) { + t->optional_drop_message = gpr_slice_from_copied_string( + "Connection dropped: received http1.x response"); + } else { + t->optional_drop_message = gpr_slice_from_copied_string( + "Connection dropped: received unparseable response"); + } } GPR_TIMER_END("reading_action.parse", 0); grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 5872fd8e0a..7f3339a620 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -383,6 +383,9 @@ struct grpc_chttp2_transport { /** Transport op to be applied post-parsing */ grpc_transport_op *post_parsing_op; + + /** Message explaining the reason of dropping connection */ + gpr_slice optional_drop_message; }; typedef struct { -- cgit v1.2.3 From 82ec451c427c0275964ee444d11238b07efeb94d Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 2 Jun 2016 18:34:34 -0700 Subject: Check if optional_drop_message is an empty slice --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'src/core/ext/transport') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 52ac895271..d0c94e6c46 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1531,7 +1531,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, typedef struct { grpc_exec_ctx *exec_ctx; - gpr_slice optional_drop_message; + gpr_slice *optional_drop_message; } cancel_stream_cb_arg; static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, @@ -1539,12 +1539,15 @@ static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { cancel_stream_cb_arg *arg = user_data; cancel_from_api(arg->exec_ctx, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE, &arg->optional_drop_message); + GRPC_STATUS_UNAVAILABLE, arg->optional_drop_message); } static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - cancel_stream_cb_arg arg = {exec_ctx, t->optional_drop_message}; + cancel_stream_cb_arg arg = {exec_ctx, + GPR_SLICE_IS_EMPTY(t->optional_drop_message) + ? NULL + : &t->optional_drop_message}; grpc_chttp2_for_all_streams(&t->global, &arg, cancel_stream_cb); } @@ -1618,13 +1621,11 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, static bool try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - return false; grpc_http_parser parser; size_t i = 0; bool success = false; grpc_http_parser_init(&parser); - grpc_http1_trace = 1; for (; i < t->read_buffer.count && grpc_http_parser_parse(&parser, t->read_buffer.slices[i]); -- cgit v1.2.3 From a2bc0ccb500f0fffcdd263420133b4a0d0cbad2d Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 6 Jun 2016 16:21:59 -0700 Subject: Get transport from transport global Avoid introducing cancel_stream_cb_arg --- .../transport/chttp2/transport/chttp2_transport.c | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) (limited to 'src/core/ext/transport') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index d0c94e6c46..43e9ca1ff5 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1529,26 +1529,20 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, 1); } -typedef struct { - grpc_exec_ctx *exec_ctx; - gpr_slice *optional_drop_message; -} cancel_stream_cb_arg; - static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { - cancel_stream_cb_arg *arg = user_data; - cancel_from_api(arg->exec_ctx, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE, arg->optional_drop_message); + grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global); + cancel_from_api(user_data, transport_global, stream_global, + GRPC_STATUS_UNAVAILABLE, + GPR_SLICE_IS_EMPTY(transport->optional_drop_message) + ? NULL + : &transport->optional_drop_message); } static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - cancel_stream_cb_arg arg = {exec_ctx, - GPR_SLICE_IS_EMPTY(t->optional_drop_message) - ? NULL - : &t->optional_drop_message}; - grpc_chttp2_for_all_streams(&t->global, &arg, cancel_stream_cb); + grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb); } static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { -- cgit v1.2.3