aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c107
1 files changed, 96 insertions, 11 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 8a6b427559..e6629ac74c 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -37,23 +37,24 @@
#include <stdio.h>
#include <string.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
-#include <grpc/support/string.h>
-#include <grpc/support/useful.h>
-#include "src/core/transport/transport_impl.h"
-#include "src/core/transport/chttp2/http2_errors.h"
-#include "src/core/transport/chttp2/hpack_parser.h"
#include "src/core/transport/chttp2/frame_data.h"
+#include "src/core/transport/chttp2/frame_goaway.h"
#include "src/core/transport/chttp2/frame_ping.h"
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include "src/core/transport/chttp2/frame_settings.h"
#include "src/core/transport/chttp2/frame_window_update.h"
+#include "src/core/transport/chttp2/hpack_parser.h"
+#include "src/core/transport/chttp2/http2_errors.h"
#include "src/core/transport/chttp2/status_conversion.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
#include "src/core/transport/chttp2/timeout_encoding.h"
+#include "src/core/transport/transport_impl.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string.h>
+#include <grpc/support/useful.h>
#define DEFAULT_WINDOW 65536
#define MAX_WINDOW 0x7fffffffu
@@ -160,6 +161,11 @@ typedef struct {
void *user_data;
} outstanding_ping;
+typedef struct {
+ grpc_status_code status;
+ gpr_slice debug;
+} pending_goaway;
+
struct transport {
grpc_transport base; /* must be first */
const grpc_transport_callbacks *cb;
@@ -180,6 +186,7 @@ struct transport {
/* stream indexing */
gpr_uint32 next_stream_id;
+ gpr_uint32 last_incoming_stream_id;
/* settings */
gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
@@ -211,6 +218,12 @@ struct transport {
grpc_chttp2_ping_parser ping;
} simple_parsers;
+ /* goaway */
+ grpc_chttp2_goaway_parser goaway_parser;
+ pending_goaway *pending_goaways;
+ size_t num_pending_goaways;
+ size_t cap_pending_goaways;
+
/* state for a stream that's not yet been created */
grpc_stream_op_buffer new_stream_sopb;
@@ -310,6 +323,7 @@ static void unref_transport(transport *t) {
gpr_slice_buffer_destroy(&t->qbuf);
grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
+ grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
grpc_mdstr_unref(t->str_grpc_timeout);
@@ -332,6 +346,11 @@ static void unref_transport(transport *t) {
}
gpr_free(t->pings);
+ for (i = 0; i < t->num_pending_goaways; i++) {
+ gpr_slice_unref(t->pending_goaways[i].debug);
+ }
+ gpr_free(t->pending_goaways);
+
gpr_free(t);
}
@@ -360,6 +379,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->writing = 0;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
+ t->last_incoming_stream_id = 0;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
@@ -370,6 +390,10 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->ping_capacity = 0;
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
+ grpc_chttp2_goaway_parser_init(&t->goaway_parser);
+ t->pending_goaways = NULL;
+ t->num_pending_goaways = 0;
+ t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf);
if (is_client) {
@@ -456,6 +480,16 @@ static void close_transport(grpc_transport *gt) {
gpr_mu_unlock(&t->mu);
}
+static void goaway(grpc_transport *gt, grpc_status_code status,
+ gpr_slice debug_data) {
+ transport *t = (transport *)gt;
+ lock(t);
+ grpc_chttp2_goaway_append(t->last_incoming_stream_id,
+ grpc_chttp2_grpc_status_to_http2_error(status),
+ debug_data, &t->qbuf);
+ unlock(t);
+}
+
static int init_stream(grpc_transport *gt, grpc_stream *gs,
const void *server_data) {
transport *t = (transport *)gt;
@@ -609,6 +643,9 @@ static void unlock(transport *t) {
int start_write = 0;
int perform_callbacks = 0;
int call_closed = 0;
+ int num_goaways = 0;
+ int i;
+ pending_goaway *goaways = NULL;
grpc_endpoint *ep = t->ep;
/* see if we need to trigger a write - and if so, get the data ready */
@@ -630,9 +667,16 @@ static void unlock(transport *t) {
t->calling_back = 1;
t->error_state = ERROR_STATE_NOTIFIED;
}
+ if (t->num_pending_goaways) {
+ goaways = t->pending_goaways;
+ num_goaways = t->num_pending_goaways;
+ t->pending_goaways = NULL;
+ t->num_pending_goaways = 0;
+ t->calling_back = 1;
+ }
}
- if (perform_callbacks || call_closed) {
+ if (perform_callbacks || call_closed || num_goaways) {
ref_transport(t);
}
@@ -640,6 +684,11 @@ static void unlock(transport *t) {
gpr_mu_unlock(&t->mu);
/* perform some callbacks if necessary */
+ for (i = 0; i < num_goaways; i++) {
+ t->cb->goaway(t->cb_user_data, &t->base, goaways[i].status,
+ goaways[i].debug);
+ }
+
if (perform_callbacks) {
run_callbacks(t);
}
@@ -698,13 +747,15 @@ static void unlock(transport *t) {
}
}
- if (perform_callbacks || call_closed) {
+ if (perform_callbacks || call_closed || num_goaways) {
lock(t);
t->calling_back = 0;
gpr_cv_broadcast(&t->cv);
unlock(t);
unref_transport(t);
}
+
+ gpr_free(goaways);
}
/*
@@ -1130,6 +1181,12 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
gpr_log(GPR_ERROR, "ignoring new stream creation on client");
}
return init_skip_frame(t, 1);
+ } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
+ gpr_log(GPR_ERROR,
+ "ignoring out of order new stream request on server; last stream "
+ "id=%d, new stream id=%d",
+ t->last_incoming_stream_id, t->incoming_stream);
+ return init_skip_frame(t, 1);
}
t->incoming_stream = NULL;
/* if stream is accepted, we set incoming_stream in init_stream */
@@ -1187,6 +1244,19 @@ static int init_ping_parser(transport *t) {
return ok;
}
+static int init_goaway_parser(transport *t) {
+ int ok =
+ GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_goaway_parser_begin_frame(
+ &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
+ if (!ok) {
+ drop_connection(t);
+ }
+ t->parser = grpc_chttp2_goaway_parser_parse;
+ t->parser_data = &t->goaway_parser;
+ return ok;
+}
+
static int init_settings_frame_parser(transport *t) {
int ok = GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_settings_parser_begin_frame(
@@ -1240,6 +1310,8 @@ static int init_frame_parser(transport *t) {
return init_window_update_frame_parser(t);
case GRPC_CHTTP2_FRAME_PING:
return init_ping_parser(t);
+ case GRPC_CHTTP2_FRAME_GOAWAY:
+ return init_goaway_parser(t);
default:
gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
return init_skip_frame(t, 0);
@@ -1277,6 +1349,18 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
&t->qbuf,
grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
}
+ if (st.goaway) {
+ if (t->num_pending_goaways == t->cap_pending_goaways) {
+ t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
+ t->pending_goaways =
+ gpr_realloc(t->pending_goaways,
+ sizeof(pending_goaway) * t->cap_pending_goaways);
+ }
+ t->pending_goaways[t->num_pending_goaways].status =
+ grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
+ t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
+ t->num_pending_goaways++;
+ }
if (st.process_ping_reply) {
for (i = 0; i < t->ping_count; i++) {
if (0 ==
@@ -1455,6 +1539,7 @@ static int process_read(transport *t, gpr_slice slice) {
if (!init_frame_parser(t)) {
return 0;
}
+ t->last_incoming_stream_id = t->incoming_stream_id;
if (t->incoming_frame_size == 0) {
if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
return 0;
@@ -1599,7 +1684,7 @@ static void run_callbacks(transport *t) {
static const grpc_transport_vtable vtable = {
sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- destroy_stream, abort_stream, close_transport, send_ping,
+ destroy_stream, abort_stream, goaway, close_transport, send_ping,
destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,