aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c338
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c365
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h24
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h51
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c5
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c17
-rw-r--r--src/core/lib/channel/compress_filter.c14
-rw-r--r--src/core/lib/channel/http_client_filter.c14
-rw-r--r--src/core/lib/surface/call.c44
-rw-r--r--src/core/lib/transport/byte_stream.c32
-rw-r--r--src/core/lib/transport/byte_stream.h21
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc9
12 files changed, 350 insertions, 584 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 29ed4bf90e..a7d047d6e7 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -44,7 +44,6 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
@@ -130,11 +129,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
-static void incoming_byte_stream_publish_error(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error);
-static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -180,9 +174,6 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -664,6 +655,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
reading */
+ gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@@ -673,11 +665,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
- grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
- grpc_slice_buffer_init(&s->frame_storage);
- s->pending_byte_stream = false;
- grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
- grpc_combiner_scheduler(t->combiner, false));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -695,6 +682,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_error *error) {
+ grpc_byte_stream *bs;
grpc_chttp2_stream *s = sp;
grpc_chttp2_transport *t = s->t;
@@ -705,9 +693,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
}
- grpc_slice_buffer_destroy_internal(exec_ctx,
- &s->unprocessed_incoming_frames_buffer);
- grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
+ while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) {
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ }
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -734,7 +722,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
- GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@@ -1188,9 +1175,8 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
s->fetching_send_message = NULL;
return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
- UINT32_MAX, &s->complete_fetch_locked)) {
- grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
- &s->fetching_slice);
+ &s->fetching_slice, UINT32_MAX,
+ &s->complete_fetch_locked)) {
add_fetched_slice_locked(exec_ctx, t, s);
}
}
@@ -1201,15 +1187,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) {
- error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
- &s->fetching_slice);
- if (error == GRPC_ERROR_NONE) {
- add_fetched_slice_locked(exec_ctx, t, s);
- continue_fetching_send_locked(exec_ctx, t, s);
- }
- }
-
- if (error != GRPC_ERROR_NONE) {
+ add_fetched_slice_locked(exec_ctx, t, s);
+ continue_fetching_send_locked(exec_ctx, t, s);
+ } else {
/* TODO(ctiller): what to do here */
abort();
}
@@ -1444,7 +1424,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->recv_message_ready == NULL);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
- if (s->id != 0 && s->frame_storage.length == 0) {
+ if (s->id != 0 &&
+ (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@@ -1633,13 +1614,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
+ grpc_byte_stream *bs;
if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) {
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
- if (!s->pending_byte_stream) {
- grpc_slice_buffer_reset_and_unref_internal(
- exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
+ NULL) {
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1652,65 +1633,39 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- grpc_error *error = GRPC_ERROR_NONE;
+ grpc_byte_stream *bs;
if (s->recv_message_ready != NULL) {
- *s->recv_message = NULL;
- if (s->final_metadata_requested && s->seen_error) {
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
- if (!s->pending_byte_stream) {
- grpc_slice_buffer_reset_and_unref_internal(
- exec_ctx, &s->unprocessed_incoming_frames_buffer);
- }
+ while (s->final_metadata_requested && s->seen_error &&
+ (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
+ NULL) {
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
- if (!s->pending_byte_stream) {
- while (s->unprocessed_incoming_frames_buffer.length > 0 ||
- s->frame_storage.length > 0) {
- if (s->unprocessed_incoming_frames_buffer.length == 0) {
- grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
- &s->frame_storage);
- }
- error = deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, s,
- &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
- if (error != GRPC_ERROR_NONE) {
- s->seen_error = true;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
- &s->frame_storage);
- grpc_slice_buffer_reset_and_unref_internal(
- exec_ctx, &s->unprocessed_incoming_frames_buffer);
- break;
- } else if (*s->recv_message != NULL) {
- break;
- }
- }
- }
- if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
+ if (s->incoming_frames.head != NULL) {
+ *s->recv_message =
+ grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
+ GPR_ASSERT(*s->recv_message != NULL);
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = NULL;
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
}
- GRPC_ERROR_UNREF(error);
}
}
void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
+ grpc_byte_stream *bs;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
s->write_closed) {
if (s->seen_error) {
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
- if (!s->pending_byte_stream) {
- grpc_slice_buffer_reset_and_unref_internal(
- exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
+ NULL) {
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
}
- bool pending_data = s->pending_byte_stream ||
- s->unprocessed_incoming_frames_buffer.length > 0;
- if (s->read_closed && s->frame_storage.length == 0 &&
- (!pending_data || s->seen_error) &&
+ if (s->all_incoming_byte_streams_finished &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@@ -1721,6 +1676,14 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
}
+static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
+ }
+}
+
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@@ -1729,19 +1692,10 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
}
- if (s->pending_byte_stream) {
- if (s->on_next != NULL) {
- grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
- if (error == GRPC_ERROR_NONE) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- }
- incoming_byte_stream_publish_error(exec_ctx, bs, error);
- incoming_byte_stream_unref(exec_ctx, bs);
- s->data_parser.parsing_frame = NULL;
- } else {
- GRPC_ERROR_UNREF(s->byte_stream_error);
- s->byte_stream_error = GRPC_ERROR_REF(error);
- }
+ if (s->data_parser.parsing_frame != NULL) {
+ grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
+ s->data_parser.parsing_frame = NULL;
}
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@@ -1927,6 +1881,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
}
}
+ decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -2464,28 +2419,12 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
-static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
-
- s->pending_byte_stream = false;
- if (error == GRPC_ERROR_NONE) {
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
- } else {
- GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
- s->on_next = NULL;
- GRPC_ERROR_UNREF(s->byte_stream_error);
- s->byte_stream_error = GRPC_ERROR_NONE;
- grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
- s->byte_stream_error = error;
- }
-}
-
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
+ GRPC_ERROR_UNREF(bs->error);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
+ gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2545,90 +2484,47 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = bs->transport;
grpc_chttp2_stream *s = bs->stream;
- size_t cur_length = s->frame_storage.length;
- incoming_byte_stream_update_flow_control(
- exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
-
- GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
- if (s->frame_storage.length > 0) {
- grpc_slice_buffer_swap(&s->frame_storage,
- &s->unprocessed_incoming_frames_buffer);
- grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
- } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(s->byte_stream_error));
- if (s->data_parser.parsing_frame != NULL) {
- incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
- s->data_parser.parsing_frame = NULL;
- }
- } else if (s->read_closed) {
- if (bs->remaining_bytes != 0) {
- s->byte_stream_error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(s->byte_stream_error));
- if (s->data_parser.parsing_frame != NULL) {
- incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
- s->data_parser.parsing_frame = NULL;
- }
- } else {
- /* Should never reach here. */
- GPR_ASSERT(false);
- }
+ if (bs->is_tail) {
+ gpr_mu_lock(&bs->slice_mu);
+ size_t cur_length = bs->slices.length;
+ gpr_mu_unlock(&bs->slice_mu);
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
+ }
+ gpr_mu_lock(&bs->slice_mu);
+ if (bs->slices.count > 0) {
+ *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
+ grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
+ } else if (bs->error != GRPC_ERROR_NONE) {
+ grpc_closure_run(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(bs->error));
} else {
- s->on_next = bs->next_action.on_complete;
+ bs->on_next = bs->next_action.on_complete;
+ bs->next = bs->next_action.slice;
}
+ gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
-static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- size_t max_size_hint,
- grpc_closure *on_complete) {
+static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice, size_t max_size_hint,
+ grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_chttp2_stream *s = bs->stream;
- if (s->unprocessed_incoming_frames_buffer.length > 0) {
- return true;
- } else {
- gpr_ref(&bs->refs);
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(
- &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
- GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_next", 0);
- return false;
- }
-}
-
-static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- grpc_slice *slice) {
- GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
- grpc_chttp2_incoming_byte_stream *bs =
- (grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_chttp2_stream *s = bs->stream;
-
- if (s->unprocessed_incoming_frames_buffer.length > 0) {
- grpc_error *error = deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
- slice, NULL);
- if (error != GRPC_ERROR_NONE) {
- return error;
- }
- } else {
- grpc_error *error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
- return error;
- }
- GPR_TIMER_END("incoming_byte_stream_pull", 0);
- return GRPC_ERROR_NONE;
+ gpr_ref(&bs->refs);
+ bs->next_action.slice = slice;
+ bs->next_action.max_size_hint = max_size_hint;
+ bs->next_action.on_complete = on_complete;
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
+ GPR_TIMER_END("incoming_byte_stream_next", 0);
+ return 0;
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2638,14 +2534,9 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
- grpc_chttp2_stream *s = bs->stream;
- grpc_chttp2_transport *t = s->t;
-
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
+ decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
incoming_byte_stream_unref(exec_ctx, bs);
- s->pending_byte_stream = false;
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2665,53 +2556,50 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
- grpc_chttp2_stream *s = bs->stream;
-
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
- s->on_next = NULL;
- GRPC_ERROR_UNREF(s->byte_stream_error);
- s->byte_stream_error = GRPC_ERROR_REF(error);
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
+ bs->on_next = NULL;
+ GRPC_ERROR_UNREF(bs->error);
grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
GRPC_ERROR_REF(error));
+ bs->error = error;
}
-grpc_error *grpc_chttp2_incoming_byte_stream_push(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_slice slice, grpc_slice *slice_out) {
- grpc_chttp2_stream *s = bs->stream;
-
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ grpc_slice slice) {
+ gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
- grpc_error *error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
-
- grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
- grpc_slice_unref_internal(exec_ctx, slice);
- return error;
+ incoming_byte_stream_publish_error(
+ exec_ctx, bs,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
} else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
- if (slice_out != NULL) {
- *slice_out = slice;
+ if (bs->on_next != NULL) {
+ *bs->next = slice;
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
+ bs->on_next = NULL;
+ } else {
+ grpc_slice_buffer_add(&bs->slices, slice);
}
- return GRPC_ERROR_NONE;
}
+ gpr_mu_unlock(&bs->slice_mu);
}
-grpc_error *grpc_chttp2_incoming_byte_stream_finished(
+void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error, bool reset_on_error) {
- grpc_chttp2_stream *s = bs->stream;
-
+ grpc_error *error) {
if (error == GRPC_ERROR_NONE) {
+ gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
+ gpr_mu_unlock(&bs->slice_mu);
}
- if (error != GRPC_ERROR_NONE && reset_on_error) {
- grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ if (error != GRPC_ERROR_NONE) {
+ incoming_byte_stream_publish_error(exec_ctx, bs, error);
}
incoming_byte_stream_unref(exec_ctx, bs);
- return error;
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2723,12 +2611,26 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
- incoming_byte_stream->base.pull = incoming_byte_stream_pull;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+ gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
+ incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
- s->byte_stream_error = GRPC_ERROR_NONE;
+ gpr_ref(&incoming_byte_stream->stream->active_streams);
+ grpc_slice_buffer_init(&incoming_byte_stream->slices);
+ incoming_byte_stream->on_next = NULL;
+ incoming_byte_stream->is_tail = 1;
+ incoming_byte_stream->error = GRPC_ERROR_NONE;
+ grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
+ if (q->head == NULL) {
+ q->head = incoming_byte_stream;
+ } else {
+ q->tail->is_tail = 0;
+ q->tail->next_message = incoming_byte_stream;
+ }
+ q->tail = incoming_byte_stream;
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
return incoming_byte_stream;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index 5d382d80a8..6e9258ee7e 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -40,7 +40,6 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
-#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport.h"
@@ -54,17 +53,16 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) {
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
if (parser->parsing_frame != NULL) {
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
+ grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, parser->parsing_frame,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"));
}
GRPC_ERROR_UNREF(parser->error);
}
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id,
- grpc_chttp2_stream *s) {
+ uint32_t stream_id) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
char *msg;
gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@@ -76,14 +74,47 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
}
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
- s->received_last_frame = true;
+ parser->is_last_frame = 1;
} else {
- s->received_last_frame = false;
+ parser->is_last_frame = 0;
}
return GRPC_ERROR_NONE;
}
+void grpc_chttp2_incoming_frame_queue_merge(
+ grpc_chttp2_incoming_frame_queue *head_dst,
+ grpc_chttp2_incoming_frame_queue *tail_src) {
+ if (tail_src->head == NULL) {
+ return;
+ }
+
+ if (head_dst->head == NULL) {
+ *head_dst = *tail_src;
+ memset(tail_src, 0, sizeof(*tail_src));
+ return;
+ }
+
+ head_dst->tail->next_message = tail_src->head;
+ head_dst->tail = tail_src->tail;
+ memset(tail_src, 0, sizeof(*tail_src));
+}
+
+grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
+ grpc_chttp2_incoming_frame_queue *q) {
+ grpc_byte_stream *out;
+ if (q->head == NULL) {
+ return NULL;
+ }
+ out = &q->head->base;
+ if (q->head == q->tail) {
+ memset(q, 0, sizeof(*q));
+ } else {
+ q->head = q->head->next_message;
+ }
+ return out;
+}
+
void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
uint32_t write_bytes, int is_eof,
grpc_transport_one_way_stats *stats,
@@ -112,217 +143,145 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
-grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_data_parser *p,
- grpc_chttp2_stream *s,
- grpc_slice_buffer *slices,
- grpc_slice *slice_out,
- grpc_byte_stream **stream_out) {
- grpc_error *error = GRPC_ERROR_NONE;
- grpc_chttp2_transport *t = s->t;
-
- while (slices->count > 0) {
- uint8_t *beg = NULL;
- uint8_t *end = NULL;
- uint8_t *cur = NULL;
-
- grpc_slice slice = grpc_slice_buffer_take_first(slices);
+static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *p,
+ grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_slice slice) {
+ uint8_t *const beg = GRPC_SLICE_START_PTR(slice);
+ uint8_t *const end = GRPC_SLICE_END_PTR(slice);
+ uint8_t *cur = beg;
+ uint32_t message_flags;
+ grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
+ char *msg;
- beg = GRPC_SLICE_START_PTR(slice);
- end = GRPC_SLICE_END_PTR(slice);
- cur = beg;
- uint32_t message_flags;
- char *msg;
-
- if (cur == end) {
- grpc_slice_unref_internal(exec_ctx, slice);
- continue;
- }
-
- switch (p->state) {
- case GRPC_CHTTP2_DATA_ERROR:
- p->state = GRPC_CHTTP2_DATA_ERROR;
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_REF(p->error);
- case GRPC_CHTTP2_DATA_FH_0:
- p->frame_type = *cur;
- switch (p->frame_type) {
- case 0:
- p->is_frame_compressed = false; /* GPR_FALSE */
- break;
- case 1:
- p->is_frame_compressed = true; /* GPR_TRUE */
- break;
- default:
- gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
- p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
- (intptr_t)s->id);
- gpr_free(msg);
- msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
- grpc_slice_from_copied_string(msg));
- gpr_free(msg);
- p->error =
- grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
- p->state = GRPC_CHTTP2_DATA_ERROR;
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_REF(p->error);
- }
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_1;
- grpc_slice_unref_internal(exec_ctx, slice);
- continue;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_1:
- p->frame_size = ((uint32_t)*cur) << 24;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_2;
- grpc_slice_unref_internal(exec_ctx, slice);
- continue;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_2:
- p->frame_size |= ((uint32_t)*cur) << 16;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_3;
- grpc_slice_unref_internal(exec_ctx, slice);
- continue;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_3:
- p->frame_size |= ((uint32_t)*cur) << 8;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_4;
- grpc_slice_unref_internal(exec_ctx, slice);
- continue;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_4:
- GPR_ASSERT(stream_out != NULL);
- GPR_ASSERT(p->parsing_frame == NULL);
- p->frame_size |= ((uint32_t)*cur);
- p->state = GRPC_CHTTP2_DATA_FRAME;
- ++cur;
- message_flags = 0;
- if (p->is_frame_compressed) {
- message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
- }
- p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
- exec_ctx, t, s, p->frame_size, message_flags);
- *stream_out = &p->parsing_frame->base;
- if (p->parsing_frame->remaining_bytes == 0) {
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true));
- p->parsing_frame = NULL;
- p->state = GRPC_CHTTP2_DATA_FH_0;
- }
- s->pending_byte_stream = true;
+ if (cur == end) {
+ return GRPC_ERROR_NONE;
+ }
- if (cur != end) {
- grpc_slice_buffer_undo_take_first(
- &s->unprocessed_incoming_frames_buffer,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- }
- grpc_slice_unref_internal(exec_ctx, slice);
+ switch (p->state) {
+ case GRPC_CHTTP2_DATA_ERROR:
+ p->state = GRPC_CHTTP2_DATA_ERROR;
+ return GRPC_ERROR_REF(p->error);
+ fh_0:
+ case GRPC_CHTTP2_DATA_FH_0:
+ s->stats.incoming.framing_bytes++;
+ p->frame_type = *cur;
+ switch (p->frame_type) {
+ case 0:
+ p->is_frame_compressed = 0; /* GPR_FALSE */
+ break;
+ case 1:
+ p->is_frame_compressed = 1; /* GPR_TRUE */
+ break;
+ default:
+ gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
+ p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
+ (intptr_t)s->id);
+ gpr_free(msg);
+ msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
+ grpc_slice_from_copied_string(msg));
+ gpr_free(msg);
+ p->error =
+ grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
+ p->state = GRPC_CHTTP2_DATA_ERROR;
+ return GRPC_ERROR_REF(p->error);
+ }
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_1;
+ return GRPC_ERROR_NONE;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_1:
+ s->stats.incoming.framing_bytes++;
+ p->frame_size = ((uint32_t)*cur) << 24;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_2;
+ return GRPC_ERROR_NONE;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_2:
+ s->stats.incoming.framing_bytes++;
+ p->frame_size |= ((uint32_t)*cur) << 16;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_3;
+ return GRPC_ERROR_NONE;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_3:
+ s->stats.incoming.framing_bytes++;
+ p->frame_size |= ((uint32_t)*cur) << 8;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_4;
+ return GRPC_ERROR_NONE;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_4:
+ s->stats.incoming.framing_bytes++;
+ p->frame_size |= ((uint32_t)*cur);
+ p->state = GRPC_CHTTP2_DATA_FRAME;
+ ++cur;
+ message_flags = 0;
+ if (p->is_frame_compressed) {
+ message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
+ p->parsing_frame = incoming_byte_stream =
+ grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size,
+ message_flags);
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FRAME:
+ if (cur == end) {
+ return GRPC_ERROR_NONE;
+ }
+ uint32_t remaining = (uint32_t)(end - cur);
+ if (remaining == p->frame_size) {
+ s->stats.incoming.data_bytes += p->frame_size;
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
+ GRPC_ERROR_NONE);
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ return GRPC_ERROR_NONE;
+ } else if (remaining > p->frame_size) {
+ s->stats.incoming.data_bytes += p->frame_size;
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(cur + p->frame_size - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
+ GRPC_ERROR_NONE);
+ p->parsing_frame = NULL;
+ cur += p->frame_size;
+ goto fh_0; /* loop */
+ } else {
+ GPR_ASSERT(remaining <= p->frame_size);
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ p->frame_size -= remaining;
+ s->stats.incoming.data_bytes += remaining;
return GRPC_ERROR_NONE;
- case GRPC_CHTTP2_DATA_FRAME: {
- GPR_ASSERT(p->parsing_frame != NULL);
- GPR_ASSERT(slice_out != NULL);
- if (cur == end) {
- grpc_slice_unref_internal(exec_ctx, slice);
- continue;
- }
- uint32_t remaining = (uint32_t)(end - cur);
- if (remaining == p->frame_size) {
- if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(end - beg)),
- slice_out))) {
- grpc_slice_unref_internal(exec_ctx, slice);
- return error;
- }
- if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
- grpc_slice_unref_internal(exec_ctx, slice);
- return error;
- }
- p->parsing_frame = NULL;
- p->state = GRPC_CHTTP2_DATA_FH_0;
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_NONE;
- } else if (remaining < p->frame_size) {
- if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(end - beg)),
- slice_out))) {
- return error;
- }
- p->frame_size -= remaining;
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_NONE;
- } else {
- GPR_ASSERT(remaining > p->frame_size);
- if (GRPC_ERROR_NONE !=
- (grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(cur + p->frame_size - beg)),
- slice_out))) {
- grpc_slice_unref_internal(exec_ctx, slice);
- return error;
- }
- if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
- grpc_slice_unref_internal(exec_ctx, slice);
- return error;
- }
- p->parsing_frame = NULL;
- p->state = GRPC_CHTTP2_DATA_FH_0;
- cur += p->frame_size;
- grpc_slice_buffer_undo_take_first(
- &s->unprocessed_incoming_frames_buffer,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- grpc_slice_unref_internal(exec_ctx, slice);
- return GRPC_ERROR_NONE;
- }
}
- }
}
- return GRPC_ERROR_NONE;
+ GPR_UNREACHABLE_CODE(
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
}
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
- /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
- s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
- if (!s->pending_byte_stream) {
- grpc_slice_ref_internal(slice);
- grpc_slice_buffer_add(&s->frame_storage, slice);
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
- } else if (s->on_next) {
- GPR_ASSERT(s->frame_storage.length == 0);
- grpc_slice_ref_internal(slice);
- grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
- grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
- s->on_next = NULL;
- } else {
- grpc_slice_ref_internal(slice);
- grpc_slice_buffer_add(&s->frame_storage, slice);
- }
+ grpc_chttp2_data_parser *p = parser;
+ grpc_error *error = parse_inner(exec_ctx, p, t, s, slice);
- if (is_last && s->received_last_frame) {
+ if (is_last && p->is_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
- return GRPC_ERROR_NONE;
+ return error;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index 2fb8983c38..264ad14608 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -56,16 +56,28 @@ typedef enum {
typedef struct grpc_chttp2_incoming_byte_stream
grpc_chttp2_incoming_byte_stream;
+typedef struct grpc_chttp2_incoming_frame_queue {
+ grpc_chttp2_incoming_byte_stream *head;
+ grpc_chttp2_incoming_byte_stream *tail;
+} grpc_chttp2_incoming_frame_queue;
+
typedef struct {
grpc_chttp2_stream_state state;
+ uint8_t is_last_frame;
uint8_t frame_type;
uint32_t frame_size;
grpc_error *error;
- bool is_frame_compressed;
+ int is_frame_compressed;
grpc_chttp2_incoming_byte_stream *parsing_frame;
} grpc_chttp2_data_parser;
+void grpc_chttp2_incoming_frame_queue_merge(
+ grpc_chttp2_incoming_frame_queue *head_dst,
+ grpc_chttp2_incoming_frame_queue *tail_src);
+grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
+ grpc_chttp2_incoming_frame_queue *q);
+
/* initialize per-stream state for data frame parsing */
grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser);
@@ -75,8 +87,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
/* start processing a new data frame */
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id,
- grpc_chttp2_stream *s);
+ uint32_t stream_id);
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
@@ -90,11 +101,4 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
grpc_transport_one_way_stats *stats,
grpc_slice_buffer *outbuf);
-grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_data_parser *p,
- grpc_chttp2_stream *s,
- grpc_slice_buffer *slices,
- grpc_slice *slice_out,
- grpc_byte_stream **stream_out);
-
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index a10e3886ea..6eb848b8d7 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -195,20 +195,22 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
+ struct grpc_chttp2_incoming_byte_stream *next_message;
+ grpc_error *error;
- grpc_chttp2_transport *transport; /* immutable */
- grpc_chttp2_stream *stream; /* immutable */
+ grpc_chttp2_transport *transport;
+ grpc_chttp2_stream *stream;
+ bool is_tail;
- /* Accessed only by transport thread when stream->pending_byte_stream == false
- * Accessed only by application thread when stream->pending_byte_stream ==
- * true */
+ gpr_mu slice_mu; // protects slices, on_next
+ grpc_slice_buffer slices;
+ grpc_closure *on_next;
+ grpc_slice *next;
uint32_t remaining_bytes;
- /* Accessed only by transport thread when stream->pending_byte_stream == false
- * Accessed only by application thread when stream->pending_byte_stream ==
- * true */
struct {
grpc_closure closure;
+ grpc_slice *slice;
size_t max_size_hint;
grpc_closure *on_complete;
} next_action;
@@ -443,8 +445,8 @@ struct grpc_chttp2_stream {
uint32_t id;
/** window available for us to send to peer, over or under the initial window
- * size of the transport... ie:
- * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+ * size of the transport... ie:
+ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
@@ -471,6 +473,9 @@ struct grpc_chttp2_stream {
grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats;
+ /** number of streams that are currently being read */
+ gpr_refcount active_streams;
+
/** Is this stream closed for writing. */
bool write_closed;
/** Is this stream reading half-closed. */
@@ -494,17 +499,7 @@ struct grpc_chttp2_stream {
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
- grpc_slice_buffer frame_storage; /* protected by t combiner */
-
- /* Accessed only by transport thread when stream->pending_byte_stream == false
- * Accessed only by application thread when stream->pending_byte_stream ==
- * true */
- grpc_slice_buffer unprocessed_incoming_frames_buffer;
- grpc_closure *on_next; /* protected by t combiner */
- bool pending_byte_stream; /* protected by t combiner */
- grpc_closure reset_byte_stream;
- grpc_error *byte_stream_error; /* protected by t combiner */
- bool received_last_frame; /* protected by t combiner */
+ grpc_chttp2_incoming_frame_queue incoming_frames;
gpr_timespec deadline;
@@ -517,9 +512,6 @@ struct grpc_chttp2_stream {
* incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta;
/** parsing state for data frames */
- /* Accessed only by transport thread when stream->pending_byte_stream == false
- * Accessed only by application thread when stream->pending_byte_stream ==
- * true */
grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes;
@@ -798,13 +790,10 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags);
-grpc_error *grpc_chttp2_incoming_byte_stream_push(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_slice slice, grpc_slice *slice_out);
-grpc_error *grpc_chttp2_incoming_byte_stream_finished(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error, bool reset_on_error);
-void grpc_chttp2_incoming_byte_stream_notify(
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ grpc_slice slice);
+void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 638b137316..7e457ced27 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -458,13 +458,12 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
return init_skip_frame_parser(exec_ctx, t, 0);
}
if (err == GRPC_ERROR_NONE) {
- err = grpc_chttp2_data_parser_begin_frame(
- &s->data_parser, t->incoming_frame_flags, s->id, s);
+ err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
+ t->incoming_frame_flags, s->id);
}
error_handler:
if (err == GRPC_ERROR_NONE) {
t->incoming_stream = s;
- /* t->parser = grpc_chttp2_data_parser_parse;*/
t->parser = grpc_chttp2_data_parser_parse;
t->parser_data = &s->data_parser;
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 7896c70f9e..88335ecd0b 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -973,20 +973,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- if (1 != grpc_byte_stream_next(
- exec_ctx, stream_op->payload->send_message.send_message,
- stream_op->payload->send_message.send_message->length,
- NULL)) {
- /* Should never reach here */
- GPR_ASSERT(false);
- }
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- stream_op->payload->send_message.send_message,
- &slice)) {
- /* Should never reach here */
- GPR_ASSERT(false);
- }
+ grpc_byte_stream_next(
+ NULL, stream_op->payload->send_message.send_message, &slice,
+ stream_op->payload->send_message.send_message->length, NULL);
grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 764524b24d..4625cba0d2 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -221,13 +221,6 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
@@ -240,11 +233,8 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
+ exec_ctx, calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 151fb9885d..4e47c5c658 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -220,11 +220,8 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes;
while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
+ exec_ctx, calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice));
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@@ -240,13 +237,6 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
calld->send_message_blocked = false;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 3e96d09798..97d50a91be 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1187,7 +1187,6 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
- grpc_error *error;
grpc_call *call = bctl->call;
for (;;) {
size_t remaining = call->receiving_stream->length -
@@ -1199,22 +1198,11 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
finish_batch_step(exec_ctx, bctl);
return;
}
- if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
+ if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
+ &call->receiving_slice, remaining,
&call->receiving_slice_ready)) {
- error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
- &call->receiving_slice);
- if (error == GRPC_ERROR_NONE) {
- grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
- } else {
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
- call->receiving_stream = NULL;
- grpc_byte_buffer_destroy(*call->receiving_buffer);
- *call->receiving_buffer = NULL;
- call->receiving_message = 0;
- finish_batch_step(exec_ctx, bctl);
- return;
- }
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ call->receiving_slice);
} else {
return;
}
@@ -1225,24 +1213,12 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- grpc_byte_stream *bs = call->receiving_stream;
- bool release_error = false;
if (error == GRPC_ERROR_NONE) {
- grpc_slice slice;
- error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
- if (error == GRPC_ERROR_NONE) {
- grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- slice);
- continue_receiving_slices(exec_ctx, bctl);
- } else {
- /* Error returned by grpc_byte_stream_pull needs to be released manually
- */
- release_error = true;
- }
- }
-
- if (error != GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ call->receiving_slice);
+ continue_receiving_slices(exec_ctx, bctl);
+ } else {
if (grpc_trace_operation_failures) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
@@ -1250,11 +1226,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL;
- call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl);
- if (release_error) {
- GRPC_ERROR_UNREF(error);
- }
}
}
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index 5800c70ef4..4d4206189e 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -40,15 +40,10 @@
#include "src/core/lib/slice/slice_internal.h"
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, size_t max_size_hint,
- grpc_closure *on_complete) {
- return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
-}
-
-grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- grpc_slice *slice) {
- return byte_stream->pull(exec_ctx, byte_stream, slice);
+ grpc_byte_stream *byte_stream, grpc_slice *slice,
+ size_t max_size_hint, grpc_closure *on_complete) {
+ return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint,
+ on_complete);
}
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -58,24 +53,16 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
/* slice_buffer_stream */
-static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- size_t max_size_hint,
- grpc_closure *on_complete) {
- grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
- GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
- return true;
-}
-
-static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- grpc_slice *slice) {
+static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice, size_t max_size_hint,
+ grpc_closure *on_complete) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
stream->cursor++;
- return GRPC_ERROR_NONE;
+ return 1;
}
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -88,7 +75,6 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags;
stream->base.next = slice_buffer_stream_next;
- stream->base.pull = slice_buffer_stream_pull;
stream->base.destroy = slice_buffer_stream_destroy;
stream->backing_buffer = slice_buffer;
stream->cursor = 0;
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 381c65fb04..1fdd5b4d77 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -49,10 +49,9 @@ typedef struct grpc_byte_stream grpc_byte_stream;
struct grpc_byte_stream {
uint32_t length;
uint32_t flags;
- bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
- size_t max_size_hint, grpc_closure *on_complete);
- grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
- grpc_slice *slice);
+ int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+ grpc_slice *slice, size_t max_size_hint,
+ grpc_closure *on_complete);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
};
@@ -62,20 +61,12 @@ struct grpc_byte_stream {
*
* max_size_hint can be set as a hint as to the maximum number
* of bytes that would be acceptable to read.
- */
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, size_t max_size_hint,
- grpc_closure *on_complete);
-
-/* returns the next slice in the byte stream when it is ready (indicated by
- * either grpc_byte_stream_next returning 1 or on_complete passed to
- * grpc_byte_stream_next is called).
*
* once a slice is returned into *slice, it is owned by the caller.
*/
-grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- grpc_slice *slice);
+int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, grpc_slice *slice,
+ size_t max_size_hint, grpc_closure *on_complete);
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 8c5413b5fd..c89f349ca7 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -569,17 +569,12 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE);
return;
}
- } while (grpc_byte_stream_next(exec_ctx, recv_stream,
+ } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice,
recv_stream->length - received,
- drain_continue.get()) &&
- GRPC_ERROR_NONE ==
- grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) &&
- (received += GRPC_SLICE_LENGTH(recv_slice),
- grpc_slice_unref_internal(exec_ctx, recv_slice), true));
+ drain_continue.get()));
});
drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
- grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(exec_ctx, recv_slice);
grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);