aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/compress_filter.c112
-rw-r--r--src/core/surface/call.c13
-rw-r--r--src/core/transport/chttp2/frame_data.c9
3 files changed, 50 insertions, 84 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 3e76030dfc..69911cbda3 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -46,7 +46,7 @@ typedef struct call_data {
gpr_slice_buffer slices;
grpc_linked_mdelem compression_algorithm_storage;
int remaining_slice_bytes;
- int seen_initial_metadata;
+ int written_initial_metadata;
grpc_compression_algorithm compression_algorithm;
gpr_uint8 has_compression_algorithm;
} call_data;
@@ -115,13 +115,10 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
size_t i;
grpc_stream_op_buffer new_send_ops;
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
int new_slices_added = 0; /* GPR_FALSE */
- grpc_metadata_batch metadata;
grpc_sopb_init(&new_send_ops);
- /* The following loop is akin to a selective reset + update */
for (i = 0; i < send_ops->nops; i++) {
grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) {
@@ -130,17 +127,6 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
&new_send_ops, calld->slices.length,
sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
break;
- case GRPC_OP_METADATA:
- grpc_metadata_batch_move(&metadata, &sop->data.metadata);
- if (!calld->seen_initial_metadata) {
- grpc_metadata_batch_add_head(
- &metadata, &calld->compression_algorithm_storage,
- grpc_mdelem_ref(channeld->mdelem_compression_algorithms
- [calld->compression_algorithm]));
- calld->seen_initial_metadata = 1; /* GPR_TRUE */
- }
- grpc_sopb_add_metadata(&new_send_ops, metadata);
- break;
case GRPC_OP_SLICE:
if (!new_slices_added) {
size_t j;
@@ -151,49 +137,16 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
new_slices_added = 1; /* GPR_TRUE */
}
break;
- case GRPC_NO_OP:
- break;
- }
- }
- grpc_sopb_swap(send_ops, &new_send_ops);
- grpc_sopb_destroy(&new_send_ops);
-}
-
-/* even if the filter isn't producing compressed output, it may need to update
- * the input. For example, compression may have een requested but somehow it was
- * decided not to honor the request: the compression flags need to be reset and
- * the fact that no compression was performed in the end signaled */
-static void finish_not_compressed_sopb(grpc_stream_op_buffer *send_ops,
- grpc_call_element *elem) {
- size_t i;
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- for (i = 0; i < send_ops->nops; ++i) {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type) {
- case GRPC_OP_BEGIN_MESSAGE:
- /* either because the user requested the exception or because
- * compressing would have resulted in a larger output */
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- /* reset the flag compression bit */
- sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS;
- break;
case GRPC_OP_METADATA:
- if (!calld->seen_initial_metadata) {
- grpc_metadata_batch_add_head(
- &(sop->data.metadata), &calld->compression_algorithm_storage,
- grpc_mdelem_ref(
- channeld->mdelem_compression_algorithms[GRPC_COMPRESS_NONE]));
- calld->seen_initial_metadata = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_SLICE:
+ grpc_sopb_add_metadata(&new_send_ops, sop->data.metadata);
+ memset(&(sop->data.metadata), 0, sizeof(grpc_metadata_batch));
break;
case GRPC_NO_OP:
break;
}
}
+ grpc_sopb_swap(send_ops, &new_send_ops);
+ grpc_sopb_destroy(&new_send_ops);
}
static void process_send_ops(grpc_call_element *elem,
@@ -216,21 +169,28 @@ static void process_send_ops(grpc_call_element *elem,
}
break;
case GRPC_OP_METADATA:
- /* Parse incoming request for compression. If any, it'll be available at
- * calld->compression_algorithm */
- grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter,
- elem);
- if (!calld->has_compression_algorithm) {
- /* If no algorithm was found in the metadata and we aren't
- * exceptionally skipping compression, fall back to the channel
- * default */
- calld->compression_algorithm =
- channeld->default_compression_algorithm;
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ if (!calld->written_initial_metadata) {
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ grpc_metadata_batch_filter(&(sop->data.metadata),
+ compression_md_filter, elem);
+ if (!calld->has_compression_algorithm) {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm =
+ channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ }
+ grpc_metadata_batch_add_head(
+ &(sop->data.metadata), &calld->compression_algorithm_storage,
+ grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+ [calld->compression_algorithm]));
+ calld->written_initial_metadata = 1; /* GPR_TRUE */
}
break;
case GRPC_OP_SLICE:
- if (skip_compression(channeld, calld)) goto done;
+ if (skip_compression(channeld, calld)) continue;
GPR_ASSERT(calld->remaining_slice_bytes > 0);
/* We need to copy the input because gpr_slice_buffer_add takes
* ownership. However, we don't own sop->data.slice, the caller does. */
@@ -247,13 +207,10 @@ static void process_send_ops(grpc_call_element *elem,
}
}
-done:
/* Modify the send_ops stream_op_buffer depending on whether compression was
* carried out */
if (did_compress) {
finish_compressed_sopb(send_ops, elem);
- } else {
- finish_not_compressed_sopb(send_ops, elem);
}
}
@@ -282,7 +239,7 @@ static void init_call_elem(grpc_call_element *elem,
/* initialize members */
gpr_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
- calld->seen_initial_metadata = 0; /* GPR_FALSE */
+ calld->written_initial_metadata = 0; /* GPR_FALSE */
if (initial_op) {
if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
@@ -342,12 +299,13 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
}
-const grpc_channel_filter grpc_compress_filter = {compress_start_transport_stream_op,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- "compress"};
+const grpc_channel_filter grpc_compress_filter = {
+ compress_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "compress"};
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 74d28fe323..717158cf16 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -433,6 +433,11 @@ static void set_compression_algorithm(grpc_call *call,
call->compression_algorithm = algo;
}
+grpc_compression_algorithm grpc_call_get_compression_algorithm(
+ const grpc_call *call) {
+ return call->compression_algorithm;
+}
+
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
@@ -712,7 +717,8 @@ static void finish_message(grpc_call *call) {
/* some aliases for readability */
gpr_slice *slices = call->incoming_message.slices;
const size_t nslices = call->incoming_message.count;
- if (call->compression_algorithm > GRPC_COMPRESS_NONE) {
+ if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
byte_buffer = grpc_raw_compressed_byte_buffer_create(
slices, nslices, call->compression_algorithm);
} else {
@@ -743,14 +749,15 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
(call->compression_algorithm == GRPC_COMPRESS_NONE)) {
char *message = NULL;
char *alg_name;
- if (!grpc_compression_algorithm_name(call->compression_algorithm, &alg_name)) {
+ if (!grpc_compression_algorithm_name(call->compression_algorithm,
+ &alg_name)) {
/* This shouldn't happen, other than due to data corruption */
alg_name = "<unknown>";
}
gpr_asprintf(&message,
"Invalid compression algorithm (%s) for compressed message.",
alg_name);
- cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
}
/* stash away parameters, and prepare for incoming slices */
if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 6a36485738..7a4c355f23 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -90,12 +90,9 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
fh_0:
case GRPC_CHTTP2_DATA_FH_0:
p->frame_type = *cur;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_1;
- return GRPC_CHTTP2_PARSE_OK;
- }
switch (p->frame_type) {
case 0:
+ /* noop */
break;
case 1:
p->is_frame_compressed = 1; /* GPR_TRUE */
@@ -104,6 +101,10 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type);
return GRPC_CHTTP2_STREAM_ERROR;
}
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_1;
+ return GRPC_CHTTP2_PARSE_OK;
+ }
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
p->frame_size = ((gpr_uint32)*cur) << 24;