aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-03-21 11:51:40 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-03-21 12:11:06 -0700
commit1bfcc40fd32357018cb7bfd729e069018abb3689 (patch)
tree79e628ed99f6be00d92d5522cb41fffc6c756460 /src/core
parent1f9fd13a15f97d6e3d1a93d942d3a44bd9526471 (diff)
Clean up byte_stream_next interface usage
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c21
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c12
-rw-r--r--src/core/lib/channel/compress_filter.c11
-rw-r--r--src/core/lib/channel/http_client_filter.c11
-rw-r--r--src/core/lib/surface/call.c5
-rw-r--r--src/core/lib/transport/byte_stream.c20
-rw-r--r--src/core/lib/transport/byte_stream.h15
8 files changed, 67 insertions, 30 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index a8962d305d..95f20725f3 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1101,8 +1101,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
s->fetching_send_message = NULL;
return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
- &s->fetching_slice, UINT32_MAX,
- &s->complete_fetch_locked)) {
+ UINT32_MAX, &s->complete_fetch_locked)) {
+ grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice);
add_fetched_slice_locked(exec_ctx, t, s);
}
}
@@ -1113,9 +1114,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) {
- add_fetched_slice_locked(exec_ctx, t, s);
- continue_fetching_send_locked(exec_ctx, t, s);
- } else {
+ error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice);
+ if (error == GRPC_ERROR_NONE) {
+ add_fetched_slice_locked(exec_ctx, t, s);
+ continue_fetching_send_locked(exec_ctx, t, s);
+ }
+ }
+
+ if (error != GRPC_ERROR_NONE) {
/* TODO(ctiller): what to do here */
abort();
}
@@ -2530,7 +2537,6 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
}
} else {
bs->on_next = bs->next_action.on_complete;
- bs->next = bs->next_action.slice;
}
gpr_mu_unlock(&bs->slice_mu);
gpr_mu_unlock(&s->buffer_mu);
@@ -2570,13 +2576,12 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
- grpc_slice *slice, size_t max_size_hint,
+ size_t max_size_hint,
grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
gpr_ref(&bs->refs);
- bs->next_action.slice = slice;
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
grpc_closure_sched(
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 54bfd42357..adbd48c581 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -197,12 +197,10 @@ struct grpc_chttp2_incoming_byte_stream {
gpr_mu slice_mu; // protects slices, on_next
grpc_slice_buffer slices;
grpc_closure *on_next;
- grpc_slice *next;
uint32_t remaining_bytes;
struct {
grpc_closure closure;
- grpc_slice *slice;
size_t max_size_hint;
grpc_closure *on_complete;
} next_action;
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 01a03533da..da180e5144 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -908,8 +908,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
- stream_op->send_message->length, NULL);
+ if (1 != grpc_byte_stream_next(exec_ctx, stream_op->send_message,
+ stream_op->send_message->length, NULL)) {
+ /* Should never reach here */
+ GPR_ASSERT(false);
+ }
+ if (GRPC_ERROR_NONE !=
+ grpc_byte_stream_pull(exec_ctx, stream_op->send_message, &slice)) {
+ /* Should never reach here */
+ GPR_ASSERT(false);
+ }
/* Check that compression flag is OFF. We don't support compression yet.
*/
if (stream_op->send_message->flags != 0) {
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index aa41014a21..53d3b86f45 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -220,6 +220,12 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
+ if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
@@ -232,8 +238,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
- &calld->incoming_slice, ~(size_t)0,
- &calld->got_slice)) {
+ ~(size_t)0, &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx, calld->send_op->send_message,
+ &calld->incoming_slice);
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index c031533dd8..af2451e1eb 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -220,8 +220,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes;
while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
- &calld->incoming_slice, ~(size_t)0,
- &calld->got_slice)) {
+ ~(size_t)0, &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx, calld->send_op.send_message,
+ &calld->incoming_slice);
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice));
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@@ -237,6 +238,12 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
calld->send_message_blocked = false;
+ if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx,
+ calld->send_op.send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 4471ada106..5dac90c60c 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1162,9 +1162,10 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
finish_batch_step(exec_ctx, bctl);
return;
}
- if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
- &call->receiving_slice, remaining,
+ if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
&call->receiving_slice_ready)) {
+ grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
+ &call->receiving_slice);
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index 3a50694670..79801c4b46 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -40,10 +40,9 @@
#include "src/core/lib/slice/slice_internal.h"
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, grpc_slice *slice,
- size_t max_size_hint, grpc_closure *on_complete) {
- return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint,
- on_complete);
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete) {
+ return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
}
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
@@ -61,14 +60,22 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
- grpc_slice *slice, size_t max_size_hint,
+ size_t max_size_hint,
grpc_closure *on_complete) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
+ return 1;
+}
+
+static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice) {
+ grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
stream->cursor++;
- return 1;
+ return GRPC_ERROR_NONE;
}
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -81,6 +88,7 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags;
stream->base.next = slice_buffer_stream_next;
+ stream->base.pull = slice_buffer_stream_pull;
stream->base.destroy = slice_buffer_stream_destroy;
stream->backing_buffer = slice_buffer;
stream->cursor = 0;
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 6afba92c52..800e2341f9 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -50,8 +50,7 @@ struct grpc_byte_stream {
uint32_t length;
uint32_t flags;
int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
- grpc_slice *slice, size_t max_size_hint,
- grpc_closure *on_complete);
+ size_t max_size_hint, grpc_closure *on_complete);
grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_slice *slice);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
@@ -63,13 +62,17 @@ struct grpc_byte_stream {
*
* max_size_hint can be set as a hint as to the maximum number
* of bytes that would be acceptable to read.
- *
- * once a slice is returned into *slice, it is owned by the caller.
*/
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, grpc_slice *slice,
- size_t max_size_hint, grpc_closure *on_complete);
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete);
+/* returns the next slice in the byte stream when it is ready (indicated by
+ * either grpc_byte_stream_next returning 1 or on_complete passed to
+ * grpc_byte_stream_next is called).
+ *
+ * once a slice is returned into *slice, it is owned by the caller.
+ */
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice);