aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.c')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c371
1 files changed, 283 insertions, 88 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index ecf3aea870..6bc054866b 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -38,9 +38,9 @@
#include <stdio.h>
#include <string.h>
+#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
@@ -51,6 +51,7 @@
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h"
@@ -110,9 +111,20 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_error *error);
+
+static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+
+static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_error *error);
@@ -129,12 +141,12 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_endpoint_destroy(exec_ctx, t->ep);
- gpr_slice_buffer_destroy(&t->qbuf);
+ grpc_slice_buffer_destroy(&t->qbuf);
- gpr_slice_buffer_destroy(&t->outbuf);
+ grpc_slice_buffer_destroy(&t->outbuf);
grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
- gpr_slice_buffer_destroy(&t->read_buffer);
+ grpc_slice_buffer_destroy(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
@@ -229,9 +241,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
- gpr_slice_buffer_init(&t->qbuf);
+ grpc_slice_buffer_init(&t->qbuf);
- gpr_slice_buffer_init(&t->outbuf);
+ grpc_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked,
@@ -241,11 +253,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
grpc_closure_init(&t->read_action_begin, read_action_begin, t);
grpc_closure_init(&t->read_action_locked, read_action_locked, t);
+ grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
+ grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
+ grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
+ grpc_closure_init(&t->destructive_reclaimer_locked,
+ destructive_reclaimer_locked, t);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
- gpr_slice_buffer_init(&t->read_buffer);
+ grpc_slice_buffer_init(&t->read_buffer);
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
@@ -267,8 +284,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->sent_local_settings = 0;
if (is_client) {
- gpr_slice_buffer_add(&t->outbuf, gpr_slice_from_copied_string(
- GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
+ GRPC_CHTTP2_CLIENT_CONNECT_STRING));
grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write");
}
@@ -362,6 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
+ post_benign_reclaimer(exec_ctx, t);
}
static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
@@ -451,7 +469,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]);
grpc_chttp2_data_parser_init(&s->data_parser);
- gpr_slice_buffer_init(&s->flow_controlled_buffer);
+ grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch, complete_fetch, s);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s);
@@ -467,6 +485,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ post_destructive_reclaimer(exec_ctx, t);
}
GPR_TIMER_END("init_stream", 0);
@@ -510,7 +529,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]);
- gpr_slice_buffer_destroy(&s->flow_controlled_buffer);
+ grpc_slice_buffer_destroy(&s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
@@ -580,11 +599,13 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name(t->write_state),
write_state_name(st), reason));
t->write_state = st;
- if (st == GRPC_CHTTP2_WRITE_STATE_IDLE &&
- t->close_transport_on_writes_finished != NULL) {
- grpc_error *err = t->close_transport_on_writes_finished;
- t->close_transport_on_writes_finished = NULL;
- close_transport_locked(exec_ctx, t, err);
+ if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL);
+ if (t->close_transport_on_writes_finished != NULL) {
+ grpc_error *err = t->close_transport_on_writes_finished;
+ t->close_transport_on_writes_finished = NULL;
+ close_transport_locked(exec_ctx, t, err);
+ }
}
}
@@ -675,7 +696,12 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
}
- grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("goaway sent"));
+ }
+ }
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
@@ -705,6 +731,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
break;
}
+ grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
+
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
@@ -728,11 +756,11 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
uint32_t goaway_error,
- gpr_slice goaway_text) {
- char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ grpc_slice goaway_text) {
+ char *msg = grpc_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
- gpr_slice_unref(goaway_text);
+ grpc_slice_unref(goaway_text);
t->seen_goaway = 1;
/* lie: use transient failure from the transport to indicate goaway has been
* received */
@@ -780,6 +808,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ post_destructive_reclaimer(exec_ctx, t);
grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
}
/* cancel out streams that will never be started */
@@ -793,7 +822,14 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
}
}
+/* Flag that this closure barrier wants stats to be updated before finishing */
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
+/* Flag that this closure barrier may be covering a write in a pollset, and so
+ we should not complete this closure until we can prove that the write got
+ scheduled */
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+/* First bit of the reference count, stored in the high order bits (with the low
+ bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure *add_closure_barrier(grpc_closure *closure) {
@@ -820,6 +856,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
return;
}
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
+ if (grpc_http_trace) {
+ const char *errstr = grpc_error_string(error);
+ gpr_log(GPR_DEBUG,
+ "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s",
+ closure,
+ (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
+ (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
+ desc, errstr);
+ grpc_error_free_string(errstr);
+ }
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error =
@@ -836,7 +882,13 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = NULL;
}
- grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
+ !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
+ grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ } else {
+ grpc_closure_list_append(&t->run_after_write, closure,
+ closure->error_data.error);
+ }
}
}
@@ -855,8 +907,8 @@ static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
s->fetched_send_message_length +=
- (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice);
- gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
+ (uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice);
+ grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
if (s->id != 0) {
grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
}
@@ -924,6 +976,16 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
+ bool is_client, bool is_initial) {
+ for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail;
+ md = md->next) {
+ gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
+ is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->md->key),
+ grpc_mdstr_as_c_string(md->md->value));
+ }
+}
+
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
@@ -937,6 +999,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
+ if (op->send_initial_metadata) {
+ log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ }
+ if (op->send_trailing_metadata) {
+ log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ }
}
grpc_closure *on_complete = op->on_complete;
@@ -965,6 +1033,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_initial_metadata != NULL) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata = op->send_initial_metadata;
const size_t metadata_size =
@@ -993,16 +1062,21 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (!s->write_closed) {
if (t->is_client) {
- GPR_ASSERT(s->id == 0);
- grpc_chttp2_list_add_waiting_for_concurrency(t, s);
- maybe_start_some_streams(exec_ctx, t);
+ if (!t->closed) {
+ GPR_ASSERT(s->id == 0);
+ grpc_chttp2_list_add_waiting_for_concurrency(t, s);
+ maybe_start_some_streams(exec_ctx, t);
+ } else {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s,
+ GRPC_ERROR_CREATE("Transport closed"));
+ }
} else {
GPR_ASSERT(s->id != 0);
grpc_chttp2_become_writable(exec_ctx, t, s, true,
"op.send_initial_metadata");
}
} else {
- s->send_trailing_metadata = NULL;
+ s->send_initial_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE(
@@ -1013,6 +1087,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->send_message != NULL) {
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
grpc_chttp2_complete_closure_step(
@@ -1022,7 +1097,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else {
GPR_ASSERT(s->fetching_send_message == NULL);
uint8_t *frame_hdr =
- gpr_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
+ grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
uint32_t flags = op->send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op->send_message->length;
@@ -1050,6 +1125,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_trailing_metadata != NULL) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
const size_t metadata_size =
@@ -1162,7 +1238,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
p->id[7] = (uint8_t)(t->ping_counter & 0xff);
t->ping_counter++;
p->on_recv = on_recv;
- gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
+ grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping");
}
@@ -1185,6 +1261,14 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_free(msg);
}
+static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_error_code error, grpc_slice data) {
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
+ grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data,
+ &t->qbuf);
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+}
+
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
@@ -1199,15 +1283,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_goaway) {
- t->sent_goaway = 1;
- grpc_chttp2_goaway_append(
- t->last_new_stream_id,
- (uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
- gpr_slice_ref(*op->goaway_message), &t->qbuf);
- close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0
- ? GRPC_ERROR_CREATE("GOAWAY sent")
- : GRPC_ERROR_NONE;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+ send_goaway(exec_ctx, t,
+ grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
+ grpc_slice_ref(*op->goaway_message));
}
if (op->set_accept_stream) {
@@ -1341,10 +1419,14 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->data_parser.parsing_frame = NULL;
}
- if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 && t->sent_goaway) {
- close_transport_locked(
- exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING(
- "Last stream closed after sending GOAWAY", &error, 1));
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ post_benign_reclaimer(exec_ctx, t);
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
+ close_transport_locked(
+ exec_ctx, t,
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Last stream closed after sending GOAWAY", &error, 1));
+ }
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
@@ -1392,7 +1474,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
&grpc_status);
if (s->id != 0) {
- gpr_slice_buffer_add(
+ grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream");
@@ -1405,7 +1487,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
free_msg = true;
msg = grpc_error_string(due_to_error);
}
- gpr_slice msg_slice = gpr_slice_from_copied_string(msg);
+ grpc_slice msg_slice = grpc_slice_from_copied_string(msg);
grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice);
if (free_msg) grpc_error_free_string(msg);
}
@@ -1418,7 +1500,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_status_code status,
- gpr_slice *slice) {
+ grpc_slice *slice) {
if (status != GRPC_STATUS_OK) {
s->seen_error = true;
}
@@ -1441,13 +1523,13 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&s->metadata_buffer[1],
grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_GRPC_MESSAGE,
- grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
+ grpc_mdstr_from_slice(grpc_slice_ref(*slice))));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (slice) {
- gpr_slice_unref(*slice);
+ grpc_slice_unref(*slice);
}
}
@@ -1477,18 +1559,22 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_error *error) {
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_error *error) {
error =
removal_error(error, s, "Pending writes failed due to stream closure");
- s->fetching_send_message = NULL;
+ s->send_initial_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error),
"send_initial_metadata_finished");
+
+ s->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_trailing_metadata_finished,
GRPC_ERROR_REF(error), "send_trailing_metadata_finished");
+
+ s->fetching_send_message = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error),
"fetching_send_message_finished");
@@ -1529,13 +1615,16 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (close_writes && !s->write_closed) {
s->write_closed_error = GRPC_ERROR_REF(error);
s->write_closed = true;
- fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
+ grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (s->read_closed && s->write_closed) {
if (s->id != 0) {
remove_stream(exec_ctx, t, s->id,
removal_error(GRPC_ERROR_REF(error), s, "Stream removed"));
+ } else {
+ /* Purge streams waiting on concurrency still waiting for id assignment */
+ grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2");
}
@@ -1544,9 +1633,9 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
- gpr_slice hdr;
- gpr_slice status_hdr;
- gpr_slice message_pfx;
+ grpc_slice hdr;
+ grpc_slice status_hdr;
+ grpc_slice message_pfx;
uint8_t *p;
uint32_t len = 0;
grpc_status_code grpc_status;
@@ -1565,8 +1654,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
time we got around to sending this, so instead we ignore HPACK
compression
and just write the uncompressed bytes onto the wire. */
- status_hdr = gpr_slice_malloc(15 + (grpc_status >= 10));
- p = GPR_SLICE_START_PTR(status_hdr);
+ status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10));
+ p = GRPC_SLICE_START_PTR(status_hdr);
*p++ = 0x40; /* literal header */
*p++ = 11; /* len(grpc-status) */
*p++ = 'g';
@@ -1588,8 +1677,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
*p++ = (uint8_t)('0' + (grpc_status / 10));
*p++ = (uint8_t)('0' + (grpc_status % 10));
}
- GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
- len += (uint32_t)GPR_SLICE_LENGTH(status_hdr);
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
+ len += (uint32_t)GRPC_SLICE_LENGTH(status_hdr);
const char *optional_message =
grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
@@ -1597,8 +1686,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (optional_message != NULL) {
size_t msg_len = strlen(optional_message);
GPR_ASSERT(msg_len < 127);
- message_pfx = gpr_slice_malloc(15);
- p = GPR_SLICE_START_PTR(message_pfx);
+ message_pfx = grpc_slice_malloc(15);
+ p = GRPC_SLICE_START_PTR(message_pfx);
*p++ = 0x40;
*p++ = 12; /* len(grpc-message) */
*p++ = 'g';
@@ -1614,13 +1703,13 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
*p++ = 'g';
*p++ = 'e';
*p++ = (uint8_t)msg_len;
- GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
- len += (uint32_t)GPR_SLICE_LENGTH(message_pfx);
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
+ len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx);
len += (uint32_t)msg_len;
}
- hdr = gpr_slice_malloc(9);
- p = GPR_SLICE_START_PTR(hdr);
+ hdr = grpc_slice_malloc(9);
+ p = GRPC_SLICE_START_PTR(hdr);
*p++ = (uint8_t)(len >> 16);
*p++ = (uint8_t)(len >> 8);
*p++ = (uint8_t)(len);
@@ -1630,16 +1719,16 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
*p++ = (uint8_t)(s->id >> 16);
*p++ = (uint8_t)(s->id >> 8);
*p++ = (uint8_t)(s->id);
- GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
- gpr_slice_buffer_add(&t->qbuf, hdr);
- gpr_slice_buffer_add(&t->qbuf, status_hdr);
+ grpc_slice_buffer_add(&t->qbuf, hdr);
+ grpc_slice_buffer_add(&t->qbuf, status_hdr);
if (optional_message) {
- gpr_slice_buffer_add(&t->qbuf, message_pfx);
- gpr_slice_buffer_add(&t->qbuf,
- gpr_slice_from_copied_string(optional_message));
+ grpc_slice_buffer_add(&t->qbuf, message_pfx);
+ grpc_slice_buffer_add(&t->qbuf,
+ grpc_slice_from_copied_string(optional_message));
}
- gpr_slice_buffer_add(
+ grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR,
&s->stats.outgoing));
}
@@ -1650,7 +1739,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
free_msg = true;
msg = grpc_error_string(error);
}
- gpr_slice msg_slice = gpr_slice_from_copied_string(msg);
+ grpc_slice msg_slice = grpc_slice_from_copied_string(msg);
grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice);
if (free_msg) grpc_error_free_string(msg);
@@ -1821,7 +1910,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
}
- gpr_slice_buffer_reset_and_unref(&t->read_buffer);
+ grpc_slice_buffer_reset_and_unref(&t->read_buffer);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin);
@@ -1875,7 +1964,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
- gpr_slice_buffer_destroy(&bs->slices);
+ grpc_slice_buffer_destroy(&bs->slices);
gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
@@ -1937,7 +2026,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
}
gpr_mu_lock(&bs->slice_mu);
if (bs->slices.count > 0) {
- *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices);
+ *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
} else if (bs->error != GRPC_ERROR_NONE) {
grpc_closure_run(exec_ctx, bs->next_action.on_complete,
@@ -1952,7 +2041,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
- gpr_slice *slice, size_t max_size_hint,
+ grpc_slice *slice, size_t max_size_hint,
grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
@@ -2005,19 +2094,19 @@ static void incoming_byte_stream_publish_error(
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
- gpr_slice slice) {
+ grpc_slice slice) {
gpr_mu_lock(&bs->slice_mu);
- if (bs->remaining_bytes < GPR_SLICE_LENGTH(slice)) {
+ if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
incoming_byte_stream_publish_error(
exec_ctx, bs, GRPC_ERROR_CREATE("Too many bytes in stream"));
} else {
- bs->remaining_bytes -= (uint32_t)GPR_SLICE_LENGTH(slice);
+ bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (bs->on_next != NULL) {
*bs->next = slice;
grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
- gpr_slice_buffer_add(&bs->slices, slice);
+ grpc_slice_buffer_add(&bs->slices, slice);
}
}
gpr_mu_unlock(&bs->slice_mu);
@@ -2055,7 +2144,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
gpr_ref(&incoming_byte_stream->stream->active_streams);
- gpr_slice_buffer_init(&incoming_byte_stream->slices);
+ grpc_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
incoming_byte_stream->error = GRPC_ERROR_NONE;
@@ -2072,6 +2161,103 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
}
/*******************************************************************************
+ * RESOURCE QUOTAS
+ */
+
+static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ if (!t->benign_reclaimer_registered) {
+ t->benign_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
+ false, &t->benign_reclaimer);
+ }
+}
+
+static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ if (!t->destructive_reclaimer_registered) {
+ t->destructive_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
+ true, &t->destructive_reclaimer);
+ }
+}
+
+static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ if (error == GRPC_ERROR_NONE &&
+ grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ /* Channel with no active streams: send a goaway to try and make it
+ * disconnect cleanly */
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
+ t->peer_string);
+ }
+ send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM,
+ grpc_slice_from_static_string("Buffers full"));
+ } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG,
+ "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
+ " streams",
+ t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
+ }
+ t->benign_reclaimer_registered = false;
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
+}
+
+static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
+ size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
+ t->destructive_reclaimer_registered = false;
+ if (error == GRPC_ERROR_NONE && n > 0) {
+ grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map);
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string,
+ s->id);
+ }
+ grpc_chttp2_cancel_stream(
+ exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
+ GRPC_ERROR_INT_HTTP2_ERROR,
+ GRPC_CHTTP2_ENHANCE_YOUR_CALM));
+ if (n > 1) {
+ /* Since we cancel one stream per destructive reclamation, if
+ there are more streams left, we can immediately post a new
+ reclaimer in case the resource quota needs to free more
+ memory */
+ post_destructive_reclaimer(exec_ctx, t);
+ }
+ }
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
+}
+
+/*******************************************************************************
* TRACING
*/
@@ -2156,6 +2342,14 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
}
+/*******************************************************************************
+ * MONITORING
+ */
+static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *t) {
+ return ((grpc_chttp2_transport *)t)->ep;
+}
+
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
"chttp2",
init_stream,
@@ -2165,7 +2359,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
perform_transport_op,
destroy_stream,
destroy_transport,
- chttp2_get_peer};
+ chttp2_get_peer,
+ chttp2_get_endpoint};
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
@@ -2177,12 +2372,12 @@ grpc_transport *grpc_create_chttp2_transport(
void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- gpr_slice_buffer *read_buffer) {
+ grpc_slice_buffer *read_buffer) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
GRPC_CHTTP2_REF_TRANSPORT(
t, "reading_action"); /* matches unref inside reading_action */
if (read_buffer != NULL) {
- gpr_slice_buffer_move_into(read_buffer, &t->read_buffer);
+ grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
read_action_begin(exec_ctx, t, GRPC_ERROR_NONE);