diff options
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 11 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 11 |
2 files changed, 18 insertions, 4 deletions
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index aa41014a21..53d3b86f45 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -220,6 +220,12 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; + if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx, + calld->send_op->send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); @@ -232,8 +238,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message, - &calld->incoming_slice, ~(size_t)0, - &calld->got_slice)) { + ~(size_t)0, &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, calld->send_op->send_message, + &calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index c031533dd8..af2451e1eb 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -220,8 +220,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; uint8_t *wrptr = calld->payload_bytes; while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, - &calld->incoming_slice, ~(size_t)0, - &calld->got_slice)) { + ~(size_t)0, &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, calld->send_op.send_message, + &calld->incoming_slice); memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), GRPC_SLICE_LENGTH(calld->incoming_slice)); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); @@ -237,6 +238,12 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; calld->send_message_blocked = false; + if (GRPC_ERROR_NONE != grpc_byte_stream_pull(exec_ctx, + calld->send_op.send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { /* Pass down the original send_message op that was blocked.*/ |