diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-07-19 15:35:17 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-07-19 15:35:17 -0700 |
commit | b4e70366c6b25d1127e66fd28c6256b19467dd9b (patch) | |
tree | 7f51ee747a9ed1200899b1c3e19752d85ce878ed /src/core/surface/call.c | |
parent | d82d0b295b51e1385481be381eef325423441a65 (diff) | |
parent | 0c2f1626c0082ab91aed27f77bbe01008d878db2 (diff) |
Merge branch 'decompression' of https://github.com/dgquintas/grpc into dgquintas-decompression
Conflicts:
Makefile
vsprojects/Grpc.mak
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 96 |
1 files changed, 63 insertions, 33 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 1146d83982..6e643b591c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -30,24 +30,25 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <grpc/compression.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/census/grpc_context.h" -#include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" +#include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <assert.h> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> /** The maximum number of completions possible. Based upon the maximum number of individually queueable ops in the batch @@ -235,8 +236,8 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /* Compression level for the call */ - grpc_compression_level compression_level; + /* Compression algorithm for the call */ + grpc_compression_algorithm compression_algorithm; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -469,9 +470,14 @@ static void set_status_code(grpc_call *call, status_source source, } } -static void set_decode_compression_level(grpc_call *call, - grpc_compression_level clevel) { - call->compression_level = clevel; +static void set_compression_algorithm(grpc_call *call, + grpc_compression_algorithm algo) { + call->compression_algorithm = algo; +} + +grpc_compression_algorithm grpc_call_get_compression_algorithm( + const grpc_call *call) { + return call->compression_algorithm; } static void set_status_details(grpc_call *call, status_source source, @@ -762,8 +768,18 @@ static void call_on_done_send(void *pc, int success) { static void finish_message(grpc_call *call) { if (call->error_status_set == 0) { /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( - call->incoming_message.slices, call->incoming_message.count); + grpc_byte_buffer *byte_buffer; + /* some aliases for readability */ + gpr_slice *slices = call->incoming_message.slices; + const size_t nslices = call->incoming_message.count; + + if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + byte_buffer = grpc_raw_compressed_byte_buffer_create( + slices, nslices, call->compression_algorithm); + } else { + byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); + } grpc_bbq_push(&call->incoming_queue, byte_buffer); } gpr_slice_buffer_reset_and_unref(&call->incoming_message); @@ -782,6 +798,25 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { gpr_free(message); return 0; } + /* sanity check: if message flags indicate a compressed message, the + * compression level should already be present in the call, as parsed off its + * corresponding metadata. */ + if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm == GRPC_COMPRESS_NONE)) { + char *message = NULL; + char *alg_name; + if (!grpc_compression_algorithm_name(call->compression_algorithm, + &alg_name)) { + /* This shouldn't happen, other than due to data corruption */ + alg_name = "<unknown>"; + } + gpr_asprintf(&message, + "Invalid compression algorithm (%s) for compressed message.", + alg_name); + cancel_with_status(call, GRPC_STATUS_INTERNAL, message); + gpr_free(message); + return 0; + } /* stash away parameters, and prepare for incoming slices */ if (msg.length > grpc_channel_get_max_message_length(call->channel)) { char *message = NULL; @@ -1276,25 +1311,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { static void destroy_compression(void *ignored) {} static gpr_uint32 decode_compression(grpc_mdelem *md) { - grpc_compression_level clevel; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + grpc_compression_algorithm algorithm; + void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); if (user_data) { - clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; } else { - gpr_uint32 parsed_clevel_bytes; - if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { - /* the following cast is safe, as a gpr_uint32 should be able to hold all - * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level)parsed_clevel_bytes; - } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + const char *md_c_str = grpc_mdstr_as_c_string(md->value); + if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) { + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + assert(0); } grpc_mdelem_set_user_data(md, destroy_compression, - (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); + (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); } - return clevel; + return algorithm; } static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { @@ -1313,8 +1343,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value)); } else if (key == - grpc_channel_get_compresssion_level_string(call->channel)) { - set_decode_compression_level(call, decode_compression(md)); + grpc_channel_get_compression_algorithm_string(call->channel)) { + set_compression_algorithm(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { @@ -1429,7 +1459,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; - req->flags = ops->flags; + req->flags = op->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ |