diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.c')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 371 |
1 files changed, 283 insertions, 88 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index ecf3aea870..6bc054866b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -38,9 +38,9 @@ #include <stdio.h> #include <string.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -51,6 +51,7 @@ #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/timeout_encoding.h" @@ -110,9 +111,20 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_error *error); + +static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); + +static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); +static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); @@ -129,12 +141,12 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_endpoint_destroy(exec_ctx, t->ep); - gpr_slice_buffer_destroy(&t->qbuf); + grpc_slice_buffer_destroy(&t->qbuf); - gpr_slice_buffer_destroy(&t->outbuf); + grpc_slice_buffer_destroy(&t->outbuf); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); - gpr_slice_buffer_destroy(&t->read_buffer); + grpc_slice_buffer_destroy(&t->read_buffer); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); @@ -229,9 +241,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, is_client ? "client_transport" : "server_transport"); - gpr_slice_buffer_init(&t->qbuf); + grpc_slice_buffer_init(&t->qbuf); - gpr_slice_buffer_init(&t->outbuf); + grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked, @@ -241,11 +253,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t); grpc_closure_init(&t->read_action_begin, read_action_begin, t); grpc_closure_init(&t->read_action_locked, read_action_locked, t); + grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); + grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); + grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t); + grpc_closure_init(&t->destructive_reclaimer_locked, + destructive_reclaimer_locked, t); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); - gpr_slice_buffer_init(&t->read_buffer); + grpc_slice_buffer_init(&t->read_buffer); /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet @@ -267,8 +284,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->sent_local_settings = 0; if (is_client) { - gpr_slice_buffer_add(&t->outbuf, gpr_slice_from_copied_string( - GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( + GRPC_CHTTP2_CLIENT_CONNECT_STRING)); grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write"); } @@ -362,6 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); + post_benign_reclaimer(exec_ctx, t); } static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, @@ -451,7 +469,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); grpc_chttp2_data_parser_init(&s->data_parser); - gpr_slice_buffer_init(&s->flow_controlled_buffer); + grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); grpc_closure_init(&s->complete_fetch, complete_fetch, s); grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s); @@ -467,6 +485,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + post_destructive_reclaimer(exec_ctx, t); } GPR_TIMER_END("init_stream", 0); @@ -510,7 +529,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser); grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]); - gpr_slice_buffer_destroy(&s->flow_controlled_buffer); + grpc_slice_buffer_destroy(&s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); @@ -580,11 +599,13 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, write_state_name(t->write_state), write_state_name(st), reason)); t->write_state = st; - if (st == GRPC_CHTTP2_WRITE_STATE_IDLE && - t->close_transport_on_writes_finished != NULL) { - grpc_error *err = t->close_transport_on_writes_finished; - t->close_transport_on_writes_finished = NULL; - close_transport_locked(exec_ctx, t, err); + if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { + grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL); + if (t->close_transport_on_writes_finished != NULL) { + grpc_error *err = t->close_transport_on_writes_finished; + t->close_transport_on_writes_finished = NULL; + close_transport_locked(exec_ctx, t, err); + } } } @@ -675,7 +696,12 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); } - grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("goaway sent")); + } + } switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: @@ -705,6 +731,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, break; } + grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GPR_TIMER_END("terminate_writing_with_lock", 0); } @@ -728,11 +756,11 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t goaway_error, - gpr_slice goaway_text) { - char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); + grpc_slice goaway_text) { + char *msg = grpc_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg)); - gpr_slice_unref(goaway_text); + grpc_slice_unref(goaway_text); t->seen_goaway = 1; /* lie: use transient failure from the transport to indicate goaway has been * received */ @@ -780,6 +808,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + post_destructive_reclaimer(exec_ctx, t); grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); } /* cancel out streams that will never be started */ @@ -793,7 +822,14 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, } } +/* Flag that this closure barrier wants stats to be updated before finishing */ #define CLOSURE_BARRIER_STATS_BIT (1 << 0) +/* Flag that this closure barrier may be covering a write in a pollset, and so + we should not complete this closure until we can prove that the write got + scheduled */ +#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1) +/* First bit of the reference count, stored in the high order bits (with the low + bits being used for flags defined above) */ #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) static grpc_closure *add_closure_barrier(grpc_closure *closure) { @@ -820,6 +856,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, return; } closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; + if (grpc_http_trace) { + const char *errstr = grpc_error_string(error); + gpr_log(GPR_DEBUG, + "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s", + closure, + (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), + (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), + desc, errstr); + grpc_error_free_string(errstr); + } if (error != GRPC_ERROR_NONE) { if (closure->error_data.error == GRPC_ERROR_NONE) { closure->error_data.error = @@ -836,7 +882,13 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, grpc_transport_move_stats(&s->stats, s->collecting_stats); s->collecting_stats = NULL; } - grpc_closure_run(exec_ctx, closure, closure->error_data.error); + if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || + !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { + grpc_closure_run(exec_ctx, closure, closure->error_data.error); + } else { + grpc_closure_list_append(&t->run_after_write, closure, + closure->error_data.error); + } } } @@ -855,8 +907,8 @@ static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { s->fetched_send_message_length += - (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); - gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); + (uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice); + grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); if (s->id != 0) { grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); } @@ -924,6 +976,16 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, + bool is_client, bool is_initial) { + for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail; + md = md->next) { + gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL", + is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->md->key), + grpc_mdstr_as_c_string(md->md->value)); + } +} + static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); @@ -937,6 +999,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); + if (op->send_initial_metadata) { + log_metadata(op->send_initial_metadata, s->id, t->is_client, true); + } + if (op->send_trailing_metadata) { + log_metadata(op->send_trailing_metadata, s->id, t->is_client, false); + } } grpc_closure *on_complete = op->on_complete; @@ -965,6 +1033,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (op->send_initial_metadata != NULL) { GPR_ASSERT(s->send_initial_metadata_finished == NULL); + on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = op->send_initial_metadata; const size_t metadata_size = @@ -993,16 +1062,21 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->write_closed) { if (t->is_client) { - GPR_ASSERT(s->id == 0); - grpc_chttp2_list_add_waiting_for_concurrency(t, s); - maybe_start_some_streams(exec_ctx, t); + if (!t->closed) { + GPR_ASSERT(s->id == 0); + grpc_chttp2_list_add_waiting_for_concurrency(t, s); + maybe_start_some_streams(exec_ctx, t); + } else { + grpc_chttp2_cancel_stream(exec_ctx, t, s, + GRPC_ERROR_CREATE("Transport closed")); + } } else { GPR_ASSERT(s->id != 0); grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_initial_metadata"); } } else { - s->send_trailing_metadata = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_CREATE( @@ -1013,6 +1087,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->send_message != NULL) { + on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { grpc_chttp2_complete_closure_step( @@ -1022,7 +1097,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else { GPR_ASSERT(s->fetching_send_message == NULL); uint8_t *frame_hdr = - gpr_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); + grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); uint32_t flags = op->send_message->flags; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; size_t len = op->send_message->length; @@ -1050,6 +1125,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (op->send_trailing_metadata != NULL) { GPR_ASSERT(s->send_trailing_metadata_finished == NULL); + on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); s->send_trailing_metadata = op->send_trailing_metadata; const size_t metadata_size = @@ -1162,7 +1238,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, p->id[7] = (uint8_t)(t->ping_counter & 0xff); t->ping_counter++; p->on_recv = on_recv; - gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); + grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping"); } @@ -1185,6 +1261,14 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_free(msg); } +static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_error_code error, grpc_slice data) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; + grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data, + &t->qbuf); + grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); +} + static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { @@ -1199,15 +1283,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_goaway) { - t->sent_goaway = 1; - grpc_chttp2_goaway_append( - t->last_new_stream_id, - (uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), - gpr_slice_ref(*op->goaway_message), &t->qbuf); - close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0 - ? GRPC_ERROR_CREATE("GOAWAY sent") - : GRPC_ERROR_NONE; - grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); + send_goaway(exec_ctx, t, + grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), + grpc_slice_ref(*op->goaway_message)); } if (op->set_accept_stream) { @@ -1341,10 +1419,14 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->data_parser.parsing_frame = NULL; } - if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 && t->sent_goaway) { - close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING( - "Last stream closed after sending GOAWAY", &error, 1)); + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + post_benign_reclaimer(exec_ctx, t); + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) { + close_transport_locked( + exec_ctx, t, + GRPC_ERROR_CREATE_REFERENCING( + "Last stream closed after sending GOAWAY", &error, 1)); + } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); @@ -1392,7 +1474,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, &grpc_status); if (s->id != 0) { - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream"); @@ -1405,7 +1487,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, free_msg = true; msg = grpc_error_string(due_to_error); } - gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_slice msg_slice = grpc_slice_from_copied_string(msg); grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice); if (free_msg) grpc_error_free_string(msg); } @@ -1418,7 +1500,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_status_code status, - gpr_slice *slice) { + grpc_slice *slice) { if (status != GRPC_STATUS_OK) { s->seen_error = true; } @@ -1441,13 +1523,13 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &s->metadata_buffer[1], grpc_mdelem_from_metadata_strings( GRPC_MDSTR_GRPC_MESSAGE, - grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); + grpc_mdstr_from_slice(grpc_slice_ref(*slice)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (slice) { - gpr_slice_unref(*slice); + grpc_slice_unref(*slice); } } @@ -1477,18 +1559,22 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_error *error) { +void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_error *error) { error = removal_error(error, s, "Pending writes failed due to stream closure"); - s->fetching_send_message = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), "send_initial_metadata_finished"); + + s->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_trailing_metadata_finished, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); + + s->fetching_send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); @@ -1529,13 +1615,16 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, if (close_writes && !s->write_closed) { s->write_closed_error = GRPC_ERROR_REF(error); s->write_closed = true; - fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (s->read_closed && s->write_closed) { if (s->id != 0) { remove_stream(exec_ctx, t, s->id, removal_error(GRPC_ERROR_REF(error), s, "Stream removed")); + } else { + /* Purge streams waiting on concurrency still waiting for id assignment */ + grpc_chttp2_list_remove_waiting_for_concurrency(t, s); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2"); } @@ -1544,9 +1633,9 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { - gpr_slice hdr; - gpr_slice status_hdr; - gpr_slice message_pfx; + grpc_slice hdr; + grpc_slice status_hdr; + grpc_slice message_pfx; uint8_t *p; uint32_t len = 0; grpc_status_code grpc_status; @@ -1565,8 +1654,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, time we got around to sending this, so instead we ignore HPACK compression and just write the uncompressed bytes onto the wire. */ - status_hdr = gpr_slice_malloc(15 + (grpc_status >= 10)); - p = GPR_SLICE_START_PTR(status_hdr); + status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10)); + p = GRPC_SLICE_START_PTR(status_hdr); *p++ = 0x40; /* literal header */ *p++ = 11; /* len(grpc-status) */ *p++ = 'g'; @@ -1588,8 +1677,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = (uint8_t)('0' + (grpc_status / 10)); *p++ = (uint8_t)('0' + (grpc_status % 10)); } - GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); - len += (uint32_t)GPR_SLICE_LENGTH(status_hdr); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr)); + len += (uint32_t)GRPC_SLICE_LENGTH(status_hdr); const char *optional_message = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); @@ -1597,8 +1686,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (optional_message != NULL) { size_t msg_len = strlen(optional_message); GPR_ASSERT(msg_len < 127); - message_pfx = gpr_slice_malloc(15); - p = GPR_SLICE_START_PTR(message_pfx); + message_pfx = grpc_slice_malloc(15); + p = GRPC_SLICE_START_PTR(message_pfx); *p++ = 0x40; *p++ = 12; /* len(grpc-message) */ *p++ = 'g'; @@ -1614,13 +1703,13 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = 'g'; *p++ = 'e'; *p++ = (uint8_t)msg_len; - GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); - len += (uint32_t)GPR_SLICE_LENGTH(message_pfx); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx)); + len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx); len += (uint32_t)msg_len; } - hdr = gpr_slice_malloc(9); - p = GPR_SLICE_START_PTR(hdr); + hdr = grpc_slice_malloc(9); + p = GRPC_SLICE_START_PTR(hdr); *p++ = (uint8_t)(len >> 16); *p++ = (uint8_t)(len >> 8); *p++ = (uint8_t)(len); @@ -1630,16 +1719,16 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = (uint8_t)(s->id >> 16); *p++ = (uint8_t)(s->id >> 8); *p++ = (uint8_t)(s->id); - GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); + GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr)); - gpr_slice_buffer_add(&t->qbuf, hdr); - gpr_slice_buffer_add(&t->qbuf, status_hdr); + grpc_slice_buffer_add(&t->qbuf, hdr); + grpc_slice_buffer_add(&t->qbuf, status_hdr); if (optional_message) { - gpr_slice_buffer_add(&t->qbuf, message_pfx); - gpr_slice_buffer_add(&t->qbuf, - gpr_slice_from_copied_string(optional_message)); + grpc_slice_buffer_add(&t->qbuf, message_pfx); + grpc_slice_buffer_add(&t->qbuf, + grpc_slice_from_copied_string(optional_message)); } - gpr_slice_buffer_add( + grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR, &s->stats.outgoing)); } @@ -1650,7 +1739,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, free_msg = true; msg = grpc_error_string(error); } - gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_slice msg_slice = grpc_slice_from_copied_string(msg); grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice); if (free_msg) grpc_error_free_string(msg); @@ -1821,7 +1910,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } - gpr_slice_buffer_reset_and_unref(&t->read_buffer); + grpc_slice_buffer_reset_and_unref(&t->read_buffer); if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin); @@ -1875,7 +1964,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { GRPC_ERROR_UNREF(bs->error); - gpr_slice_buffer_destroy(&bs->slices); + grpc_slice_buffer_destroy(&bs->slices); gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); } @@ -1937,7 +2026,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } gpr_mu_lock(&bs->slice_mu); if (bs->slices.count > 0) { - *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices); + *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices); grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (bs->error != GRPC_ERROR_NONE) { grpc_closure_run(exec_ctx, bs->next_action.on_complete, @@ -1952,7 +2041,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - gpr_slice *slice, size_t max_size_hint, + grpc_slice *slice, size_t max_size_hint, grpc_closure *on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = @@ -2005,19 +2094,19 @@ static void incoming_byte_stream_publish_error( void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - gpr_slice slice) { + grpc_slice slice) { gpr_mu_lock(&bs->slice_mu); - if (bs->remaining_bytes < GPR_SLICE_LENGTH(slice)) { + if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { incoming_byte_stream_publish_error( exec_ctx, bs, GRPC_ERROR_CREATE("Too many bytes in stream")); } else { - bs->remaining_bytes -= (uint32_t)GPR_SLICE_LENGTH(slice); + bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); if (bs->on_next != NULL) { *bs->next = slice; grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { - gpr_slice_buffer_add(&bs->slices, slice); + grpc_slice_buffer_add(&bs->slices, slice); } } gpr_mu_unlock(&bs->slice_mu); @@ -2055,7 +2144,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; gpr_ref(&incoming_byte_stream->stream->active_streams); - gpr_slice_buffer_init(&incoming_byte_stream->slices); + grpc_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; incoming_byte_stream->error = GRPC_ERROR_NONE; @@ -2072,6 +2161,103 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( } /******************************************************************************* + * RESOURCE QUOTAS + */ + +static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + if (!t->benign_reclaimer_registered) { + t->benign_reclaimer_registered = true; + GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), + false, &t->benign_reclaimer); + } +} + +static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + if (!t->destructive_reclaimer_registered) { + t->destructive_reclaimer_registered = true; + GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), + true, &t->destructive_reclaimer); + } +} + +static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked, + GRPC_ERROR_REF(error), false); +} + +static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked, + GRPC_ERROR_REF(error), false); +} + +static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (error == GRPC_ERROR_NONE && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* Channel with no active streams: send a goaway to try and make it + * disconnect cleanly */ + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", + t->peer_string); + } + send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM, + grpc_slice_from_static_string("Buffers full")); + } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, + "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); + } + t->benign_reclaimer_registered = false; + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); +} + +static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + size_t n = grpc_chttp2_stream_map_size(&t->stream_map); + t->destructive_reclaimer_registered = false; + if (error == GRPC_ERROR_NONE && n > 0) { + grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } + grpc_chttp2_cancel_stream( + exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, + GRPC_CHTTP2_ENHANCE_YOUR_CALM)); + if (n > 1) { + /* Since we cancel one stream per destructive reclamation, if + there are more streams left, we can immediately post a new + reclaimer in case the resource quota needs to free more + memory */ + post_destructive_reclaimer(exec_ctx, t); + } + } + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); +} + +/******************************************************************************* * TRACING */ @@ -2156,6 +2342,14 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } +/******************************************************************************* + * MONITORING + */ +static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, + grpc_transport *t) { + return ((grpc_chttp2_transport *)t)->ep; +} + static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), "chttp2", init_stream, @@ -2165,7 +2359,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), perform_transport_op, destroy_stream, destroy_transport, - chttp2_get_peer}; + chttp2_get_peer, + chttp2_get_endpoint}; grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, @@ -2177,12 +2372,12 @@ grpc_transport *grpc_create_chttp2_transport( void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - gpr_slice_buffer *read_buffer) { + grpc_slice_buffer *read_buffer) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; GRPC_CHTTP2_REF_TRANSPORT( t, "reading_action"); /* matches unref inside reading_action */ if (read_buffer != NULL) { - gpr_slice_buffer_move_into(read_buffer, &t->read_buffer); + grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } read_action_begin(exec_ctx, t, GRPC_ERROR_NONE); |