diff options
author | David Garcia Quintas <dgq@google.com> | 2015-07-10 17:48:22 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-07-10 17:48:22 -0700 |
commit | 20a3538ddcdc194c14ad7c04b237a2ec7f022df3 (patch) | |
tree | 9d99a72e2e9d96e6ec4d82100b29d3da2dfb4d4f | |
parent | a21e2c8f912f5701c5bd9905b41548677d4b1413 (diff) |
More bugfixes to compress_filter.
Introduced grpc_metadata_batch_move and added grpc_compress_filter to grpc_server_create
-rw-r--r-- | src/core/channel/compress_filter.c | 36 | ||||
-rw-r--r-- | src/core/surface/server_create.c | 5 | ||||
-rw-r--r-- | src/core/transport/stream_op.c | 6 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 5 |
4 files changed, 37 insertions, 15 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index d62d31e78b..3e76030dfc 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -112,43 +112,51 @@ static int skip_compression(channel_data *channeld, call_data *calld) { static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, grpc_call_element *elem) { - size_t i, j; + size_t i; + grpc_stream_op_buffer new_send_ops; call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + int new_slices_added = 0; /* GPR_FALSE */ + grpc_metadata_batch metadata; + + grpc_sopb_init(&new_send_ops); /* The following loop is akin to a selective reset + update */ - for (i = 0, j = 0; i < send_ops->nops; ++i) { + for (i = 0; i < send_ops->nops; i++) { grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { case GRPC_OP_BEGIN_MESSAGE: - sop->data.begin_message.length = calld->slices.length; - sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + grpc_sopb_add_begin_message( + &new_send_ops, calld->slices.length, + sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); break; case GRPC_OP_METADATA: + grpc_metadata_batch_move(&metadata, &sop->data.metadata); if (!calld->seen_initial_metadata) { grpc_metadata_batch_add_head( - &(sop->data.metadata), &calld->compression_algorithm_storage, + &metadata, &calld->compression_algorithm_storage, grpc_mdelem_ref(channeld->mdelem_compression_algorithms [calld->compression_algorithm])); calld->seen_initial_metadata = 1; /* GPR_TRUE */ } + grpc_sopb_add_metadata(&new_send_ops, metadata); break; case GRPC_OP_SLICE: - gpr_slice_unref(sop->data.slice); - /* replace only up to the number of available compressed slices */ - if (j < calld->slices.count) { - sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); + if (!new_slices_added) { + size_t j; + for (j = 0; j < calld->slices.count; ++j) { + grpc_sopb_add_slice(&new_send_ops, + gpr_slice_ref(calld->slices.slices[j])); + } + new_slices_added = 1; /* GPR_TRUE */ } break; case GRPC_NO_OP: break; } } - - /* in case compressed slices remain to be added to the output */ - while (j < calld->slices.count) { - grpc_sopb_add_slice(send_ops, gpr_slice_ref(calld->slices.slices[j++])); - } + grpc_sopb_swap(send_ops, &new_send_ops); + grpc_sopb_destroy(&new_send_ops); } /* even if the filter isn't producing compressed output, it may need to update diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index b7390675ad..1e26c67693 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -34,7 +34,10 @@ #include <grpc/grpc.h> #include "src/core/surface/completion_queue.h" #include "src/core/surface/server.h" +#include "src/core/channel/compress_filter.h" grpc_server *grpc_server_create(const grpc_channel_args *args) { - return grpc_server_create_from_filters(NULL, 0, args); + const grpc_channel_filter *filters[] = {&grpc_compress_filter}; + return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters), + args); } diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index fdb50c6b71..1cce38aae2 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -286,6 +286,12 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target, } } +void grpc_metadata_batch_move(grpc_metadata_batch *dst, + grpc_metadata_batch *src) { + *dst = *src; + memset(src, 0, sizeof(grpc_metadata_batch)); +} + void grpc_metadata_batch_filter(grpc_metadata_batch *batch, grpc_mdelem *(*filter)(void *user_data, grpc_mdelem *elem), diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 964d39d14f..f27ef1b66b 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -102,6 +102,11 @@ void grpc_metadata_batch_destroy(grpc_metadata_batch *batch); void grpc_metadata_batch_merge(grpc_metadata_batch *target, grpc_metadata_batch *add); +/** Moves the metadata information from \a src to \a dst. Upon return, \a src is + * zeroed. */ +void grpc_metadata_batch_move(grpc_metadata_batch *dst, + grpc_metadata_batch *src); + /** Add \a storage to the beginning of \a batch. storage->md is assumed to be valid. \a storage is owned by the caller and must survive for the |