diff options
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 107 |
1 files changed, 96 insertions, 11 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 8a6b427559..e6629ac74c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -37,23 +37,24 @@ #include <stdio.h> #include <string.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> -#include <grpc/support/string.h> -#include <grpc/support/useful.h> -#include "src/core/transport/transport_impl.h" -#include "src/core/transport/chttp2/http2_errors.h" -#include "src/core/transport/chttp2/hpack_parser.h" #include "src/core/transport/chttp2/frame_data.h" +#include "src/core/transport/chttp2/frame_goaway.h" #include "src/core/transport/chttp2/frame_ping.h" #include "src/core/transport/chttp2/frame_rst_stream.h" #include "src/core/transport/chttp2/frame_settings.h" #include "src/core/transport/chttp2/frame_window_update.h" +#include "src/core/transport/chttp2/hpack_parser.h" +#include "src/core/transport/chttp2/http2_errors.h" #include "src/core/transport/chttp2/status_conversion.h" #include "src/core/transport/chttp2/stream_encoder.h" #include "src/core/transport/chttp2/stream_map.h" #include "src/core/transport/chttp2/timeout_encoding.h" +#include "src/core/transport/transport_impl.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string.h> +#include <grpc/support/useful.h> #define DEFAULT_WINDOW 65536 #define MAX_WINDOW 0x7fffffffu @@ -160,6 +161,11 @@ typedef struct { void *user_data; } outstanding_ping; +typedef struct { + grpc_status_code status; + gpr_slice debug; +} pending_goaway; + struct transport { grpc_transport base; /* must be first */ const grpc_transport_callbacks *cb; @@ -180,6 +186,7 @@ struct transport { /* stream indexing */ gpr_uint32 next_stream_id; + gpr_uint32 last_incoming_stream_id; /* settings */ gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; @@ -211,6 +218,12 @@ struct transport { grpc_chttp2_ping_parser ping; } simple_parsers; + /* goaway */ + grpc_chttp2_goaway_parser goaway_parser; + pending_goaway *pending_goaways; + size_t num_pending_goaways; + size_t cap_pending_goaways; + /* state for a stream that's not yet been created */ grpc_stream_op_buffer new_stream_sopb; @@ -310,6 +323,7 @@ static void unref_transport(transport *t) { gpr_slice_buffer_destroy(&t->qbuf); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); + grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); grpc_mdstr_unref(t->str_grpc_timeout); @@ -332,6 +346,11 @@ static void unref_transport(transport *t) { } gpr_free(t->pings); + for (i = 0; i < t->num_pending_goaways; i++) { + gpr_slice_unref(t->pending_goaways[i].debug); + } + gpr_free(t->pending_goaways); + gpr_free(t); } @@ -360,6 +379,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->writing = 0; t->error_state = ERROR_STATE_NONE; t->next_stream_id = is_client ? 1 : 2; + t->last_incoming_stream_id = 0; t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; @@ -370,6 +390,10 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->ping_capacity = 0; t->ping_counter = gpr_now().tv_nsec; grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); + grpc_chttp2_goaway_parser_init(&t->goaway_parser); + t->pending_goaways = NULL; + t->num_pending_goaways = 0; + t->cap_pending_goaways = 0; gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); if (is_client) { @@ -456,6 +480,16 @@ static void close_transport(grpc_transport *gt) { gpr_mu_unlock(&t->mu); } +static void goaway(grpc_transport *gt, grpc_status_code status, + gpr_slice debug_data) { + transport *t = (transport *)gt; + lock(t); + grpc_chttp2_goaway_append(t->last_incoming_stream_id, + grpc_chttp2_grpc_status_to_http2_error(status), + debug_data, &t->qbuf); + unlock(t); +} + static int init_stream(grpc_transport *gt, grpc_stream *gs, const void *server_data) { transport *t = (transport *)gt; @@ -609,6 +643,9 @@ static void unlock(transport *t) { int start_write = 0; int perform_callbacks = 0; int call_closed = 0; + int num_goaways = 0; + int i; + pending_goaway *goaways = NULL; grpc_endpoint *ep = t->ep; /* see if we need to trigger a write - and if so, get the data ready */ @@ -630,9 +667,16 @@ static void unlock(transport *t) { t->calling_back = 1; t->error_state = ERROR_STATE_NOTIFIED; } + if (t->num_pending_goaways) { + goaways = t->pending_goaways; + num_goaways = t->num_pending_goaways; + t->pending_goaways = NULL; + t->num_pending_goaways = 0; + t->calling_back = 1; + } } - if (perform_callbacks || call_closed) { + if (perform_callbacks || call_closed || num_goaways) { ref_transport(t); } @@ -640,6 +684,11 @@ static void unlock(transport *t) { gpr_mu_unlock(&t->mu); /* perform some callbacks if necessary */ + for (i = 0; i < num_goaways; i++) { + t->cb->goaway(t->cb_user_data, &t->base, goaways[i].status, + goaways[i].debug); + } + if (perform_callbacks) { run_callbacks(t); } @@ -698,13 +747,15 @@ static void unlock(transport *t) { } } - if (perform_callbacks || call_closed) { + if (perform_callbacks || call_closed || num_goaways) { lock(t); t->calling_back = 0; gpr_cv_broadcast(&t->cv); unlock(t); unref_transport(t); } + + gpr_free(goaways); } /* @@ -1130,6 +1181,12 @@ static int init_header_frame_parser(transport *t, int is_continuation) { gpr_log(GPR_ERROR, "ignoring new stream creation on client"); } return init_skip_frame(t, 1); + } else if (t->last_incoming_stream_id > t->incoming_stream_id) { + gpr_log(GPR_ERROR, + "ignoring out of order new stream request on server; last stream " + "id=%d, new stream id=%d", + t->last_incoming_stream_id, t->incoming_stream); + return init_skip_frame(t, 1); } t->incoming_stream = NULL; /* if stream is accepted, we set incoming_stream in init_stream */ @@ -1187,6 +1244,19 @@ static int init_ping_parser(transport *t) { return ok; } +static int init_goaway_parser(transport *t) { + int ok = + GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_goaway_parser_begin_frame( + &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags); + if (!ok) { + drop_connection(t); + } + t->parser = grpc_chttp2_goaway_parser_parse; + t->parser_data = &t->goaway_parser; + return ok; +} + static int init_settings_frame_parser(transport *t) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_settings_parser_begin_frame( @@ -1240,6 +1310,8 @@ static int init_frame_parser(transport *t) { return init_window_update_frame_parser(t); case GRPC_CHTTP2_FRAME_PING: return init_ping_parser(t); + case GRPC_CHTTP2_FRAME_GOAWAY: + return init_goaway_parser(t); default: gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type); return init_skip_frame(t, 0); @@ -1277,6 +1349,18 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { &t->qbuf, grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes)); } + if (st.goaway) { + if (t->num_pending_goaways == t->cap_pending_goaways) { + t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); + t->pending_goaways = + gpr_realloc(t->pending_goaways, + sizeof(pending_goaway) * t->cap_pending_goaways); + } + t->pending_goaways[t->num_pending_goaways].status = + grpc_chttp2_http2_error_to_grpc_status(st.goaway_error); + t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text; + t->num_pending_goaways++; + } if (st.process_ping_reply) { for (i = 0; i < t->ping_count; i++) { if (0 == @@ -1455,6 +1539,7 @@ static int process_read(transport *t, gpr_slice slice) { if (!init_frame_parser(t)) { return 0; } + t->last_incoming_stream_id = t->incoming_stream_id; if (t->incoming_frame_size == 0) { if (!parse_frame_slice(t, gpr_empty_slice(), 1)) { return 0; @@ -1599,7 +1684,7 @@ static void run_callbacks(transport *t) { static const grpc_transport_vtable vtable = { sizeof(stream), init_stream, send_batch, set_allow_window_updates, - destroy_stream, abort_stream, close_transport, send_ping, + destroy_stream, abort_stream, goaway, close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, |