aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-04-02 18:17:40 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-04-02 18:17:40 -0700
commitf570f96edaa895621f7c8d57737e0f79c3f24cf9 (patch)
tree3a08e27fdae5cc261f9e8213836e6a7a1e6a2aef /src/core/ext/transport
parent9e760da81a61afb6b9fe4c8ca3a30fae879002f2 (diff)
parent69b2d2d40bd06b872bdf0c484a459831ed399dbc (diff)
Merge branch 'lazy-deframe-dev2' into lazy-deframe
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c439
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c186
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h30
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c4
5 files changed, 240 insertions, 423 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 719a17fe10..176382cb10 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -115,6 +115,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
+static void incoming_byte_stream_publish_error(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error);
+static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -156,13 +161,14 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static grpc_error *deframe_unprocessed_incoming_frames(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice_buffer *slices,
- grpc_slice *slice_out, bool partial_deframe);
-static void clean_unprocessed_frames_buffer(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
+static grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *p,
+ grpc_chttp2_stream *s,
+ grpc_slice_buffer *slices,
+ grpc_slice *slice_out,
+ grpc_byte_stream **stream_out);
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
@@ -597,7 +603,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
reading */
- gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@@ -607,9 +612,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
- s->incoming_frames = NULL;
grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
- gpr_mu_init(&s->buffer_mu);
+ grpc_slice_buffer_init(&s->frame_storage);
+ s->pending_byte_stream = false;
+ grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s, grpc_combiner_scheduler(t->combiner, false));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -639,11 +645,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer);
- if (s->incoming_frames != NULL) {
- grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
- s->incoming_frames = NULL;
- incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base, GRPC_ERROR_NONE);
- }
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -662,9 +664,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(s->recv_initial_metadata_ready == NULL);
GPR_ASSERT(s->recv_message_ready == NULL);
GPR_ASSERT(s->recv_trailing_metadata_finished == NULL);
- gpr_mu_lock(&s->buffer_mu);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser);
- gpr_mu_unlock(&s->buffer_mu);
grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
&s->metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
@@ -672,6 +672,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
+ GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@@ -1347,22 +1348,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->recv_message_ready == NULL);
s->recv_message_ready = op->recv_message_ready;
s->recv_message = op->recv_message;
- gpr_mu_lock(&s->buffer_mu);
- if (s->id != 0 && (s->incoming_frames == NULL ||
- s->unprocessed_incoming_frames_buffer.count == 0)) {
- gpr_mu_unlock(&s->buffer_mu);
+ if (s->id != 0 && s->frame_storage.length == 0) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
- } else {
- gpr_mu_unlock(&s->buffer_mu);
- }
- gpr_mu_lock(&s->buffer_mu);
- if (s->incoming_frames == NULL &&
- s->unprocessed_incoming_frames_buffer.count > 0) {
- deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, t, s,
- &s->unprocessed_incoming_frames_buffer, NULL, true);
}
- gpr_mu_unlock(&s->buffer_mu);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1530,18 +1518,9 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) {
- if (s->incoming_frames != NULL) {
- grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
- s->incoming_frames = NULL;
- incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base,
- GRPC_ERROR_NONE);
- }
- size_t length;
- gpr_mu_lock(&s->buffer_mu);
- length = s->unprocessed_incoming_frames_buffer.length;
- gpr_mu_unlock(&s->buffer_mu);
- if (length > 0) {
- clean_unprocessed_frames_buffer(exec_ctx, t, s);
+ grpc_slice_buffer_reset_and_unref(&s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1554,32 +1533,38 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
+ grpc_error *error = GRPC_ERROR_NONE;
if (s->recv_message_ready != NULL) {
+ *s->recv_message = NULL;
if (s->final_metadata_requested && s->seen_error) {
- if (s->incoming_frames != NULL) {
- grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
- s->incoming_frames = NULL;
- incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base,
- GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref(&s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
}
- size_t length;
- gpr_mu_lock(&s->buffer_mu);
- length = s->unprocessed_incoming_frames_buffer.length;
- gpr_mu_unlock(&s->buffer_mu);
- if (length > 0) {
- clean_unprocessed_frames_buffer(exec_ctx, t, s);
+ }
+ if (!s->pending_byte_stream) {
+ while (s->unprocessed_incoming_frames_buffer.length > 0 ||
+ s->frame_storage.length > 0) {
+ if (s->unprocessed_incoming_frames_buffer.length == 0) {
+ grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, &s->frame_storage);
+ }
+ /* error handling ok? */
+ error = deframe_unprocessed_incoming_frames(exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+ if (error != GRPC_ERROR_NONE) {
+ s->seen_error = true;
+ grpc_slice_buffer_reset_and_unref(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
+ break;
+ } else if (*s->recv_message != NULL) {
+ break;
+ }
}
}
- if (s->incoming_frames != NULL) {
- *s->recv_message = &s->incoming_frames->base;
- s->incoming_frames = NULL;
- GPR_ASSERT(*s->recv_message != NULL);
- grpc_closure_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE);
- s->recv_message_ready = NULL;
+ if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
+ null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = NULL;
- grpc_closure_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE);
- s->recv_message_ready = NULL;
+ null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
}
}
}
@@ -1591,21 +1576,13 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
s->write_closed) {
if (s->seen_error) {
- if (s->incoming_frames != NULL) {
- grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
- s->incoming_frames = NULL;
- incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base,
- GRPC_ERROR_NONE);
- }
- size_t length;
- gpr_mu_lock(&s->buffer_mu);
- length = s->unprocessed_incoming_frames_buffer.length;
- gpr_mu_unlock(&s->buffer_mu);
- if (length > 0) {
- clean_unprocessed_frames_buffer(exec_ctx, t, s);
+ grpc_slice_buffer_reset_and_unref(&s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
}
}
- if (s->all_incoming_byte_streams_finished &&
+ if (s->read_closed && s->frame_storage.length == 0 &&
+ (!s->pending_byte_stream || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@@ -1616,34 +1593,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
}
-static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- size_t length;
- gpr_mu_lock(&s->buffer_mu);
- length = s->unprocessed_incoming_frames_buffer.length;
- gpr_mu_unlock(&s->buffer_mu);
- if ((s->all_incoming_byte_streams_finished =
- (gpr_unref(&s->active_streams) && length == 0))) {
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
- }
-}
-
-static void clean_unprocessed_frames_buffer(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- gpr_mu_lock(&s->buffer_mu);
- grpc_slice_buffer_reset_and_unref_internal(
- exec_ctx, &s->unprocessed_incoming_frames_buffer);
- gpr_mu_unlock(&s->buffer_mu);
- // TODO (mxyan): add get ref count in sync.c?
- gpr_atm active_streams =
- gpr_atm_no_barrier_fetch_add(&s->active_streams.count, 0);
- if ((s->all_incoming_byte_streams_finished = (active_streams == 0))) {
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
- }
-}
-
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@@ -1652,24 +1601,18 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
}
- gpr_mu_lock(&s->buffer_mu);
- if (s->data_parser.parsing_frame != NULL) {
- grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
- gpr_mu_lock(&bs->slice_mu);
- bs->push_closed = true;
- if (bs->on_next != NULL) {
- gpr_mu_unlock(&bs->slice_mu);
- gpr_mu_unlock(&s->buffer_mu);
- grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
+ if (s->pending_byte_stream) {
+ if (s->on_next != NULL) {
+ grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ }
+ incoming_byte_stream_publish_error(exec_ctx, bs, error);
+ incoming_byte_stream_unref(exec_ctx, bs);
s->data_parser.parsing_frame = NULL;
} else {
- bs->error = GRPC_ERROR_REF(error);
- gpr_mu_unlock(&bs->slice_mu);
- gpr_mu_unlock(&s->buffer_mu);
+ s->byte_stream_error = GRPC_ERROR_REF(error);
}
- } else {
- gpr_mu_unlock(&s->buffer_mu);
}
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@@ -1855,7 +1798,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
}
}
- decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -2308,11 +2250,32 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
+
+ s->pending_byte_stream = false;
+ if (error == GRPC_ERROR_NONE) {
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
+ } else {
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ s->on_next = NULL;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ grpc_chttp2_cancel_stream(exec_ctx, s->t, s,
+ GRPC_ERROR_REF(error));
+ s->byte_stream_error = error;
+ }
+}
+
static grpc_error *deframe_unprocessed_incoming_frames(
grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice_buffer *slices,
- grpc_slice *slice_out, bool partial_deframe) {
- bool slice_set = false;
+ grpc_chttp2_stream *s, grpc_slice_buffer *slices,
+ grpc_slice *slice_out, grpc_byte_stream **stream_out) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ grpc_chttp2_transport *t = s->t;
+
while (slices->count > 0) {
uint8_t *beg = NULL;
uint8_t *end = NULL;
@@ -2336,15 +2299,7 @@ static grpc_error *deframe_unprocessed_incoming_frames(
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
- fh_0:
case GRPC_CHTTP2_DATA_FH_0:
- if (s->incoming_frames != NULL) {
- grpc_slice_buffer_undo_take_first(
- &s->unprocessed_incoming_frames_buffer,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_NONE;
- }
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
@@ -2400,6 +2355,8 @@ static grpc_error *deframe_unprocessed_incoming_frames(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
+ GPR_ASSERT(stream_out != NULL);
+ GPR_ASSERT(p->parsing_frame == NULL);
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
@@ -2407,71 +2364,69 @@ static grpc_error *deframe_unprocessed_incoming_frames(
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- GPR_ASSERT(s->incoming_frames == NULL);
p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
exec_ctx, t, s, p->frame_size, message_flags);
- /* fallthrough */
+ *stream_out = &p->parsing_frame->base;
+ if (p->parsing_frame->remaining_bytes == 0) {
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, 1);
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ } else {
+ s->pending_byte_stream = true;
+ }
+
+ if (cur != end) {
+ grpc_slice_buffer_undo_take_first(&s->unprocessed_incoming_frames_buffer, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ }
+ grpc_slice_unref(slice);
+ return GRPC_ERROR_NONE;
case GRPC_CHTTP2_DATA_FRAME: {
GPR_ASSERT(p->parsing_frame != NULL);
- if (partial_deframe && p->frame_size > 0) {
- if (cur != end) {
- grpc_slice_buffer_undo_take_first(
- &s->unprocessed_incoming_frames_buffer,
- grpc_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(end - beg)));
- }
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_NONE;
- }
+ GPR_ASSERT(slice_out != NULL);
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
- if (slice_set) {
- grpc_slice_buffer_undo_take_first(
- &s->unprocessed_incoming_frames_buffer,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_NONE;
- }
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
- grpc_chttp2_incoming_byte_stream_push(
+ if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)),
- slice_out);
- slice_set = true;
+ slice_out))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
- GRPC_ERROR_NONE);
+ GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_NONE);
grpc_slice_unref_internal(exec_ctx, slice);
- continue;
+ return GRPC_ERROR_NONE;
} else if (remaining < p->frame_size) {
- grpc_chttp2_incoming_byte_stream_push(
+ if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)),
- slice_out);
- slice_set = true;
+ slice_out))) {
+ return error;
+ }
p->frame_size -= remaining;
grpc_slice_unref_internal(exec_ctx, slice);
- continue;
+ return GRPC_ERROR_NONE;
} else {
GPR_ASSERT(remaining > p->frame_size);
- if (p->frame_size > 0) {
- grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(cur + p->frame_size - beg)),
- slice_out);
+ if (GRPC_ERROR_NONE != (grpc_chttp2_incoming_byte_stream_push(exec_ctx, p->parsing_frame, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg)), slice_out))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
}
- slice_set = true;
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
- GRPC_ERROR_NONE);
+ GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
cur += p->frame_size;
- goto fh_0;
+ grpc_slice_buffer_undo_take_first(&s->unprocessed_incoming_frames_buffer, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_NONE);
+ grpc_slice_unref(slice);
return GRPC_ERROR_NONE;
}
}
@@ -2484,7 +2439,6 @@ static grpc_error *deframe_unprocessed_incoming_frames(
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
- GRPC_ERROR_UNREF(bs->error);
grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
@@ -2546,90 +2500,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = bs->transport;
grpc_chttp2_stream *s = bs->stream;
- if (bs->is_tail) {
- gpr_mu_lock(&bs->slice_mu);
- size_t cur_length = bs->slices.length;
- gpr_mu_unlock(&bs->slice_mu);
- incoming_byte_stream_update_flow_control(
- exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
- }
- gpr_mu_lock(&s->buffer_mu);
- gpr_mu_lock(&bs->slice_mu);
- if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ size_t cur_length = s->frame_storage.length;
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
+
+ GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+ if (s->frame_storage.length > 0) {
+ grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer);
grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
- } else if (bs->error != GRPC_ERROR_NONE) {
+ } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(bs->error));
- } else if (bs->push_closed) {
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != NULL) {
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame = NULL;
+ }
+ } else if (s->read_closed) {
if (bs->remaining_bytes != 0) {
- bs->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(bs->error));
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != NULL) {
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame = NULL;
+ }
} else {
/* Should never reach here. */
GPR_ASSERT(false);
- grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_NONE);
}
} else {
- bs->on_next = bs->next_action.on_complete;
+ s->on_next = bs->next_action.on_complete;
}
- gpr_mu_unlock(&bs->slice_mu);
- gpr_mu_unlock(&s->buffer_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
+static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ size_t max_size_hint,
+ grpc_closure *on_complete) {
+ GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ grpc_chttp2_stream *s = bs->stream;
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ return 1;
+ } else {
+ gpr_ref(&bs->refs);
+ bs->next_action.max_size_hint = max_size_hint;
+ bs->next_action.on_complete = on_complete;
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
+ GPR_TIMER_END("incoming_byte_stream_next", 0);
+ return 0;
+ }
+}
+
static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
grpc_chttp2_incoming_byte_stream *bs =
- (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
- grpc_chttp2_transport *t = bs->transport;
- if (bs->error) {
- return bs->error;
- }
- gpr_mu_lock(&s->buffer_mu);
if (s->unprocessed_incoming_frames_buffer.length > 0) {
grpc_error *error = deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, t, s, &s->unprocessed_incoming_frames_buffer,
- slice, false);
+ exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
+ slice, NULL);
if (error != GRPC_ERROR_NONE) {
- gpr_mu_unlock(&s->buffer_mu);
return error;
}
} else {
- bs->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- gpr_mu_unlock(&s->buffer_mu);
- return bs->error;
+ grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ grpc_closure_sched(exec_ctx,
+ &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ return error;
}
- gpr_mu_unlock(&s->buffer_mu);
GPR_TIMER_END("incoming_byte_stream_pull", 0);
return GRPC_ERROR_NONE;
}
-static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- size_t max_size_hint,
- grpc_closure *on_complete) {
- GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
- grpc_chttp2_incoming_byte_stream *bs =
- (grpc_chttp2_incoming_byte_stream *)byte_stream;
- gpr_ref(&bs->refs);
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(
- &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
- GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_next", 0);
- return 0;
-}
-
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
@@ -2638,7 +2592,6 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
- decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2659,34 +2612,44 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
+ grpc_chttp2_stream *s = bs->stream;
+
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ s->on_next = NULL;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
GRPC_ERROR_REF(error));
- bs->error = error;
+ s->byte_stream_error = GRPC_ERROR_REF(error);
}
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+grpc_error *grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice,
grpc_slice *slice_out) {
+ grpc_chttp2_stream *s = bs->stream;
+
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
- incoming_byte_stream_publish_error(
- exec_ctx, bs,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
+ grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
+
+ grpc_closure_sched(exec_ctx,
+ &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
} else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (slice_out != NULL) {
*slice_out = slice;
}
+ return GRPC_ERROR_NONE;
}
}
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error) {
+ grpc_error *error, int reset_on_error) {
+ grpc_chttp2_stream *s = bs->stream;
+
if (error == GRPC_ERROR_NONE) {
gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes != 0) {
@@ -2694,10 +2657,12 @@ void grpc_chttp2_incoming_byte_stream_finished(
}
gpr_mu_unlock(&bs->slice_mu);
}
- if (error != GRPC_ERROR_NONE) {
- incoming_byte_stream_publish_error(exec_ctx, bs, error);
+ if (error != GRPC_ERROR_NONE && reset_on_error) {
+ grpc_closure_sched(exec_ctx,
+ &s->reset_byte_stream, GRPC_ERROR_REF(error));
}
incoming_byte_stream_unref(exec_ctx, bs);
+ return error;
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2716,14 +2681,10 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
- gpr_ref(&incoming_byte_stream->stream->active_streams);
grpc_slice_buffer_init(&incoming_byte_stream->slices);
- incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
- incoming_byte_stream->error = GRPC_ERROR_NONE;
+ s->byte_stream_error = GRPC_ERROR_NONE;
incoming_byte_stream->push_closed = false;
- s->incoming_frames = incoming_byte_stream;
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
return incoming_byte_stream;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index ecd53e2ce9..8b42d05c72 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -56,14 +56,15 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
if (parser->parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, parser->parsing_frame,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"));
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), 0);
}
GRPC_ERROR_UNREF(parser->error);
}
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id) {
+ uint32_t stream_id,
+ grpc_chttp2_stream *s) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
char *msg;
gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@@ -75,9 +76,9 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
}
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
- parser->is_last_frame = 1;
+ s->received_last_frame = true;
} else {
- parser->is_last_frame = 0;
+ s->received_last_frame = false;
}
return GRPC_ERROR_NONE;
@@ -144,172 +145,31 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
-static void grpc_chttp2_unprocessed_frames_buffer_push(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s,
- grpc_slice slice) {
- grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
- if (p->parsing_frame) {
- grpc_chttp2_incoming_byte_stream *bs = p->parsing_frame;
- // Necessary?
- gpr_mu_lock(&bs->slice_mu);
- if (bs->on_next != NULL) {
- grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
- bs->on_next = NULL;
- }
- gpr_mu_unlock(&bs->slice_mu);
- }
-}
-
-grpc_error *parse_inner_buffer(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_data_parser *p,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_slice slice) {
- uint8_t *const beg = GRPC_SLICE_START_PTR(slice);
- uint8_t *const end = GRPC_SLICE_END_PTR(slice);
- uint8_t *cur = beg;
- uint32_t message_flags;
- char *msg;
-
- if (cur == end) {
- return GRPC_ERROR_NONE;
- }
-
- /* If there is already pending data, or if there is a pending
- * incoming_byte_stream that is finished, append the data to unprocessed frame
- * buffer. */
- gpr_mu_lock(&s->buffer_mu);
- if (s->unprocessed_incoming_frames_buffer.count > 0) {
- s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
- grpc_slice_ref_internal(slice);
- grpc_chttp2_unprocessed_frames_buffer_push(exec_ctx, p, s, slice);
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
-
- switch (p->state) {
- case GRPC_CHTTP2_DATA_ERROR:
- p->state = GRPC_CHTTP2_DATA_ERROR;
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_REF(p->error);
- fh_0:
- case GRPC_CHTTP2_DATA_FH_0:
- if (s->incoming_frames != NULL) {
- s->stats.incoming.framing_bytes += (size_t)(end - cur);
- grpc_chttp2_unprocessed_frames_buffer_push(
- exec_ctx, p, s,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
- s->stats.incoming.framing_bytes++;
- p->frame_type = *cur;
- switch (p->frame_type) {
- case 0:
- p->is_frame_compressed = 0; /* GPR_FALSE */
- break;
- case 1:
- p->is_frame_compressed = 1; /* GPR_TRUE */
- break;
- default:
- gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
- p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
- (intptr_t)s->id);
- gpr_free(msg);
- msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
- grpc_slice_from_copied_string(msg));
- gpr_free(msg);
- p->error =
- grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
- p->state = GRPC_CHTTP2_DATA_ERROR;
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_REF(p->error);
- }
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_1;
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_1:
- s->stats.incoming.framing_bytes++;
- p->frame_size = ((uint32_t)*cur) << 24;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_2;
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_2:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur) << 16;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_3;
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_3:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur) << 8;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_4;
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_4:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur);
- p->state = GRPC_CHTTP2_DATA_FRAME;
- ++cur;
- message_flags = 0;
- if (p->is_frame_compressed) {
- message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
- }
- GPR_ASSERT(s->incoming_frames == NULL);
- p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
- exec_ctx, t, s, p->frame_size, message_flags);
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FRAME:
- if (p->parsing_frame->remaining_bytes == 0) {
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
- GRPC_ERROR_NONE);
- p->parsing_frame = NULL;
- p->state = GRPC_CHTTP2_DATA_FH_0;
- if (cur != end) {
- goto fh_0;
- }
- }
- if (cur == end) {
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
- uint32_t remaining = (uint32_t)(end - cur);
- s->stats.incoming.data_bytes += remaining;
- grpc_chttp2_unprocessed_frames_buffer_push(
- exec_ctx, p, s,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- gpr_mu_unlock(&s->buffer_mu);
- return GRPC_ERROR_NONE;
- }
-
- GPR_UNREACHABLE_CODE(
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
-}
-
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
- grpc_chttp2_data_parser *p = parser;
- grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice);
+ /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
+ s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
+ if (!s->pending_byte_stream) {
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->frame_storage, slice);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ } else if (s->on_next) {
+ GPR_ASSERT(s->frame_storage.length == 0);
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
+ s->on_next = NULL;
+ } else {
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->frame_storage, slice);
+ }
- if (is_last && p->is_last_frame) {
+ if (is_last && s->received_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
- return error;
+ return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index 264ad14608..e7e459c79f 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -63,7 +63,6 @@ typedef struct grpc_chttp2_incoming_frame_queue {
typedef struct {
grpc_chttp2_stream_state state;
- uint8_t is_last_frame;
uint8_t frame_type;
uint32_t frame_size;
grpc_error *error;
@@ -87,7 +86,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
/* start processing a new data frame */
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id);
+ uint32_t stream_id,
+ grpc_chttp2_stream *s);
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 5604ea3e31..a43c825b70 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -187,7 +187,6 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
struct grpc_chttp2_incoming_byte_stream *next_message; /* unused; should be removed */
- grpc_error *error; /* protected by slice_mu */
bool push_closed; /* protected by slice_mu */
grpc_chttp2_transport *transport; /* immutable */
@@ -196,7 +195,6 @@ struct grpc_chttp2_incoming_byte_stream {
gpr_mu slice_mu;
grpc_slice_buffer slices; /* unused; should be removed */
- grpc_closure *on_next; /* protected by slice_mu */
uint32_t remaining_bytes; /* guaranteed one thread access */
struct {
@@ -462,9 +460,6 @@ struct grpc_chttp2_stream {
grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats;
- /** number of streams that are currently being read */
- gpr_refcount active_streams;
-
/** Is this stream closed for writing. */
bool write_closed;
/** Is this stream reading half-closed. */
@@ -488,10 +483,13 @@ struct grpc_chttp2_stream {
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
- grpc_chttp2_incoming_byte_stream *incoming_frames; /* protected by buffer_mu */
- gpr_mu buffer_mu; /* protects unprocessed_incoming_frames_buffer and
- data_parser */
- grpc_slice_buffer unprocessed_incoming_frames_buffer; /* protected by buffer_mu */
+ grpc_slice_buffer frame_storage; /* protected by t combiner */
+ grpc_slice_buffer unprocessed_incoming_frames_buffer; /* guaranteed one thread access */
+ grpc_closure *on_next; /* protected by t combiner */
+ bool pending_byte_stream; /* protected by t combiner */
+ grpc_closure reset_byte_stream;
+ grpc_error *byte_stream_error; /* protected by t combiner */
+ bool received_last_frame; /* proected by t combiner */
gpr_timespec deadline;
@@ -504,7 +502,7 @@ struct grpc_chttp2_stream {
* incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta;
/** parsing state for data frames */
- grpc_chttp2_data_parser data_parser; /* protected by buffer_mu */
+ grpc_chttp2_data_parser data_parser; /* guaranteed one thread access */
/** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes;
@@ -782,13 +780,13 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags);
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- grpc_slice slice,
- grpc_slice *slice_out);
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ grpc_slice slice,
+ grpc_slice *slice_out);
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error);
+ grpc_error *error, int reset_on_error);
void grpc_chttp2_incoming_byte_stream_notify(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index f1c6f96db5..2c662b6721 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -458,10 +458,8 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
return init_skip_frame_parser(exec_ctx, t, 0);
}
if (err == GRPC_ERROR_NONE) {
- gpr_mu_lock(&s->buffer_mu);
err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
- t->incoming_frame_flags, s->id);
- gpr_mu_unlock(&s->buffer_mu);
+ t->incoming_frame_flags, s->id, s);
}
error_handler:
if (err == GRPC_ERROR_NONE) {