aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/compress_filter.c36
1 files changed, 22 insertions, 14 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index d62d31e78b..3e76030dfc 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -112,43 +112,51 @@ static int skip_compression(channel_data *channeld, call_data *calld) {
static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
grpc_call_element *elem) {
- size_t i, j;
+ size_t i;
+ grpc_stream_op_buffer new_send_ops;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+ int new_slices_added = 0; /* GPR_FALSE */
+ grpc_metadata_batch metadata;
+
+ grpc_sopb_init(&new_send_ops);
/* The following loop is akin to a selective reset + update */
- for (i = 0, j = 0; i < send_ops->nops; ++i) {
+ for (i = 0; i < send_ops->nops; i++) {
grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) {
case GRPC_OP_BEGIN_MESSAGE:
- sop->data.begin_message.length = calld->slices.length;
- sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ grpc_sopb_add_begin_message(
+ &new_send_ops, calld->slices.length,
+ sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
break;
case GRPC_OP_METADATA:
+ grpc_metadata_batch_move(&metadata, &sop->data.metadata);
if (!calld->seen_initial_metadata) {
grpc_metadata_batch_add_head(
- &(sop->data.metadata), &calld->compression_algorithm_storage,
+ &metadata, &calld->compression_algorithm_storage,
grpc_mdelem_ref(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
calld->seen_initial_metadata = 1; /* GPR_TRUE */
}
+ grpc_sopb_add_metadata(&new_send_ops, metadata);
break;
case GRPC_OP_SLICE:
- gpr_slice_unref(sop->data.slice);
- /* replace only up to the number of available compressed slices */
- if (j < calld->slices.count) {
- sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
+ if (!new_slices_added) {
+ size_t j;
+ for (j = 0; j < calld->slices.count; ++j) {
+ grpc_sopb_add_slice(&new_send_ops,
+ gpr_slice_ref(calld->slices.slices[j]));
+ }
+ new_slices_added = 1; /* GPR_TRUE */
}
break;
case GRPC_NO_OP:
break;
}
}
-
- /* in case compressed slices remain to be added to the output */
- while (j < calld->slices.count) {
- grpc_sopb_add_slice(send_ops, gpr_slice_ref(calld->slices.slices[j++]));
- }
+ grpc_sopb_swap(send_ops, &new_send_ops);
+ grpc_sopb_destroy(&new_send_ops);
}
/* even if the filter isn't producing compressed output, it may need to update