From 5a6e2416ed4938afd26ed756af6ea57c2288b8ed Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 24 Feb 2017 14:52:28 -0800 Subject: Lazy deframe pass cronet end2end tests --- src/core/lib/transport/byte_stream.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'src/core/lib/transport/byte_stream.c') diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 4d4206189e..afebf52cd5 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -46,6 +46,11 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, on_complete); } +grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, grpc_slice *slice) { + return byte_stream->pull(exec_ctx, byte_stream, slice); +} + void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { byte_stream->destroy(exec_ctx, byte_stream); -- cgit v1.2.3 From 31ed2d9effd77029b879a139f4dd9e8ab166a1f5 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Sun, 26 Feb 2017 22:10:07 -0800 Subject: clang-format --- .../transport/chttp2/transport/chttp2_transport.c | 24 ++++++++++------------ .../ext/transport/chttp2/transport/frame_data.c | 12 ++++------- src/core/ext/transport/chttp2/transport/internal.h | 6 ++++-- src/core/lib/transport/byte_stream.c | 3 ++- src/core/lib/transport/byte_stream.h | 7 ++++--- 5 files changed, 25 insertions(+), 27 deletions(-) (limited to 'src/core/lib/transport/byte_stream.c') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index b03c79315b..b5e087d8f7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2214,10 +2214,8 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static grpc_error *deframe_unprocessed_incoming_frames( grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_slice_buffer *slices, grpc_slice *slice_out, - bool partial_deframe) { - + grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice_buffer *slices, + grpc_slice *slice_out, bool partial_deframe) { bool slice_set = false; while (slices->count > 0) { uint8_t *beg = NULL; @@ -2247,8 +2245,9 @@ static grpc_error *deframe_unprocessed_incoming_frames( GPR_ASSERT(s->incoming_frames == NULL); if (s->incoming_frames != NULL) { s->stats.incoming.framing_bytes += (size_t)(end - cur); - grpc_slice_buffer_undo_take_first(&s->unprocessed_incoming_frames_buffer, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_slice_buffer_undo_take_first( + &s->unprocessed_incoming_frames_buffer, + grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); return GRPC_ERROR_NONE; } p->frame_type = *cur; @@ -2314,9 +2313,8 @@ static grpc_error *deframe_unprocessed_incoming_frames( message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } GPR_ASSERT(s->incoming_frames == NULL); - p->parsing_frame = - grpc_chttp2_incoming_byte_stream_create( - exec_ctx, t, s, p->frame_size, message_flags); + p->parsing_frame = grpc_chttp2_incoming_byte_stream_create( + exec_ctx, t, s, p->frame_size, message_flags); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: { if (slice_set) { @@ -2475,9 +2473,8 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&s->buffer_mu); if (s->unprocessed_incoming_frames_buffer.length > 0) { grpc_error *error = deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, t, s, - &s->unprocessed_incoming_frames_buffer, - slice, false); + exec_ctx, &s->data_parser, t, s, &s->unprocessed_incoming_frames_buffer, + slice, false); if (error != GRPC_ERROR_NONE) { gpr_mu_unlock(&s->buffer_mu); return error; @@ -2549,7 +2546,8 @@ static void incoming_byte_stream_publish_error( void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice, grpc_slice *slice_out) { + grpc_slice slice, + grpc_slice *slice_out) { if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { incoming_byte_stream_publish_error( exec_ctx, bs, GRPC_ERROR_CREATE("Too many bytes in stream")); diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index e889e3527b..2c12b87141 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -141,10 +141,9 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, stats->data_bytes += write_bytes; } -static void grpc_chttp2_unprocessed_frames_buffer_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_data_parser *p, - grpc_chttp2_stream *s, - grpc_slice slice) { +static void grpc_chttp2_unprocessed_frames_buffer_push( + grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s, + grpc_slice slice) { grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); if (p->parsing_frame) { grpc_chttp2_incoming_byte_stream *bs = p->parsing_frame; @@ -179,10 +178,7 @@ grpc_error *parse_inner_buffer(grpc_exec_ctx *exec_ctx, if (s->unprocessed_incoming_frames_buffer.count > 0) { s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice); grpc_slice_ref(slice); - grpc_chttp2_unprocessed_frames_buffer_push(exec_ctx, - p, - s, - slice); + grpc_chttp2_unprocessed_frames_buffer_push(exec_ctx, p, s, slice); gpr_mu_unlock(&s->buffer_mu); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d0f96efc6..3d6253cd6f 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -490,7 +490,8 @@ struct grpc_chttp2_stream { grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; grpc_chttp2_incoming_byte_stream *incoming_frames; - gpr_mu buffer_mu; /* protects unprocessed_incoming_frames_buffer and parse_data */ + gpr_mu buffer_mu; /* protects unprocessed_incoming_frames_buffer and + parse_data */ grpc_slice_buffer unprocessed_incoming_frames_buffer; gpr_timespec deadline; @@ -784,7 +785,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( uint32_t frame_size, uint32_t flags); void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice, grpc_slice *slice_out); + grpc_slice slice, + grpc_slice *slice_out); void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error); diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index afebf52cd5..3a50694670 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -47,7 +47,8 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, } grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice) { + grpc_byte_stream *byte_stream, + grpc_slice *slice) { return byte_stream->pull(exec_ctx, byte_stream, slice); } diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 582480bb88..6afba92c52 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -52,8 +52,8 @@ struct grpc_byte_stream { int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice, size_t max_size_hint, grpc_closure *on_complete); - grpc_error* (*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - grpc_slice *slice); + grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + grpc_slice *slice); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); }; @@ -71,7 +71,8 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, size_t max_size_hint, grpc_closure *on_complete); grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice); + grpc_byte_stream *byte_stream, + grpc_slice *slice); void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); -- cgit v1.2.3 From 1bfcc40fd32357018cb7bfd729e069018abb3689 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 21 Mar 2017 11:51:40 -0700 Subject: Clean up byte_stream_next interface usage --- .../transport/chttp2/transport/chttp2_transport.c | 21 +++++++++++++-------- src/core/ext/transport/chttp2/transport/internal.h | 2 -- .../transport/cronet/transport/cronet_transport.c | 12 ++++++++++-- src/core/lib/channel/compress_filter.c | 11 +++++++++-- src/core/lib/channel/http_client_filter.c | 11 +++++++++-- src/core/lib/surface/call.c | 5 +++-- src/core/lib/transport/byte_stream.c | 20 ++++++++++++++------ src/core/lib/transport/byte_stream.h | 15 +++++++++------ 8 files changed, 67 insertions(+), 30 deletions(-) (limited to 'src/core/lib/transport/byte_stream.c') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index a8962d305d..95f20725f3 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1101,8 +1101,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, s->fetching_send_message = NULL; return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - &s->fetching_slice, UINT32_MAX, - &s->complete_fetch_locked)) { + UINT32_MAX, &s->complete_fetch_locked)) { + grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, + &s->fetching_slice); add_fetched_slice_locked(exec_ctx, t, s); } } @@ -1113,9 +1114,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_chttp2_stream *s = gs; grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { - add_fetched_slice_locked(exec_ctx, t, s); - continue_fetching_send_locked(exec_ctx, t, s); - } else { + error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, + &s->fetching_slice); + if (error == GRPC_ERROR_NONE) { + add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s); + } + } + + if (error != GRPC_ERROR_NONE) { /* TODO(ctiller): what to do here */ abort(); } @@ -2530,7 +2537,6 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } } else { bs->on_next = bs->next_action.on_complete; - bs->next = bs->next_action.slice; } gpr_mu_unlock(&bs->slice_mu); gpr_mu_unlock(&s->buffer_mu); @@ -2570,13 +2576,12 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, + size_t max_size_hint, grpc_closure *on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; gpr_ref(&bs->refs); - bs->next_action.slice = slice; bs->next_action.max_size_hint = max_size_hint; bs->next_action.on_complete = on_complete; grpc_closure_sched( diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 54bfd42357..adbd48c581 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -197,12 +197,10 @@ struct grpc_chttp2_incoming_byte_stream { gpr_mu slice_mu; // protects slices, on_next grpc_slice_buffer slices; grpc_closure *on_next; - grpc_slice *next; uint32_t remaining_bytes; struct { grpc_closure closure; - grpc_slice *slice; size_t max_size_hint; grpc_closure *on_complete; } next_action; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 01a03533da..da180e5144 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -908,8 +908,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - grpc_byte_stream_next(NULL, stream_op->send_message, &slice, - stream_op->send_message->length, NULL); + if (1 != grpc_byte_stream_next(exec_ctx, stream_op->send_message, + stream_op->send_message->length, NULL)) { + /* Should never reach here */ + GPR_ASSERT(false); + } + if (GRPC_ERROR_NONE != + grpc_byte_stream_pull(exec_ctx, stream_op->send_message, &slice)) { + /* Should never reach here */ + GPR_ASSERT(false); + } /* Check that compression flag is OFF. We don't support compression yet. */ if (stream_op->send_message->flags != 0) { diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index aa41014a21..53d3b86f45 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -220,6 +220,12 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; + if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx, + calld->send_op->send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); @@ -232,8 +238,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message, - &calld->incoming_slice, ~(size_t)0, - &calld->got_slice)) { + ~(size_t)0, &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, calld->send_op->send_message, + &calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index c031533dd8..af2451e1eb 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -220,8 +220,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; uint8_t *wrptr = calld->payload_bytes; while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, - &calld->incoming_slice, ~(size_t)0, - &calld->got_slice)) { + ~(size_t)0, &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, calld->send_op.send_message, + &calld->incoming_slice); memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), GRPC_SLICE_LENGTH(calld->incoming_slice)); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); @@ -237,6 +238,12 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; calld->send_message_blocked = false; + if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx, + calld->send_op.send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { /* Pass down the original send_message op that was blocked.*/ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 4471ada106..5dac90c60c 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1162,9 +1162,10 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, finish_batch_step(exec_ctx, bctl); return; } - if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, - &call->receiving_slice, remaining, + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, &call->receiving_slice_ready)) { + grpc_byte_stream_pull(exec_ctx, call->receiving_stream, + &call->receiving_slice); grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); } else { diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 3a50694670..79801c4b46 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -40,10 +40,9 @@ #include "src/core/lib/slice/slice_internal.h" int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice, - size_t max_size_hint, grpc_closure *on_complete) { - return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint, - on_complete); + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete) { + return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); } grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, @@ -61,14 +60,22 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, + size_t max_size_hint, grpc_closure *on_complete) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; GPR_ASSERT(stream->cursor < stream->backing_buffer->count); + return 1; +} + +static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GPR_ASSERT(stream->cursor < stream->backing_buffer->count); *slice = grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); stream->cursor++; - return 1; + return GRPC_ERROR_NONE; } static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -81,6 +88,7 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, stream->base.length = (uint32_t)slice_buffer->length; stream->base.flags = flags; stream->base.next = slice_buffer_stream_next; + stream->base.pull = slice_buffer_stream_pull; stream->base.destroy = slice_buffer_stream_destroy; stream->backing_buffer = slice_buffer; stream->cursor = 0; diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 6afba92c52..800e2341f9 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -50,8 +50,7 @@ struct grpc_byte_stream { uint32_t length; uint32_t flags; int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, - grpc_closure *on_complete); + size_t max_size_hint, grpc_closure *on_complete); grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); @@ -63,13 +62,17 @@ struct grpc_byte_stream { * * max_size_hint can be set as a hint as to the maximum number * of bytes that would be acceptable to read. - * - * once a slice is returned into *slice, it is owned by the caller. */ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice, - size_t max_size_hint, grpc_closure *on_complete); + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete); +/* returns the next slice in the byte stream when it is ready (indicated by + * either grpc_byte_stream_next returning 1 or on_complete passed to + * grpc_byte_stream_next is called). + * + * once a slice is returned into *slice, it is owned by the caller. + */ grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice); -- cgit v1.2.3 From 02646c3f62d6e5b07b62ffbdacfb64ae7946b721 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 10 Apr 2017 18:28:23 -0700 Subject: int -> bool --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 12 ++++++------ src/core/lib/transport/byte_stream.c | 10 +++++----- src/core/lib/transport/byte_stream.h | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) (limited to 'src/core/lib/transport/byte_stream.c') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 093370d11a..a0bce1e077 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2536,16 +2536,16 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, incoming_byte_stream_unref(exec_ctx, bs); } -static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - size_t max_size_hint, - grpc_closure *on_complete) { +static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_stream *s = bs->stream; if (s->unprocessed_incoming_frames_buffer.length > 0) { - return 1; + return true; } else { gpr_ref(&bs->refs); bs->next_action.max_size_hint = max_size_hint; @@ -2557,7 +2557,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_combiner_scheduler(bs->transport->combiner, false)), GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_next", 0); - return 0; + return false; } } diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 79801c4b46..5800c70ef4 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -58,13 +58,13 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, /* slice_buffer_stream */ -static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - size_t max_size_hint, - grpc_closure *on_complete) { +static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; GPR_ASSERT(stream->cursor < stream->backing_buffer->count); - return 1; + return true; } static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 800e2341f9..381c65fb04 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -49,8 +49,8 @@ typedef struct grpc_byte_stream grpc_byte_stream; struct grpc_byte_stream { uint32_t length; uint32_t flags; - int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - size_t max_size_hint, grpc_closure *on_complete); + bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + size_t max_size_hint, grpc_closure *on_complete); grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); -- cgit v1.2.3