aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/compress_filter.c11
-rw-r--r--src/core/lib/channel/http_client_filter.c11
2 files changed, 18 insertions, 4 deletions
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index aa41014a21..53d3b86f45 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -220,6 +220,12 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
+ if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
@@ -232,8 +238,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
- &calld->incoming_slice, ~(size_t)0,
- &calld->got_slice)) {
+ ~(size_t)0, &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx, calld->send_op->send_message,
+ &calld->incoming_slice);
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index c031533dd8..af2451e1eb 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -220,8 +220,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes;
while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
- &calld->incoming_slice, ~(size_t)0,
- &calld->got_slice)) {
+ ~(size_t)0, &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx, calld->send_op.send_message,
+ &calld->incoming_slice);
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice));
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@@ -237,6 +238,12 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
calld->send_message_blocked = false;
+ if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx,
+ calld->send_op.send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/