aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-11-02 14:16:12 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-11-02 14:16:12 -0800
commit9d35a1f9ce98e446d9036d64227b3c9e9317177a (patch)
tree77f2d34117faffcf5649fbd6bd66a850eea4e4d3 /src/core
parente8b5f627ccae2db13e366059c3738e96ac9a5d29 (diff)
stream_op cleanup: transport changes
Diffstat (limited to 'src/core')
-rw-r--r--src/core/transport/byte_stream.c76
-rw-r--r--src/core/transport/byte_stream.h88
-rw-r--r--src/core/transport/chttp2/frame_data.c90
-rw-r--r--src/core/transport/chttp2/frame_data.h23
-rw-r--r--src/core/transport/chttp2/frame_window_update.c15
-rw-r--r--src/core/transport/chttp2/hpack_encoder.c (renamed from src/core/transport/chttp2/stream_encoder.c)289
-rw-r--r--src/core/transport/chttp2/hpack_encoder.h (renamed from src/core/transport/chttp2/stream_encoder.h)22
-rw-r--r--src/core/transport/chttp2/hpack_parser.c16
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c136
-rw-r--r--src/core/transport/chttp2/incoming_metadata.h24
-rw-r--r--src/core/transport/chttp2/internal.h346
-rw-r--r--src/core/transport/chttp2/parsing.c368
-rw-r--r--src/core/transport/chttp2/stream_lists.c112
-rw-r--r--src/core/transport/chttp2/writing.c330
-rw-r--r--src/core/transport/chttp2_transport.c826
-rw-r--r--src/core/transport/metadata.c45
-rw-r--r--src/core/transport/metadata_batch.c (renamed from src/core/transport/stream_op.c)169
-rw-r--r--src/core/transport/metadata_batch.h (renamed from src/core/transport/stream_op.h)89
-rw-r--r--src/core/transport/transport.c50
-rw-r--r--src/core/transport/transport.h60
-rw-r--r--src/core/transport/transport_impl.h8
-rw-r--r--src/core/transport/transport_op_string.c80
22 files changed, 1841 insertions, 1421 deletions
diff --git a/src/core/transport/byte_stream.c b/src/core/transport/byte_stream.c
new file mode 100644
index 0000000000..81e8e77ccb
--- /dev/null
+++ b/src/core/transport/byte_stream.c
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/transport/byte_stream.h"
+
+#include <stdlib.h>
+
+#include <grpc/support/log.h>
+
+int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, gpr_slice *slice,
+ size_t max_size_hint, grpc_closure *on_complete) {
+ return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint,
+ on_complete);
+}
+
+void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream) {
+ byte_stream->destroy(byte_stream);
+}
+
+/* slice_buffer_stream */
+
+static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ gpr_slice *slice, size_t max_size_hint,
+ grpc_closure *on_complete) {
+ grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
+ *slice = gpr_slice_ref(stream->backing_buffer->slices[stream->cursor]);
+ stream->cursor++;
+ return 1;
+}
+
+static void slice_buffer_stream_destroy(grpc_byte_stream *byte_stream) {}
+
+void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
+ gpr_slice_buffer *slice_buffer,
+ gpr_uint32 flags) {
+ GPR_ASSERT(slice_buffer->length <= GPR_UINT32_MAX);
+ stream->base.length = (gpr_uint32)slice_buffer->length;
+ stream->base.flags = flags;
+ stream->base.next = slice_buffer_stream_next;
+ stream->base.destroy = slice_buffer_stream_destroy;
+ stream->backing_buffer = slice_buffer;
+ stream->cursor = 0;
+}
diff --git a/src/core/transport/byte_stream.h b/src/core/transport/byte_stream.h
new file mode 100644
index 0000000000..c94d8ff275
--- /dev/null
+++ b/src/core/transport/byte_stream.h
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_TRANSPORT_BYTE_STREAM_H
+#define GRPC_INTERNAL_CORE_TRANSPORT_BYTE_STREAM_H
+
+#include "src/core/iomgr/exec_ctx.h"
+#include <grpc/support/slice_buffer.h>
+
+/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
+ * compression for the message */
+#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
+/** Mask of all valid internal flags. */
+#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
+
+struct grpc_byte_stream;
+typedef struct grpc_byte_stream grpc_byte_stream;
+
+struct grpc_byte_stream {
+ gpr_uint32 length;
+ gpr_uint32 flags;
+ int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+ gpr_slice *slice, size_t max_size_hint,
+ grpc_closure *on_complete);
+ void (*destroy)(grpc_byte_stream *byte_stream);
+};
+
+/* returns 1 if the bytes are available immediately (in which case
+ * on_complete will not be called), 0 if the bytes will be available
+ * asynchronously.
+ *
+ * on entry, *remaining can be set as a hint as to the maximum number
+ * of bytes that would be acceptable to read.
+ *
+ * fills *buffer, *length, *remaining with the bytes, length of bytes
+ * and length of data remaining to be read before either returning 1
+ * or calling on_complete.
+ *
+ * once a slice is returned into *slice, it is owned by the caller.
+ */
+int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, gpr_slice *slice,
+ size_t max_size_hint, grpc_closure *on_complete);
+
+void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream);
+
+/* grpc_byte_stream that wraps a slice buffer */
+typedef struct grpc_slice_buffer_stream {
+ grpc_byte_stream base;
+ gpr_slice_buffer *backing_buffer;
+ size_t cursor;
+} grpc_slice_buffer_stream;
+
+void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
+ gpr_slice_buffer *slice_buffer,
+ gpr_uint32 flags);
+
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_BYTE_STREAM_H */
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 07179a4571..7287f97aaa 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -45,12 +45,15 @@
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
- grpc_sopb_init(&parser->incoming_sopb);
return GRPC_CHTTP2_PARSE_OK;
}
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) {
- grpc_sopb_destroy(&parser->incoming_sopb);
+ grpc_byte_stream *bs;
+ while (
+ (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
+ grpc_byte_stream_destroy(bs);
+ }
}
grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
@@ -69,6 +72,62 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
return GRPC_CHTTP2_PARSE_OK;
}
+void grpc_chttp2_incoming_frame_queue_merge(
+ grpc_chttp2_incoming_frame_queue *head_dst,
+ grpc_chttp2_incoming_frame_queue *tail_src) {
+ if (tail_src->head == NULL) {
+ return;
+ }
+
+ if (head_dst->head == NULL) {
+ *head_dst = *tail_src;
+ memset(tail_src, 0, sizeof(*tail_src));
+ return;
+ }
+
+ head_dst->tail->next_message = tail_src->head;
+ head_dst->tail = tail_src->tail;
+ memset(tail_src, 0, sizeof(*tail_src));
+}
+
+grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
+ grpc_chttp2_incoming_frame_queue *q) {
+ grpc_byte_stream *out;
+ if (q->head == NULL) {
+ return NULL;
+ }
+ out = &q->head->base;
+ if (q->head == q->tail) {
+ memset(q, 0, sizeof(*q));
+ } else {
+ q->head = q->head->next_message;
+ }
+ return out;
+}
+
+void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf,
+ gpr_uint32 write_bytes, int is_eof,
+ gpr_slice_buffer *outbuf) {
+ gpr_slice hdr;
+ gpr_uint8 *p;
+
+ hdr = gpr_slice_malloc(9);
+ p = GPR_SLICE_START_PTR(hdr);
+ GPR_ASSERT(write_bytes < 16777316);
+ *p++ = (gpr_uint8)(write_bytes >> 16);
+ *p++ = (gpr_uint8)(write_bytes >> 8);
+ *p++ = (gpr_uint8)(write_bytes);
+ *p++ = GRPC_CHTTP2_FRAME_DATA;
+ *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
+ *p++ = (gpr_uint8)(id >> 24);
+ *p++ = (gpr_uint8)(id >> 16);
+ *p++ = (gpr_uint8)(id >> 8);
+ *p++ = (gpr_uint8)(id);
+ gpr_slice_buffer_add(outbuf, hdr);
+
+ gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf);
+}
+
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
@@ -77,7 +136,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_data_parser *p = parser;
- gpr_uint32 message_flags = 0;
+ gpr_uint32 message_flags;
+ grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
if (is_last && p->is_last_frame) {
stream_parsing->received_close = 1;
@@ -132,11 +192,14 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((gpr_uint32)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
+ message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size,
- message_flags);
+ p->parsing_frame = incoming_byte_stream =
+ grpc_chttp2_incoming_byte_stream_create(
+ transport_parsing, stream_parsing, p->frame_size, message_flags,
+ &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
@@ -147,20 +210,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
if ((gpr_uint32)(end - cur) == p->frame_size) {
- grpc_sopb_add_slice(
- &p->incoming_sopb,
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
- grpc_sopb_add_slice(&p->incoming_sopb,
- gpr_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(cur + p->frame_size - beg)));
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ gpr_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(cur + p->frame_size - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
cur += p->frame_size;
goto fh_0; /* loop */
} else {
- grpc_sopb_add_slice(
- &p->incoming_sopb,
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
GPR_ASSERT((size_t)(end - cur) <= p->frame_size);
p->frame_size -= (gpr_uint32)(end - cur);
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index 6762484e5b..bc32f29d97 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -39,7 +39,7 @@
#include "src/core/iomgr/exec_ctx.h"
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
-#include "src/core/transport/stream_op.h"
+#include "src/core/transport/byte_stream.h"
#include "src/core/transport/chttp2/frame.h"
typedef enum {
@@ -51,6 +51,14 @@ typedef enum {
GRPC_CHTTP2_DATA_FRAME
} grpc_chttp2_stream_state;
+typedef struct grpc_chttp2_incoming_byte_stream
+ grpc_chttp2_incoming_byte_stream;
+
+typedef struct grpc_chttp2_incoming_frame_queue {
+ grpc_chttp2_incoming_byte_stream *head;
+ grpc_chttp2_incoming_byte_stream *tail;
+} grpc_chttp2_incoming_frame_queue;
+
typedef struct {
grpc_chttp2_stream_state state;
gpr_uint8 is_last_frame;
@@ -58,9 +66,16 @@ typedef struct {
gpr_uint32 frame_size;
int is_frame_compressed;
- grpc_stream_op_buffer incoming_sopb;
+ grpc_chttp2_incoming_frame_queue incoming_frames;
+ grpc_chttp2_incoming_byte_stream *parsing_frame;
} grpc_chttp2_data_parser;
+void grpc_chttp2_incoming_frame_queue_merge(
+ grpc_chttp2_incoming_frame_queue *head_dst,
+ grpc_chttp2_incoming_frame_queue *tail_src);
+grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
+ grpc_chttp2_incoming_frame_queue *q);
+
/* initialize per-stream state for data frame parsing */
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser);
@@ -81,4 +96,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
/* create a slice with an empty data frame and is_last set */
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id);
+void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf,
+ gpr_uint32 write_bytes, int is_eof,
+ gpr_slice_buffer *outbuf);
+
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_DATA_H */
diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c
index 91bbcfe2c1..74ca29baf9 100644
--- a/src/core/transport/chttp2/frame_window_update.c
+++ b/src/core/transport/chttp2/frame_window_update.c
@@ -89,7 +89,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
}
if (p->byte == 4) {
- if (p->amount == 0 || (p->amount & 0x80000000u)) {
+ gpr_uint32 received_update = p->amount;
+ if (received_update == 0 || (received_update & 0x80000000u)) {
gpr_log(GPR_ERROR, "invalid window update bytes: %d", p->amount);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
@@ -97,17 +98,15 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
if (transport_parsing->incoming_stream_id != 0) {
if (stream_parsing != NULL) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing,
- stream_parsing, outgoing_window_update,
- p->amount);
- stream_parsing->outgoing_window_update += p->amount;
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_parsing,
+ stream_parsing, outgoing_window,
+ received_update);
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
}
} else {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("update", transport_parsing,
- outgoing_window_update, p->amount);
- transport_parsing->outgoing_window_update += p->amount;
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_parsing,
+ outgoing_window, received_update);
}
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/hpack_encoder.c
index 24a5d958b8..6c7c00b7c3 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/hpack_encoder.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/transport/chttp2/stream_encoder.h"
+#include "src/core/transport/chttp2/hpack_encoder.h"
#include <assert.h>
#include <string.h>
@@ -54,18 +54,13 @@
/* don't consider adding anything bigger than this to the hpack table */
#define MAX_DECODER_SPACE_USAGE 512
-/* what kind of frame our we encoding? */
-typedef enum { HEADER, DATA, NONE } frame_type;
-
typedef struct {
- frame_type cur_frame_type;
+ int is_first_frame;
/* number of bytes in 'output' when we started the frame - used to calculate
frame length */
size_t output_length_at_start_of_frame;
/* index (in output) of the header for the current frame */
size_t header_idx;
- /* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */
- gpr_uint8 last_was_header;
/* have we seen a regular (non-colon-prefixed) header yet? */
gpr_uint8 seen_regular_header;
/* output stream id */
@@ -92,58 +87,35 @@ static void fill_header(gpr_uint8 *p, gpr_uint8 type, gpr_uint32 id, size_t len,
static void finish_frame(framer_state *st, int is_header_boundary,
int is_last_in_stream) {
gpr_uint8 type = 0xff;
- switch (st->cur_frame_type) {
- case HEADER:
- type = st->last_was_header ? GRPC_CHTTP2_FRAME_CONTINUATION
- : GRPC_CHTTP2_FRAME_HEADER;
- st->last_was_header = 1;
- break;
- case DATA:
- type = GRPC_CHTTP2_FRAME_DATA;
- st->last_was_header = 0;
- is_header_boundary = 0;
- break;
- case NONE:
- return;
- }
+ type = st->is_first_frame ? GRPC_CHTTP2_FRAME_HEADER
+ : GRPC_CHTTP2_FRAME_CONTINUATION;
fill_header(
GPR_SLICE_START_PTR(st->output->slices[st->header_idx]), type,
st->stream_id, st->output->length - st->output_length_at_start_of_frame,
(gpr_uint8)(
(is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) |
(is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0)));
- st->cur_frame_type = NONE;
+ st->is_first_frame = 0;
}
/* begin a new frame: reserve off header space, remember how many bytes we'd
output before beginning */
-static void begin_frame(framer_state *st, frame_type type) {
- GPR_ASSERT(type != NONE);
- GPR_ASSERT(st->cur_frame_type == NONE);
- st->cur_frame_type = type;
+static void begin_frame(framer_state *st) {
st->header_idx =
gpr_slice_buffer_add_indexed(st->output, gpr_slice_malloc(9));
st->output_length_at_start_of_frame = st->output->length;
}
-static void begin_new_frame(framer_state *st, frame_type type) {
- finish_frame(st, 1, 0);
- st->last_was_header = 0;
- begin_frame(st, type);
-}
-
/* make sure that the current frame is of the type desired, and has sufficient
space to add at least about_to_add bytes -- finishes the current frame if
needed */
-static void ensure_frame_type(framer_state *st, frame_type type,
- size_t need_bytes) {
- if (st->cur_frame_type == type &&
- st->output->length - st->output_length_at_start_of_frame + need_bytes <=
- GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
+static void ensure_space(framer_state *st, size_t need_bytes) {
+ if (st->output->length - st->output_length_at_start_of_frame + need_bytes <=
+ GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
return;
}
- finish_frame(st, type != HEADER, 0);
- begin_frame(st, type);
+ finish_frame(st, 0, 0);
+ begin_frame(st);
}
/* increment a filter count, halve all counts if one element reaches max */
@@ -165,31 +137,30 @@ static void add_header_data(framer_state *st, gpr_slice slice) {
size_t len = GPR_SLICE_LENGTH(slice);
size_t remaining;
if (len == 0) return;
- ensure_frame_type(st, HEADER, 1);
remaining = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
st->output_length_at_start_of_frame - st->output->length;
if (len <= remaining) {
gpr_slice_buffer_add(st->output, slice);
} else {
gpr_slice_buffer_add(st->output, gpr_slice_split_head(&slice, remaining));
+ finish_frame(st, 0, 0);
+ begin_frame(st);
add_header_data(st, slice);
}
}
static gpr_uint8 *add_tiny_header_data(framer_state *st, size_t len) {
- ensure_frame_type(st, HEADER, len);
+ ensure_space(st, len);
return gpr_slice_buffer_tiny_add(st->output, len);
}
-/* add an element to the decoder table: returns metadata element to unref */
-static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem) {
+/* add an element to the decoder table */
+static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1;
size_t elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) +
GPR_SLICE_LENGTH(elem->value->slice);
- grpc_mdelem *elem_to_unref;
GPR_ASSERT(elem_size < 65536);
@@ -220,31 +191,27 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) {
/* already there: update with new index */
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) {
/* already there (cuckoo): update with new index */
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) {
/* not there, but a free element: add */
- c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
+ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- elem_to_unref = NULL;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) {
/* not there (cuckoo), but a free element: add */
- c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
+ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- elem_to_unref = NULL;
} else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
/* not there: replace oldest */
- elem_to_unref = c->entries_elems[HASH_FRAGMENT_2(elem_hash)];
- c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
+ GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
+ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
} else {
/* not there: replace oldest */
- elem_to_unref = c->entries_elems[HASH_FRAGMENT_3(elem_hash)];
- c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
+ GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
+ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
}
@@ -270,8 +237,6 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key);
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
}
-
- return elem_to_unref;
}
static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 elem_index,
@@ -370,9 +335,9 @@ static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c,
c->table_elems - elem_index;
}
-/* encode an mdelem; returns metadata element to unref */
-static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem, framer_state *st) {
+/* encode an mdelem */
+static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
+ framer_state *st) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
size_t decoder_space_usage;
@@ -397,7 +362,7 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: complete element (first cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]),
st);
- return elem;
+ return;
}
if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem &&
@@ -405,7 +370,7 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: complete element (second cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]),
st);
- return elem;
+ return;
}
/* should this elem be in the table? */
@@ -423,12 +388,13 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- return add_elem(c, elem);
+ add_elem(c, elem);
+ return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
- return elem;
+ return;
}
- GPR_UNREACHABLE_CODE(return NULL);
+ GPR_UNREACHABLE_CODE(return );
}
indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)];
@@ -437,24 +403,26 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- return add_elem(c, elem);
+ add_elem(c, elem);
+ return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
- return elem;
+ return;
}
- GPR_UNREACHABLE_CODE(return NULL);
+ GPR_UNREACHABLE_CODE(return );
}
/* no elem, key in the table... fall back to literal emission */
if (should_add_elem) {
emit_lithdr_incidx_v(c, elem, st);
- return add_elem(c, elem);
+ add_elem(c, elem);
+ return;
} else {
emit_lithdr_noidx_v(c, elem, st);
- return elem;
+ return;
}
- GPR_UNREACHABLE_CODE(return NULL);
+ GPR_UNREACHABLE_CODE(return );
}
#define STRLEN_LIT(x) (sizeof(x) - 1)
@@ -469,8 +437,8 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
mdelem = grpc_mdelem_from_metadata_strings(
c->mdctx, GRPC_MDSTR_REF(c->timeout_key_str),
grpc_mdstr_from_string(c->mdctx, timeout_str));
- mdelem = hpack_enc(c, mdelem, st);
- if (mdelem) GRPC_MDELEM_UNREF(mdelem);
+ hpack_enc(c, mdelem, st);
+ GRPC_MDELEM_UNREF(mdelem);
}
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) {
@@ -495,169 +463,34 @@ void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) {
GRPC_MDSTR_UNREF(c->timeout_key_str);
}
-gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
- gpr_uint32 max_flow_controlled_bytes,
- grpc_stream_op_buffer *outops) {
- gpr_slice slice;
- grpc_stream_op *op;
- gpr_uint32 max_take_size;
- gpr_uint32 flow_controlled_bytes_taken = 0;
- gpr_uint32 curop = 0;
- gpr_uint8 *p;
- gpr_uint8 compressed_flag_set = 0;
-
- while (curop < *inops_count) {
- GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes);
- op = &inops[curop];
- switch (op->type) {
- case GRPC_NO_OP:
- /* skip */
- curop++;
- break;
- case GRPC_OP_METADATA:
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- /* these just get copied as they don't impact the number of flow
- controlled bytes */
- grpc_sopb_append(outops, op, 1);
- curop++;
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* begin op: for now we just convert the op to a slice and fall
- through - this lets us reuse the slice framing code below */
- compressed_flag_set =
- (op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
- slice = gpr_slice_malloc(5);
-
- p = GPR_SLICE_START_PTR(slice);
- p[0] = compressed_flag_set;
- p[1] = (gpr_uint8)(op->data.begin_message.length >> 24);
- p[2] = (gpr_uint8)(op->data.begin_message.length >> 16);
- p[3] = (gpr_uint8)(op->data.begin_message.length >> 8);
- p[4] = (gpr_uint8)(op->data.begin_message.length);
- op->type = GRPC_OP_SLICE;
- op->data.slice = slice;
- /* fallthrough */
- case GRPC_OP_SLICE:
- slice = op->data.slice;
- if (!GPR_SLICE_LENGTH(slice)) {
- /* skip zero length slices */
- gpr_slice_unref(slice);
- curop++;
- break;
- }
- max_take_size = max_flow_controlled_bytes - flow_controlled_bytes_taken;
- if (max_take_size == 0) {
- goto exit_loop;
- }
- if (GPR_SLICE_LENGTH(slice) > max_take_size) {
- slice = gpr_slice_split_head(&op->data.slice, max_take_size);
- grpc_sopb_add_slice(outops, slice);
- } else {
- /* consume this op immediately */
- grpc_sopb_append(outops, op, 1);
- curop++;
- }
- flow_controlled_bytes_taken += (gpr_uint32)GPR_SLICE_LENGTH(slice);
- break;
- }
- }
-exit_loop:
- *inops_count -= curop;
- memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
-
- for (curop = 0; curop < *inops_count; curop++) {
- if (inops[curop].type == GRPC_OP_METADATA) {
- grpc_metadata_batch_assert_ok(&inops[curop].data.metadata);
- }
- }
-
- return flow_controlled_bytes_taken;
-}
-
-void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
- gpr_uint32 stream_id,
- grpc_chttp2_hpack_compressor *compressor,
- gpr_slice_buffer *output) {
+void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
+ gpr_uint32 stream_id,
+ grpc_metadata_batch *metadata, int is_eof,
+ gpr_slice_buffer *outbuf) {
framer_state st;
- gpr_slice slice;
- grpc_stream_op *op;
- size_t max_take_size;
- gpr_uint32 curop = 0;
- gpr_uint32 unref_op;
grpc_linked_mdelem *l;
- int need_unref = 0;
gpr_timespec deadline;
GPR_ASSERT(stream_id != 0);
- st.cur_frame_type = NONE;
- st.last_was_header = 0;
st.seen_regular_header = 0;
st.stream_id = stream_id;
- st.output = output;
-
- while (curop < ops_count) {
- op = &ops[curop];
- switch (op->type) {
- case GRPC_NO_OP:
- case GRPC_OP_BEGIN_MESSAGE:
- gpr_log(
- GPR_ERROR,
- "These stream ops should be filtered out by grpc_chttp2_preencode");
- abort();
- case GRPC_OP_METADATA:
- /* Encode a metadata batch; store the returned values, representing
- a metadata element that needs to be unreffed back into the metadata
- slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
- updated). After this loop, we'll do a batch unref of elements. */
- begin_new_frame(&st, HEADER);
- need_unref |= op->data.metadata.garbage.head != NULL;
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- for (l = op->data.metadata.list.head; l; l = l->next) {
- l->md = hpack_enc(compressor, l->md, &st);
- need_unref |= l->md != NULL;
- }
- deadline = op->data.metadata.deadline;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
- deadline_enc(compressor, deadline, &st);
- }
- curop++;
- break;
- case GRPC_OP_SLICE:
- slice = op->data.slice;
- if (st.cur_frame_type == DATA &&
- st.output->length - st.output_length_at_start_of_frame ==
- GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
- finish_frame(&st, 0, 0);
- }
- ensure_frame_type(&st, DATA, 1);
- max_take_size = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
- st.output_length_at_start_of_frame - st.output->length;
- if (GPR_SLICE_LENGTH(slice) > max_take_size) {
- slice = gpr_slice_split_head(&op->data.slice, max_take_size);
- } else {
- /* consume this op immediately */
- curop++;
- }
- gpr_slice_buffer_add(output, slice);
- break;
- }
+ st.output = outbuf;
+ st.is_first_frame = 1;
+
+ /* Encode a metadata batch; store the returned values, representing
+ a metadata element that needs to be unreffed back into the metadata
+ slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
+ updated). After this loop, we'll do a batch unref of elements. */
+ begin_frame(&st);
+ grpc_metadata_batch_assert_ok(metadata);
+ for (l = metadata->list.head; l; l = l->next) {
+ hpack_enc(c, l->md, &st);
}
- if (eof && st.cur_frame_type == NONE) {
- begin_frame(&st, DATA);
- }
- finish_frame(&st, 1, eof);
-
- if (need_unref) {
- for (unref_op = 0; unref_op < curop; unref_op++) {
- op = &ops[unref_op];
- if (op->type != GRPC_OP_METADATA) continue;
- for (l = op->data.metadata.list.head; l; l = l->next) {
- if (l->md) GRPC_MDELEM_UNREF(l->md);
- }
- for (l = op->data.metadata.garbage.head; l; l = l->next) {
- GRPC_MDELEM_UNREF(l->md);
- }
- }
+ deadline = metadata->deadline;
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
+ deadline_enc(c, deadline, &st);
}
+
+ finish_frame(&st, 1, is_eof);
}
diff --git a/src/core/transport/chttp2/stream_encoder.h b/src/core/transport/chttp2/hpack_encoder.h
index db52f2a0f6..59b122dfda 100644
--- a/src/core/transport/chttp2/stream_encoder.h
+++ b/src/core/transport/chttp2/hpack_encoder.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H
-#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H
+#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H
+#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/metadata.h"
-#include "src/core/transport/stream_op.h"
+#include "src/core/transport/metadata_batch.h"
#include <grpc/support/port_platform.h>
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
@@ -78,16 +78,8 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c,
grpc_mdctx *mdctx);
void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c);
-/* select stream ops to be encoded, moving them from inops to outops, and
- moving subsequent ops in inops forward in the queue */
-gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
- gpr_uint32 max_flow_controlled_bytes,
- grpc_stream_op_buffer *outops);
+void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, gpr_uint32 id,
+ grpc_metadata_batch *metadata, int is_eof,
+ gpr_slice_buffer *outbuf);
-/* encode stream ops to output */
-void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
- gpr_uint32 stream_id,
- grpc_chttp2_hpack_compressor *compressor,
- gpr_slice_buffer *output);
-
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H */
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index 20d8312d54..6eebfc3ce4 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -38,13 +38,15 @@
#include <string.h>
#include <assert.h>
-#include "src/core/transport/chttp2/bin_encoder.h"
-#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/useful.h>
+#include "src/core/profiling/timers.h"
+#include "src/core/support/string.h"
+#include "src/core/transport/chttp2/bin_encoder.h"
+
typedef enum {
NOT_BINARY,
B64_BYTE0,
@@ -1379,20 +1381,23 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_hpack_parser *parser = hpack_parser;
+ GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0);
if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice),
GPR_SLICE_END_PTR(slice))) {
+ GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
if (is_last) {
if (parser->is_boundary && parser->state != parse_begin) {
gpr_log(GPR_ERROR,
"end of header frame not aligned with a hpack record boundary");
+ GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
if (parser->is_boundary) {
- grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- &stream_parsing->incoming_metadata,
- &stream_parsing->data_parser.incoming_sopb);
+ stream_parsing
+ ->got_metadata_on_parse[stream_parsing->header_frames_received] = 1;
+ stream_parsing->header_frames_received++;
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
}
@@ -1404,5 +1409,6 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
parser->is_boundary = 0xde;
parser->is_eof = 0xde;
}
+ GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return GRPC_CHTTP2_PARSE_OK;
}
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index 10c64f3356..956afc8e9d 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -48,14 +48,27 @@ void grpc_chttp2_incoming_metadata_buffer_init(
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer) {
size_t i;
+ if (!buffer->published) {
+ for (i = 0; i < buffer->count; i++) {
+ GRPC_MDELEM_UNREF(buffer->elems[i].md);
+ }
+ }
+ gpr_free(buffer->elems);
+}
+
+void grpc_chttp2_incoming_metadata_buffer_reset(
+ grpc_chttp2_incoming_metadata_buffer *buffer) {
+ size_t i;
+ GPR_ASSERT(!buffer->published);
for (i = 0; i < buffer->count; i++) {
GRPC_MDELEM_UNREF(buffer->elems[i].md);
}
- gpr_free(buffer->elems);
+ buffer->count = 0;
}
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) {
+ GPR_ASSERT(!buffer->published);
if (buffer->capacity == buffer->count) {
buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
buffer->elems =
@@ -66,117 +79,36 @@ void grpc_chttp2_incoming_metadata_buffer_add(
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
+ GPR_ASSERT(!buffer->published);
buffer->deadline = deadline;
}
-void grpc_chttp2_incoming_metadata_live_op_buffer_end(
- grpc_chttp2_incoming_metadata_live_op_buffer *buffer) {
- gpr_free(buffer->elems);
- buffer->elems = NULL;
-}
-
-void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) {
- grpc_metadata_batch b;
-
- b.list.head = NULL;
- /* Store away the last element of the list, so that in patch_metadata_ops
- we can reconstitute the list.
- We can't do list building here as later incoming metadata may reallocate
- the underlying array. */
- b.list.tail = (void *)(gpr_intptr)buffer->count;
- b.garbage.head = b.garbage.tail = NULL;
- b.deadline = buffer->deadline;
- buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
-
- grpc_sopb_add_metadata(sopb, b);
-}
-
void grpc_chttp2_incoming_metadata_buffer_swap(
grpc_chttp2_incoming_metadata_buffer *a,
grpc_chttp2_incoming_metadata_buffer *b) {
+ GPR_ASSERT(!a->published);
+ GPR_ASSERT(!b->published);
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b);
}
-void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
- grpc_chttp2_incoming_metadata_buffer *src,
- grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb) {
- size_t delta;
- size_t i;
- dst->deadline = gpr_time_min(src->deadline, dst->deadline);
-
- if (src->count == 0) {
- return;
- }
- if (dst->count == 0) {
- grpc_chttp2_incoming_metadata_buffer_swap(src, dst);
- return;
- }
- delta = dst->count;
- if (dst->capacity < src->count + dst->count) {
- dst->capacity = GPR_MAX(dst->capacity * 2, src->count + dst->count);
- dst->elems = gpr_realloc(dst->elems, dst->capacity * sizeof(*dst->elems));
- }
- memcpy(dst->elems + dst->count, src->elems, src->count * sizeof(*src->elems));
- dst->count += src->count;
- for (i = 0; i < sopb->nops; i++) {
- if (sopb->ops[i].type != GRPC_OP_METADATA) continue;
- sopb->ops[i].data.metadata.list.tail =
- (void *)(delta + (gpr_uintptr)sopb->ops[i].data.metadata.list.tail);
- }
- src->count = 0;
-}
-
-void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
- grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) {
- grpc_stream_op *ops = sopb->ops;
- size_t nops = sopb->nops;
- size_t i;
- size_t j;
- size_t mdidx = 0;
- size_t last_mdidx;
- int found_metadata = 0;
-
- /* rework the array of metadata into a linked list, making use
- of the breadcrumbs we left in metadata batches during
- add_metadata_batch */
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- if (op->type != GRPC_OP_METADATA) continue;
- found_metadata = 1;
- /* we left a breadcrumb indicating where the end of this list is,
- and since we add sequentially, we know from the end of the last
- segment where this segment begins */
- last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
- GPR_ASSERT(last_mdidx > mdidx);
- GPR_ASSERT(last_mdidx <= buffer->count);
- /* turn the array into a doubly linked list */
- op->data.metadata.list.head = &buffer->elems[mdidx];
- op->data.metadata.list.tail = &buffer->elems[last_mdidx - 1];
- for (j = mdidx + 1; j < last_mdidx; j++) {
- buffer->elems[j].prev = &buffer->elems[j - 1];
- buffer->elems[j - 1].next = &buffer->elems[j];
+void grpc_chttp2_incoming_metadata_buffer_publish(
+ grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) {
+ GPR_ASSERT(!buffer->published);
+ buffer->published = 1;
+ if (buffer->count > 0) {
+ size_t i;
+ for (i = 1; i < buffer->count; i++) {
+ buffer->elems[i].prev = &buffer->elems[i - 1];
}
- buffer->elems[mdidx].prev = NULL;
- buffer->elems[last_mdidx - 1].next = NULL;
- /* track where we're up to */
- mdidx = last_mdidx;
- }
- if (found_metadata) {
- live_op_buffer->elems = buffer->elems;
- if (mdidx != buffer->count) {
- /* we have a partially read metadata batch still in incoming_metadata */
- size_t new_count = buffer->count - mdidx;
- size_t copy_bytes = sizeof(*buffer->elems) * new_count;
- GPR_ASSERT(mdidx < buffer->count);
- buffer->elems = gpr_malloc(copy_bytes);
- memcpy(buffer->elems, live_op_buffer->elems + mdidx, copy_bytes);
- buffer->count = buffer->capacity = new_count;
- } else {
- buffer->elems = NULL;
- buffer->count = 0;
- buffer->capacity = 0;
+ for (i = 0; i < buffer->count - 1; i++) {
+ buffer->elems[i].next = &buffer->elems[i + 1];
}
+ buffer->elems[0].prev = NULL;
+ buffer->elems[buffer->count - 1].next = NULL;
+ batch->list.head = &buffer->elems[0];
+ batch->list.tail = &buffer->elems[buffer->count - 1];
+ } else {
+ batch->list.head = batch->list.tail = NULL;
}
+ batch->deadline = buffer->deadline;
}
diff --git a/src/core/transport/chttp2/incoming_metadata.h b/src/core/transport/chttp2/incoming_metadata.h
index 2f1de411ba..0e1dabe825 100644
--- a/src/core/transport/chttp2/incoming_metadata.h
+++ b/src/core/transport/chttp2/incoming_metadata.h
@@ -41,12 +41,9 @@ typedef struct {
size_t count;
size_t capacity;
gpr_timespec deadline;
+ int published;
} grpc_chttp2_incoming_metadata_buffer;
-typedef struct {
- grpc_linked_mdelem *elems;
-} grpc_chttp2_incoming_metadata_live_op_buffer;
-
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer);
@@ -54,27 +51,12 @@ void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_reset(
grpc_chttp2_incoming_metadata_buffer *buffer);
+void grpc_chttp2_incoming_metadata_buffer_publish(
+ grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch);
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem);
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
-/** extend sopb with a metadata batch; this must be post-processed by
- grpc_chttp2_incoming_metadata_buffer_postprocess_sopb before being handed
- out of the transport */
-void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
-
-void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
- grpc_chttp2_incoming_metadata_buffer *src,
- grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb);
-
-void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
- grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
-
-void grpc_chttp2_incoming_metadata_live_op_buffer_end(
- grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
-
#endif /* GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H */
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index b35f8b5d88..b53c9dee0b 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
+#include <assert.h>
+
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
@@ -42,9 +44,9 @@
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include "src/core/transport/chttp2/frame_settings.h"
#include "src/core/transport/chttp2/frame_window_update.h"
+#include "src/core/transport/chttp2/hpack_encoder.h"
#include "src/core/transport/chttp2/hpack_parser.h"
#include "src/core/transport/chttp2/incoming_metadata.h"
-#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
#include "src/core/transport/connectivity_state.h"
#include "src/core/transport/transport_impl.h"
@@ -56,14 +58,14 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream;
happen to them... this enum labels each list */
typedef enum {
GRPC_CHTTP2_LIST_ALL_STREAMS,
- GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED,
+ GRPC_CHTTP2_LIST_CHECK_READ_OPS,
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE,
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
- GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING,
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED,
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@@ -113,22 +115,6 @@ typedef enum {
GRPC_DTS_FRAME
} grpc_chttp2_deframe_transport_state;
-typedef enum {
- GRPC_WRITE_STATE_OPEN,
- GRPC_WRITE_STATE_QUEUED_CLOSE,
- GRPC_WRITE_STATE_SENT_CLOSE
-} grpc_chttp2_write_state;
-
-/* flags that can be or'd into stream_global::writing_now */
-#define GRPC_CHTTP2_WRITING_DATA 1
-#define GRPC_CHTTP2_WRITING_WINDOW 2
-
-typedef enum {
- GRPC_DONT_SEND_CLOSED = 0,
- GRPC_SEND_CLOSED,
- GRPC_SEND_CLOSED_WITH_RST_STREAM
-} grpc_chttp2_send_closed;
-
typedef struct {
grpc_chttp2_stream *head;
grpc_chttp2_stream *tail;
@@ -160,14 +146,28 @@ typedef struct grpc_chttp2_outstanding_ping {
struct grpc_chttp2_outstanding_ping *prev;
} grpc_chttp2_outstanding_ping;
+/* forward declared in frame_data.h */
+struct grpc_chttp2_incoming_byte_stream {
+ grpc_byte_stream base;
+ gpr_refcount refs;
+ struct grpc_chttp2_incoming_byte_stream *next_message;
+
+ grpc_chttp2_transport *transport;
+ grpc_chttp2_stream *stream;
+ int is_tail;
+ gpr_slice_buffer slices;
+ grpc_closure *on_next;
+ gpr_slice *next;
+};
+
typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
- /** window available for peer to send to us - updated after parse */
- gpr_uint32 incoming_window;
+ /** window available to announce to peer */
+ gpr_int64 announce_incoming_window;
/** how much window would we like to have for incoming_window */
gpr_uint32 connection_window_target;
@@ -209,6 +209,7 @@ typedef struct {
gpr_slice_buffer outbuf;
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
+ gpr_int64 outgoing_window;
/** is this a client? */
gpr_uint8 is_client;
/** callback for when writing is done */
@@ -233,6 +234,7 @@ struct grpc_chttp2_transport_parsing {
gpr_slice_buffer qbuf;
/* metadata object cache */
grpc_mdstr *str_grpc_timeout;
+ grpc_mdelem *elem_grpc_status_ok;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
/** simple one shot parsers */
@@ -246,8 +248,7 @@ struct grpc_chttp2_transport_parsing {
grpc_chttp2_goaway_parser goaway_parser;
/** window available for peer to send to us */
- gpr_uint32 incoming_window;
- gpr_uint32 incoming_window_delta;
+ gpr_int64 incoming_window;
/** next stream id available at the time of beginning parsing */
gpr_uint32 next_stream_id;
@@ -278,7 +279,7 @@ struct grpc_chttp2_transport_parsing {
gpr_uint32 goaway_last_stream_index;
gpr_slice goaway_text;
- gpr_int64 outgoing_window_update;
+ gpr_int64 outgoing_window;
/** pings awaiting responses */
grpc_chttp2_outstanding_ping pings;
@@ -345,8 +346,8 @@ struct grpc_chttp2_transport {
struct {
/* accept stream callback */
- void (*accept_stream)(void *user_data, grpc_transport *transport,
- const void *server_data);
+ void (*accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_transport *transport, const void *server_data);
void *accept_stream_user_data;
/** connectivity tracking */
@@ -358,9 +359,6 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
- grpc_closure *send_done_closure;
- grpc_closure *recv_done_closure;
-
/** window available for us to send to peer */
gpr_int64 outgoing_window;
/** The number of bytes the upper layers have offered to receive.
@@ -371,54 +369,66 @@ typedef struct {
not yet announced to HTTP2 flow control.
As the upper layers offer to read more bytes, this value increases.
As we advertise incoming flow control window, this value decreases. */
- gpr_uint32 unannounced_incoming_window;
- /** The number of bytes of HTTP2 flow control we have advertised.
- As we advertise incoming flow control window, this value increases.
- As bytes are read, this value decreases.
- Updated after parse. */
- gpr_uint32 incoming_window;
- /** stream ops the transport user would like to send */
- grpc_stream_op_buffer *outgoing_sopb;
+ gpr_uint32 unannounced_incoming_window_for_parse;
+ gpr_uint32 unannounced_incoming_window_for_writing;
+ /** things the upper layers would like to send */
+ grpc_metadata_batch *send_initial_metadata;
+ grpc_closure *send_initial_metadata_finished;
+ grpc_byte_stream *send_message;
+ grpc_closure *send_message_finished;
+ grpc_metadata_batch *send_trailing_metadata;
+ grpc_closure *send_trailing_metadata_finished;
+
+ grpc_metadata_batch *recv_initial_metadata;
+ grpc_closure *recv_initial_metadata_finished;
+ grpc_byte_stream **recv_message;
+ grpc_closure *recv_message_ready;
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_closure *recv_trailing_metadata_finished;
+
/** when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- grpc_chttp2_write_state write_state;
- /** is this stream closed (boolean) */
+ gpr_uint8 write_closed;
+ /** is this stream reading half-closed (boolean) */
gpr_uint8 read_closed;
- /** has this stream been cancelled? (boolean) */
- gpr_uint8 cancelled;
- grpc_status_code cancelled_status;
- /** have we told the upper layer that this stream is cancelled? */
- gpr_uint8 published_cancelled;
+ /** is this stream finished closing (and reportably closed) */
+ gpr_uint8 finished_close;
/** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map;
- /** bitmask of GRPC_CHTTP2_WRITING_xxx above */
- gpr_uint8 writing_now;
- /** has anything been written to this stream? */
- gpr_uint8 written_anything;
-
- /** stream state already published to the upper layer */
- grpc_stream_state published_state;
- /** address to publish next stream state to */
- grpc_stream_state *publish_state;
- /** pointer to sop buffer to fill in with new stream ops */
- grpc_stream_op_buffer *publish_sopb;
- grpc_stream_op_buffer incoming_sopb;
+ /** has this stream seen an error? if 1, then pending incoming frames
+ can be thrown away */
+ gpr_uint8 seen_error;
- /** incoming metadata */
- grpc_chttp2_incoming_metadata_buffer incoming_metadata;
- grpc_chttp2_incoming_metadata_live_op_buffer outstanding_metadata;
+ gpr_uint8 published_initial_metadata;
+ gpr_uint8 published_trailing_metadata;
+ gpr_uint8 faked_trailing_metadata;
+
+ grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
+ grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
+
+ grpc_chttp2_incoming_frame_queue incoming_frames;
} grpc_chttp2_stream_global;
typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
- /** sops that have passed flow control to be written */
- grpc_stream_op_buffer sopb;
- /** how strongly should we indicate closure with the next write */
- grpc_chttp2_send_closed send_closed;
+ gpr_uint8 fetching;
+ gpr_uint8 sent_initial_metadata;
+ gpr_uint8 sent_message;
+ gpr_uint8 sent_trailing_metadata;
+ gpr_uint8 read_closed;
+ /** send this initial metadata */
+ grpc_metadata_batch *send_initial_metadata;
+ grpc_byte_stream *send_message;
+ grpc_metadata_batch *send_trailing_metadata;
+ gpr_int64 outgoing_window;
/** how much window should we announce? */
gpr_uint32 announce_window;
+ gpr_slice_buffer flow_controlled_buffer;
+ gpr_slice fetching_slice;
+ size_t stream_fetched;
+ grpc_closure finished_fetch;
} grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing {
@@ -428,22 +438,29 @@ struct grpc_chttp2_stream_parsing {
gpr_uint8 received_close;
/** saw a rst_stream */
gpr_uint8 saw_rst_stream;
- /** incoming_window has been reduced by this much during parsing */
- gpr_uint32 incoming_window_delta;
+ /** how many header frames have we received? */
+ gpr_uint8 header_frames_received;
+ /** which metadata did we get (on this parse) */
+ gpr_uint8 got_metadata_on_parse[2];
+ /** should we raise the seen_error flag in transport_global */
+ gpr_uint8 seen_error;
/** window available for peer to send to us */
- gpr_uint32 incoming_window;
+ gpr_int64 incoming_window;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** reason give to rst_stream */
gpr_uint32 rst_stream_reason;
- /* amount of window given */
- gpr_uint64 outgoing_window_update;
+ /** amount of window given */
+ gpr_int64 outgoing_window;
+ /** number of bytes received - reset at end of parse thread execution */
+ gpr_int64 received_bytes;
/** incoming metadata */
- grpc_chttp2_incoming_metadata_buffer incoming_metadata;
+ grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
};
struct grpc_chttp2_stream {
+ grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
@@ -504,21 +521,10 @@ void grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-void grpc_chttp2_list_add_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_global **stream_global,
- grpc_chttp2_stream_parsing **stream_parsing);
-void grpc_chttp2_list_remove_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
-
-void grpc_chttp2_list_add_writing_stream(
+/* returns 1 if stream added, 0 if it was already present */
+int grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
- grpc_chttp2_stream_writing *stream_writing);
+ grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT;
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream(
@@ -550,31 +556,44 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
-void grpc_chttp2_list_add_closed_waiting_for_parsing(
+void grpc_chttp2_list_add_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_closed_waiting_for_parsing(
+int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
-void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing);
+int grpc_chttp2_list_pop_stalled_by_transport(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global);
+
+void grpc_chttp2_list_add_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+void grpc_chttp2_list_remove_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global);
+ grpc_chttp2_stream_global *stream_global);
+int grpc_chttp2_list_pop_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_parsing **stream_parsing);
-void grpc_chttp2_list_add_read_write_state_changed(
+void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_read_write_state_changed(
+int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
- grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ gpr_uint32 id);
void grpc_chttp2_add_incoming_goaway(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
@@ -592,7 +611,10 @@ void grpc_chttp2_for_all_streams(
grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_parsing_become_skip_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
+
+void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
+ grpc_closure **pclosure, int success);
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
@@ -607,26 +629,122 @@ extern int grpc_flowctl_trace;
else \
stmt
-#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \
- delta) \
- if (!(grpc_flowctl_trace)) { \
- } else { \
- grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
- transport->is_client, context->id, \
- (gpr_int64)(context->var), (gpr_int64)(delta)); \
- }
-
-#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \
- if (!(grpc_flowctl_trace)) { \
- } else { \
- grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
- context->is_client, 0, \
- (gpr_int64)(context->var), (gpr_int64)(delta)); \
- }
-
-void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
- const char *context, const char *var,
- int is_client, gpr_uint32 stream_id,
- gpr_int64 current_value, gpr_int64 delta);
+typedef enum {
+ GRPC_CHTTP2_FLOWCTL_MOVE,
+ GRPC_CHTTP2_FLOWCTL_CREDIT,
+ GRPC_CHTTP2_FLOWCTL_DEBIT
+} grpc_chttp2_flowctl_op;
+
+#define GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, id1, id2, dst_context, \
+ dst_var, src_context, src_var) \
+ do { \
+ assert(id1 == id2); \
+ if (grpc_flowctl_trace) { \
+ grpc_chttp2_flowctl_trace( \
+ __FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \
+ #dst_var, #src_context, #src_var, transport->is_client, id1, \
+ dst_context->dst_var, src_context->src_var); \
+ } \
+ dst_context->dst_var += src_context->src_var; \
+ src_context->src_var = 0; \
+ } while (0)
+
+#define GRPC_CHTTP2_FLOW_MOVE_STREAM(phase, transport, dst_context, dst_var, \
+ src_context, src_var) \
+ GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, dst_context->id, \
+ src_context->id, dst_context, dst_var, \
+ src_context, src_var)
+#define GRPC_CHTTP2_FLOW_MOVE_TRANSPORT(phase, dst_context, dst_var, \
+ src_context, src_var) \
+ GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, dst_context, 0, 0, dst_context, dst_var, \
+ src_context, src_var)
+
+#define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \
+ dst_var, amount) \
+ do { \
+ if (grpc_flowctl_trace) { \
+ grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
+ GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \
+ #dst_var, NULL, #amount, transport->is_client, \
+ id, dst_context->dst_var, amount); \
+ } \
+ dst_context->dst_var += amount; \
+ } while (0)
+
+#define GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, dst_var, \
+ amount) \
+ GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, dst_context->id, \
+ dst_context, dst_var, amount)
+#define GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(phase, dst_context, dst_var, amount) \
+ GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
+ amount)
+
+#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \
+ dst_var, amount) \
+ do { \
+ if (grpc_flowctl_trace) { \
+ grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
+ GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \
+ #dst_var, NULL, #amount, transport->is_client, \
+ id, dst_context->dst_var, amount); \
+ } \
+ dst_context->dst_var -= amount; \
+ } while (0)
+
+#define GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, dst_var, \
+ amount) \
+ GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, dst_context->id, \
+ dst_context, dst_var, amount)
+#define GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT(phase, dst_context, dst_var, amount) \
+ GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
+ amount)
+
+void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
+ grpc_chttp2_flowctl_op op, const char *context1,
+ const char *var1, const char *context2,
+ const char *var2, int is_client,
+ gpr_uint32 stream_id, gpr_int64 val1,
+ gpr_int64 val2);
+
+void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream,
+ grpc_status_code status, gpr_slice *details);
+void grpc_chttp2_mark_stream_closed(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, int close_reads,
+ int close_writes);
+void grpc_chttp2_start_writing(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global);
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \
+ grpc_chttp2_stream_ref(stream_global, reason)
+#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \
+ grpc_chttp2_stream_unref(exec_ctx, stream_global, reason)
+void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global,
+ const char *reason);
+void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_stream_global *stream_global,
+ const char *reason);
+#else
+#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \
+ grpc_chttp2_stream_ref(stream_global)
+#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \
+ grpc_chttp2_stream_unref(exec_ctx, stream_global)
+void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global);
+void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_stream_global *stream_global);
+#endif
+
+grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
+ gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue);
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice);
+void grpc_chttp2_incoming_byte_stream_finished(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs);
#endif
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 5d4d8e70c4..8cef8fbb77 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -42,22 +42,28 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
-static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_frame_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing);
static int init_header_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_continuation);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_continuation);
static int init_data_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
static int init_rst_stream_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
static int init_settings_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
static int init_window_update_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
-static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing);
-static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
+static int init_ping_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing);
+static int init_goaway_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing);
static int init_skip_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_header);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_header);
static int parse_frame_slice(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
@@ -74,23 +80,11 @@ void grpc_chttp2_prepare_to_read(
transport_parsing->next_stream_id = transport_global->next_stream_id;
/* update the parsing view of incoming window */
- if (transport_parsing->incoming_window != transport_global->incoming_window) {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parse", transport_parsing, incoming_window,
- (gpr_int64)transport_global->incoming_window -
- (gpr_int64)transport_parsing->incoming_window);
- transport_parsing->incoming_window = transport_global->incoming_window;
- }
- while (grpc_chttp2_list_pop_incoming_window_updated(
+ while (grpc_chttp2_list_pop_unannounced_incoming_window_available(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
- stream_parsing->id = stream_global->id;
- if (stream_parsing->incoming_window != stream_global->incoming_window) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parse", transport_parsing, stream_parsing, incoming_window,
- (gpr_int64)stream_global->incoming_window -
- (gpr_int64)stream_parsing->incoming_window);
- stream_parsing->incoming_window = stream_global->incoming_window;
- }
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("parse", transport_parsing, stream_parsing,
+ incoming_window, stream_global,
+ unannounced_incoming_window_for_parse);
}
GPR_TIMER_END("grpc_chttp2_prepare_to_read", 0);
@@ -101,6 +95,8 @@ void grpc_chttp2_publish_reads(
grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_parsing *stream_parsing;
+ int was_zero;
+ int is_zero;
/* transport_parsing->last_incoming_stream_id is used as
last-grpc_chttp2_stream-id when
@@ -144,98 +140,102 @@ void grpc_chttp2_publish_reads(
}
/* propagate flow control tokens to global state */
- if (transport_parsing->outgoing_window_update) {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_global, outgoing_window,
- transport_parsing->outgoing_window_update);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_parsing, outgoing_window_update,
- -(gpr_int64)transport_parsing->outgoing_window_update);
- transport_global->outgoing_window +=
- transport_parsing->outgoing_window_update;
- transport_parsing->outgoing_window_update = 0;
- }
-
- if (transport_parsing->incoming_window_delta) {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_global, incoming_window,
- -(gpr_int64)transport_parsing->incoming_window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_parsing, incoming_window_delta,
- -(gpr_int64)transport_parsing->incoming_window_delta);
- transport_global->incoming_window -=
- transport_parsing->incoming_window_delta;
- transport_parsing->incoming_window_delta = 0;
+ was_zero = transport_global->outgoing_window <= 0;
+ GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("parsed", transport_global, outgoing_window,
+ transport_parsing, outgoing_window);
+ is_zero = transport_global->outgoing_window <= 0;
+ if (was_zero && !is_zero) {
+ while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
+ &stream_global)) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ }
+ }
+
+ if (transport_parsing->incoming_window <
+ transport_global->connection_window_target * 3 / 4) {
+ gpr_int64 announce_bytes = transport_global->connection_window_target -
+ transport_parsing->incoming_window;
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_global,
+ announce_incoming_window, announce_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing,
+ incoming_window, announce_bytes);
}
/* for each stream that saw an update, fixup global state */
while (grpc_chttp2_list_pop_parsing_seen_stream(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
- /* update incoming flow control window */
- if (stream_parsing->incoming_window_delta) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_global, incoming_window,
- -(gpr_int64)stream_parsing->incoming_window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_parsing, incoming_window_delta,
- -(gpr_int64)stream_parsing->incoming_window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_global, max_recv_bytes,
- -(gpr_int64)stream_parsing->incoming_window_delta);
- stream_global->incoming_window -= stream_parsing->incoming_window_delta;
- GPR_ASSERT(stream_global->max_recv_bytes >=
- stream_parsing->incoming_window_delta);
- stream_global->max_recv_bytes -= stream_parsing->incoming_window_delta;
- stream_parsing->incoming_window_delta = 0;
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ if (stream_parsing->seen_error) {
+ stream_global->seen_error = 1;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
/* update outgoing flow control window */
- if (stream_parsing->outgoing_window_update) {
- int was_zero = stream_global->outgoing_window <= 0;
- int is_zero;
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("parsed", transport_parsing,
- stream_global, outgoing_window,
- stream_parsing->outgoing_window_update);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_parsing, outgoing_window_update,
- -(gpr_int64)stream_parsing->outgoing_window_update);
- GPR_ASSERT(stream_parsing->outgoing_window_update <= GPR_UINT32_MAX);
- stream_global->outgoing_window +=
- (gpr_uint32)stream_parsing->outgoing_window_update;
- stream_parsing->outgoing_window_update = 0;
- is_zero = stream_global->outgoing_window <= 0;
- if (was_zero && !is_zero) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
- }
+ was_zero = stream_global->outgoing_window <= 0;
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("parsed", transport_global, stream_global,
+ outgoing_window, stream_parsing,
+ outgoing_window);
+ is_zero = stream_global->outgoing_window <= 0;
+ if (was_zero && !is_zero) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
- /* updating closed status */
- if (stream_parsing->received_close) {
- stream_global->read_closed = 1;
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ stream_global->max_recv_bytes -= (gpr_uint32)GPR_MIN(
+ stream_global->max_recv_bytes, stream_parsing->received_bytes);
+ stream_parsing->received_bytes = 0;
+
+ /* publish incoming stream ops */
+ if (stream_global->incoming_frames.tail != NULL) {
+ stream_global->incoming_frames.tail->is_tail = 0;
+ }
+ if (stream_parsing->data_parser.incoming_frames.head != NULL) {
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ grpc_chttp2_incoming_frame_queue_merge(
+ &stream_global->incoming_frames,
+ &stream_parsing->data_parser.incoming_frames);
+ if (stream_global->incoming_frames.tail != NULL) {
+ stream_global->incoming_frames.tail->is_tail = 1;
}
+
+ if (!stream_global->published_initial_metadata &&
+ stream_parsing->got_metadata_on_parse[0]) {
+ stream_parsing->got_metadata_on_parse[0] = 0;
+ stream_global->published_initial_metadata = 1;
+ GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
+ stream_parsing->metadata_buffer[0],
+ stream_global->received_initial_metadata);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (!stream_global->published_trailing_metadata &&
+ stream_parsing->got_metadata_on_parse[1]) {
+ stream_parsing->got_metadata_on_parse[1] = 0;
+ stream_global->published_trailing_metadata = 1;
+ GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
+ stream_parsing->metadata_buffer[1],
+ stream_global->received_trailing_metadata);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+
if (stream_parsing->saw_rst_stream) {
- stream_global->cancelled = 1;
- stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status(
- (grpc_chttp2_error_code)stream_parsing->rst_stream_reason);
- if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) {
- stream_global->published_cancelled = 1;
+ if (stream_parsing->rst_stream_reason != GRPC_CHTTP2_NO_ERROR) {
+ grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status(
+ (grpc_chttp2_error_code)stream_parsing->rst_stream_reason);
+ char *status_details;
+ gpr_slice slice_details;
+ gpr_asprintf(&status_details, "Received RST_STREAM err=%d",
+ stream_parsing->rst_stream_reason);
+ slice_details = gpr_slice_from_copied_string(status_details);
+ gpr_free(status_details);
+ grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
+ status_code, &slice_details);
}
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
+ 1, 1);
}
- /* publish incoming stream ops */
- if (stream_parsing->data_parser.incoming_sopb.nops > 0) {
- grpc_incoming_metadata_buffer_move_to_referencing_sopb(
- &stream_parsing->incoming_metadata, &stream_global->incoming_metadata,
- &stream_parsing->data_parser.incoming_sopb);
- grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb,
- &stream_global->incoming_sopb);
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ if (stream_parsing->received_close) {
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
+ 1, 0);
}
}
}
@@ -363,7 +363,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(cur < end);
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur);
transport_parsing->deframe_state = GRPC_DTS_FRAME;
- if (!init_frame_parser(transport_parsing)) {
+ if (!init_frame_parser(exec_ctx, transport_parsing)) {
return 0;
}
if (transport_parsing->incoming_stream_id) {
@@ -428,7 +428,8 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
GPR_UNREACHABLE_CODE(return 0);
}
-static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+static int init_frame_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing) {
if (transport_parsing->expect_continuation_stream_id != 0) {
if (transport_parsing->incoming_frame_type !=
GRPC_CHTTP2_FRAME_CONTINUATION) {
@@ -445,30 +446,30 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
transport_parsing->incoming_stream_id);
return 0;
}
- return init_header_frame_parser(transport_parsing, 1);
+ return init_header_frame_parser(exec_ctx, transport_parsing, 1);
}
switch (transport_parsing->incoming_frame_type) {
case GRPC_CHTTP2_FRAME_DATA:
- return init_data_frame_parser(transport_parsing);
+ return init_data_frame_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_HEADER:
- return init_header_frame_parser(transport_parsing, 0);
+ return init_header_frame_parser(exec_ctx, transport_parsing, 0);
case GRPC_CHTTP2_FRAME_CONTINUATION:
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
return 0;
case GRPC_CHTTP2_FRAME_RST_STREAM:
- return init_rst_stream_parser(transport_parsing);
+ return init_rst_stream_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_SETTINGS:
- return init_settings_frame_parser(transport_parsing);
+ return init_settings_frame_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
- return init_window_update_frame_parser(transport_parsing);
+ return init_window_update_frame_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_PING:
- return init_ping_parser(transport_parsing);
+ return init_ping_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_GOAWAY:
- return init_goaway_parser(transport_parsing);
+ return init_goaway_parser(exec_ctx, transport_parsing);
default:
gpr_log(GPR_ERROR, "Unknown frame type %02x",
transport_parsing->incoming_frame_type);
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
}
}
@@ -482,7 +483,8 @@ static grpc_chttp2_parse_error skip_parser(
static void skip_header(void *tp, grpc_mdelem *md) { GRPC_MDELEM_UNREF(md); }
static int init_skip_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_header) {
if (is_header) {
gpr_uint8 is_eoh = transport_parsing->expect_continuation_stream_id != 0;
transport_parsing->parser = grpc_chttp2_header_parser_parse;
@@ -499,65 +501,51 @@ static int init_skip_frame_parser(
}
void grpc_chttp2_parsing_become_skip_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
init_skip_frame_parser(
- transport_parsing,
+ exec_ctx, transport_parsing,
transport_parsing->parser == grpc_chttp2_header_parser_parse);
}
static grpc_chttp2_parse_error update_incoming_window(
- grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
- if (transport_parsing->incoming_frame_size >
- transport_parsing->incoming_window) {
+ gpr_uint32 incoming_frame_size = transport_parsing->incoming_frame_size;
+ if (incoming_frame_size > transport_parsing->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
transport_parsing->incoming_frame_size,
transport_parsing->incoming_window);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
- if (transport_parsing->incoming_frame_size >
- stream_parsing->incoming_window) {
+ if (incoming_frame_size > stream_parsing->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
transport_parsing->incoming_frame_size,
stream_parsing->incoming_window);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "data", transport_parsing, incoming_window,
- -(gpr_int64)transport_parsing->incoming_frame_size);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("data", transport_parsing,
- incoming_window_delta,
- transport_parsing->incoming_frame_size);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "data", transport_parsing, stream_parsing, incoming_window,
- -(gpr_int64)transport_parsing->incoming_frame_size);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("data", transport_parsing, stream_parsing,
- incoming_window_delta,
- transport_parsing->incoming_frame_size);
-
- transport_parsing->incoming_window -= transport_parsing->incoming_frame_size;
- transport_parsing->incoming_window_delta +=
- transport_parsing->incoming_frame_size;
- stream_parsing->incoming_window -= transport_parsing->incoming_frame_size;
- stream_parsing->incoming_window_delta +=
- transport_parsing->incoming_frame_size;
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", transport_parsing, incoming_window,
+ incoming_frame_size);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", transport_parsing, stream_parsing,
+ incoming_window, incoming_frame_size);
+ stream_parsing->received_bytes += incoming_frame_size;
+
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
return GRPC_CHTTP2_PARSE_OK;
}
static int init_data_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_parsing *stream_parsing =
grpc_chttp2_parsing_lookup_stream(transport_parsing,
transport_parsing->incoming_stream_id);
grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
if (!stream_parsing || stream_parsing->received_close)
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
if (err == GRPC_CHTTP2_PARSE_OK) {
- err = update_incoming_window(transport_parsing, stream_parsing);
+ err = update_incoming_window(exec_ctx, transport_parsing, stream_parsing);
}
if (err == GRPC_CHTTP2_PARSE_OK) {
err = grpc_chttp2_data_parser_begin_frame(
@@ -577,7 +565,7 @@ static int init_data_frame_parser(
&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
return 0;
}
@@ -586,11 +574,13 @@ static int init_data_frame_parser(
static void free_timeout(void *p) { gpr_free(p); }
-static void on_header(void *tp, grpc_mdelem *md) {
+static void on_initial_header(void *tp, grpc_mdelem *md) {
grpc_chttp2_transport_parsing *transport_parsing = tp;
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream;
+ GPR_TIMER_BEGIN("on_initial_header", 0);
+
GPR_ASSERT(stream_parsing);
GRPC_CHTTP2_IF_TRACING(gpr_log(
@@ -598,6 +588,12 @@ static void on_header(void *tp, grpc_mdelem *md) {
transport_parsing->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+ if (md->key == transport_parsing->elem_grpc_status_ok->key &&
+ md != transport_parsing->elem_grpc_status_ok) {
+ /* TODO(ctiller): check for a status like " 0" */
+ stream_parsing->seen_error = 1;
+ }
+
if (md->key == transport_parsing->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
if (!cached_timeout) {
@@ -612,24 +608,57 @@ static void on_header(void *tp, grpc_mdelem *md) {
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
grpc_chttp2_incoming_metadata_buffer_set_deadline(
- &stream_parsing->incoming_metadata,
+ &stream_parsing->metadata_buffer[0],
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
GRPC_MDELEM_UNREF(md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata,
- md);
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &stream_parsing->metadata_buffer[0], md);
}
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+
+ GPR_TIMER_END("on_initial_header", 0);
+}
+
+static void on_trailing_header(void *tp, grpc_mdelem *md) {
+ grpc_chttp2_transport_parsing *transport_parsing = tp;
+ grpc_chttp2_stream_parsing *stream_parsing =
+ transport_parsing->incoming_stream;
+
+ GPR_TIMER_BEGIN("on_trailing_header", 0);
+
+ GPR_ASSERT(stream_parsing);
+
+ GRPC_CHTTP2_IF_TRACING(gpr_log(
+ GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", stream_parsing->id,
+ transport_parsing->is_client ? "CLI" : "SVR",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+
+ if (md->key == transport_parsing->elem_grpc_status_ok->key &&
+ md != transport_parsing->elem_grpc_status_ok) {
+ /* TODO(ctiller): check for a status like " 0" */
+ stream_parsing->seen_error = 1;
+ }
+
+ grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->metadata_buffer[1],
+ md);
+
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+
+ GPR_TIMER_END("on_trailing_header", 0);
}
static int init_header_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_continuation) {
gpr_uint8 is_eoh = (transport_parsing->incoming_frame_flags &
GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
int via_accept = 0;
grpc_chttp2_stream_parsing *stream_parsing;
+ /* TODO(ctiller): when to increment header_frames_received? */
+
if (is_eoh) {
transport_parsing->expect_continuation_stream_id = 0;
} else {
@@ -649,7 +678,7 @@ static int init_header_frame_parser(
if (is_continuation) {
gpr_log(GPR_ERROR,
"grpc_chttp2_stream disbanded before CONTINUATION received");
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
if (transport_parsing->is_client) {
if ((transport_parsing->incoming_stream_id & 1) &&
@@ -660,7 +689,7 @@ static int init_header_frame_parser(
gpr_log(GPR_ERROR,
"ignoring new grpc_chttp2_stream creation on client");
}
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
} else if (transport_parsing->last_incoming_stream_id >
transport_parsing->incoming_stream_id) {
gpr_log(GPR_ERROR,
@@ -669,19 +698,19 @@ static int init_header_frame_parser(
"id=%d, new grpc_chttp2_stream id=%d",
transport_parsing->last_incoming_stream_id,
transport_parsing->incoming_stream_id);
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
} else if ((transport_parsing->incoming_stream_id & 1) == 0) {
gpr_log(GPR_ERROR,
"ignoring grpc_chttp2_stream with non-client generated index %d",
transport_parsing->incoming_stream_id);
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
stream_parsing = transport_parsing->incoming_stream =
grpc_chttp2_parsing_accept_stream(
- transport_parsing, transport_parsing->incoming_stream_id);
+ exec_ctx, transport_parsing, transport_parsing->incoming_stream_id);
if (stream_parsing == NULL) {
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
via_accept = 1;
} else {
@@ -691,11 +720,21 @@ static int init_header_frame_parser(
if (stream_parsing->received_close) {
gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
transport_parsing->incoming_stream = NULL;
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
transport_parsing->parser = grpc_chttp2_header_parser_parse;
transport_parsing->parser_data = &transport_parsing->hpack_parser;
- transport_parsing->hpack_parser.on_header = on_header;
+ switch (stream_parsing->header_frames_received) {
+ case 0:
+ transport_parsing->hpack_parser.on_header = on_initial_header;
+ break;
+ case 1:
+ transport_parsing->hpack_parser.on_header = on_trailing_header;
+ break;
+ case 2:
+ gpr_log(GPR_ERROR, "too many header frames received");
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
+ }
transport_parsing->hpack_parser.on_header_user_data = transport_parsing;
transport_parsing->hpack_parser.is_boundary = is_eoh;
transport_parsing->hpack_parser.is_eof =
@@ -708,7 +747,7 @@ static int init_header_frame_parser(
}
static int init_window_update_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
&transport_parsing->simple.window_update,
transport_parsing->incoming_frame_size,
@@ -722,7 +761,8 @@ static int init_window_update_frame_parser(
return ok;
}
-static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+static int init_ping_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_ping_parser_begin_frame(
&transport_parsing->simple.ping,
transport_parsing->incoming_frame_size,
@@ -733,7 +773,7 @@ static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
}
static int init_rst_stream_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
&transport_parsing->simple.rst_stream,
transport_parsing->incoming_frame_size,
@@ -741,7 +781,7 @@ static int init_rst_stream_parser(
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(
transport_parsing, transport_parsing->incoming_stream_id);
if (!transport_parsing->incoming_stream) {
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
}
transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.rst_stream;
@@ -749,7 +789,7 @@ static int init_rst_stream_parser(
}
static int init_goaway_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_goaway_parser_begin_frame(
&transport_parsing->goaway_parser,
transport_parsing->incoming_frame_size,
@@ -760,7 +800,7 @@ static int init_goaway_parser(
}
static int init_settings_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok;
if (transport_parsing->incoming_stream_id != 0) {
@@ -806,7 +846,7 @@ static int parse_frame_slice(grpc_exec_ctx *exec_ctx,
}
return 1;
case GRPC_CHTTP2_STREAM_ERROR:
- grpc_chttp2_parsing_become_skip_parser(transport_parsing);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_parsing);
if (stream_parsing) {
stream_parsing->saw_rst_stream = 1;
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 781db7b0d6..a81ffcce79 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -142,12 +142,13 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
s->included[id] = 1;
}
-static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_chttp2_stream_list_id id) {
+static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
- return;
+ return 0;
}
stream_list_add_tail(t, s, id);
+ return 1;
}
/* wrappers for specializations */
@@ -192,12 +193,12 @@ void grpc_chttp2_list_remove_writable_stream(
GRPC_CHTTP2_LIST_WRITABLE);
}
-void grpc_chttp2_list_add_writing_stream(
+int grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
- stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
- STREAM_FROM_WRITING(stream_writing),
- GRPC_CHTTP2_LIST_WRITING);
+ return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
+ STREAM_FROM_WRITING(stream_writing),
+ GRPC_CHTTP2_LIST_WRITING);
}
int grpc_chttp2_list_have_writing_streams(
@@ -241,6 +242,40 @@ int grpc_chttp2_list_pop_written_stream(
return r;
}
+void grpc_chttp2_list_add_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
+ stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
+}
+
+void grpc_chttp2_list_remove_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ stream_list_maybe_remove(
+ TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
+}
+
+int grpc_chttp2_list_pop_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_parsing **stream_parsing) {
+ grpc_chttp2_stream *stream;
+ int r =
+ stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
+ if (r != 0) {
+ *stream_global = &stream->global;
+ *stream_parsing = &stream->parsing;
+ }
+ return r;
+}
+
void grpc_chttp2_list_add_parsing_seen_stream(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
@@ -284,91 +319,60 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
return r;
}
-void grpc_chttp2_list_add_closed_waiting_for_parsing(
+void grpc_chttp2_list_add_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
+ GRPC_CHTTP2_LIST_CHECK_READ_OPS);
}
-int grpc_chttp2_list_pop_closed_waiting_for_parsing(
+int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
+ GRPC_CHTTP2_LIST_CHECK_READ_OPS);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
-void grpc_chttp2_list_add_cancelled_waiting_for_writing(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
- stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing) {
+ stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
+ STREAM_FROM_WRITING(stream_writing),
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
-int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
-void grpc_chttp2_list_add_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
- stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
-}
-
-int grpc_chttp2_list_pop_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_global **stream_global,
- grpc_chttp2_stream_parsing **stream_parsing) {
- grpc_chttp2_stream *stream;
- int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
- if (r != 0) {
- *stream_global = &stream->global;
- *stream_parsing = &stream->parsing;
- }
- return r;
-}
-
-void grpc_chttp2_list_remove_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
- stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
-}
-
-void grpc_chttp2_list_add_read_write_state_changed(
+void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
+ GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
}
-int grpc_chttp2_list_pop_read_write_state_changed(
+int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
+ GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
if (r != 0) {
*stream_global = &stream->global;
}
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 69ad8854ba..353e476e40 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -40,15 +40,16 @@
#include "src/core/profiling/timers.h"
#include "src/core/transport/chttp2/http2_errors.h"
-static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
+static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
- grpc_chttp2_stream_global *first_reinserted_stream = NULL;
- gpr_uint32 window_delta;
+
+ GPR_TIMER_BEGIN("grpc_chttp2_unlocking_check_writes", 0);
/* simple writes are queued to qbuf, and flushed here */
gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
@@ -67,98 +68,103 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->sent_local_settings = 1;
}
+ GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
+ transport_global, outgoing_window);
+
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- if (stream_global == first_reinserted_stream) {
- /* prevent infinite loop */
- grpc_chttp2_list_add_first_writable_stream(transport_global,
- stream_global);
- break;
- }
+ gpr_uint8 sent_initial_metadata;
stream_writing->id = stream_global->id;
- stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
-
- if (stream_global->outgoing_sopb) {
- window_delta = grpc_chttp2_preencode(
- stream_global->outgoing_sopb->ops,
- &stream_global->outgoing_sopb->nops,
- (gpr_uint32)GPR_MIN(GPR_MIN(transport_global->outgoing_window,
- stream_global->outgoing_window),
- GPR_UINT32_MAX),
- &stream_writing->sopb);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "write", transport_global, outgoing_window, -(gpr_int64)window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- outgoing_window,
- -(gpr_int64)window_delta);
- transport_global->outgoing_window -= window_delta;
- stream_global->outgoing_window -= window_delta;
-
- if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE &&
- stream_global->outgoing_sopb->nops == 0) {
- if (!transport_global->is_client && !stream_global->read_closed) {
- stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
+ stream_writing->read_closed = stream_global->read_closed;
+
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_writing, stream_writing,
+ outgoing_window, stream_global,
+ outgoing_window);
+
+ sent_initial_metadata = stream_writing->sent_initial_metadata;
+ if (!sent_initial_metadata && stream_global->send_initial_metadata) {
+ stream_writing->send_initial_metadata =
+ stream_global->send_initial_metadata;
+ stream_global->send_initial_metadata = NULL;
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
+ sent_initial_metadata = 1;
+ }
+ if (sent_initial_metadata) {
+ if (stream_global->send_message != NULL) {
+ gpr_slice hdr = gpr_slice_malloc(5);
+ gpr_uint8 *p = GPR_SLICE_START_PTR(hdr);
+ gpr_uint32 len = stream_global->send_message->length;
+ GPR_ASSERT(stream_writing->send_message == NULL);
+ p[0] = (stream_global->send_message->flags &
+ GRPC_WRITE_INTERNAL_COMPRESS) != 0;
+ p[1] = (gpr_uint8)(len >> 24);
+ p[2] = (gpr_uint8)(len >> 16);
+ p[3] = (gpr_uint8)(len >> 8);
+ p[4] = (gpr_uint8)(len);
+ gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, hdr);
+ if (stream_global->send_message->length > 0) {
+ stream_writing->send_message = stream_global->send_message;
} else {
- stream_writing->send_closed = GRPC_SEND_CLOSED;
+ stream_writing->send_message = NULL;
}
+ stream_writing->stream_fetched = 0;
+ stream_global->send_message = NULL;
}
-
- if (stream_global->outgoing_window > 0 &&
- stream_global->outgoing_sopb->nops != 0) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
- if (first_reinserted_stream == NULL &&
- transport_global->outgoing_window == 0) {
- first_reinserted_stream = stream_global;
+ if ((stream_writing->send_message != NULL ||
+ stream_writing->flow_controlled_buffer.length > 0) &&
+ stream_writing->outgoing_window > 0) {
+ if (transport_writing->outgoing_window > 0) {
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
+ } else {
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
+ }
+ }
+ if (stream_global->send_trailing_metadata) {
+ stream_writing->send_trailing_metadata =
+ stream_global->send_trailing_metadata;
+ stream_global->send_trailing_metadata = NULL;
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
}
}
if (!stream_global->read_closed &&
- stream_global->unannounced_incoming_window > 0) {
- GPR_ASSERT(stream_writing->announce_window == 0);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_writing, stream_writing, announce_window,
- stream_global->unannounced_incoming_window);
- stream_writing->announce_window =
- stream_global->unannounced_incoming_window;
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_global, stream_global, incoming_window,
- stream_global->unannounced_incoming_window);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_global, stream_global, unannounced_incoming_window,
- -(gpr_int64)stream_global->unannounced_incoming_window);
- stream_global->incoming_window +=
- stream_global->unannounced_incoming_window;
- stream_global->unannounced_incoming_window = 0;
- grpc_chttp2_list_add_incoming_window_updated(transport_global,
- stream_global);
- stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
- }
- if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
- }
- if (stream_global->writing_now != 0) {
- grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
+ stream_global->unannounced_incoming_window_for_writing > 1024) {
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
+ announce_window, stream_global,
+ unannounced_incoming_window_for_writing);
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
- if (transport_global->incoming_window <
- transport_global->connection_window_target * 3 / 4) {
- window_delta = transport_global->connection_window_target -
- transport_global->incoming_window;
+ if (transport_global->announce_incoming_window > 0) {
+ gpr_uint32 announced = (gpr_uint32)GPR_MIN(
+ transport_global->announce_incoming_window, GPR_UINT32_MAX);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
+ announce_incoming_window, announced);
gpr_slice_buffer_add(&transport_writing->outbuf,
- grpc_chttp2_window_update_create(0, window_delta));
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global,
- incoming_window, window_delta);
- transport_global->incoming_window += window_delta;
+ grpc_chttp2_window_update_create(0, announced));
}
+ GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
+
return transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing);
}
@@ -169,50 +175,146 @@ void grpc_chttp2_perform_writes(
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
- finalize_outbuf(transport_writing);
+ finalize_outbuf(exec_ctx, transport_writing);
- GPR_ASSERT(transport_writing->outbuf.count > 0);
GPR_ASSERT(endpoint);
- grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
- &transport_writing->done_cb);
+ if (transport_writing->outbuf.count > 0) {
+ grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
+ &transport_writing->done_cb);
+ } else {
+ grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1);
+ }
}
-static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
+static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
GPR_TIMER_BEGIN("finalize_outbuf", 0);
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
- if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
- stream_writing->id,
- &transport_writing->hpack_compressor,
- &transport_writing->outbuf);
- stream_writing->sopb.nops = 0;
+ gpr_uint32 max_outgoing =
+ (gpr_uint32)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
+ GPR_MIN(stream_writing->outgoing_window,
+ transport_writing->outgoing_window));
+ /* send initial metadata if it's available */
+ if (stream_writing->send_initial_metadata != NULL) {
+ grpc_chttp2_encode_header(
+ &transport_writing->hpack_compressor, stream_writing->id,
+ stream_writing->send_initial_metadata, 0, &transport_writing->outbuf);
+ stream_writing->send_initial_metadata = NULL;
+ stream_writing->sent_initial_metadata = 1;
}
- if (stream_writing->announce_window > 0) {
+ /* send any window updates */
+ if (stream_writing->announce_window > 0 &&
+ stream_writing->send_initial_metadata == NULL) {
+ gpr_uint32 announce = stream_writing->announce_window;
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(stream_writing->id,
stream_writing->announce_window));
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_writing, stream_writing, announce_window,
- -(gpr_int64)stream_writing->announce_window);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing,
+ announce_window, announce);
stream_writing->announce_window = 0;
}
- if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
- gpr_slice_buffer_add(&transport_writing->outbuf,
- grpc_chttp2_rst_stream_create(stream_writing->id,
- GRPC_CHTTP2_NO_ERROR));
+ /* fetch any body bytes */
+ while (!stream_writing->fetching && stream_writing->send_message &&
+ stream_writing->flow_controlled_buffer.length < max_outgoing &&
+ stream_writing->stream_fetched <
+ stream_writing->send_message->length) {
+ if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
+ &stream_writing->fetching_slice, max_outgoing,
+ &stream_writing->finished_fetch)) {
+ stream_writing->stream_fetched +=
+ GPR_SLICE_LENGTH(stream_writing->fetching_slice);
+ if (stream_writing->stream_fetched ==
+ stream_writing->send_message->length) {
+ stream_writing->send_message = NULL;
+ }
+ gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
+ stream_writing->fetching_slice);
+ } else {
+ stream_writing->fetching = 1;
+ }
+ }
+ /* send any body bytes */
+ if (stream_writing->flow_controlled_buffer.length > 0) {
+ if (max_outgoing > 0) {
+ gpr_uint32 send_bytes = (gpr_uint32)GPR_MIN(
+ max_outgoing, stream_writing->flow_controlled_buffer.length);
+ int is_last_data_frame =
+ stream_writing->send_message == NULL &&
+ send_bytes == stream_writing->flow_controlled_buffer.length;
+ int is_last_frame = is_last_data_frame &&
+ stream_writing->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(
+ stream_writing->send_trailing_metadata);
+ grpc_chttp2_encode_data(
+ stream_writing->id, &stream_writing->flow_controlled_buffer,
+ send_bytes, is_last_frame, &transport_writing->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
+ stream_writing, outgoing_window,
+ send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
+ outgoing_window, send_bytes);
+ if (is_last_frame) {
+ stream_writing->send_trailing_metadata = NULL;
+ stream_writing->sent_trailing_metadata = 1;
+ }
+ if (is_last_data_frame) {
+ GPR_ASSERT(stream_writing->send_message == NULL);
+ stream_writing->sent_message = 1;
+ }
+ } else if (transport_writing->outgoing_window == 0) {
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
+ }
+ }
+ /* send trailing metadata if it's available and we're ready for it */
+ if (stream_writing->send_message == NULL &&
+ stream_writing->flow_controlled_buffer.length == 0 &&
+ stream_writing->send_trailing_metadata != NULL) {
+ if (grpc_metadata_batch_is_empty(
+ stream_writing->send_trailing_metadata)) {
+ grpc_chttp2_encode_data(stream_writing->id,
+ &stream_writing->flow_controlled_buffer, 0, 1,
+ &transport_writing->outbuf);
+ } else {
+ grpc_chttp2_encode_header(&transport_writing->hpack_compressor,
+ stream_writing->id,
+ stream_writing->send_trailing_metadata, 1,
+ &transport_writing->outbuf);
+ }
+ if (!transport_writing->is_client && !stream_writing->read_closed) {
+ gpr_slice_buffer_add(&transport_writing->outbuf,
+ grpc_chttp2_rst_stream_create(
+ stream_writing->id, GRPC_CHTTP2_NO_ERROR));
+ }
+ stream_writing->send_trailing_metadata = NULL;
+ stream_writing->sent_trailing_metadata = 1;
+ }
+ /* if there's more to write, then loop, otherwise prepare to finish the
+ * write */
+ if ((stream_writing->flow_controlled_buffer.length > 0 ||
+ (stream_writing->send_message && !stream_writing->fetching)) &&
+ stream_writing->outgoing_window > 0) {
+ if (transport_writing->outgoing_window > 0) {
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ /* do nothing - already reffed */
+ }
+ } else {
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
+ }
+ } else {
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
- grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
-
- GPR_TIMER_END("finalize_outbuf", 0);
}
void grpc_chttp2_cleanup_writing(
@@ -223,24 +325,26 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- GPR_ASSERT(stream_global->writing_now != 0);
- if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
- if (!transport_global->is_client) {
- stream_global->read_closed = 1;
- }
+ if (stream_writing->sent_trailing_metadata) {
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
+ !transport_global->is_client, 1);
}
- if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) {
- if (stream_global->outgoing_sopb != NULL &&
- stream_global->outgoing_sopb->nops == 0) {
- GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
- stream_global->outgoing_sopb = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1);
- }
+ if (stream_writing->sent_initial_metadata) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_initial_metadata_finished, 1);
+ }
+ if (stream_writing->sent_message) {
+ GPR_ASSERT(stream_writing->send_message == NULL);
+ GPR_ASSERT(stream_global->send_message_finished);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_message_finished, 1);
+ stream_writing->sent_message = 0;
+ }
+ if (stream_writing->sent_trailing_metadata) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_trailing_metadata_finished, 1);
}
- stream_global->writing_now = 0;
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index effc3c4b3b..10f52f4923 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -52,6 +52,7 @@
#include "src/core/transport/transport_impl.h"
#define DEFAULT_WINDOW 65535
+#define GRPC_CHTTP2_STREAM_LOOKAHEAD DEFAULT_WINDOW
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
@@ -75,14 +76,14 @@ int grpc_flowctl_trace = 0;
#define STREAM_FROM_GLOBAL(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
+#define STREAM_FROM_PARSING(sg) \
+ ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, parsing)))
+
static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
-static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t);
-
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
int iomgr_success_ignored);
@@ -103,11 +104,13 @@ static void perform_stream_op_locked(
grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
/** Cancel a stream: coming from the transport API */
-static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
+static void cancel_from_api(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
-static void close_from_api(grpc_chttp2_transport_global *transport_global,
+static void close_from_api(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status,
gpr_slice *optional_message);
@@ -128,6 +131,9 @@ static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, const char *reason);
+static void check_read_ops(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global);
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -151,6 +157,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
GRPC_MDSTR_UNREF(t->parsing.str_grpc_timeout);
+ GRPC_MDELEM_UNREF(t->parsing.elem_grpc_status_ok);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
GPR_ASSERT(t->lists[i].head == NULL);
@@ -236,14 +243,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
t->global.is_client = is_client;
- t->global.outgoing_window = DEFAULT_WINDOW;
- t->global.incoming_window = DEFAULT_WINDOW;
+ t->writing.outgoing_window = DEFAULT_WINDOW;
+ t->parsing.incoming_window = DEFAULT_WINDOW;
t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->global.ping_counter = 1;
t->global.pings.next = t->global.pings.prev = &t->global.pings;
t->parsing.is_client = is_client;
t->parsing.str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
+ t->parsing.elem_grpc_status_ok =
+ grpc_mdelem_from_strings(t->metadata_context, "grpc-status", "0");
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
@@ -392,19 +401,46 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
}
}
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global,
+ const char *reason) {
+ grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount, reason);
+}
+void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_stream_global *stream_global,
+ const char *reason) {
+ grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount,
+ reason);
+}
+#else
+void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global) {
+ grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount);
+}
+void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_stream_global *stream_global) {
+ grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount);
+}
+#endif
+
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, const void *server_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_stream *gs, grpc_stream_refcount *refcount,
+ const void *server_data) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
memset(s, 0, sizeof(*s));
- grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.incoming_metadata);
- grpc_chttp2_incoming_metadata_buffer_init(&s->global.incoming_metadata);
- grpc_sopb_init(&s->writing.sopb);
- grpc_sopb_init(&s->global.incoming_sopb);
+ s->refcount = refcount;
+ GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2");
+
+ grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[1]);
+ grpc_chttp2_incoming_metadata_buffer_init(
+ &s->global.received_initial_metadata);
+ grpc_chttp2_incoming_metadata_buffer_init(
+ &s->global.received_trailing_metadata);
grpc_chttp2_data_parser_init(&s->parsing.data_parser);
+ gpr_slice_buffer_init(&s->writing.flow_controlled_buffer);
REF_TRANSPORT(t, "stream");
@@ -413,20 +449,17 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if (server_data) {
GPR_ASSERT(t->parsing_active);
s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
+ s->parsing.id = s->global.id;
s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->global.max_recv_bytes = s->parsing.incoming_window =
- s->global.incoming_window =
- t->global.settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ s->parsing.incoming_window = s->global.max_recv_bytes =
+ t->global.settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
s->global.in_stream_map = 1;
}
-
- if (initial_op)
- perform_stream_op_locked(exec_ctx, &t->global, &s->global, initial_op);
unlock(exec_ctx, t);
return 0;
@@ -437,10 +470,13 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
int i;
+ grpc_byte_stream *bs;
+
+ GPR_TIMER_BEGIN("destroy_stream", 0);
gpr_mu_lock(&t->mu);
- GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
+ GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
@@ -451,8 +487,9 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.id) == NULL);
}
- grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global);
grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
+ grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
+ &s->global);
gpr_mu_unlock(&t->mu);
@@ -464,17 +501,29 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
}
- GPR_ASSERT(s->global.outgoing_sopb == NULL);
- GPR_ASSERT(s->global.publish_sopb == NULL);
- grpc_sopb_destroy(&s->writing.sopb);
- grpc_sopb_destroy(&s->global.incoming_sopb);
+ while (
+ (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
+ grpc_byte_stream_destroy(bs);
+ }
+
+ GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
+ GPR_ASSERT(s->global.send_message_finished == NULL);
+ GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
+ GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
+ GPR_ASSERT(s->global.recv_message_ready == NULL);
+ GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.incoming_metadata);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.incoming_metadata);
- grpc_chttp2_incoming_metadata_live_op_buffer_end(
- &s->global.outstanding_metadata);
+ grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
+ grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
+ grpc_chttp2_incoming_metadata_buffer_destroy(
+ &s->global.received_initial_metadata);
+ grpc_chttp2_incoming_metadata_buffer_destroy(
+ &s->global.received_trailing_metadata);
+ gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
UNREF_TRANSPORT(exec_ctx, t, "stream");
+
+ GPR_TIMER_END("destroy_stream", 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -486,12 +535,14 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
- grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ gpr_uint32 id) {
grpc_chttp2_stream *accepting;
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
GPR_ASSERT(t->accepting_stream == NULL);
t->accepting_stream = &accepting;
- t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
+ t->channel_callback.accept_stream(exec_ctx,
+ t->channel_callback.accept_stream_user_data,
&t->base, (void *)(gpr_uintptr)id);
t->accepting_stream = NULL;
return &accepting->parsing;
@@ -511,7 +562,6 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("unlock", 0);
- unlock_check_read_write_state(exec_ctx, t);
if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
@@ -519,6 +569,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1);
prevent_endpoint_shutdown(t);
}
+ check_read_ops(exec_ctx, &t->global);
gpr_mu_unlock(&t->mu);
GPR_TIMER_END("unlock", 0);
@@ -558,7 +609,6 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
drop_connection(exec_ctx, t);
}
- /* cleanup writing related jazz */
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
/* leave the writing flag up on shutdown to prevent further writes in unlock()
@@ -598,6 +648,7 @@ void grpc_chttp2_add_incoming_goaway(
static void maybe_start_some_streams(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
grpc_chttp2_stream_global *stream_global;
+ gpr_uint32 stream_incoming_window;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
@@ -607,13 +658,16 @@ static void maybe_start_some_streams(
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
+ /* safe since we can't (legally) be parsing this stream yet */
+ grpc_chttp2_stream_parsing *stream_parsing =
+ &STREAM_FROM_GLOBAL(stream_global)->parsing;
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
transport_global->is_client ? "CLI" : "SVR", stream_global,
transport_global->next_stream_id));
GPR_ASSERT(stream_global->id == 0);
- stream_global->id = transport_global->next_stream_id;
+ stream_global->id = stream_parsing->id = transport_global->next_stream_id;
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
@@ -625,103 +679,166 @@ static void maybe_start_some_streams(
stream_global->outgoing_window =
transport_global->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- stream_global->incoming_window =
+ stream_parsing->incoming_window = stream_incoming_window =
transport_global->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
stream_global->max_recv_bytes =
- GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
+ GPR_MAX(stream_incoming_window, stream_global->max_recv_bytes);
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = 1;
transport_global->concurrent_stream_count++;
- grpc_chttp2_list_add_incoming_window_updated(transport_global,
- stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
- cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
+ cancel_from_api(exec_ctx, transport_global, stream_global,
+ GRPC_STATUS_UNAVAILABLE);
+ }
+}
+
+static grpc_closure *add_closure_barrier(grpc_closure *closure) {
+ closure->final_data += 2;
+ return closure;
+}
+
+void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
+ grpc_closure **pclosure, int success) {
+ grpc_closure *closure = *pclosure;
+ if (closure == NULL) {
+ return;
+ }
+ closure->final_data -= 2;
+ if (!success) {
+ closure->final_data |= 1;
+ }
+ if (closure->final_data < 2) {
+ grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0);
}
+ *pclosure = NULL;
+}
+
+static int contains_non_ok_status(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_metadata_batch *batch) {
+ grpc_mdelem *ok_elem =
+ TRANSPORT_FROM_GLOBAL(transport_global)->parsing.elem_grpc_status_ok;
+ grpc_linked_mdelem *l;
+ for (l = batch->list.head; l; l = l->next) {
+ if (l->md->key == ok_elem->key && l->md != ok_elem) {
+ return 1;
+ }
+ }
+ return 0;
}
static void perform_stream_op_locked(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
+ grpc_closure *on_complete;
+
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
+
+ on_complete = op->on_complete;
+ /* use final_data as a barrier until enqueue time; the inital counter is
+ dropped at the end of this function */
+ on_complete->final_data = 2;
+
if (op->cancel_with_status != GRPC_STATUS_OK) {
- cancel_from_api(transport_global, stream_global, op->cancel_with_status);
+ cancel_from_api(exec_ctx, transport_global, stream_global,
+ op->cancel_with_status);
}
if (op->close_with_status != GRPC_STATUS_OK) {
- close_from_api(transport_global, stream_global, op->close_with_status,
- op->optional_close_message);
- }
-
- if (op->send_ops) {
- GPR_ASSERT(stream_global->outgoing_sopb == NULL);
- stream_global->send_done_closure = op->on_done_send;
- if (!stream_global->cancelled) {
- stream_global->written_anything = 1;
- stream_global->outgoing_sopb = op->send_ops;
- if (op->is_last_send &&
- stream_global->write_state == GRPC_WRITE_STATE_OPEN) {
- stream_global->write_state = GRPC_WRITE_STATE_QUEUED_CLOSE;
- }
- if (stream_global->id == 0) {
- GRPC_CHTTP2_IF_TRACING(gpr_log(
- GPR_DEBUG,
- "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
- transport_global->is_client ? "CLI" : "SVR", stream_global));
+ close_from_api(exec_ctx, transport_global, stream_global,
+ op->close_with_status, op->optional_close_message);
+ }
+
+ if (op->send_initial_metadata != NULL) {
+ GPR_ASSERT(stream_global->send_initial_metadata_finished == NULL);
+ stream_global->send_initial_metadata_finished =
+ add_closure_barrier(on_complete);
+ stream_global->send_initial_metadata = op->send_initial_metadata;
+ if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
+ stream_global->seen_error = 1;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (!stream_global->write_closed) {
+ if (transport_global->is_client) {
+ GPR_ASSERT(stream_global->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
stream_global);
maybe_start_some_streams(exec_ctx, transport_global);
- } else if (stream_global->outgoing_window > 0) {
+ } else {
+ GPR_ASSERT(stream_global->id != 0);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
} else {
- grpc_sopb_reset(op->send_ops);
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 0);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_initial_metadata_finished, 0);
}
}
- if (op->recv_ops) {
- GPR_ASSERT(stream_global->publish_sopb == NULL);
- GPR_ASSERT(stream_global->published_state != GRPC_STREAM_CLOSED);
- stream_global->recv_done_closure = op->on_done_recv;
- stream_global->publish_sopb = op->recv_ops;
- stream_global->publish_sopb->nops = 0;
- stream_global->publish_state = op->recv_state;
- /* clamp max recv bytes */
- op->max_recv_bytes = GPR_MIN(op->max_recv_bytes, GPR_UINT32_MAX);
- if (stream_global->max_recv_bytes < op->max_recv_bytes) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "op", transport_global, stream_global, max_recv_bytes,
- op->max_recv_bytes - stream_global->max_recv_bytes);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "op", transport_global, stream_global, unannounced_incoming_window,
- op->max_recv_bytes - stream_global->max_recv_bytes);
- stream_global->unannounced_incoming_window +=
- (gpr_uint32)op->max_recv_bytes - stream_global->max_recv_bytes;
- stream_global->max_recv_bytes = (gpr_uint32)op->max_recv_bytes;
+ if (op->send_message != NULL) {
+ GPR_ASSERT(stream_global->send_message_finished == NULL);
+ GPR_ASSERT(stream_global->send_message == NULL);
+ stream_global->send_message_finished = add_closure_barrier(on_complete);
+ if (stream_global->write_closed) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_message_finished, 0);
+ } else if (stream_global->id != 0) {
+ stream_global->send_message = op->send_message;
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
- grpc_chttp2_incoming_metadata_live_op_buffer_end(
- &stream_global->outstanding_metadata);
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
- if (stream_global->id != 0) {
+ }
+
+ if (op->send_trailing_metadata != NULL) {
+ GPR_ASSERT(stream_global->send_trailing_metadata_finished == NULL);
+ stream_global->send_trailing_metadata_finished =
+ add_closure_barrier(on_complete);
+ stream_global->send_trailing_metadata = op->send_trailing_metadata;
+ if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) {
+ stream_global->seen_error = 1;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (stream_global->write_closed) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_trailing_metadata_finished, 0);
+ } else if (stream_global->id != 0) {
+ /* TODO(ctiller): check if there's flow control for any outstanding
+ bytes before going writable */
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
- if (op->bind_pollset) {
- add_to_pollset_locked(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
- op->bind_pollset);
+ if (op->recv_initial_metadata != NULL) {
+ GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL);
+ stream_global->recv_initial_metadata_finished =
+ add_closure_barrier(on_complete);
+ stream_global->recv_initial_metadata = op->recv_initial_metadata;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+ if (op->recv_message != NULL) {
+ GPR_ASSERT(stream_global->recv_message_ready == NULL);
+ stream_global->recv_message_ready = op->recv_message_ready;
+ stream_global->recv_message = op->recv_message;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+
+ if (op->recv_trailing_metadata != NULL) {
+ GPR_ASSERT(stream_global->recv_trailing_metadata_finished == NULL);
+ stream_global->recv_trailing_metadata_finished =
+ add_closure_barrier(on_complete);
+ stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+
+ grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1);
+
GPR_TIMER_END("perform_stream_op_locked", 0);
}
@@ -811,12 +928,49 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* INPUT PROCESSING
*/
-static grpc_stream_state compute_state(gpr_uint8 write_closed,
- gpr_uint8 read_closed) {
- if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
- if (write_closed) return GRPC_STREAM_SEND_CLOSED;
- if (read_closed) return GRPC_STREAM_RECV_CLOSED;
- return GRPC_STREAM_OPEN;
+static void check_read_ops(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global) {
+ grpc_chttp2_stream_global *stream_global;
+ grpc_byte_stream *bs;
+ while (
+ grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
+ if (stream_global->recv_initial_metadata_finished != NULL &&
+ stream_global->published_initial_metadata) {
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &stream_global->received_initial_metadata,
+ stream_global->recv_initial_metadata);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->recv_initial_metadata_finished, 1);
+ }
+ if (stream_global->recv_message_ready != NULL) {
+ if (stream_global->incoming_frames.head != NULL) {
+ *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
+ &stream_global->incoming_frames);
+ GPR_ASSERT(*stream_global->recv_message != NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
+ stream_global->recv_message_ready = NULL;
+ } else if (stream_global->published_trailing_metadata) {
+ *stream_global->recv_message = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
+ stream_global->recv_message_ready = NULL;
+ }
+ }
+ if (stream_global->recv_trailing_metadata_finished != NULL &&
+ stream_global->read_closed && stream_global->write_closed) {
+ while (stream_global->seen_error &&
+ (bs = grpc_chttp2_incoming_frame_queue_pop(
+ &stream_global->incoming_frames)) != NULL) {
+ grpc_byte_stream_destroy(bs);
+ }
+ if (stream_global->incoming_frames.head == NULL) {
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &stream_global->received_trailing_metadata,
+ stream_global->recv_trailing_metadata);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->recv_trailing_metadata_finished, 1);
+ }
+ }
+ }
}
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -832,7 +986,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->global.in_stream_map = 0;
if (t->parsing.incoming_stream == &s->parsing) {
t->parsing.incoming_stream = NULL;
- grpc_chttp2_parsing_become_skip_parser(&t->parsing);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing);
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(exec_ctx, t);
@@ -847,109 +1001,10 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
-static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_chttp2_transport_global *transport_global = &t->global;
- grpc_chttp2_stream_global *stream_global;
- grpc_stream_state state;
-
- if (!t->parsing_active) {
- /* if a stream is in the stream map, and gets cancelled, we need to ensure
- we are not parsing before continuing the cancellation to keep things in
- a sane state */
- while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
- &stream_global)) {
- GPR_ASSERT(stream_global->in_stream_map);
- GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN);
- GPR_ASSERT(stream_global->read_closed);
- remove_stream(exec_ctx, t, stream_global->id);
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
- }
- }
-
- if (!t->writing_active) {
- while (grpc_chttp2_list_pop_cancelled_waiting_for_writing(transport_global,
- &stream_global)) {
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
- }
- }
-
- while (grpc_chttp2_list_pop_read_write_state_changed(transport_global,
- &stream_global)) {
- if (stream_global->cancelled) {
- if (t->writing_active &&
- stream_global->write_state != GRPC_WRITE_STATE_SENT_CLOSE) {
- grpc_chttp2_list_add_cancelled_waiting_for_writing(transport_global,
- stream_global);
- } else {
- stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
- if (stream_global->outgoing_sopb != NULL) {
- grpc_sopb_reset(stream_global->outgoing_sopb);
- stream_global->outgoing_sopb = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1);
- }
- stream_global->read_closed = 1;
- if (!stream_global->published_cancelled) {
- char buffer[GPR_LTOA_MIN_BUFSIZE];
- gpr_ltoa(stream_global->cancelled_status, buffer);
- grpc_chttp2_incoming_metadata_buffer_add(
- &stream_global->incoming_metadata,
- grpc_mdelem_from_strings(t->metadata_context, "grpc-status",
- buffer));
- grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- &stream_global->incoming_metadata, &stream_global->incoming_sopb);
- stream_global->published_cancelled = 1;
- }
- }
- }
- if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
- stream_global->read_closed && stream_global->in_stream_map) {
- if (t->parsing_active) {
- grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
- stream_global);
- } else {
- remove_stream(exec_ctx, t, stream_global->id);
- }
- }
- if (!stream_global->publish_sopb) {
- continue;
- }
- if (stream_global->writing_now != 0) {
- continue;
- }
- /* FIXME(ctiller): we include in_stream_map in our computation of
- whether the stream is write-closed. This is completely bogus,
- but has the effect of delaying stream-closed until the stream
- is indeed evicted from the stream map, making it safe to delete.
- To fix this will require having an edge after stream-closed
- indicating that the stream is closed AND safe to delete. */
- state = compute_state(
- stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
- !stream_global->in_stream_map,
- stream_global->read_closed);
- if (stream_global->incoming_sopb.nops == 0 &&
- state == stream_global->published_state) {
- continue;
- }
- grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
- &stream_global->incoming_metadata, &stream_global->incoming_sopb,
- &stream_global->outstanding_metadata);
- grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
- stream_global->published_state = *stream_global->publish_state = state;
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_done_closure, 1);
- stream_global->recv_done_closure = NULL;
- stream_global->publish_sopb = NULL;
- stream_global->publish_state = NULL;
- }
-}
-
-static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
+static void cancel_from_api(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status) {
- stream_global->cancelled = 1;
- stream_global->cancelled_status = status;
if (stream_global->id != 0) {
gpr_slice_buffer_add(
&transport_global->qbuf,
@@ -957,11 +1012,89 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
stream_global->id,
(gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status)));
}
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
+ NULL);
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
+ 1);
+}
+
+void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_status_code status, gpr_slice *slice) {
+ if (status != GRPC_STATUS_OK) {
+ stream_global->seen_error = 1;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (stream_global->published_trailing_metadata &&
+ stream_global->recv_trailing_metadata_finished != NULL) {
+ /* last chance replacement: we've received trailing metadata,
+ but something more important has become available to signal
+ to the upper layers - drop what we've got, and then publish
+ what we want - which is safe because we haven't told anyone
+ about the metadata yet */
+ grpc_chttp2_incoming_metadata_buffer_reset(
+ &stream_global->received_trailing_metadata);
+ stream_global->published_trailing_metadata = 0;
+ }
+ if (!stream_global->published_trailing_metadata) {
+ grpc_mdctx *mdctx =
+ TRANSPORT_FROM_GLOBAL(transport_global)->metadata_context;
+ char status_string[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(status, status_string);
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &stream_global->received_trailing_metadata,
+ grpc_mdelem_from_strings(mdctx, "grpc-status", status_string));
+ if (slice) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &stream_global->received_trailing_metadata,
+ grpc_mdelem_from_metadata_strings(
+ mdctx, grpc_mdstr_from_string(mdctx, "grpc-message"),
+ grpc_mdstr_from_slice(mdctx, gpr_slice_ref(*slice))));
+ }
+ stream_global->published_trailing_metadata = 1;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (slice) {
+ gpr_slice_unref(*slice);
+ }
+}
+
+void grpc_chttp2_mark_stream_closed(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, int close_reads,
+ int close_writes) {
+ if (stream_global->read_closed && stream_global->write_closed) {
+ /* already closed */
+ return;
+ }
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ if (close_reads && !stream_global->read_closed) {
+ stream_global->read_closed = 1;
+ stream_global->published_initial_metadata = 1;
+ stream_global->published_trailing_metadata = 1;
+ }
+ if (close_writes && !stream_global->write_closed) {
+ stream_global->write_closed = 1;
+ }
+ if (stream_global->read_closed && stream_global->write_closed) {
+ if (stream_global->id != 0 &&
+ TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) {
+ grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
+ stream_global);
+ } else {
+ if (stream_global->id != 0) {
+ remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
+ stream_global->id);
+ }
+ stream_global->finished_close = 1;
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
+ }
+ }
}
-static void close_from_api(grpc_chttp2_transport_global *transport_global,
+static void close_from_api(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status,
gpr_slice *optional_message) {
@@ -973,10 +1106,7 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global,
GPR_ASSERT(status >= 0 && (int)status < 100);
- stream_global->cancelled = 1;
- stream_global->cancelled_status = status;
GPR_ASSERT(stream_global->id != 0);
- GPR_ASSERT(!stream_global->written_anything);
/* Hand roll a header block.
This is unnecessarily ugly - at some point we should find a more elegant
@@ -1059,23 +1189,30 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global,
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ if (optional_message) {
+ gpr_slice_ref(*optional_message);
+ }
+ grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
+ optional_message);
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
+ 1);
}
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
- cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
+ cancel_from_api(user_data, transport_global, stream_global,
+ GRPC_STATUS_UNAVAILABLE);
}
-static void end_all_the_calls(grpc_chttp2_transport *t) {
- grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb);
+static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
}
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
close_transport_locked(exec_ctx, t);
- end_all_the_calls(t);
+ end_all_the_calls(exec_ctx, t);
}
/** update window from a settings change */
@@ -1086,12 +1223,11 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) {
grpc_chttp2_stream_global *stream_global = &s->global;
int was_zero;
int is_zero;
+ gpr_int64 initial_window_update = t->parsing.initial_window_update;
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global,
- outgoing_window,
- t->parsing.initial_window_update);
was_zero = stream_global->outgoing_window <= 0;
- stream_global->outgoing_window += t->parsing.initial_window_update;
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", transport_global, stream_global,
+ outgoing_window, initial_window_update);
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
@@ -1112,6 +1248,9 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
size_t i;
int keep_reading = 0;
grpc_chttp2_transport *t = tp;
+ grpc_chttp2_transport_global *transport_global = &t->global;
+ grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
+ grpc_chttp2_stream_global *stream_global;
GPR_TIMER_BEGIN("recv_data", 0);
@@ -1123,11 +1262,11 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
- grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+ grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
gpr_mu_unlock(&t->mu);
GPR_TIMER_BEGIN("recv_data.parse", 0);
for (; i < t->read_buffer.count &&
- grpc_chttp2_perform_read(exec_ctx, &t->parsing,
+ grpc_chttp2_perform_read(exec_ctx, transport_parsing,
t->read_buffer.slices[i]);
i++)
;
@@ -1139,16 +1278,28 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
- t->global.concurrent_stream_count =
+ transport_global->concurrent_stream_count =
(gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
- if (t->parsing.initial_window_update != 0) {
+ if (transport_parsing->initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
- t->parsing.initial_window_update = 0;
+ transport_parsing->initial_window_update = 0;
}
/* handle higher level things */
- grpc_chttp2_publish_reads(exec_ctx, &t->global, &t->parsing);
+ grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
t->parsing_active = 0;
+ /* if a stream is in the stream map, and gets cancelled, we need to ensure
+ * we are not parsing before continuing the cancellation to keep things in
+ * a sane state */
+ while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
+ &stream_global)) {
+ GPR_ASSERT(stream_global->in_stream_map);
+ GPR_ASSERT(stream_global->write_closed);
+ GPR_ASSERT(stream_global->read_closed);
+ remove_stream(exec_ctx, t, stream_global->id);
+ stream_global->finished_close = 1;
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
+ }
}
if (!success || i != t->read_buffer.count || t->closed) {
drop_connection(exec_ctx, t);
@@ -1206,35 +1357,216 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
}
}
+static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ lock(t);
+ add_to_pollset_locked(exec_ctx, t, pollset);
+ unlock(exec_ctx, t);
+}
+
/*
- * TRACING
+ * BYTE STREAM
*/
-void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
- const char *context, const char *var,
- int is_client, gpr_uint32 stream_id,
- gpr_int64 current_value, gpr_int64 delta) {
- char *identifier;
- char *context_scope;
- char *context_thread;
- char *underscore_pos = strchr(context, '_');
- GPR_ASSERT(underscore_pos);
- context_thread = gpr_strdup(underscore_pos + 1);
- context_scope = gpr_strdup(context);
- context_scope[underscore_pos - context] = 0;
- if (stream_id) {
- gpr_asprintf(&identifier, "%s[%d]", context_scope, stream_id);
+static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ gpr_slice *slice, size_t max_size_hint,
+ grpc_closure *on_complete) {
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ grpc_chttp2_transport_global *transport_global = &bs->transport->global;
+ grpc_chttp2_stream_global *stream_global = &bs->stream->global;
+ gpr_uint32 max_recv_bytes;
+
+ lock(bs->transport);
+ if (bs->is_tail) {
+ /* clamp max recv hint to an allowable size */
+ if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
+ max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
+ } else {
+ max_recv_bytes = (gpr_uint32)max_size_hint;
+ }
+
+ /* account for bytes already received but unknown to higher layers */
+ if (max_recv_bytes >= bs->slices.length) {
+ max_recv_bytes -= (gpr_uint32)bs->slices.length;
+ } else {
+ max_recv_bytes = 0;
+ }
+ /* add some small lookahead to keep pipelines flowing */
+ GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
+ max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
+ if (stream_global->max_recv_bytes < max_recv_bytes) {
+ gpr_uint32 add_max_recv_bytes =
+ max_recv_bytes - stream_global->max_recv_bytes;
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ max_recv_bytes, add_max_recv_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ unannounced_incoming_window_for_parse,
+ add_max_recv_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ unannounced_incoming_window_for_writing,
+ add_max_recv_bytes);
+ grpc_chttp2_list_add_unannounced_incoming_window_available(
+ transport_global, stream_global);
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ }
+ }
+ if (bs->slices.count > 0) {
+ *slice = gpr_slice_buffer_take_first(&bs->slices);
+ unlock(exec_ctx, bs->transport);
+ return 1;
+ } else {
+ bs->on_next = on_complete;
+ bs->next = slice;
+ unlock(exec_ctx, bs->transport);
+ return 0;
+ }
+}
+
+static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) {
+ if (gpr_unref(&bs->refs)) {
+ gpr_slice_buffer_destroy(&bs->slices);
+ gpr_free(bs);
+ }
+}
+
+static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) {
+ incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream);
+}
+
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice) {
+ gpr_mu_lock(&bs->transport->mu);
+ if (bs->on_next != NULL) {
+ *bs->next = slice;
+ grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1);
+ bs->on_next = NULL;
+ } else {
+ gpr_slice_buffer_add(&bs->slices, slice);
+ }
+ gpr_mu_unlock(&bs->transport->mu);
+}
+
+void grpc_chttp2_incoming_byte_stream_finished(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) {
+ incoming_byte_stream_unref(bs);
+}
+
+grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
+ gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
+ grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
+ gpr_malloc(sizeof(*incoming_byte_stream));
+ incoming_byte_stream->base.length = frame_size;
+ incoming_byte_stream->base.flags = flags;
+ incoming_byte_stream->base.next = incoming_byte_stream_next;
+ incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+ gpr_ref_init(&incoming_byte_stream->refs, 2);
+ incoming_byte_stream->next_message = NULL;
+ incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
+ incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing);
+ gpr_slice_buffer_init(&incoming_byte_stream->slices);
+ incoming_byte_stream->on_next = NULL;
+ incoming_byte_stream->is_tail = 1;
+ if (add_to_queue->head == NULL) {
+ add_to_queue->head = incoming_byte_stream;
} else {
- identifier = gpr_strdup(context_scope);
- }
- gpr_log(GPR_INFO,
- "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]",
- is_client ? "client" : "server", identifier, context_thread, var,
- current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta,
- current_value + delta, reason, file, line);
- gpr_free(identifier);
- gpr_free(context_thread);
- gpr_free(context_scope);
+ add_to_queue->tail->is_tail = 0;
+ add_to_queue->tail->next_message = incoming_byte_stream;
+ }
+ add_to_queue->tail = incoming_byte_stream;
+ return incoming_byte_stream;
+}
+
+/*
+ * TRACING
+ */
+
+static char *format_flowctl_context_var(const char *context, const char *var,
+ gpr_int64 val, gpr_uint32 id,
+ char **scope) {
+ char *underscore_pos;
+ char *result;
+ if (context == NULL) {
+ *scope = NULL;
+ gpr_asprintf(&result, "%s(%lld)", var, val);
+ return result;
+ }
+ underscore_pos = strchr(context, '_');
+ *scope = gpr_strdup(context);
+ (*scope)[underscore_pos - context] = 0;
+ if (id != 0) {
+ char *tmp = *scope;
+ gpr_asprintf(scope, "%s[%d]", tmp, id);
+ gpr_free(tmp);
+ }
+ gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val);
+ return result;
+}
+
+static int samestr(char *a, char *b) {
+ if (a == NULL) {
+ return b == NULL;
+ }
+ if (b == NULL) {
+ return 0;
+ }
+ return 0 == strcmp(a, b);
+}
+
+void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
+ grpc_chttp2_flowctl_op op, const char *context1,
+ const char *var1, const char *context2,
+ const char *var2, int is_client,
+ gpr_uint32 stream_id, gpr_int64 val1,
+ gpr_int64 val2) {
+ char *scope1;
+ char *scope2;
+ char *label1 =
+ format_flowctl_context_var(context1, var1, val1, stream_id, &scope1);
+ char *label2 =
+ format_flowctl_context_var(context2, var2, val2, stream_id, &scope2);
+ char *clisvr = is_client ? "client" : "server";
+ char *prefix;
+
+ gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1);
+
+ switch (op) {
+ case GRPC_CHTTP2_FLOWCTL_MOVE:
+ GPR_ASSERT(samestr(scope1, scope2));
+ if (val2 != 0) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "%sMOVE % 40s <- % 40s giving %d", prefix, label1, label2,
+ val1 + val2);
+ }
+ break;
+ case GRPC_CHTTP2_FLOWCTL_CREDIT:
+ GPR_ASSERT(val2 >= 0);
+ if (val2 != 0) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2,
+ val1 + val2);
+ }
+ break;
+ case GRPC_CHTTP2_FLOWCTL_DEBIT:
+ GPR_ASSERT(val2 >= 0);
+ if (val2 != 0) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "%sDEBIT % 40s by % 40s giving %d", prefix, label1, label2,
+ val1 - val2);
+ }
+ break;
+ }
+
+ gpr_free(scope1);
+ gpr_free(scope2);
+ gpr_free(label1);
+ gpr_free(label2);
+ gpr_free(prefix);
}
/*
@@ -1246,7 +1578,7 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
}
static const grpc_transport_vtable vtable = {
- sizeof(grpc_chttp2_stream), init_stream, perform_stream_op,
+ sizeof(grpc_chttp2_stream), init_stream, set_pollset, perform_stream_op,
perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer};
grpc_transport *grpc_create_chttp2_transport(
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 68f23177eb..a72dee4399 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -41,9 +41,10 @@
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include "src/core/profiling/timers.h"
#include "src/core/support/murmur_hash.h"
#include "src/core/transport/chttp2/bin_encoder.h"
-#include <grpc/support/time.h>
#define INITIAL_STRTAB_CAPACITY 4
#define INITIAL_MDTAB_CAPACITY 4
@@ -232,24 +233,32 @@ static void metadata_context_destroy_locked(grpc_mdctx *ctx) {
}
void grpc_mdctx_ref(grpc_mdctx *ctx) {
+ GPR_TIMER_BEGIN("grpc_mdctx_ref", 0);
lock(ctx);
GPR_ASSERT(ctx->refs > 0);
ctx->refs++;
unlock(ctx);
+ GPR_TIMER_END("grpc_mdctx_ref", 0);
}
void grpc_mdctx_unref(grpc_mdctx *ctx) {
+ GPR_TIMER_BEGIN("grpc_mdctx_unref", 0);
lock(ctx);
GPR_ASSERT(ctx->refs > 0);
ctx->refs--;
unlock(ctx);
+ GPR_TIMER_END("grpc_mdctx_unref", 0);
}
static void grow_strtab(grpc_mdctx *ctx) {
size_t capacity = ctx->strtab_capacity * 2;
size_t i;
- internal_string **strtab = gpr_malloc(sizeof(internal_string *) * capacity);
+ internal_string **strtab;
internal_string *s, *next;
+
+ GPR_TIMER_BEGIN("grow_strtab", 0);
+
+ strtab = gpr_malloc(sizeof(internal_string *) * capacity);
memset(strtab, 0, sizeof(internal_string *) * capacity);
for (i = 0; i < ctx->strtab_capacity; i++) {
@@ -263,12 +272,15 @@ static void grow_strtab(grpc_mdctx *ctx) {
gpr_free(ctx->strtab);
ctx->strtab = strtab;
ctx->strtab_capacity = capacity;
+
+ GPR_TIMER_END("grow_strtab", 0);
}
static void internal_destroy_string(internal_string *is) {
internal_string **prev_next;
internal_string *cur;
grpc_mdctx *ctx = is->context;
+ GPR_TIMER_BEGIN("internal_destroy_string", 0);
if (is->has_base64_and_huffman_encoded) {
gpr_slice_unref(is->base64_and_huffman);
}
@@ -279,6 +291,7 @@ static void internal_destroy_string(internal_string *is) {
*prev_next = cur->bucket_next;
ctx->strtab_count--;
gpr_free(is);
+ GPR_TIMER_END("internal_destroy_string", 0);
}
static void internal_string_ref(internal_string *s DEBUG_ARGS) {
@@ -304,18 +317,22 @@ static void slice_ref(void *p) {
internal_string *is =
(internal_string *)((char *)p - offsetof(internal_string, refcount));
grpc_mdctx *ctx = is->context;
+ GPR_TIMER_BEGIN("slice_ref", 0);
lock(ctx);
INTERNAL_STRING_REF(is);
unlock(ctx);
+ GPR_TIMER_END("slice_ref", 0);
}
static void slice_unref(void *p) {
internal_string *is =
(internal_string *)((char *)p - offsetof(internal_string, refcount));
grpc_mdctx *ctx = is->context;
+ GPR_TIMER_BEGIN("slice_unref", 0);
lock(ctx);
INTERNAL_STRING_UNREF(is);
unlock(ctx);
+ GPR_TIMER_END("slice_unref", 0);
}
grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str) {
@@ -334,6 +351,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf,
gpr_uint32 hash = gpr_murmur_hash3(buf, length, ctx->hash_seed);
internal_string *s;
+ GPR_TIMER_BEGIN("grpc_mdstr_from_buffer", 0);
lock(ctx);
/* search for an existing string */
@@ -342,6 +360,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf,
0 == memcmp(buf, GPR_SLICE_START_PTR(s->slice), length)) {
INTERNAL_STRING_REF(s);
unlock(ctx);
+ GPR_TIMER_END("grpc_mdstr_from_buffer", 0);
return (grpc_mdstr *)s;
}
}
@@ -382,6 +401,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf,
}
unlock(ctx);
+ GPR_TIMER_END("grpc_mdstr_from_buffer", 0);
return (grpc_mdstr *)s;
}
@@ -391,6 +411,7 @@ static void gc_mdtab(grpc_mdctx *ctx) {
internal_metadata **prev_next;
internal_metadata *md, *next;
+ GPR_TIMER_BEGIN("gc_mdtab", 0);
for (i = 0; i < ctx->mdtab_capacity; i++) {
prev_next = &ctx->mdtab[i];
for (md = ctx->mdtab[i]; md; md = next) {
@@ -412,17 +433,19 @@ static void gc_mdtab(grpc_mdctx *ctx) {
}
}
}
-
- GPR_ASSERT(ctx->mdtab_free == 0);
+ GPR_TIMER_END("gc_mdtab", 0);
}
static void grow_mdtab(grpc_mdctx *ctx) {
size_t capacity = ctx->mdtab_capacity * 2;
size_t i;
- internal_metadata **mdtab =
- gpr_malloc(sizeof(internal_metadata *) * capacity);
+ internal_metadata **mdtab;
internal_metadata *md, *next;
gpr_uint32 hash;
+
+ GPR_TIMER_BEGIN("grow_mdtab", 0);
+
+ mdtab = gpr_malloc(sizeof(internal_metadata *) * capacity);
memset(mdtab, 0, sizeof(internal_metadata *) * capacity);
for (i = 0; i < ctx->mdtab_capacity; i++) {
@@ -437,6 +460,8 @@ static void grow_mdtab(grpc_mdctx *ctx) {
gpr_free(ctx->mdtab);
ctx->mdtab = mdtab;
ctx->mdtab_capacity = capacity;
+
+ GPR_TIMER_END("grow_mdtab", 0);
}
static void rehash_mdtab(grpc_mdctx *ctx) {
@@ -458,6 +483,8 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx,
GPR_ASSERT(key->context == ctx);
GPR_ASSERT(value->context == ctx);
+ GPR_TIMER_BEGIN("grpc_mdelem_from_metadata_strings", 0);
+
lock(ctx);
/* search for an existing pair */
@@ -467,6 +494,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx,
INTERNAL_STRING_UNREF(key);
INTERNAL_STRING_UNREF(value);
unlock(ctx);
+ GPR_TIMER_END("grpc_mdelem_from_metadata_strings", 0);
return (grpc_mdelem *)md;
}
}
@@ -496,6 +524,8 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx,
unlock(ctx);
+ GPR_TIMER_END("grpc_mdelem_from_metadata_strings", 0);
+
return (grpc_mdelem *)md;
}
@@ -542,6 +572,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) {
void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
internal_metadata *md = (internal_metadata *)gmd;
+ if (!md) return;
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"ELM UNREF:%p:%d->%d: '%s' = '%s'", md,
@@ -552,12 +583,14 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
#endif
if (2 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
grpc_mdctx *ctx = md->context;
+ GPR_TIMER_BEGIN("grpc_mdelem_unref.to_zero", 0);
lock(ctx);
if (1 == gpr_atm_no_barrier_load(&md->refcnt)) {
ctx->mdtab_free++;
gpr_atm_no_barrier_store(&md->refcnt, 0);
}
unlock(ctx);
+ GPR_TIMER_END("grpc_mdelem_unref.to_zero", 0);
}
}
diff --git a/src/core/transport/stream_op.c b/src/core/transport/metadata_batch.c
index 6493e77bc5..c5d39e0c9f 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/metadata_batch.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/transport/stream_op.h"
+#include "src/core/transport/metadata_batch.h"
#include <string.h>
@@ -40,143 +40,6 @@
#include "src/core/profiling/timers.h"
-/* Exponential growth function: Given x, return a larger x.
- Currently we grow by 1.5 times upon reallocation. */
-#define GROW(x) (3 * (x) / 2)
-
-void grpc_sopb_init(grpc_stream_op_buffer *sopb) {
- sopb->ops = sopb->inlined_ops;
- sopb->nops = 0;
- sopb->capacity = GRPC_SOPB_INLINE_ELEMENTS;
-}
-
-void grpc_sopb_destroy(grpc_stream_op_buffer *sopb) {
- grpc_stream_ops_unref_owned_objects(sopb->ops, sopb->nops);
- if (sopb->ops != sopb->inlined_ops) gpr_free(sopb->ops);
-}
-
-void grpc_sopb_reset(grpc_stream_op_buffer *sopb) {
- grpc_stream_ops_unref_owned_objects(sopb->ops, sopb->nops);
- sopb->nops = 0;
-}
-
-void grpc_sopb_swap(grpc_stream_op_buffer *a, grpc_stream_op_buffer *b) {
- GPR_SWAP(size_t, a->nops, b->nops);
- GPR_SWAP(size_t, a->capacity, b->capacity);
-
- if (a->ops == a->inlined_ops) {
- if (b->ops == b->inlined_ops) {
- /* swap contents of inlined buffer */
- grpc_stream_op temp[GRPC_SOPB_INLINE_ELEMENTS];
- memcpy(temp, a->ops, b->nops * sizeof(grpc_stream_op));
- memcpy(a->ops, b->ops, a->nops * sizeof(grpc_stream_op));
- memcpy(b->ops, temp, b->nops * sizeof(grpc_stream_op));
- } else {
- /* a is inlined, b is not - copy a inlined into b, fix pointers */
- a->ops = b->ops;
- b->ops = b->inlined_ops;
- memcpy(b->ops, a->inlined_ops, b->nops * sizeof(grpc_stream_op));
- }
- } else if (b->ops == b->inlined_ops) {
- /* b is inlined, a is not - copy b inlined int a, fix pointers */
- b->ops = a->ops;
- a->ops = a->inlined_ops;
- memcpy(a->ops, b->inlined_ops, a->nops * sizeof(grpc_stream_op));
- } else {
- /* no inlining: easy swap */
- GPR_SWAP(grpc_stream_op *, a->ops, b->ops);
- }
-}
-
-void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
- size_t i;
- for (i = 0; i < nops; i++) {
- switch (ops[i].type) {
- case GRPC_OP_SLICE:
- gpr_slice_unref(ops[i].data.slice);
- break;
- case GRPC_OP_METADATA:
- grpc_metadata_batch_destroy(&ops[i].data.metadata);
- break;
- case GRPC_NO_OP:
- case GRPC_OP_BEGIN_MESSAGE:
- break;
- }
- }
-}
-
-static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) {
- sopb->capacity = new_capacity;
- if (sopb->ops == sopb->inlined_ops) {
- sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity);
- memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op));
- } else {
- sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity);
- }
-}
-
-static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
- grpc_stream_op *out;
-
- GPR_ASSERT(sopb->nops <= sopb->capacity);
- if (sopb->nops == sopb->capacity) {
- expandto(sopb, GROW(sopb->capacity));
- }
- out = sopb->ops + sopb->nops;
- sopb->nops++;
- return out;
-}
-
-void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) {
- add(sopb)->type = GRPC_NO_OP;
-}
-
-void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
- gpr_uint32 flags) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_BEGIN_MESSAGE;
- op->data.begin_message.length = length;
- op->data.begin_message.flags = flags;
-}
-
-void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
- grpc_metadata_batch b) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_METADATA;
- op->data.metadata = b;
-}
-
-void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_SLICE;
- op->data.slice = slice;
-}
-
-void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
- size_t nops) {
- size_t orig_nops = sopb->nops;
- size_t new_nops = orig_nops + nops;
-
- if (new_nops > sopb->capacity) {
- expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops));
- }
-
- memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops);
- sopb->nops = new_nops;
-}
-
-void grpc_sopb_move_to(grpc_stream_op_buffer *src, grpc_stream_op_buffer *dst) {
- if (src->nops == 0) {
- return;
- }
- if (dst->nops == 0) {
- grpc_sopb_swap(src, dst);
- return;
- }
- grpc_sopb_append(dst, src->ops, src->nops);
- src->nops = 0;
-}
-
static void assert_valid_list(grpc_mdelem_list *list) {
#ifndef NDEBUG
grpc_linked_mdelem *l;
@@ -200,13 +63,11 @@ static void assert_valid_list(grpc_mdelem_list *list) {
#ifndef NDEBUG
void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) {
assert_valid_list(&batch->list);
- assert_valid_list(&batch->garbage);
}
#endif /* NDEBUG */
void grpc_metadata_batch_init(grpc_metadata_batch *batch) {
- batch->list.head = batch->list.tail = batch->garbage.head =
- batch->garbage.tail = NULL;
+ batch->list.head = batch->list.tail = NULL;
batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
@@ -215,9 +76,6 @@ void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) {
for (l = batch->list.head; l; l = l->next) {
GRPC_MDELEM_UNREF(l->md);
}
- for (l = batch->garbage.head; l; l = l->next) {
- GRPC_MDELEM_UNREF(l->md);
- }
}
void grpc_metadata_batch_add_head(grpc_metadata_batch *batch,
@@ -283,10 +141,6 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target,
next = l->next;
link_tail(&target->list, l);
}
- for (l = to_add->garbage.head; l; l = next) {
- next = l->next;
- link_tail(&target->garbage, l);
- }
}
void grpc_metadata_batch_move(grpc_metadata_batch *dst,
@@ -305,7 +159,6 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *batch,
GPR_TIMER_BEGIN("grpc_metadata_batch_filter", 0);
assert_valid_list(&batch->list);
- assert_valid_list(&batch->garbage);
for (l = batch->list.head; l; l = next) {
grpc_mdelem *orig = l->md;
grpc_mdelem *filt = filter(user_data, orig);
@@ -324,14 +177,28 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *batch,
batch->list.tail = l->prev;
}
assert_valid_list(&batch->list);
- link_head(&batch->garbage, l);
+ GRPC_MDELEM_UNREF(l->md);
} else if (filt != orig) {
GRPC_MDELEM_UNREF(orig);
l->md = filt;
}
}
assert_valid_list(&batch->list);
- assert_valid_list(&batch->garbage);
GPR_TIMER_END("grpc_metadata_batch_filter", 0);
}
+
+static grpc_mdelem *no_metadata_for_you(void *user_data, grpc_mdelem *elem) {
+ return NULL;
+}
+
+void grpc_metadata_batch_clear(grpc_metadata_batch *batch) {
+ batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_metadata_batch_filter(batch, no_metadata_for_you, NULL);
+}
+
+int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) {
+ return batch->list.head == NULL &&
+ gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type),
+ batch->deadline) == 0;
+}
diff --git a/src/core/transport/stream_op.h b/src/core/transport/metadata_batch.h
index 37f18b02d9..fc3a46004f 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/metadata_batch.h
@@ -40,39 +40,6 @@
#include <grpc/support/time.h>
#include "src/core/transport/metadata.h"
-/* this many stream ops are inlined into a sopb before allocating */
-#define GRPC_SOPB_INLINE_ELEMENTS 4
-
-/* Operations that can be performed on a stream.
- Used by grpc_stream_op. */
-typedef enum grpc_stream_op_code {
- /* Do nothing code. Useful if rewriting a batch to exclude some operations.
- Must be ignored by receivers */
- GRPC_NO_OP,
- GRPC_OP_METADATA,
- /* Begin a message/metadata element/status - as defined by
- grpc_message_type. */
- GRPC_OP_BEGIN_MESSAGE,
- /* Add a slice of data to the current message/metadata element/status.
- Must not overflow the forward declared length. */
- GRPC_OP_SLICE
-} grpc_stream_op_code;
-
-/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
- * compression for the message */
-#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
-/** Mask of all valid internal flags. */
-#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
-
-/* Arguments for GRPC_OP_BEGIN_MESSAGE */
-typedef struct grpc_begin_message {
- /* How many bytes of data will this message contain */
- gpr_uint32 length;
- /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits,
- * GRPC_WRITE_INTERNAL_* for the internal ones. */
- gpr_uint32 flags;
-} grpc_begin_message;
-
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@@ -88,10 +55,6 @@ typedef struct grpc_mdelem_list {
typedef struct grpc_metadata_batch {
/** Metadata elements in this batch */
grpc_mdelem_list list;
- /** Elements that have been removed from the batch, but have
- not yet been unreffed - used to allow collecting garbage
- under a single metadata context lock */
- grpc_mdelem_list garbage;
/** Used to calculate grpc-timeout at the point of sending,
or gpr_inf_future if this batch does not need to send a
grpc-timeout */
@@ -102,6 +65,8 @@ void grpc_metadata_batch_init(grpc_metadata_batch *batch);
void grpc_metadata_batch_destroy(grpc_metadata_batch *batch);
void grpc_metadata_batch_merge(grpc_metadata_batch *target,
grpc_metadata_batch *add);
+void grpc_metadata_batch_clear(grpc_metadata_batch *batch);
+int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch);
/** Moves the metadata information from \a src to \a dst. Upon return, \a src is
* zeroed. */
@@ -159,54 +124,4 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd);
} while (0)
#endif
-/* Represents a single operation performed on a stream/transport */
-typedef struct grpc_stream_op {
- /* the operation to be applied */
- enum grpc_stream_op_code type;
- /* the arguments to this operation. union fields are named according to the
- associated op-code */
- union {
- grpc_begin_message begin_message;
- grpc_metadata_batch metadata;
- gpr_slice slice;
- } data;
-} grpc_stream_op;
-
-/** A stream op buffer is a wrapper around stream operations that is
- * dynamically extendable. */
-typedef struct grpc_stream_op_buffer {
- grpc_stream_op *ops;
- size_t nops;
- size_t capacity;
- grpc_stream_op inlined_ops[GRPC_SOPB_INLINE_ELEMENTS];
-} grpc_stream_op_buffer;
-
-/* Initialize a stream op buffer */
-void grpc_sopb_init(grpc_stream_op_buffer *sopb);
-/* Destroy a stream op buffer */
-void grpc_sopb_destroy(grpc_stream_op_buffer *sopb);
-/* Reset a sopb to no elements */
-void grpc_sopb_reset(grpc_stream_op_buffer *sopb);
-/* Swap two sopbs */
-void grpc_sopb_swap(grpc_stream_op_buffer *a, grpc_stream_op_buffer *b);
-
-void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops);
-
-/* Append a GRPC_NO_OP to a buffer */
-void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
-/* Append a GRPC_OP_BEGIN to a buffer */
-void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
- gpr_uint32 flags);
-void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
- grpc_metadata_batch metadata);
-/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
-void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a buffer to a buffer - does not ref/unref any internal objects */
-void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
- size_t nops);
-
-void grpc_sopb_move_to(grpc_stream_op_buffer *src, grpc_stream_op_buffer *dst);
-
-char *grpc_sopb_string(grpc_stream_op_buffer *sopb);
-
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 828d212cfe..f2bebc62f3 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -33,9 +33,36 @@
#include "src/core/transport/transport.h"
#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include "src/core/transport/transport_impl.h"
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
+ gpr_log(GPR_DEBUG, "STREAM %p:%p REF %d->%d %s", refcount,
+ refcount->destroy.cb_arg, val, val + 1, reason);
+#else
+void grpc_stream_ref(grpc_stream_refcount *refcount) {
+#endif
+ gpr_ref(&refcount->refs);
+}
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
+ const char *reason) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
+ gpr_log(GPR_DEBUG, "STREAM %p:%p UNREF %d->%d %s", refcount,
+ refcount->destroy.cb_arg, val, val - 1, reason);
+#else
+void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_stream_refcount *refcount) {
+#endif
+ if (gpr_unref(&refcount->refs)) {
+ grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, 1);
+ }
+}
+
size_t grpc_transport_stream_size(grpc_transport *transport) {
return transport->vtable->sizeof_stream;
}
@@ -47,10 +74,10 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx,
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
- const void *server_data,
- grpc_transport_stream_op *initial_op) {
- return transport->vtable->init_stream(exec_ctx, transport, stream,
- server_data, initial_op);
+ grpc_stream_refcount *refcount,
+ const void *server_data) {
+ return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
+ server_data);
}
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
@@ -66,6 +93,12 @@ void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx,
transport->vtable->perform_op(exec_ctx, transport, op);
}
+void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport, grpc_stream *stream,
+ grpc_pollset *pollset) {
+ transport->vtable->set_pollset(exec_ctx, transport, stream, pollset);
+}
+
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
grpc_stream *stream) {
@@ -79,9 +112,8 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
- grpc_exec_ctx_enqueue(exec_ctx, op->on_done_recv, 0);
- grpc_exec_ctx_enqueue(exec_ctx, op->on_done_send, 0);
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
@@ -129,9 +161,9 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
if (optional_message) {
cmd = gpr_malloc(sizeof(*cmd));
cmd->message = *optional_message;
- cmd->then_call = op->on_consumed;
+ cmd->then_call = op->on_complete;
grpc_closure_init(&cmd->closure, free_message, cmd);
- op->on_consumed = &cmd->closure;
+ op->on_complete = &cmd->closure;
op->optional_close_message = &cmd->message;
}
op->close_with_status = status;
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index d4cee03862..f296ce8251 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -38,7 +38,8 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_set.h"
-#include "src/core/transport/stream_op.h"
+#include "src/core/transport/metadata_batch.h"
+#include "src/core/transport/byte_stream.h"
#include "src/core/channel/context.h"
/* forward declarations */
@@ -49,36 +50,35 @@ typedef struct grpc_transport grpc_transport;
for a stream. */
typedef struct grpc_stream grpc_stream;
-/* Represents the send/recv closed state of a stream. */
-typedef enum grpc_stream_state {
- /* the stream is open for sends and receives */
- GRPC_STREAM_OPEN,
- /* the stream is closed for sends, but may still receive data */
- GRPC_STREAM_SEND_CLOSED,
- /* the stream is closed for receives, but may still send data */
- GRPC_STREAM_RECV_CLOSED,
- /* the stream is closed for both sends and receives */
- GRPC_STREAM_CLOSED
-} grpc_stream_state;
+typedef struct grpc_stream_refcount {
+ gpr_refcount refs;
+ grpc_closure destroy;
+} grpc_stream_refcount;
+
+/*#define GRPC_STREAM_REFCOUNT_DEBUG*/
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason);
+void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
+ const char *reason);
+#else
+void grpc_stream_ref(grpc_stream_refcount *refcount);
+void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
+#endif
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
- grpc_closure *on_consumed;
+ grpc_metadata_batch *send_initial_metadata;
+ grpc_metadata_batch *send_trailing_metadata;
- grpc_stream_op_buffer *send_ops;
- int is_last_send;
- grpc_closure *on_done_send;
+ grpc_byte_stream *send_message;
- grpc_stream_op_buffer *recv_ops;
- grpc_stream_state *recv_state;
- /** The number of bytes this peer is currently prepared to receive.
- These bytes will be eventually used to replenish per-stream flow control
- windows. */
- size_t max_recv_bytes;
- grpc_closure *on_done_recv;
+ grpc_metadata_batch *recv_initial_metadata;
+ grpc_byte_stream **recv_message;
+ grpc_closure *recv_message_ready;
+ grpc_metadata_batch *recv_trailing_metadata;
- grpc_pollset *bind_pollset;
+ grpc_closure *on_complete;
/** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status;
@@ -110,8 +110,8 @@ typedef struct grpc_transport_op {
gpr_slice *goaway_message;
/** set the callback for accepting new streams;
this is a permanent callback, unlike the other one-shot closures */
- void (*set_accept_stream)(void *user_data, grpc_transport *transport,
- const void *server_data);
+ void (*set_accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_transport *transport, const void *server_data);
void *set_accept_stream_user_data;
/** add this transport to a pollset */
grpc_pollset *bind_pollset;
@@ -136,8 +136,12 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
- const void *server_data,
- grpc_transport_stream_op *initial_op);
+ grpc_stream_refcount *refcount,
+ const void *server_data);
+
+void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport, grpc_stream *stream,
+ grpc_pollset *pollset);
/* Destroy transport data for a stream.
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index 900c6340ff..40bfb4b13a 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -43,8 +43,12 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
- grpc_stream *stream, const void *server_data,
- grpc_transport_stream_op *initial_op);
+ grpc_stream *stream, grpc_stream_refcount *refcount,
+ const void *server_data);
+
+ /* implementation of grpc_transport_set_pollset */
+ void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
+ grpc_stream *stream, grpc_pollset *pollset);
/* implementation of grpc_transport_perform_stream_op */
void (*perform_stream_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index f62c340e97..f3b6db29d6 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -69,42 +69,6 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
}
}
-char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
- char *out;
- char *tmp;
- size_t i;
- gpr_strvec b;
- gpr_strvec_init(&b);
-
- for (i = 0; i < sopb->nops; i++) {
- grpc_stream_op *op = &sopb->ops[i];
- if (i > 0) gpr_strvec_add(&b, gpr_strdup(", "));
- switch (op->type) {
- case GRPC_NO_OP:
- gpr_strvec_add(&b, gpr_strdup("NO_OP"));
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length);
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_OP_SLICE:
- gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice));
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_OP_METADATA:
- gpr_strvec_add(&b, gpr_strdup("METADATA{"));
- put_metadata_list(&b, op->data.metadata);
- gpr_strvec_add(&b, gpr_strdup("}"));
- break;
- }
- }
-
- out = gpr_strvec_flatten(&b, NULL);
- gpr_strvec_destroy(&b);
-
- return out;
-}
-
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
char *tmp;
char *out;
@@ -113,42 +77,52 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
gpr_strvec b;
gpr_strvec_init(&b);
- if (op->send_ops) {
+ if (op->send_initial_metadata != NULL) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_asprintf(&tmp, "SEND%s:%p", op->is_last_send ? "_LAST" : "",
- op->on_done_send);
- gpr_strvec_add(&b, tmp);
- gpr_strvec_add(&b, gpr_strdup("["));
- gpr_strvec_add(&b, grpc_sopb_string(op->send_ops));
- gpr_strvec_add(&b, gpr_strdup("]"));
+ gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{"));
+ put_metadata_list(&b, *op->send_initial_metadata);
+ gpr_strvec_add(&b, gpr_strdup("}"));
}
- if (op->recv_ops) {
+ if (op->send_message != NULL) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_asprintf(&tmp, "RECV:%p:max_recv_bytes=%d", op->on_done_recv,
- op->max_recv_bytes);
+ gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d",
+ op->send_message->flags, op->send_message->length);
gpr_strvec_add(&b, tmp);
}
- if (op->bind_pollset) {
+ if (op->send_trailing_metadata != NULL) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_strvec_add(&b, gpr_strdup("BIND"));
+ gpr_strvec_add(&b, gpr_strdup("SEND_TRAILING_METADATA{"));
+ put_metadata_list(&b, *op->send_trailing_metadata);
+ gpr_strvec_add(&b, gpr_strdup("}"));
}
- if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (op->recv_initial_metadata != NULL) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
- gpr_strvec_add(&b, tmp);
+ gpr_strvec_add(&b, gpr_strdup("RECV_INITIAL_METADATA"));
+ }
+
+ if (op->recv_message != NULL) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
}
- if (op->on_consumed != NULL) {
+ if (op->recv_trailing_metadata != NULL) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_asprintf(&tmp, "ON_CONSUMED:%p", op->on_consumed);
+ gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA"));
+ }
+
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
gpr_strvec_add(&b, tmp);
}