diff options
author | David Garcia Quintas <dgq@google.com> | 2015-06-16 14:27:32 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-06-16 14:27:32 -0700 |
commit | 55b4ea1f07708320ed78fc8770472a8e28b399c9 (patch) | |
tree | 5d69a9bbd9485312bec11ceb15b8694af9a2b300 /src/core/transport | |
parent | 9c941ed983ea5b70a0d5bd4cee5f298b236337c6 (diff) |
WIP. Compiles and tests pass. Need to write specific tests.
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 40 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.h | 1 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 2 | ||||
-rw-r--r-- | src/core/transport/transport.h | 1 |
4 files changed, 38 insertions, 6 deletions
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index a1ae9ed2e6..851420e900 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -40,6 +40,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/transport/transport.h" +#include "src/core/compression/message_compress.h" grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser) { @@ -68,6 +69,35 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( return GRPC_CHTTP2_PARSE_OK; } +/** Performs any extra work needed after a frame has been assembled */ +grpc_chttp2_parse_error parse_postprocessing(grpc_chttp2_data_parser *p) { + if (p->is_frame_compressed) { /* Decompress */ + /* Reorganize the slices within p->incoming_sopb into a gpr_slice_buffer to + * be fed to the decompression function */ + gpr_slice_buffer sb_in, sb_out; + grpc_stream_op_buffer *sopb = &p->incoming_sopb; + size_t i; + gpr_slice_buffer_init(&sb_in); + gpr_slice_buffer_init(&sb_out); + for (i = 0; i < sopb->nops; ++i) { + if (sopb->ops->type == GRPC_OP_SLICE) { + gpr_slice_buffer_add(&sb_in, sopb->ops->data.slice); + } + } + grpc_msg_decompress(GRPC_COMPRESS_GZIP /* XXX */, &sb_in, &sb_out); + /* copy uncompressed output back to p->incoming_sopb */ + grpc_sopb_reset(sopb); + grpc_sopb_add_begin_message(sopb, sb_out.length, 0); + for (i = 0; i < sb_out.count; ++i) { + grpc_sopb_add_slice(sopb, sb_out.slices[i]); + } + gpr_slice_buffer_destroy(&sb_in); + gpr_slice_buffer_destroy(&sb_out); + } + + return GRPC_CHTTP2_PARSE_OK; +} + grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last) { @@ -97,8 +127,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( case 0: break; case 1: - gpr_log(GPR_ERROR, "Compressed GRPC frames not yet supported"); - return GRPC_CHTTP2_STREAM_ERROR; + p->is_frame_compressed = 1; /* GPR_TRUE */ + break; default: gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type); return GRPC_CHTTP2_STREAM_ERROR; @@ -134,13 +164,13 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { - return GRPC_CHTTP2_PARSE_OK; + return parse_postprocessing(p); } else if ((gpr_uint32)(end - cur) == p->frame_size) { state->need_flush_reads = 1; grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->state = GRPC_CHTTP2_DATA_FH_0; - return GRPC_CHTTP2_PARSE_OK; + return parse_postprocessing(p); } else if ((gpr_uint32)(end - cur) > p->frame_size) { state->need_flush_reads = 1; grpc_sopb_add_slice( @@ -153,7 +183,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->frame_size -= (end - cur); - return GRPC_CHTTP2_PARSE_OK; + return parse_postprocessing(p); } } diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index 24e557accd..03c8db97b9 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -56,6 +56,7 @@ typedef struct { gpr_uint8 frame_type; gpr_uint32 frame_size; + int is_frame_compressed; grpc_stream_op_buffer incoming_sopb; } grpc_chttp2_data_parser; diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index cf78ac50cc..5788236ffb 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -497,7 +497,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, through - this lets us reuse the slice framing code below */ slice = gpr_slice_malloc(5); p = GPR_SLICE_START_PTR(slice); - p[0] = 0; + p[0] = !!(op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS); p[1] = op->data.begin_message.length >> 24; p[2] = op->data.begin_message.length >> 16; p[3] = op->data.begin_message.length >> 8; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 6f8d39e352..a3e5f3d3c4 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -65,6 +65,7 @@ typedef enum grpc_stream_state { typedef struct grpc_transport_op { grpc_stream_op_buffer *send_ops; int is_last_send; + int dont_compress; void (*on_done_send)(void *user_data, int success); void *send_user_data; |