aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-19 15:35:17 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-19 15:35:17 -0700
commitb4e70366c6b25d1127e66fd28c6256b19467dd9b (patch)
tree7f51ee747a9ed1200899b1c3e19752d85ce878ed /src/core
parentd82d0b295b51e1385481be381eef325423441a65 (diff)
parent0c2f1626c0082ab91aed27f77bbe01008d878db2 (diff)
Merge branch 'decompression' of https://github.com/dgquintas/grpc into dgquintas-decompression
Conflicts: Makefile vsprojects/Grpc.mak
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/channel_args.c29
-rw-r--r--src/core/channel/channel_args.h13
-rw-r--r--src/core/channel/compress_filter.c325
-rw-r--r--src/core/channel/compress_filter.h65
-rw-r--r--src/core/compression/algorithm.c46
-rw-r--r--src/core/surface/call.c96
-rw-r--r--src/core/surface/channel.c13
-rw-r--r--src/core/surface/channel.h3
-rw-r--r--src/core/surface/channel_create.c2
-rw-r--r--src/core/surface/secure_channel_create.c2
-rw-r--r--src/core/surface/server.c6
-rw-r--r--src/core/surface/server.h6
-rw-r--r--src/core/surface/server_create.c5
-rw-r--r--src/core/transport/chttp2/frame_data.c11
-rw-r--r--src/core/transport/chttp2/frame_data.h1
-rw-r--r--src/core/transport/chttp2/stream_encoder.c6
-rw-r--r--src/core/transport/stream_op.c6
-rw-r--r--src/core/transport/stream_op.h5
18 files changed, 560 insertions, 80 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 140f8bd656..c430b56fa2 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -114,7 +114,7 @@ void grpc_channel_args_destroy(grpc_channel_args *a) {
}
int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
- unsigned i;
+ size_t i;
if (a == NULL) return 0;
for (i = 0; i < a->num_args; i++) {
if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
@@ -124,26 +124,25 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
return 0;
}
-grpc_compression_level grpc_channel_args_get_compression_level(
+grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a) {
size_t i;
- if (a) {
- for (i = 0; a && i < a->num_args; ++i) {
- if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) {
- return a->args[i].value.integer;
- break;
- }
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
+ return a->args[i].value.integer;
+ break;
}
}
- return GRPC_COMPRESS_LEVEL_NONE;
+ return GRPC_COMPRESS_NONE;
}
-void grpc_channel_args_set_compression_level(grpc_channel_args **a,
- grpc_compression_level level) {
+grpc_channel_args *grpc_channel_args_set_compression_algorithm(
+ grpc_channel_args *a, grpc_compression_algorithm algorithm) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_LEVEL_ARG;
- tmp.value.integer = level;
- *a = grpc_channel_args_copy_and_add(*a, &tmp, 1);
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
+ tmp.value.integer = algorithm;
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index 17849b7e59..7e6ddd3997 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -57,13 +57,14 @@ void grpc_channel_args_destroy(grpc_channel_args *a);
* is specified in channel args, otherwise returns 0. */
int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
-/** Returns the compression level set in \a a. */
-grpc_compression_level grpc_channel_args_get_compression_level(
+/** Returns the compression algorithm set in \a a. */
+grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a);
-/** Sets the compression level in \a a to \a level. Setting it to
- * GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */
-void grpc_channel_args_set_compression_level(grpc_channel_args **a,
- grpc_compression_level level);
+/** Returns a channel arg instance with compression enabled. If \a a is
+ * non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression
+ * for the channel. */
+grpc_channel_args *grpc_channel_args_set_compression_algorithm(
+ grpc_channel_args *a, grpc_compression_algorithm algorithm);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
new file mode 100644
index 0000000000..14cb3da62d
--- /dev/null
+++ b/src/core/channel/compress_filter.c
@@ -0,0 +1,325 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <assert.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+
+#include "src/core/channel/compress_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/compression/message_compress.h"
+
+typedef struct call_data {
+ gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
+ grpc_linked_mdelem compression_algorithm_storage;
+ int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */
+ int written_initial_metadata; /**< Already processed initial md? */
+ /** Compression algorithm we'll try to use. It may be given by incoming
+ * metadata, or by the channel's default compression settings. */
+ grpc_compression_algorithm compression_algorithm;
+ /** If true, contents of \a compression_algorithm are authoritative */
+ int has_compression_algorithm;
+} call_data;
+
+typedef struct channel_data {
+ /** Metadata key for the incoming (requested) compression algorithm */
+ grpc_mdstr *mdstr_request_compression_algorithm_key;
+ /** Metadata key for the outgoing (used) compression algorithm */
+ grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
+ /** Precomputed metadata elements for all available compression algorithms */
+ grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
+ /** The default, channel-level, compression algorithm */
+ grpc_compression_algorithm default_compression_algorithm;
+} channel_data;
+
+/** Compress \a slices in place using \a algorithm. Returns 1 if compression did
+ * actually happen, 0 otherwise (for example if the compressed output size was
+ * larger than the raw input).
+ *
+ * Returns 1 if the data was actually compress and 0 otherwise. */
+static int compress_send_sb(grpc_compression_algorithm algorithm,
+ gpr_slice_buffer *slices) {
+ int did_compress;
+ gpr_slice_buffer tmp;
+ gpr_slice_buffer_init(&tmp);
+ did_compress = grpc_msg_compress(algorithm, slices, &tmp);
+ if (did_compress) {
+ gpr_slice_buffer_swap(slices, &tmp);
+ }
+ gpr_slice_buffer_destroy(&tmp);
+ return did_compress;
+}
+
+/** For each \a md element from the incoming metadata, filter out the entry for
+ * "grpc-encoding", using its value to populate the call data's
+ * compression_algorithm field. */
+static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ if (md->key == channeld->mdstr_request_compression_algorithm_key) {
+ const char *md_c_str = grpc_mdstr_as_c_string(md->value);
+ if (!grpc_compression_algorithm_parse(md_c_str,
+ &calld->compression_algorithm)) {
+ gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.",
+ md_c_str);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ calld->has_compression_algorithm = 1;
+ return NULL;
+ }
+
+ return md;
+}
+
+static int skip_compression(channel_data *channeld, call_data *calld) {
+ if (calld->has_compression_algorithm) {
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ return 1;
+ }
+ return 0; /* we have an actual call-specific algorithm */
+ }
+ /* no per-call compression override */
+ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
+}
+
+/** Assembles a new grpc_stream_op_buffer with the compressed slices, modifying
+ * the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length,
+ * flags indicating compression is in effect) and replaces \a send_ops with it.
+ * */
+static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
+ grpc_call_element *elem) {
+ size_t i;
+ call_data *calld = elem->call_data;
+ int new_slices_added = 0; /* GPR_FALSE */
+ grpc_metadata_batch metadata;
+ grpc_stream_op_buffer new_send_ops;
+ grpc_sopb_init(&new_send_ops);
+
+ for (i = 0; i < send_ops->nops; i++) {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type) {
+ case GRPC_OP_BEGIN_MESSAGE:
+ grpc_sopb_add_begin_message(
+ &new_send_ops, calld->slices.length,
+ sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
+ break;
+ case GRPC_OP_SLICE:
+ /* Once we reach the slices section of the original buffer, simply add
+ * all the new (compressed) slices. We obviously want to do this only
+ * once, hence the "new_slices_added" guard. */
+ if (!new_slices_added) {
+ size_t j;
+ for (j = 0; j < calld->slices.count; ++j) {
+ grpc_sopb_add_slice(&new_send_ops,
+ gpr_slice_ref(calld->slices.slices[j]));
+ }
+ new_slices_added = 1; /* GPR_TRUE */
+ }
+ break;
+ case GRPC_OP_METADATA:
+ /* move the metadata to the new buffer. */
+ grpc_metadata_batch_move(&metadata, &sop->data.metadata);
+ grpc_sopb_add_metadata(&new_send_ops, metadata);
+ break;
+ case GRPC_NO_OP:
+ break;
+ }
+ }
+ grpc_sopb_swap(send_ops, &new_send_ops);
+ grpc_sopb_destroy(&new_send_ops);
+}
+
+/** Filter's "main" function, called for any incoming grpc_transport_stream_op
+ * instance that holds a non-zero number of send operations, accesible to this
+ * function in \a send_ops. */
+static void process_send_ops(grpc_call_element *elem,
+ grpc_stream_op_buffer *send_ops) {
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ size_t i;
+ int did_compress = 0;
+
+ for (i = 0; i < send_ops->nops; ++i) {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type) {
+ case GRPC_OP_BEGIN_MESSAGE:
+ /* buffer up slices until we've processed all the expected ones (as
+ * given by GRPC_OP_BEGIN_MESSAGE) */
+ calld->remaining_slice_bytes = sop->data.begin_message.length;
+ if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ break;
+ case GRPC_OP_METADATA:
+ if (!calld->written_initial_metadata) {
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ grpc_metadata_batch_filter(&(sop->data.metadata),
+ compression_md_filter, elem);
+ if (!calld->has_compression_algorithm) {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm =
+ channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ }
+ grpc_metadata_batch_add_head(
+ &(sop->data.metadata), &calld->compression_algorithm_storage,
+ grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+ [calld->compression_algorithm]));
+ calld->written_initial_metadata = 1; /* GPR_TRUE */
+ }
+ break;
+ case GRPC_OP_SLICE:
+ if (skip_compression(channeld, calld)) continue;
+ GPR_ASSERT(calld->remaining_slice_bytes > 0);
+ /* Increase input ref count, gpr_slice_buffer_add takes ownership. */
+ gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice));
+ calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice);
+ if (calld->remaining_slice_bytes == 0) {
+ did_compress =
+ compress_send_sb(calld->compression_algorithm, &calld->slices);
+ }
+ break;
+ case GRPC_NO_OP:
+ break;
+ }
+ }
+
+ /* Modify the send_ops stream_op_buffer depending on whether compression was
+ * carried out */
+ if (did_compress) {
+ finish_compressed_sopb(send_ops, elem);
+ }
+}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void compress_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ if (op->send_ops && op->send_ops->nops > 0) {
+ process_send_ops(elem, op->send_ops);
+ }
+
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op);
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+
+ /* initialize members */
+ gpr_slice_buffer_init(&calld->slices);
+ calld->has_compression_algorithm = 0;
+ calld->written_initial_metadata = 0; /* GPR_FALSE */
+
+ if (initial_op) {
+ if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
+ process_send_ops(elem, initial_op->send_ops);
+ }
+ }
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_call_element *elem) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ gpr_slice_buffer_destroy(&calld->slices);
+}
+
+/* Constructor for channel_data */
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ channel_data *channeld = elem->channel_data;
+ grpc_compression_algorithm algo_idx;
+
+ channeld->default_compression_algorithm =
+ grpc_channel_args_get_compression_algorithm(args);
+
+ channeld->mdstr_request_compression_algorithm_key =
+ grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
+
+ channeld->mdstr_outgoing_compression_algorithm_key =
+ grpc_mdstr_from_string(mdctx, "grpc-encoding");
+
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
+ char *algorith_name;
+ GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
+ channeld->mdelem_compression_algorithms[algo_idx] =
+ grpc_mdelem_from_metadata_strings(
+ mdctx,
+ grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
+ grpc_mdstr_from_string(mdctx, algorith_name));
+ }
+
+ GPR_ASSERT(!is_last);
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ channel_data *channeld = elem->channel_data;
+ grpc_compression_algorithm algo_idx;
+
+ grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
+ grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
+ ++algo_idx) {
+ grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
+ }
+}
+
+const grpc_channel_filter grpc_compress_filter = {
+ compress_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "compress"};
diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h
new file mode 100644
index 0000000000..0694e2c1dd
--- /dev/null
+++ b/src/core/channel/compress_filter.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H
+#define GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H
+
+#include "src/core/channel/channel_stack.h"
+
+#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "internal:grpc-encoding-request"
+
+/** Compression filter for outgoing data.
+ *
+ * See <grpc/compression.h> for the available compression settings.
+ *
+ * Compression settings may come from:
+ * - Channel configuration, as established at channel creation time.
+ * - The metadata accompanying the outgoing data to be compressed. This is
+ * taken as a request only. We may choose not to honor it. The metadata key
+ * is given by \a GRPC_COMPRESS_REQUEST_ALGORITHM_KEY.
+ *
+ * Compression can be disabled for concrete messages (for instance in order to
+ * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in
+ * the BEGIN_MESSAGE flags.
+ *
+ * The attempted compression mechanism is added to the resulting initial
+ * metadata under the'grpc-encoding' key.
+ *
+ * If compression is actually performed, BEGIN_MESSAGE's flag is modified to
+ * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the
+ * aforementioned 'grpc-encoding' metadata value, data will pass through
+ * uncompressed. */
+
+extern const grpc_channel_filter grpc_compress_filter;
+
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c
index 4db48df6cb..e426241d0a 100644
--- a/src/core/compression/algorithm.c
+++ b/src/core/compression/algorithm.c
@@ -32,21 +32,39 @@
*/
#include <stdlib.h>
+#include <string.h>
#include <grpc/compression.h>
-const char *grpc_compression_algorithm_name(
- grpc_compression_algorithm algorithm) {
+int grpc_compression_algorithm_parse(const char* name,
+ grpc_compression_algorithm *algorithm) {
+ if (strcmp(name, "none") == 0) {
+ *algorithm = GRPC_COMPRESS_NONE;
+ } else if (strcmp(name, "gzip") == 0) {
+ *algorithm = GRPC_COMPRESS_GZIP;
+ } else if (strcmp(name, "deflate") == 0) {
+ *algorithm = GRPC_COMPRESS_DEFLATE;
+ } else {
+ return 0;
+ }
+ return 1;
+}
+
+int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
+ char **name) {
switch (algorithm) {
case GRPC_COMPRESS_NONE:
- return "none";
+ *name = "none";
+ break;
case GRPC_COMPRESS_DEFLATE:
- return "deflate";
+ *name = "deflate";
+ break;
case GRPC_COMPRESS_GZIP:
- return "gzip";
- case GRPC_COMPRESS_ALGORITHMS_COUNT:
- return "error";
+ *name = "gzip";
+ break;
+ default:
+ return 0;
}
- return "error";
+ return 1;
}
/* TODO(dgq): Add the ability to specify parameters to the individual
@@ -65,3 +83,15 @@ grpc_compression_algorithm grpc_compression_algorithm_for_level(
abort();
}
}
+
+grpc_compression_level grpc_compression_level_for_algorithm(
+ grpc_compression_algorithm algorithm) {
+ grpc_compression_level clevel;
+ for (clevel = GRPC_COMPRESS_LEVEL_NONE; clevel < GRPC_COMPRESS_LEVEL_COUNT;
+ ++clevel) {
+ if (grpc_compression_algorithm_for_level(clevel) == algorithm) {
+ return clevel;
+ }
+ }
+ abort();
+}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 1146d83982..6e643b591c 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -30,24 +30,25 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/census/grpc_context.h"
-#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
+#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <assert.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
/** The maximum number of completions possible.
Based upon the maximum number of individually queueable ops in the batch
@@ -235,8 +236,8 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /* Compression level for the call */
- grpc_compression_level compression_level;
+ /* Compression algorithm for the call */
+ grpc_compression_algorithm compression_algorithm;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -469,9 +470,14 @@ static void set_status_code(grpc_call *call, status_source source,
}
}
-static void set_decode_compression_level(grpc_call *call,
- grpc_compression_level clevel) {
- call->compression_level = clevel;
+static void set_compression_algorithm(grpc_call *call,
+ grpc_compression_algorithm algo) {
+ call->compression_algorithm = algo;
+}
+
+grpc_compression_algorithm grpc_call_get_compression_algorithm(
+ const grpc_call *call) {
+ return call->compression_algorithm;
}
static void set_status_details(grpc_call *call, status_source source,
@@ -762,8 +768,18 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) {
if (call->error_status_set == 0) {
/* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
- call->incoming_message.slices, call->incoming_message.count);
+ grpc_byte_buffer *byte_buffer;
+ /* some aliases for readability */
+ gpr_slice *slices = call->incoming_message.slices;
+ const size_t nslices = call->incoming_message.count;
+
+ if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ byte_buffer = grpc_raw_compressed_byte_buffer_create(
+ slices, nslices, call->compression_algorithm);
+ } else {
+ byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
+ }
grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
@@ -782,6 +798,25 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_free(message);
return 0;
}
+ /* sanity check: if message flags indicate a compressed message, the
+ * compression level should already be present in the call, as parsed off its
+ * corresponding metadata. */
+ if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
+ char *message = NULL;
+ char *alg_name;
+ if (!grpc_compression_algorithm_name(call->compression_algorithm,
+ &alg_name)) {
+ /* This shouldn't happen, other than due to data corruption */
+ alg_name = "<unknown>";
+ }
+ gpr_asprintf(&message,
+ "Invalid compression algorithm (%s) for compressed message.",
+ alg_name);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
+ gpr_free(message);
+ return 0;
+ }
/* stash away parameters, and prepare for incoming slices */
if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
char *message = NULL;
@@ -1276,25 +1311,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
static void destroy_compression(void *ignored) {}
static gpr_uint32 decode_compression(grpc_mdelem *md) {
- grpc_compression_level clevel;
- void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ grpc_compression_algorithm algorithm;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
- clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
- gpr_uint32 parsed_clevel_bytes;
- if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice),
- &parsed_clevel_bytes)) {
- /* the following cast is safe, as a gpr_uint32 should be able to hold all
- * possible values of the grpc_compression_level enum */
- clevel = (grpc_compression_level)parsed_clevel_bytes;
- } else {
- clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ const char *md_c_str = grpc_mdstr_as_c_string(md->value);
+ if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
+ gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
+ assert(0);
}
grpc_mdelem_set_user_data(md, destroy_compression,
- (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
+ (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
}
- return clevel;
+ return algorithm;
}
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
@@ -1313,8 +1343,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value));
} else if (key ==
- grpc_channel_get_compresssion_level_string(call->channel)) {
- set_decode_compression_level(call, decode_compression(md));
+ grpc_channel_get_compression_algorithm_string(call->channel)) {
+ set_compression_algorithm(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1429,7 +1459,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
- req->flags = ops->flags;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index b7826d4dfc..a6438ff512 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -63,7 +63,7 @@ struct grpc_channel {
grpc_mdctx *metadata_context;
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
- grpc_mdstr *grpc_compression_level_string;
+ grpc_mdstr *grpc_compression_algorithm_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@@ -98,8 +98,8 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
- channel->grpc_compression_level_string =
- grpc_mdstr_from_string(mdctx, "grpc-compression-level");
+ channel->grpc_compression_algorithm_string =
+ grpc_mdstr_from_string(mdctx, "grpc-encoding");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
@@ -209,7 +209,7 @@ static void destroy_channel(void *p, int ok) {
GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]);
}
GRPC_MDSTR_UNREF(channel->grpc_status_string);
- GRPC_MDSTR_UNREF(channel->grpc_compression_level_string);
+ GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string);
GRPC_MDSTR_UNREF(channel->grpc_message_string);
GRPC_MDSTR_UNREF(channel->path_string);
GRPC_MDSTR_UNREF(channel->authority_string);
@@ -262,8 +262,9 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
- return channel->grpc_compression_level_string;
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
+ grpc_channel *channel) {
+ return channel->grpc_compression_algorithm_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 71f8a55731..4e03eb4411 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -54,7 +54,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
+ grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index e205f0a9f8..91c7b35550 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -40,6 +40,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
+#include "src/core/channel/compress_filter.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
@@ -163,6 +164,7 @@ grpc_channel *grpc_channel_create(const char *target,
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
+ filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index f3c7d8397b..d87ec97b53 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -40,6 +40,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
+#include "src/core/channel/compress_filter.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
@@ -213,6 +214,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
+ filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index fa120088e1..f2d6b11bc7 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -739,9 +739,9 @@ void grpc_server_register_completion_queue(grpc_server *server,
server->cqs[n] = cq;
}
-grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
- size_t filter_count,
- const grpc_channel_args *args) {
+grpc_server *grpc_server_create_from_filters(
+ const grpc_channel_filter **filters, size_t filter_count,
+ const grpc_channel_args *args) {
size_t i;
/* TODO(census): restore this once we finalize census filter etc.
int census_enabled = grpc_channel_args_is_census_enabled(args); */
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 2899c6dea3..c638d682bb 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -39,9 +39,9 @@
#include "src/core/transport/transport.h"
/* Create a server */
-grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
- size_t filter_count,
- const grpc_channel_args *args);
+grpc_server *grpc_server_create_from_filters(
+ const grpc_channel_filter **filters, size_t filter_count,
+ const grpc_channel_args *args);
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index b7390675ad..1e26c67693 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -34,7 +34,10 @@
#include <grpc/grpc.h>
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h"
+#include "src/core/channel/compress_filter.h"
grpc_server *grpc_server_create(const grpc_channel_args *args) {
- return grpc_server_create_from_filters(NULL, 0, args);
+ const grpc_channel_filter *filters[] = {&grpc_compress_filter};
+ return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
+ args);
}
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 7e3980159e..7a4c355f23 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -76,6 +76,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_data_parser *p = parser;
+ gpr_uint32 message_flags = 0;
if (is_last && p->is_last_frame) {
stream_parsing->received_close = 1;
@@ -94,8 +95,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
/* noop */
break;
case 1:
- gpr_log(GPR_ERROR, "Compressed GRPC frames not yet supported");
- return GRPC_CHTTP2_STREAM_ERROR;
+ p->is_frame_compressed = 1; /* GPR_TRUE */
+ break;
default:
gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type);
return GRPC_CHTTP2_STREAM_ERROR;
@@ -130,7 +131,11 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((gpr_uint32)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
- grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, 0);
+ if (p->is_frame_compressed) {
+ message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
+ grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size,
+ message_flags);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index 8d6cfcb841..23957b05ad 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -56,6 +56,7 @@ typedef struct {
gpr_uint8 frame_type;
gpr_uint32 frame_size;
+ int is_frame_compressed;
grpc_stream_op_buffer incoming_sopb;
} grpc_chttp2_data_parser;
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index d7fc2da5d3..d30059abf8 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -477,6 +477,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
gpr_uint32 flow_controlled_bytes_taken = 0;
gpr_uint32 curop = 0;
gpr_uint8 *p;
+ int compressed_flag_set = 0;
while (curop < *inops_count) {
GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes);
@@ -496,9 +497,12 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
case GRPC_OP_BEGIN_MESSAGE:
/* begin op: for now we just convert the op to a slice and fall
through - this lets us reuse the slice framing code below */
+ compressed_flag_set =
+ (op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
slice = gpr_slice_malloc(5);
+
p = GPR_SLICE_START_PTR(slice);
- p[0] = 0;
+ p[0] = compressed_flag_set;
p[1] = op->data.begin_message.length >> 24;
p[2] = op->data.begin_message.length >> 16;
p[3] = op->data.begin_message.length >> 8;
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 71061fe0c7..a5dfec9d50 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -286,6 +286,12 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target,
}
}
+void grpc_metadata_batch_move(grpc_metadata_batch *dst,
+ grpc_metadata_batch *src) {
+ *dst = *src;
+ memset(src, 0, sizeof(grpc_metadata_batch));
+}
+
void grpc_metadata_batch_filter(grpc_metadata_batch *batch,
grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem),
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 964d39d14f..f27ef1b66b 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -102,6 +102,11 @@ void grpc_metadata_batch_destroy(grpc_metadata_batch *batch);
void grpc_metadata_batch_merge(grpc_metadata_batch *target,
grpc_metadata_batch *add);
+/** Moves the metadata information from \a src to \a dst. Upon return, \a src is
+ * zeroed. */
+void grpc_metadata_batch_move(grpc_metadata_batch *dst,
+ grpc_metadata_batch *src);
+
/** Add \a storage to the beginning of \a batch. storage->md is
assumed to be valid.
\a storage is owned by the caller and must survive for the