aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-07-06 23:01:39 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-07-06 23:01:39 -0700
commit20afd46db876d5b2e68a6ef0ad007565c7d16217 (patch)
tree78401943a03d28921528f8533a1927e6f41d0277
parent4e4033650911582617049643fb02942a114be220 (diff)
PR comments
-rw-r--r--src/core/channel/channel_args.c15
-rw-r--r--src/core/channel/compress_filter.c124
-rw-r--r--src/core/transport/chttp2/stream_encoder.c6
3 files changed, 92 insertions, 53 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 35ad93cc2f..5b331ded86 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -106,7 +106,7 @@ void grpc_channel_args_destroy(grpc_channel_args *a) {
}
int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
- unsigned i;
+ size_t i;
if (a == NULL) return 0;
for (i = 0; i < a->num_args; i++) {
if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
@@ -119,13 +119,12 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
grpc_compression_level grpc_channel_args_get_compression_level(
const grpc_channel_args *a) {
size_t i;
- if (a) {
- for (i = 0; a && i < a->num_args; ++i) {
- if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) {
- return a->args[i].value.integer;
- break;
- }
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) {
+ return a->args[i].value.integer;
+ break;
}
}
return GRPC_COMPRESS_LEVEL_NONE;
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 56e0a8141e..655c452ea9 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -34,13 +34,16 @@
#include <assert.h>
#include <string.h>
-#include "src/core/channel/compress_filter.h"
-#include "src/core/channel/channel_args.h"
-#include "src/core/compression/message_compress.h"
#include <grpc/compression.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
+
+#include "src/core/channel/compress_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/compression/message_compress.h"
+
+
typedef struct call_data {
gpr_slice_buffer slices;
grpc_linked_mdelem compression_algorithm_storage;
@@ -108,11 +111,81 @@ static int skip_compression(channel_data *channeld, call_data *calld) {
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
}
+static void compressed_sopb(grpc_stream_op_buffer *send_ops,
+ grpc_call_element *elem) {
+ size_t i, j;
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ /* The following loop is akin to a selective reset + update */
+ for (i = 0, j = 0; i < send_ops->nops; ++i) {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type) {
+ case GRPC_OP_BEGIN_MESSAGE:
+ sop->data.begin_message.length = calld->slices.length;
+ sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ break;
+ case GRPC_OP_METADATA:
+ grpc_metadata_batch_add_head(
+ &(sop->data.metadata), &calld->compression_algorithm_storage,
+ grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+ [calld->compression_algorithm]));
+ break;
+ case GRPC_OP_SLICE:
+ gpr_slice_unref(sop->data.slice);
+ /* replace only up to the number of available compressed slices */
+ if (j < calld->slices.count) {
+ sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
+ }
+ case GRPC_NO_OP:
+ ; /* fallthrough */
+ }
+ }
+
+ /* in case compressed slices remain to be added to the output */
+ while (j < calld->slices.count) {
+ grpc_sopb_add_slice(send_ops, gpr_slice_ref(calld->slices.slices[j++]));
+ }
+}
+
+/* even if the filter isn't producing compressed output, it may need to update
+ * the input. For example, compression may have een requested but somehow it was
+ * decided not to honor the request: the compression flags need to be reset and
+ * the fact that no compression was performed in the end signaled */
+static void not_compressed_sopb(grpc_stream_op_buffer *send_ops,
+ grpc_call_element *elem) {
+ size_t i;
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ for (i = 0; i < send_ops->nops; ++i) {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type) {
+ case GRPC_OP_BEGIN_MESSAGE:
+ /* either because the user requested the exception or because
+ * compressing would have resulted in a larger output */
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ /* reset the flag compression bit */
+ sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS;
+ break;
+ case GRPC_OP_METADATA:
+ grpc_metadata_batch_add_head(
+ &(sop->data.metadata), &calld->compression_algorithm_storage,
+ grpc_mdelem_ref(
+ channeld->mdelem_compression_algorithms[GRPC_COMPRESS_NONE]));
+ break;
+ case GRPC_OP_SLICE:
+ case GRPC_NO_OP:
+ ; /* fallthrough */
+ }
+ }
+}
+
static void process_send_ops(grpc_call_element *elem,
grpc_stream_op_buffer *send_ops) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- size_t i, j;
+ size_t i;
int did_compress = 0;
/* buffer up slices until we've processed all the expected ones (as given by
@@ -159,46 +232,9 @@ static void process_send_ops(grpc_call_element *elem,
}
}
- /* We need to:
- * - (OP_SLICE) If compression happened, replace the input slices with the
- * compressed ones.
- * - (BEGIN_MESSAGE) Update the message info (size, flags).
- * - (OP_METADATA) Convey the compression configuration */
- for (i = 0, j = 0; i < send_ops->nops; ++i) {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type) {
- case GRPC_OP_BEGIN_MESSAGE:
- if (did_compress) {
- sop->data.begin_message.length = calld->slices.length;
- sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
- } else {
- /* either because the user requested the exception or because
- * compressing would have resulted in a larger output */
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- /* reset the flag compression bit */
- sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS;
- }
- break;
- case GRPC_OP_METADATA:
- grpc_metadata_batch_add_head(
- &(sop->data.metadata), &calld->compression_algorithm_storage,
- grpc_mdelem_ref(channeld->mdelem_compression_algorithms
- [did_compress ? calld->compression_algorithm
- : GRPC_COMPRESS_NONE]));
- break;
- case GRPC_OP_SLICE:
- if (did_compress) {
- if (j < calld->slices.count) {
- /* swap the input slices for their compressed counterparts */
- gpr_slice_unref(sop->data.slice);
- sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
- }
- }
- break;
- case GRPC_NO_OP:
- ; /* fallthrough */
- }
- }
+ /* Modify the send_ops stream_op_buffer depending on whether compression was
+ * carried out */
+ (did_compress ? compressed_sopb : not_compressed_sopb)(send_ops, elem);
}
/* Called either:
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 5788236ffb..d0a17af5da 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -476,6 +476,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
gpr_uint32 flow_controlled_bytes_taken = 0;
gpr_uint32 curop = 0;
gpr_uint8 *p;
+ int compressed_flag_set = 0;
while (curop < *inops_count) {
GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes);
@@ -495,9 +496,12 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
case GRPC_OP_BEGIN_MESSAGE:
/* begin op: for now we just convert the op to a slice and fall
through - this lets us reuse the slice framing code below */
+ compressed_flag_set =
+ !!(op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS);
slice = gpr_slice_malloc(5);
+
p = GPR_SLICE_START_PTR(slice);
- p[0] = !!(op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS);
+ p[0] = compressed_flag_set;
p[1] = op->data.begin_message.length >> 24;
p[2] = op->data.begin_message.length >> 16;
p[3] = op->data.begin_message.length >> 8;