diff options
17 files changed, 205 insertions, 54 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0a307a73a6..0425333953 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -758,23 +758,35 @@ static void maybe_start_some_streams( } } +#define CLOSURE_BARRIER_STATS_BIT (1 << 0) +#define CLOSURE_BARRIER_FAILURE_BIT (1 << 1) +#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) + static grpc_closure *add_closure_barrier(grpc_closure *closure) { - closure->final_data += 2; + closure->final_data += CLOSURE_BARRIER_FIRST_REF_BIT; return closure; } void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, int success) { grpc_closure *closure = *pclosure; if (closure == NULL) { return; } - closure->final_data -= 2; + closure->final_data -= CLOSURE_BARRIER_FIRST_REF_BIT; if (!success) { - closure->final_data |= 1; + closure->final_data |= CLOSURE_BARRIER_FAILURE_BIT; } - if (closure->final_data < 2) { - grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0, NULL); + if (closure->final_data < CLOSURE_BARRIER_FIRST_REF_BIT) { + if (closure->final_data & CLOSURE_BARRIER_STATS_BIT) { + grpc_transport_move_stats(&stream_global->stats, + stream_global->collecting_stats); + stream_global->collecting_stats = NULL; + } + grpc_exec_ctx_enqueue( + exec_ctx, closure, + (closure->final_data & CLOSURE_BARRIER_FAILURE_BIT) == 0, NULL); } *pclosure = NULL; } @@ -807,7 +819,13 @@ static void perform_stream_op_locked( } /* use final_data as a barrier until enqueue time; the inital counter is dropped at the end of this function */ - on_complete->final_data = 2; + on_complete->final_data = CLOSURE_BARRIER_FIRST_REF_BIT; + + if (op->collect_stats != NULL) { + GPR_ASSERT(stream_global->collecting_stats == NULL); + stream_global->collecting_stats = op->collect_stats; + on_complete->final_data |= CLOSURE_BARRIER_STATS_BIT; + } if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(exec_ctx, transport_global, stream_global, @@ -840,7 +858,8 @@ static void perform_stream_op_locked( } } else { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_initial_metadata_finished, 0); + exec_ctx, stream_global, + &stream_global->send_initial_metadata_finished, 0); } } @@ -850,8 +869,8 @@ static void perform_stream_op_locked( stream_global->send_message_finished = add_closure_barrier(on_complete); if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_message_finished, 0); - } else { + exec_ctx, stream_global, &stream_global->send_message_finished, 0); + } else if (stream_global->id != 0) { stream_global->send_message = op->send_message; if (stream_global->id != 0) { grpc_chttp2_become_writable(transport_global, stream_global); @@ -870,7 +889,8 @@ static void perform_stream_op_locked( } if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, + exec_ctx, stream_global, + &stream_global->send_trailing_metadata_finished, grpc_metadata_batch_is_empty(op->send_trailing_metadata)); } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding @@ -909,7 +929,7 @@ static void perform_stream_op_locked( grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } - grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1); + grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &on_complete, 1); GPR_TIMER_END("perform_stream_op_locked", 0); } @@ -1080,7 +1100,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, &stream_global->received_trailing_metadata, stream_global->recv_trailing_metadata); grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->recv_trailing_metadata_finished, 1); + exec_ctx, stream_global, + &stream_global->recv_trailing_metadata_finished, 1); } } } @@ -1131,7 +1152,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, &transport_global->qbuf, grpc_chttp2_rst_stream_create( stream_global->id, - (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status))); + (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status), + &stream_global->stats.outgoing)); } grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, NULL); @@ -1179,10 +1201,12 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_initial_metadata_finished, 0); + exec_ctx, stream_global, &stream_global->send_initial_metadata_finished, + 0); grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, 0); - grpc_chttp2_complete_closure_step(exec_ctx, + exec_ctx, stream_global, &stream_global->send_trailing_metadata_finished, + 0); + grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &stream_global->send_message_finished, 0); } @@ -1319,7 +1343,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_add( &transport_global->qbuf, - grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR)); + grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR, + &stream_global->stats.outgoing)); if (optional_message) { gpr_slice_ref(*optional_message); diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index cdd624647c..baa66e0008 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -113,11 +113,13 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf) { gpr_slice hdr; uint8_t *p; + static const size_t header_size = 9; - hdr = gpr_slice_malloc(9); + hdr = gpr_slice_malloc(header_size); p = GPR_SLICE_START_PTR(hdr); GPR_ASSERT(write_bytes < (1 << 24)); *p++ = (uint8_t)(write_bytes >> 16); @@ -132,6 +134,9 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, gpr_slice_buffer_add(outbuf, hdr); gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf); + + stats->framing_bytes += header_size; + stats->data_bytes += write_bytes; } grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( @@ -156,6 +161,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( switch (p->state) { fh_0: case GRPC_CHTTP2_DATA_FH_0: + stream_parsing->stats.incoming.framing_bytes++; p->frame_type = *cur; switch (p->frame_type) { case 0: @@ -174,6 +180,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_1: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size = ((uint32_t)*cur) << 24; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_2; @@ -181,6 +188,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_2: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur) << 16; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_3; @@ -188,6 +196,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_3: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur) << 8; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_4; @@ -195,6 +204,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_4: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; @@ -215,7 +225,9 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); - if ((uint32_t)(end - cur) == p->frame_size) { + uint32_t remaining = (uint32_t)(end - cur); + if (remaining == p->frame_size) { + stream_parsing->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); @@ -224,7 +236,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; - } else if ((uint32_t)(end - cur) > p->frame_size) { + } else if (remaining > p->frame_size) { + stream_parsing->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), @@ -235,11 +248,12 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( cur += p->frame_size; goto fh_0; /* loop */ } else { + GPR_ASSERT(remaining <= p->frame_size); grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - GPR_ASSERT((size_t)(end - cur) <= p->frame_size); - p->frame_size -= (uint32_t)(end - cur); + p->frame_size -= remaining; + stream_parsing->stats.incoming.data_bytes += remaining; return GRPC_CHTTP2_PARSE_OK; } } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index f2762ed9de..8a073a9c11 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -41,6 +41,7 @@ #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/byte_stream.h" +#include "src/core/lib/transport/transport.h" typedef enum { GRPC_CHTTP2_DATA_FH_0, @@ -96,6 +97,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index acfc3627e8..d9a04bbf3c 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -38,8 +38,11 @@ #include "src/core/ext/transport/chttp2/transport/frame.h" -gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code) { - gpr_slice slice = gpr_slice_malloc(13); +gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, + grpc_transport_one_way_stats *stats) { + static const size_t frame_size = 13; + gpr_slice slice = gpr_slice_malloc(frame_size); + stats->framing_bytes += frame_size; uint8_t *p = GPR_SLICE_START_PTR(slice); *p++ = 0; @@ -84,6 +87,7 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( cur++; p->byte++; } + stream_parsing->stats.incoming.framing_bytes += (uint64_t)(end - cur); if (p->byte == 4) { GPR_ASSERT(is_last); diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h index 134f1368a0..729274e9a2 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h @@ -37,13 +37,15 @@ #include <grpc/support/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/transport/transport.h" typedef struct { uint8_t byte; uint8_t reason_bytes[4]; } grpc_chttp2_rst_stream_parser; -gpr_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code); +gpr_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code, + grpc_transport_one_way_stats *stats); grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser *parser, uint32_t length, uint8_t flags); diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 1a561cebaf..0baf6d2fdd 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -36,9 +36,11 @@ #include <grpc/support/log.h> -gpr_slice grpc_chttp2_window_update_create(uint32_t id, - uint32_t window_update) { - gpr_slice slice = gpr_slice_malloc(13); +gpr_slice grpc_chttp2_window_update_create( + uint32_t id, uint32_t window_update, grpc_transport_one_way_stats *stats) { + static const size_t frame_size = 13; + gpr_slice slice = gpr_slice_malloc(frame_size); + stats->header_bytes += frame_size; uint8_t *p = GPR_SLICE_START_PTR(slice); GPR_ASSERT(window_update); @@ -87,6 +89,10 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( p->byte++; } + if (stream_parsing != NULL) { + stream_parsing->stats.incoming.framing_bytes += (uint32_t)(end - cur); + } + if (p->byte == 4) { uint32_t received_update = p->amount; if (received_update == 0 || (received_update & 0x80000000u)) { diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h index f9f670b6f1..a1093a6041 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.h +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h @@ -37,6 +37,7 @@ #include <grpc/support/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/transport/transport.h" typedef struct { uint8_t byte; @@ -44,7 +45,8 @@ typedef struct { uint32_t amount; } grpc_chttp2_window_update_parser; -gpr_slice grpc_chttp2_window_update_create(uint32_t id, uint32_t window_delta); +gpr_slice grpc_chttp2_window_update_create(uint32_t id, uint32_t window_delta, + grpc_transport_one_way_stats *stats); grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_window_update_parser *parser, uint32_t length, uint8_t flags); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 819addd9e3..880305afdd 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -74,6 +74,7 @@ typedef struct { /* output stream id */ uint32_t stream_id; gpr_slice_buffer *output; + grpc_transport_one_way_stats *stats; } framer_state; /* fills p (which is expected to be 9 bytes long) with a data frame header */ @@ -102,6 +103,7 @@ static void finish_frame(framer_state *st, int is_header_boundary, st->stream_id, st->output->length - st->output_length_at_start_of_frame, (uint8_t)((is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) | (is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0))); + st->stats->framing_bytes += 9; st->is_first_frame = 0; } @@ -147,8 +149,10 @@ static void add_header_data(framer_state *st, gpr_slice slice) { remaining = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + st->output_length_at_start_of_frame - st->output->length; if (len <= remaining) { + st->stats->header_bytes += len; gpr_slice_buffer_add(st->output, slice); } else { + st->stats->header_bytes += remaining; gpr_slice_buffer_add(st->output, gpr_slice_split_head(&slice, remaining)); finish_frame(st, 0, 0); begin_frame(st); @@ -535,6 +539,7 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t stream_id, grpc_metadata_batch *metadata, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf) { framer_state st; grpc_linked_mdelem *l; @@ -546,6 +551,7 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, st.stream_id = stream_id; st.output = outbuf; st.is_first_frame = 1; + st.stats = stats; /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index d79de35979..91c368fbe2 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -40,6 +40,7 @@ #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport.h" #define GRPC_CHTTP2_HPACKC_NUM_FILTERS 256 #define GRPC_CHTTP2_HPACKC_NUM_VALUES 256 @@ -90,6 +91,7 @@ void grpc_chttp2_hpack_compressor_set_max_usable_size( void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t id, grpc_metadata_batch *metadata, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index ec3387efb8..259452fa42 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1426,6 +1426,9 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { grpc_chttp2_hpack_parser *parser = hpack_parser; GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0); + if (stream_parsing != NULL) { + stream_parsing->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice); + } if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice))) { GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 2fae653623..2c3c7a3820 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -394,6 +394,9 @@ typedef struct { grpc_metadata_batch *recv_trailing_metadata; grpc_closure *recv_trailing_metadata_finished; + grpc_transport_stream_stats *collecting_stats; + grpc_transport_stream_stats stats; + /** when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ @@ -435,6 +438,8 @@ typedef struct { gpr_slice fetching_slice; size_t stream_fetched; grpc_closure finished_fetch; + /** stats gathered during the write */ + grpc_transport_one_way_stats stats; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -460,6 +465,8 @@ struct grpc_chttp2_stream_parsing { int64_t outgoing_window; /** number of bytes received - reset at end of parse thread execution */ int64_t received_bytes; + /** stats gathered during the parse */ + grpc_transport_stream_stats stats; /** incoming metadata */ grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; @@ -635,6 +642,7 @@ void grpc_chttp2_parsing_become_skip_parser( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, int success); #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index a9f043cee3..1e815e096b 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -171,6 +171,9 @@ void grpc_chttp2_publish_reads( grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } + /* flush stats to global stream state */ + grpc_transport_move_stats(&stream_parsing->stats, &stream_global->stats); + /* update outgoing flow control window */ was_zero = stream_global->outgoing_window <= 0; GRPC_CHTTP2_FLOW_MOVE_STREAM("parsed", transport_global, stream_global, @@ -544,8 +547,13 @@ static int init_data_frame_parser( grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; - if (!stream_parsing || stream_parsing->received_close) + if (stream_parsing == NULL) { + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + } + stream_parsing->stats.incoming.framing_bytes += 9; + if (stream_parsing->received_close) { return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + } if (err == GRPC_CHTTP2_PARSE_OK) { err = update_incoming_window(exec_ctx, transport_parsing, stream_parsing); } @@ -566,7 +574,8 @@ static int init_data_frame_parser( gpr_slice_buffer_add( &transport_parsing->qbuf, grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, - GRPC_CHTTP2_PROTOCOL_ERROR)); + GRPC_CHTTP2_PROTOCOL_ERROR, + &stream_parsing->stats.outgoing)); return init_skip_frame_parser(exec_ctx, transport_parsing, 0); case GRPC_CHTTP2_CONNECTION_ERROR: return 0; @@ -717,6 +726,7 @@ static int init_header_frame_parser( transport_parsing->incoming_stream = stream_parsing; } GPR_ASSERT(stream_parsing != NULL && (via_accept == 0 || via_accept == 1)); + stream_parsing->stats.incoming.framing_bytes += 9; if (stream_parsing->received_close) { gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); transport_parsing->incoming_stream = NULL; @@ -752,9 +762,14 @@ static int init_window_update_frame_parser( &transport_parsing->simple.window_update, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); - if (transport_parsing->incoming_stream_id) { - transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( - transport_parsing, transport_parsing->incoming_stream_id); + if (transport_parsing->incoming_stream_id != 0) { + grpc_chttp2_stream_parsing *stream_parsing = + transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( + transport_parsing, transport_parsing->incoming_stream_id); + if (stream_parsing == NULL) { + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + } + stream_parsing->stats.incoming.framing_bytes += 9; } transport_parsing->parser = grpc_chttp2_window_update_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.window_update; @@ -778,11 +793,13 @@ static int init_rst_stream_parser( &transport_parsing->simple.rst_stream, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); - transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( - transport_parsing, transport_parsing->incoming_stream_id); + grpc_chttp2_stream_parsing *stream_parsing = + transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( + transport_parsing, transport_parsing->incoming_stream_id); if (!transport_parsing->incoming_stream) { return init_skip_frame_parser(exec_ctx, transport_parsing, 0); } + stream_parsing->stats.incoming.framing_bytes += 9; transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.rst_stream; return ok; @@ -856,7 +873,8 @@ static int parse_frame_slice(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_add( &transport_parsing->qbuf, grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, - GRPC_CHTTP2_PROTOCOL_ERROR)); + GRPC_CHTTP2_PROTOCOL_ERROR, + &stream_parsing->stats.outgoing)); } return 1; case GRPC_CHTTP2_CONNECTION_ERROR: diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 8d5886f225..6bb188f1da 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -161,8 +161,10 @@ int grpc_chttp2_unlocking_check_writes( transport_global->announce_incoming_window, UINT32_MAX); GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global, announce_incoming_window, announced); - gpr_slice_buffer_add(&transport_writing->outbuf, - grpc_chttp2_window_update_create(0, announced)); + grpc_transport_one_way_stats throwaway_stats; + gpr_slice_buffer_add( + &transport_writing->outbuf, + grpc_chttp2_window_update_create(0, announced, &throwaway_stats)); } GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0); @@ -205,7 +207,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, if (stream_writing->send_initial_metadata != NULL) { grpc_chttp2_encode_header( &transport_writing->hpack_compressor, stream_writing->id, - stream_writing->send_initial_metadata, 0, &transport_writing->outbuf); + stream_writing->send_initial_metadata, 0, &stream_writing->stats, + &transport_writing->outbuf); stream_writing->send_initial_metadata = NULL; stream_writing->sent_initial_metadata = 1; } @@ -216,7 +219,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_add( &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_writing->id, - stream_writing->announce_window)); + stream_writing->announce_window, + &stream_writing->stats)); GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing, announce_window, announce); stream_writing->announce_window = 0; @@ -255,7 +259,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, stream_writing->send_trailing_metadata); grpc_chttp2_encode_data( stream_writing->id, &stream_writing->flow_controlled_buffer, - send_bytes, is_last_frame, &transport_writing->outbuf); + send_bytes, is_last_frame, &stream_writing->stats, + &transport_writing->outbuf); GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing, outgoing_window, send_bytes); @@ -281,19 +286,20 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, stream_writing->send_trailing_metadata != NULL) { if (grpc_metadata_batch_is_empty( stream_writing->send_trailing_metadata)) { - grpc_chttp2_encode_data(stream_writing->id, - &stream_writing->flow_controlled_buffer, 0, 1, - &transport_writing->outbuf); + grpc_chttp2_encode_data( + stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1, + &stream_writing->stats, &transport_writing->outbuf); } else { - grpc_chttp2_encode_header(&transport_writing->hpack_compressor, - stream_writing->id, - stream_writing->send_trailing_metadata, 1, - &transport_writing->outbuf); + grpc_chttp2_encode_header( + &transport_writing->hpack_compressor, stream_writing->id, + stream_writing->send_trailing_metadata, 1, &stream_writing->stats, + &transport_writing->outbuf); } if (!transport_writing->is_client && !stream_writing->read_closed) { gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create( - stream_writing->id, GRPC_CHTTP2_NO_ERROR)); + stream_writing->id, GRPC_CHTTP2_NO_ERROR, + &stream_writing->stats)); } stream_writing->send_trailing_metadata = NULL; stream_writing->sent_trailing_metadata = 1; @@ -328,17 +334,21 @@ void grpc_chttp2_cleanup_writing( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_initial_metadata_finished, 1); + exec_ctx, stream_global, + &stream_global->send_initial_metadata_finished, 1); } + grpc_transport_move_one_way_stats(&stream_writing->stats, + &stream_global->stats.outgoing); if (stream_writing->sent_message) { GPR_ASSERT(stream_writing->send_message == NULL); grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_message_finished, 1); + exec_ctx, stream_global, &stream_global->send_message_finished, 1); stream_writing->sent_message = 0; } if (stream_writing->sent_trailing_metadata) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, 1); + exec_ctx, stream_global, + &stream_global->send_trailing_metadata_finished, 1); } if (stream_writing->sent_trailing_metadata) { grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index d63a4a7401..0219d7e6d0 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -174,6 +174,9 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; + /* Call stats: only valid after trailing metadata received */ + grpc_transport_stream_stats stats; + /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; /* Supported encodings (compression algorithms), a bitset */ @@ -1371,6 +1374,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + stream_op.collect_stats = &call->stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ @@ -1392,6 +1396,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + stream_op.collect_stats = &call->stats; break; } } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 18256aae5e..411e4f140a 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -35,6 +35,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include "src/core/lib/transport/transport_impl.h" #ifdef GRPC_STREAM_REFCOUNT_DEBUG @@ -76,6 +77,24 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, grpc_closure_init(&refcount->destroy, cb, cb_arg); } +static void move64(uint64_t *from, uint64_t *to) { + *to += *from; + *from = 0; +} + +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, + grpc_transport_one_way_stats *to) { + move64(&from->framing_bytes, &to->framing_bytes); + move64(&from->data_bytes, &to->data_bytes); + move64(&from->header_bytes, &to->header_bytes); +} + +void grpc_transport_move_stats(grpc_transport_stream_stats *from, + grpc_transport_stream_stats *to) { + grpc_transport_move_one_way_stats(&from->incoming, &to->incoming); + grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing); +} + size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index e98cfe9515..c253b190e7 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -78,6 +78,23 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif +typedef struct { + uint64_t framing_bytes; + uint64_t data_bytes; + uint64_t header_bytes; +} grpc_transport_one_way_stats; + +typedef struct grpc_transport_stream_stats { + grpc_transport_one_way_stats incoming; + grpc_transport_one_way_stats outgoing; +} grpc_transport_stream_stats; + +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, + grpc_transport_one_way_stats *to); + +void grpc_transport_move_stats(grpc_transport_stream_stats *from, + grpc_transport_stream_stats *to); + /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { @@ -104,6 +121,9 @@ typedef struct grpc_transport_stream_op { */ grpc_metadata_batch *recv_trailing_metadata; + /** Collect any stats into provided buffer, zero internal stat counters */ + grpc_transport_stream_stats *collect_stats; + /** Should be enqueued when all requested operations (excluding recv_message and recv_initial_metadata which have their own closures) in a given batch have been completed. */ diff --git a/test/core/transport/chttp2/hpack_encoder_test.c b/test/core/transport/chttp2/hpack_encoder_test.c index 818ce09a7c..464f0d93b0 100644 --- a/test/core/transport/chttp2/hpack_encoder_test.c +++ b/test/core/transport/chttp2/hpack_encoder_test.c @@ -34,10 +34,12 @@ #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include <stdio.h> +#include <string.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> + #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/metadata.h" @@ -93,7 +95,10 @@ static void verify(size_t window_available, int eof, size_t expect_window_used, gpr_slice_buffer_init(&output); - grpc_chttp2_encode_header(&g_compressor, 0xdeadbeef, &b, eof, &output); + grpc_transport_one_way_stats stats; + memset(&stats, 0, sizeof(stats)); + grpc_chttp2_encode_header(&g_compressor, 0xdeadbeef, &b, eof, &stats, + &output); merged = grpc_slice_merge(output.slices, output.count); gpr_slice_buffer_destroy(&output); grpc_metadata_batch_destroy(&b); |