diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/compress_filter.c | 190 | ||||
-rw-r--r-- | src/core/channel/compress_filter.h | 42 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 11 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 40 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.h | 1 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 2 | ||||
-rw-r--r-- | src/core/transport/transport.h | 1 |
7 files changed, 278 insertions, 9 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c new file mode 100644 index 0000000000..5da82edb3c --- /dev/null +++ b/src/core/channel/compress_filter.c @@ -0,0 +1,190 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include "src/core/channel/compress_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/compression/message_compress.h" +#include <grpc/compression.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> + +typedef struct call_data { + gpr_slice_buffer slices; + int remaining_slice_bytes; + int dont_compress; /**< whether skip compression for this specific call */ +} call_data; + +typedef struct channel_data { + grpc_compression_algorithm compress_algorithm; +} channel_data; + +static void compress_send_sb(grpc_compression_algorithm algorithm, + gpr_slice_buffer *slices) { + gpr_slice_buffer tmp; + gpr_slice_buffer_init(&tmp); + if (!grpc_msg_compress(algorithm, slices, &tmp)) { + gpr_log(GPR_INFO, "Not compressed!"); + } + gpr_slice_buffer_swap(slices, &tmp); + gpr_slice_buffer_destroy(&tmp); +} + +static void process_send_ops(grpc_call_element *elem, + grpc_stream_op_buffer *send_ops) { + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + size_t i, j; + /* buffer up slices until we've processed all the expected ones (as given by + * GRPC_OP_BEGIN_MESSAGE) */ + for (i = 0; i < send_ops->nops; ++i) { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) { + case GRPC_OP_BEGIN_MESSAGE: + calld->remaining_slice_bytes = sop->data.begin_message.length; + calld->dont_compress = + !!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS); + break; + case GRPC_OP_SLICE: + if (calld->dont_compress) return; + GPR_ASSERT(calld->remaining_slice_bytes > 0); + /* add to calld->slices */ + gpr_slice_buffer_add(&calld->slices, sop->data.slice); + calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); + if (calld->remaining_slice_bytes == 0) { + /* compress */ + compress_send_sb(channeld->compress_algorithm, &calld->slices); + } + break; + case GRPC_NO_OP: + case GRPC_OP_METADATA: + ; /* fallthrough, ignore */ + } + } + + /* at this point, calld->slices contains the *compressed* slices from + * send_ops->ops[*]->data.slice. We now replace these input slices with the + * compressed ones. */ + for (i = 0, j = 0; i < send_ops->nops; ++i) { + grpc_stream_op *sop = &send_ops->ops[i]; + GPR_ASSERT(j < calld->slices.count); + switch (sop->type) { + case GRPC_OP_SLICE: + gpr_slice_unref(sop->data.slice); + sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); + break; + case GRPC_OP_BEGIN_MESSAGE: + case GRPC_NO_OP: + case GRPC_OP_METADATA: + ; /* fallthrough, ignore */ + } + } +} + +/* Called either: + - in response to an API call (or similar) from above, to send something + - a network event (or similar) from below, to receive something + op contains type and call direction information, in addition to the data + that is being sent or received. */ +static void compress_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { + if (op->send_ops && op->send_ops->nops > 0) { + process_send_ops(elem, op->send_ops); + } + + /* pass control down the stack */ + grpc_call_next_op(elem, op); +} + +/* Called on special channel events, such as disconnection or new incoming + calls on the server */ +static void channel_op(grpc_channel_element *elem, + grpc_channel_element *from_elem, grpc_channel_op *op) { + switch (op->type) { + default: + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_op *initial_op) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + + /* initialize members */ + gpr_slice_buffer_init(&calld->slices); + + if (initial_op) { + if (initial_op->send_ops && initial_op->send_ops->nops > 0) { + process_send_ops(elem, initial_op->send_ops); + } + } +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + gpr_slice_buffer_destroy(&calld->slices); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + channel_data *channeld = elem->channel_data; + channeld->compress_algorithm = grpc_compression_algorithm_for_level( + grpc_channel_args_get_compression_level(args)); + /*We shouldn't be in this filter if compression is disabled. */ + GPR_ASSERT(channeld->compress_algorithm != GRPC_COMPRESS_NONE); + + /* The first and the last filters tend to be implemented differently to + handle the case that there's no 'next' filter to call on the up or down + path */ + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + /* empty for now */ +} + +const grpc_channel_filter grpc_compress_filter = { + compress_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "compress"}; diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h new file mode 100644 index 0000000000..de3fadebb4 --- /dev/null +++ b/src/core/channel/compress_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H +#define GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H + +#include "src/core/channel/channel_stack.h" + +/* XXX */ +extern const grpc_channel_filter grpc_compress_filter; + +#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 946ee0949d..e7804047a0 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -33,24 +33,25 @@ #include "src/core/iomgr/sockaddr.h" -#include <grpc/grpc.h> #include <stdlib.h> #include <string.h> -#include "src/core/channel/census_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/client_setup.h" +#include "src/core/channel/compress_filter.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" +#include "src/core/support/string.h" #include "src/core/surface/channel.h" #include "src/core/surface/client.h" -#include "src/core/support/string.h" #include "src/core/transport/chttp2_transport.h" + +#include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -200,6 +201,10 @@ grpc_channel *grpc_channel_create(const char *target, if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ + if (grpc_channel_args_get_compression_level(args) > + GRPC_COMPRESS_LEVEL_NONE) { + filters[n++] = &grpc_compress_filter; + } filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index a1ae9ed2e6..851420e900 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -40,6 +40,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/transport/transport.h" +#include "src/core/compression/message_compress.h" grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser) { @@ -68,6 +69,35 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( return GRPC_CHTTP2_PARSE_OK; } +/** Performs any extra work needed after a frame has been assembled */ +grpc_chttp2_parse_error parse_postprocessing(grpc_chttp2_data_parser *p) { + if (p->is_frame_compressed) { /* Decompress */ + /* Reorganize the slices within p->incoming_sopb into a gpr_slice_buffer to + * be fed to the decompression function */ + gpr_slice_buffer sb_in, sb_out; + grpc_stream_op_buffer *sopb = &p->incoming_sopb; + size_t i; + gpr_slice_buffer_init(&sb_in); + gpr_slice_buffer_init(&sb_out); + for (i = 0; i < sopb->nops; ++i) { + if (sopb->ops->type == GRPC_OP_SLICE) { + gpr_slice_buffer_add(&sb_in, sopb->ops->data.slice); + } + } + grpc_msg_decompress(GRPC_COMPRESS_GZIP /* XXX */, &sb_in, &sb_out); + /* copy uncompressed output back to p->incoming_sopb */ + grpc_sopb_reset(sopb); + grpc_sopb_add_begin_message(sopb, sb_out.length, 0); + for (i = 0; i < sb_out.count; ++i) { + grpc_sopb_add_slice(sopb, sb_out.slices[i]); + } + gpr_slice_buffer_destroy(&sb_in); + gpr_slice_buffer_destroy(&sb_out); + } + + return GRPC_CHTTP2_PARSE_OK; +} + grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last) { @@ -97,8 +127,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( case 0: break; case 1: - gpr_log(GPR_ERROR, "Compressed GRPC frames not yet supported"); - return GRPC_CHTTP2_STREAM_ERROR; + p->is_frame_compressed = 1; /* GPR_TRUE */ + break; default: gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type); return GRPC_CHTTP2_STREAM_ERROR; @@ -134,13 +164,13 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { - return GRPC_CHTTP2_PARSE_OK; + return parse_postprocessing(p); } else if ((gpr_uint32)(end - cur) == p->frame_size) { state->need_flush_reads = 1; grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->state = GRPC_CHTTP2_DATA_FH_0; - return GRPC_CHTTP2_PARSE_OK; + return parse_postprocessing(p); } else if ((gpr_uint32)(end - cur) > p->frame_size) { state->need_flush_reads = 1; grpc_sopb_add_slice( @@ -153,7 +183,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->frame_size -= (end - cur); - return GRPC_CHTTP2_PARSE_OK; + return parse_postprocessing(p); } } diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index 24e557accd..03c8db97b9 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -56,6 +56,7 @@ typedef struct { gpr_uint8 frame_type; gpr_uint32 frame_size; + int is_frame_compressed; grpc_stream_op_buffer incoming_sopb; } grpc_chttp2_data_parser; diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index cf78ac50cc..5788236ffb 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -497,7 +497,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, through - this lets us reuse the slice framing code below */ slice = gpr_slice_malloc(5); p = GPR_SLICE_START_PTR(slice); - p[0] = 0; + p[0] = !!(op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS); p[1] = op->data.begin_message.length >> 24; p[2] = op->data.begin_message.length >> 16; p[3] = op->data.begin_message.length >> 8; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 6f8d39e352..a3e5f3d3c4 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -65,6 +65,7 @@ typedef enum grpc_stream_state { typedef struct grpc_transport_op { grpc_stream_op_buffer *send_ops; int is_last_send; + int dont_compress; void (*on_done_send)(void *user_data, int success); void *send_user_data; |