diff options
author | Muxi Yan <mxyan@google.com> | 2017-09-25 12:38:17 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2017-09-25 12:38:17 -0700 |
commit | 1c285b98122ef8fe31aa3325a4c10f5b05107ca8 (patch) | |
tree | f811ca6879a76bb526a7af7a35e987780d1bcba8 /src/core/lib/compression | |
parent | d4bb9bddd6cd1bfa4829deee9c3163c6065dafcb (diff) | |
parent | 008a173a7e2ba1d5c0933aa7a77395945ba2024d (diff) |
Merge remote-tracking branch 'upstream/master' into fix-stream-compression-config-interface
Diffstat (limited to 'src/core/lib/compression')
-rw-r--r-- | src/core/lib/compression/compression.c | 2 | ||||
-rw-r--r-- | src/core/lib/compression/compression_internal.c | 2 | ||||
-rw-r--r-- | src/core/lib/compression/compression_internal.h | 2 | ||||
-rw-r--r-- | src/core/lib/compression/stream_compression.c | 178 | ||||
-rw-r--r-- | src/core/lib/compression/stream_compression.h | 32 | ||||
-rw-r--r-- | src/core/lib/compression/stream_compression_gzip.c | 228 | ||||
-rw-r--r-- | src/core/lib/compression/stream_compression_gzip.h | 26 | ||||
-rw-r--r-- | src/core/lib/compression/stream_compression_identity.c | 94 | ||||
-rw-r--r-- | src/core/lib/compression/stream_compression_identity.h | 27 |
9 files changed, 438 insertions, 153 deletions
diff --git a/src/core/lib/compression/compression.c b/src/core/lib/compression/compression.c index e7414c4152..97a63654f3 100644 --- a/src/core/lib/compression/compression.c +++ b/src/core/lib/compression/compression.c @@ -59,7 +59,7 @@ int grpc_compression_algorithm_parse(grpc_slice name, } int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, - char **name) { + const char **name) { GRPC_API_TRACE("grpc_compression_algorithm_parse(algorithm=%d, name=%p)", 2, ((int)algorithm, name)); switch (algorithm) { diff --git a/src/core/lib/compression/compression_internal.c b/src/core/lib/compression/compression_internal.c index a395ceb572..8e0a14dead 100644 --- a/src/core/lib/compression/compression_internal.c +++ b/src/core/lib/compression/compression_internal.c @@ -183,7 +183,7 @@ int grpc_compression_algorithm_from_message_stream_compression_algorithm( /* Interfaces for message compression. */ int grpc_message_compression_algorithm_name( - grpc_message_compression_algorithm algorithm, char **name) { + grpc_message_compression_algorithm algorithm, const char **name) { GRPC_API_TRACE("grpc_message_compression_algorithm_parse(algorithm=%d, name=%p)", 2, ((int)algorithm, name)); switch (algorithm) { diff --git a/src/core/lib/compression/compression_internal.h b/src/core/lib/compression/compression_internal.h index d3f95910a7..dcd49c8c11 100644 --- a/src/core/lib/compression/compression_internal.h +++ b/src/core/lib/compression/compression_internal.h @@ -91,7 +91,7 @@ int grpc_compression_algorithm_from_message_stream_compression_algorithm( /* Interfaces for message compression. */ int grpc_message_compression_algorithm_name( - grpc_message_compression_algorithm algorithm, char **name); + grpc_message_compression_algorithm algorithm, const char **name); grpc_message_compression_algorithm grpc_message_compression_algorithm_for_level( grpc_message_compression_level level, uint32_t accepted_encodings); diff --git a/src/core/lib/compression/stream_compression.c b/src/core/lib/compression/stream_compression.c index df13d53e06..411489f029 100644 --- a/src/core/lib/compression/stream_compression.c +++ b/src/core/lib/compression/stream_compression.c @@ -16,176 +16,62 @@ * */ -#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include "src/core/lib/compression/stream_compression.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/compression/stream_compression_gzip.h" -#define OUTPUT_BLOCK_SIZE (1024) - -static bool gzip_flate(grpc_stream_compression_context *ctx, - grpc_slice_buffer *in, grpc_slice_buffer *out, - size_t *output_size, size_t max_output_size, int flush, - bool *end_of_context) { - GPR_ASSERT(flush == 0 || flush == Z_SYNC_FLUSH || flush == Z_FINISH); - /* Full flush is not allowed when inflating. */ - GPR_ASSERT(!(ctx->flate == inflate && (flush == Z_FINISH))); - - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - int r; - bool eoc = false; - size_t original_max_output_size = max_output_size; - while (max_output_size > 0 && (in->length > 0 || flush) && !eoc) { - size_t slice_size = max_output_size < OUTPUT_BLOCK_SIZE ? max_output_size - : OUTPUT_BLOCK_SIZE; - grpc_slice slice_out = GRPC_SLICE_MALLOC(slice_size); - ctx->zs.avail_out = (uInt)slice_size; - ctx->zs.next_out = GRPC_SLICE_START_PTR(slice_out); - while (ctx->zs.avail_out > 0 && in->length > 0 && !eoc) { - grpc_slice slice = grpc_slice_buffer_take_first(in); - ctx->zs.avail_in = (uInt)GRPC_SLICE_LENGTH(slice); - ctx->zs.next_in = GRPC_SLICE_START_PTR(slice); - r = ctx->flate(&ctx->zs, Z_NO_FLUSH); - if (r < 0 && r != Z_BUF_ERROR) { - gpr_log(GPR_ERROR, "zlib error (%d)", r); - grpc_slice_unref_internal(&exec_ctx, slice_out); - grpc_exec_ctx_finish(&exec_ctx); - return false; - } else if (r == Z_STREAM_END && ctx->flate == inflate) { - eoc = true; - } - if (ctx->zs.avail_in > 0) { - grpc_slice_buffer_undo_take_first( - in, - grpc_slice_sub(slice, GRPC_SLICE_LENGTH(slice) - ctx->zs.avail_in, - GRPC_SLICE_LENGTH(slice))); - } - grpc_slice_unref_internal(&exec_ctx, slice); - } - if (flush != 0 && ctx->zs.avail_out > 0 && !eoc) { - GPR_ASSERT(in->length == 0); - r = ctx->flate(&ctx->zs, flush); - if (flush == Z_SYNC_FLUSH) { - switch (r) { - case Z_OK: - /* Maybe flush is not complete; just made some partial progress. */ - if (ctx->zs.avail_out > 0) { - flush = 0; - } - break; - case Z_BUF_ERROR: - case Z_STREAM_END: - flush = 0; - break; - default: - gpr_log(GPR_ERROR, "zlib error (%d)", r); - grpc_slice_unref_internal(&exec_ctx, slice_out); - grpc_exec_ctx_finish(&exec_ctx); - return false; - } - } else if (flush == Z_FINISH) { - switch (r) { - case Z_OK: - case Z_BUF_ERROR: - /* Wait for the next loop to assign additional output space. */ - GPR_ASSERT(ctx->zs.avail_out == 0); - break; - case Z_STREAM_END: - flush = 0; - break; - default: - gpr_log(GPR_ERROR, "zlib error (%d)", r); - grpc_slice_unref_internal(&exec_ctx, slice_out); - grpc_exec_ctx_finish(&exec_ctx); - return false; - } - } - } - - if (ctx->zs.avail_out == 0) { - grpc_slice_buffer_add(out, slice_out); - } else if (ctx->zs.avail_out < slice_size) { - slice_out.data.refcounted.length -= ctx->zs.avail_out; - grpc_slice_buffer_add(out, slice_out); - } else { - grpc_slice_unref_internal(&exec_ctx, slice_out); - } - max_output_size -= (slice_size - ctx->zs.avail_out); - } - grpc_exec_ctx_finish(&exec_ctx); - if (end_of_context) { - *end_of_context = eoc; - } - if (output_size) { - *output_size = original_max_output_size - max_output_size; - } - return true; -} +extern const grpc_stream_compression_vtable + grpc_stream_compression_identity_vtable; bool grpc_stream_compress(grpc_stream_compression_context *ctx, grpc_slice_buffer *in, grpc_slice_buffer *out, size_t *output_size, size_t max_output_size, grpc_stream_compression_flush flush) { - GPR_ASSERT(ctx->flate == deflate); - int gzip_flush; - switch (flush) { - case GRPC_STREAM_COMPRESSION_FLUSH_NONE: - gzip_flush = 0; - break; - case GRPC_STREAM_COMPRESSION_FLUSH_SYNC: - gzip_flush = Z_SYNC_FLUSH; - break; - case GRPC_STREAM_COMPRESSION_FLUSH_FINISH: - gzip_flush = Z_FINISH; - break; - default: - gzip_flush = 0; - } - return gzip_flate(ctx, in, out, output_size, max_output_size, gzip_flush, - NULL); + return ctx->vtable->compress(ctx, in, out, output_size, max_output_size, + flush); } bool grpc_stream_decompress(grpc_stream_compression_context *ctx, grpc_slice_buffer *in, grpc_slice_buffer *out, size_t *output_size, size_t max_output_size, bool *end_of_context) { - GPR_ASSERT(ctx->flate == inflate); - return gzip_flate(ctx, in, out, output_size, max_output_size, Z_SYNC_FLUSH, - end_of_context); + return ctx->vtable->decompress(ctx, in, out, output_size, max_output_size, + end_of_context); } grpc_stream_compression_context *grpc_stream_compression_context_create( grpc_stream_compression_method method) { - grpc_stream_compression_context *ctx = - gpr_zalloc(sizeof(grpc_stream_compression_context)); - int r; - if (ctx == NULL) { - return NULL; - } - if (method == GRPC_STREAM_COMPRESSION_DECOMPRESS) { - r = inflateInit2(&ctx->zs, 0x1F); - ctx->flate = inflate; - } else { - r = deflateInit2(&ctx->zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 0x1F, 8, - Z_DEFAULT_STRATEGY); - ctx->flate = deflate; - } - if (r != Z_OK) { - gpr_free(ctx); - return NULL; + switch (method) { + case GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS: + case GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS: + return grpc_stream_compression_identity_vtable.context_create(method); + case GRPC_STREAM_COMPRESSION_GZIP_COMPRESS: + case GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS: + return grpc_stream_compression_gzip_vtable.context_create(method); + default: + gpr_log(GPR_ERROR, "Unknown stream compression method: %d", method); + return NULL; } - - return ctx; } void grpc_stream_compression_context_destroy( grpc_stream_compression_context *ctx) { - if (ctx->flate == inflate) { - inflateEnd(&ctx->zs); + ctx->vtable->context_destroy(ctx); +} + +int grpc_stream_compression_method_parse( + grpc_slice value, bool is_compress, + grpc_stream_compression_method *method) { + if (grpc_slice_eq(value, GRPC_MDSTR_IDENTITY)) { + *method = is_compress ? GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS + : GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; + return 1; + } else if (grpc_slice_eq(value, GRPC_MDSTR_GZIP)) { + *method = is_compress ? GRPC_STREAM_COMPRESSION_GZIP_COMPRESS + : GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS; + return 1; } else { - deflateEnd(&ctx->zs); + return 0; } - gpr_free(ctx); } diff --git a/src/core/lib/compression/stream_compression.h b/src/core/lib/compression/stream_compression.h index 844dff81a3..6d073280fa 100644 --- a/src/core/lib/compression/stream_compression.h +++ b/src/core/lib/compression/stream_compression.h @@ -24,15 +24,20 @@ #include <grpc/slice_buffer.h> #include <zlib.h> +#include "src/core/lib/transport/static_metadata.h" + +typedef struct grpc_stream_compression_vtable grpc_stream_compression_vtable; + /* Stream compression/decompression context */ typedef struct grpc_stream_compression_context { - z_stream zs; - int (*flate)(z_stream *zs, int flush); + const grpc_stream_compression_vtable *vtable; } grpc_stream_compression_context; typedef enum grpc_stream_compression_method { - GRPC_STREAM_COMPRESSION_COMPRESS = 0, - GRPC_STREAM_COMPRESSION_DECOMPRESS, + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS = 0, + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS, + GRPC_STREAM_COMPRESSION_GZIP_COMPRESS, + GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS, GRPC_STREAM_COMPRESSION_METHOD_COUNT } grpc_stream_compression_method; @@ -43,6 +48,19 @@ typedef enum grpc_stream_compression_flush { GRPC_STREAM_COMPRESSION_FLUSH_COUNT } grpc_stream_compression_flush; +struct grpc_stream_compression_vtable { + bool (*compress)(grpc_stream_compression_context *ctx, grpc_slice_buffer *in, + grpc_slice_buffer *out, size_t *output_size, + size_t max_output_size, grpc_stream_compression_flush flush); + bool (*decompress)(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, + bool *end_of_context); + grpc_stream_compression_context *(*context_create)( + grpc_stream_compression_method method); + void (*context_destroy)(grpc_stream_compression_context *ctx); +}; + /** * Compress bytes provided in \a in with a given context, with an optional flush * at the end of compression. Emits at most \a max_output_size compressed bytes @@ -87,4 +105,10 @@ grpc_stream_compression_context *grpc_stream_compression_context_create( void grpc_stream_compression_context_destroy( grpc_stream_compression_context *ctx); +/** + * Parse stream compression method based on algorithm name + */ +int grpc_stream_compression_method_parse( + grpc_slice value, bool is_compress, grpc_stream_compression_method *method); + #endif diff --git a/src/core/lib/compression/stream_compression_gzip.c b/src/core/lib/compression/stream_compression_gzip.c new file mode 100644 index 0000000000..abcbdb3a91 --- /dev/null +++ b/src/core/lib/compression/stream_compression_gzip.c @@ -0,0 +1,228 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/compression/stream_compression_gzip.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/slice/slice_internal.h" + +#define OUTPUT_BLOCK_SIZE (1024) + +typedef struct grpc_stream_compression_context_gzip { + grpc_stream_compression_context base; + + z_stream zs; + int (*flate)(z_stream *zs, int flush); +} grpc_stream_compression_context_gzip; + +static bool gzip_flate(grpc_stream_compression_context_gzip *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, int flush, + bool *end_of_context) { + GPR_ASSERT(flush == 0 || flush == Z_SYNC_FLUSH || flush == Z_FINISH); + /* Full flush is not allowed when inflating. */ + GPR_ASSERT(!(ctx->flate == inflate && (flush == Z_FINISH))); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + int r; + bool eoc = false; + size_t original_max_output_size = max_output_size; + while (max_output_size > 0 && (in->length > 0 || flush) && !eoc) { + size_t slice_size = max_output_size < OUTPUT_BLOCK_SIZE ? max_output_size + : OUTPUT_BLOCK_SIZE; + grpc_slice slice_out = GRPC_SLICE_MALLOC(slice_size); + ctx->zs.avail_out = (uInt)slice_size; + ctx->zs.next_out = GRPC_SLICE_START_PTR(slice_out); + while (ctx->zs.avail_out > 0 && in->length > 0 && !eoc) { + grpc_slice slice = grpc_slice_buffer_take_first(in); + ctx->zs.avail_in = (uInt)GRPC_SLICE_LENGTH(slice); + ctx->zs.next_in = GRPC_SLICE_START_PTR(slice); + r = ctx->flate(&ctx->zs, Z_NO_FLUSH); + if (r < 0 && r != Z_BUF_ERROR) { + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } else if (r == Z_STREAM_END && ctx->flate == inflate) { + eoc = true; + } + if (ctx->zs.avail_in > 0) { + grpc_slice_buffer_undo_take_first( + in, + grpc_slice_sub(slice, GRPC_SLICE_LENGTH(slice) - ctx->zs.avail_in, + GRPC_SLICE_LENGTH(slice))); + } + grpc_slice_unref_internal(&exec_ctx, slice); + } + if (flush != 0 && ctx->zs.avail_out > 0 && !eoc) { + GPR_ASSERT(in->length == 0); + r = ctx->flate(&ctx->zs, flush); + if (flush == Z_SYNC_FLUSH) { + switch (r) { + case Z_OK: + /* Maybe flush is not complete; just made some partial progress. */ + if (ctx->zs.avail_out > 0) { + flush = 0; + } + break; + case Z_BUF_ERROR: + case Z_STREAM_END: + flush = 0; + break; + default: + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } + } else if (flush == Z_FINISH) { + switch (r) { + case Z_OK: + case Z_BUF_ERROR: + /* Wait for the next loop to assign additional output space. */ + GPR_ASSERT(ctx->zs.avail_out == 0); + break; + case Z_STREAM_END: + flush = 0; + break; + default: + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } + } + } + + if (ctx->zs.avail_out == 0) { + grpc_slice_buffer_add(out, slice_out); + } else if (ctx->zs.avail_out < slice_size) { + slice_out.data.refcounted.length -= ctx->zs.avail_out; + grpc_slice_buffer_add(out, slice_out); + } else { + grpc_slice_unref_internal(&exec_ctx, slice_out); + } + max_output_size -= (slice_size - ctx->zs.avail_out); + } + grpc_exec_ctx_finish(&exec_ctx); + if (end_of_context) { + *end_of_context = eoc; + } + if (output_size) { + *output_size = original_max_output_size - max_output_size; + } + return true; +} + +static bool grpc_stream_compress_gzip(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size, + grpc_stream_compression_flush flush) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)ctx; + GPR_ASSERT(gzip_ctx->flate == deflate); + int gzip_flush; + switch (flush) { + case GRPC_STREAM_COMPRESSION_FLUSH_NONE: + gzip_flush = 0; + break; + case GRPC_STREAM_COMPRESSION_FLUSH_SYNC: + gzip_flush = Z_SYNC_FLUSH; + break; + case GRPC_STREAM_COMPRESSION_FLUSH_FINISH: + gzip_flush = Z_FINISH; + break; + default: + gzip_flush = 0; + } + return gzip_flate(gzip_ctx, in, out, output_size, max_output_size, gzip_flush, + NULL); +} + +static bool grpc_stream_decompress_gzip(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size, + bool *end_of_context) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)ctx; + GPR_ASSERT(gzip_ctx->flate == inflate); + return gzip_flate(gzip_ctx, in, out, output_size, max_output_size, + Z_SYNC_FLUSH, end_of_context); +} + +static grpc_stream_compression_context * +grpc_stream_compression_context_create_gzip( + grpc_stream_compression_method method) { + GPR_ASSERT(method == GRPC_STREAM_COMPRESSION_GZIP_COMPRESS || + method == GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS); + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)gpr_zalloc( + sizeof(grpc_stream_compression_context_gzip)); + int r; + if (gzip_ctx == NULL) { + return NULL; + } + if (method == GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS) { + r = inflateInit2(&gzip_ctx->zs, 0x1F); + gzip_ctx->flate = inflate; + } else { + r = deflateInit2(&gzip_ctx->zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 0x1F, 8, + Z_DEFAULT_STRATEGY); + gzip_ctx->flate = deflate; + } + if (r != Z_OK) { + gpr_free(gzip_ctx); + return NULL; + } + + gzip_ctx->base.vtable = &grpc_stream_compression_gzip_vtable; + return (grpc_stream_compression_context *)gzip_ctx; +} + +static void grpc_stream_compression_context_destroy_gzip( + grpc_stream_compression_context *ctx) { + if (ctx == NULL) { + return; + } + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)ctx; + if (gzip_ctx->flate == inflate) { + inflateEnd(&gzip_ctx->zs); + } else { + deflateEnd(&gzip_ctx->zs); + } + gpr_free(ctx); +} + +const grpc_stream_compression_vtable grpc_stream_compression_gzip_vtable = { + .compress = grpc_stream_compress_gzip, + .decompress = grpc_stream_decompress_gzip, + .context_create = grpc_stream_compression_context_create_gzip, + .context_destroy = grpc_stream_compression_context_destroy_gzip}; diff --git a/src/core/lib/compression/stream_compression_gzip.h b/src/core/lib/compression/stream_compression_gzip.h new file mode 100644 index 0000000000..7cf49a0de9 --- /dev/null +++ b/src/core/lib/compression/stream_compression_gzip.h @@ -0,0 +1,26 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_GZIP_H +#define GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_GZIP_H + +#include "src/core/lib/compression/stream_compression.h" + +extern const grpc_stream_compression_vtable grpc_stream_compression_gzip_vtable; + +#endif diff --git a/src/core/lib/compression/stream_compression_identity.c b/src/core/lib/compression/stream_compression_identity.c new file mode 100644 index 0000000000..3dfcf53b85 --- /dev/null +++ b/src/core/lib/compression/stream_compression_identity.c @@ -0,0 +1,94 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/compression/stream_compression_identity.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/slice/slice_internal.h" + +#define OUTPUT_BLOCK_SIZE (1024) + +/* Singleton context used for all identity streams. */ +static grpc_stream_compression_context identity_ctx = { + .vtable = &grpc_stream_compression_identity_vtable}; + +static void grpc_stream_compression_pass_through(grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size) { + if (max_output_size >= in->length) { + if (output_size) { + *output_size = in->length; + } + grpc_slice_buffer_move_into(in, out); + } else { + if (output_size) { + *output_size = max_output_size; + } + grpc_slice_buffer_move_first(in, max_output_size, out); + } +} + +static bool grpc_stream_compress_identity(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size, + grpc_stream_compression_flush flush) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_pass_through(in, out, output_size, max_output_size); + return true; +} + +static bool grpc_stream_decompress_identity( + grpc_stream_compression_context *ctx, grpc_slice_buffer *in, + grpc_slice_buffer *out, size_t *output_size, size_t max_output_size, + bool *end_of_context) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_pass_through(in, out, output_size, max_output_size); + if (end_of_context) { + *end_of_context = false; + } + return true; +} + +static grpc_stream_compression_context * +grpc_stream_compression_context_create_identity( + grpc_stream_compression_method method) { + GPR_ASSERT(method == GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS || + method == GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS); + /* No context needed in this case. Use fake context instead. */ + return (grpc_stream_compression_context *)&identity_ctx; +} + +static void grpc_stream_compression_context_destroy_identity( + grpc_stream_compression_context *ctx) { + return; +} + +const grpc_stream_compression_vtable grpc_stream_compression_identity_vtable = { + .compress = grpc_stream_compress_identity, + .decompress = grpc_stream_decompress_identity, + .context_create = grpc_stream_compression_context_create_identity, + .context_destroy = grpc_stream_compression_context_destroy_identity}; diff --git a/src/core/lib/compression/stream_compression_identity.h b/src/core/lib/compression/stream_compression_identity.h new file mode 100644 index 0000000000..41926e949e --- /dev/null +++ b/src/core/lib/compression/stream_compression_identity.h @@ -0,0 +1,27 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_IDENTITY_H +#define GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_IDENTITY_H + +#include "src/core/lib/compression/stream_compression.h" + +extern const grpc_stream_compression_vtable + grpc_stream_compression_identity_vtable; + +#endif |