aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-06-16 14:27:32 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-06-16 14:27:32 -0700
commit55b4ea1f07708320ed78fc8770472a8e28b399c9 (patch)
tree5d69a9bbd9485312bec11ceb15b8694af9a2b300 /src/core/transport
parent9c941ed983ea5b70a0d5bd4cee5f298b236337c6 (diff)
WIP. Compiles and tests pass. Need to write specific tests.
Diffstat (limited to 'src/core/transport')
-rw-r--r--src/core/transport/chttp2/frame_data.c40
-rw-r--r--src/core/transport/chttp2/frame_data.h1
-rw-r--r--src/core/transport/chttp2/stream_encoder.c2
-rw-r--r--src/core/transport/transport.h1
4 files changed, 38 insertions, 6 deletions
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index a1ae9ed2e6..851420e900 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/transport/transport.h"
+#include "src/core/compression/message_compress.h"
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
@@ -68,6 +69,35 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
return GRPC_CHTTP2_PARSE_OK;
}
+/** Performs any extra work needed after a frame has been assembled */
+grpc_chttp2_parse_error parse_postprocessing(grpc_chttp2_data_parser *p) {
+ if (p->is_frame_compressed) { /* Decompress */
+ /* Reorganize the slices within p->incoming_sopb into a gpr_slice_buffer to
+ * be fed to the decompression function */
+ gpr_slice_buffer sb_in, sb_out;
+ grpc_stream_op_buffer *sopb = &p->incoming_sopb;
+ size_t i;
+ gpr_slice_buffer_init(&sb_in);
+ gpr_slice_buffer_init(&sb_out);
+ for (i = 0; i < sopb->nops; ++i) {
+ if (sopb->ops->type == GRPC_OP_SLICE) {
+ gpr_slice_buffer_add(&sb_in, sopb->ops->data.slice);
+ }
+ }
+ grpc_msg_decompress(GRPC_COMPRESS_GZIP /* XXX */, &sb_in, &sb_out);
+ /* copy uncompressed output back to p->incoming_sopb */
+ grpc_sopb_reset(sopb);
+ grpc_sopb_add_begin_message(sopb, sb_out.length, 0);
+ for (i = 0; i < sb_out.count; ++i) {
+ grpc_sopb_add_slice(sopb, sb_out.slices[i]);
+ }
+ gpr_slice_buffer_destroy(&sb_in);
+ gpr_slice_buffer_destroy(&sb_out);
+ }
+
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
int is_last) {
@@ -97,8 +127,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
case 0:
break;
case 1:
- gpr_log(GPR_ERROR, "Compressed GRPC frames not yet supported");
- return GRPC_CHTTP2_STREAM_ERROR;
+ p->is_frame_compressed = 1; /* GPR_TRUE */
+ break;
default:
gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type);
return GRPC_CHTTP2_STREAM_ERROR;
@@ -134,13 +164,13 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
- return GRPC_CHTTP2_PARSE_OK;
+ return parse_postprocessing(p);
} else if ((gpr_uint32)(end - cur) == p->frame_size) {
state->need_flush_reads = 1;
grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg));
p->state = GRPC_CHTTP2_DATA_FH_0;
- return GRPC_CHTTP2_PARSE_OK;
+ return parse_postprocessing(p);
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
state->need_flush_reads = 1;
grpc_sopb_add_slice(
@@ -153,7 +183,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg));
p->frame_size -= (end - cur);
- return GRPC_CHTTP2_PARSE_OK;
+ return parse_postprocessing(p);
}
}
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index 24e557accd..03c8db97b9 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -56,6 +56,7 @@ typedef struct {
gpr_uint8 frame_type;
gpr_uint32 frame_size;
+ int is_frame_compressed;
grpc_stream_op_buffer incoming_sopb;
} grpc_chttp2_data_parser;
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index cf78ac50cc..5788236ffb 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -497,7 +497,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
through - this lets us reuse the slice framing code below */
slice = gpr_slice_malloc(5);
p = GPR_SLICE_START_PTR(slice);
- p[0] = 0;
+ p[0] = !!(op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS);
p[1] = op->data.begin_message.length >> 24;
p[2] = op->data.begin_message.length >> 16;
p[3] = op->data.begin_message.length >> 8;
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 6f8d39e352..a3e5f3d3c4 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -65,6 +65,7 @@ typedef enum grpc_stream_state {
typedef struct grpc_transport_op {
grpc_stream_op_buffer *send_ops;
int is_last_send;
+ int dont_compress;
void (*on_done_send)(void *user_data, int success);
void *send_user_data;