diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 102 |
1 files changed, 66 insertions, 36 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 71f4235571..6e643b591c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -30,24 +30,25 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <grpc/compression.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/census/grpc_context.h" -#include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" +#include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <assert.h> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> /** The maximum number of completions possible. Based upon the maximum number of individually queueable ops in the batch @@ -235,8 +236,8 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /* Compression level for the call */ - grpc_compression_level compression_level; + /* Compression algorithm for the call */ + grpc_compression_algorithm compression_algorithm; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -347,7 +348,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); - if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(send_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { set_deadline_alarm(call, send_deadline); } return call; @@ -469,9 +470,14 @@ static void set_status_code(grpc_call *call, status_source source, } } -static void set_decode_compression_level(grpc_call *call, - grpc_compression_level clevel) { - call->compression_level = clevel; +static void set_compression_algorithm(grpc_call *call, + grpc_compression_algorithm algo) { + call->compression_algorithm = algo; +} + +grpc_compression_algorithm grpc_call_get_compression_algorithm( + const grpc_call *call) { + return call->compression_algorithm; } static void set_status_details(grpc_call *call, status_source source, @@ -762,8 +768,18 @@ static void call_on_done_send(void *pc, int success) { static void finish_message(grpc_call *call) { if (call->error_status_set == 0) { /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( - call->incoming_message.slices, call->incoming_message.count); + grpc_byte_buffer *byte_buffer; + /* some aliases for readability */ + gpr_slice *slices = call->incoming_message.slices; + const size_t nslices = call->incoming_message.count; + + if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + byte_buffer = grpc_raw_compressed_byte_buffer_create( + slices, nslices, call->compression_algorithm); + } else { + byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); + } grpc_bbq_push(&call->incoming_queue, byte_buffer); } gpr_slice_buffer_reset_and_unref(&call->incoming_message); @@ -782,6 +798,25 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { gpr_free(message); return 0; } + /* sanity check: if message flags indicate a compressed message, the + * compression level should already be present in the call, as parsed off its + * corresponding metadata. */ + if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm == GRPC_COMPRESS_NONE)) { + char *message = NULL; + char *alg_name; + if (!grpc_compression_algorithm_name(call->compression_algorithm, + &alg_name)) { + /* This shouldn't happen, other than due to data corruption */ + alg_name = "<unknown>"; + } + gpr_asprintf(&message, + "Invalid compression algorithm (%s) for compressed message.", + alg_name); + cancel_with_status(call, GRPC_STATUS_INTERNAL, message); + gpr_free(message); + return 0; + } /* stash away parameters, and prepare for incoming slices */ if (msg.length > grpc_channel_get_max_message_length(call->channel)) { char *message = NULL; @@ -985,7 +1020,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { mdb.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata); mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; @@ -1276,25 +1311,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { static void destroy_compression(void *ignored) {} static gpr_uint32 decode_compression(grpc_mdelem *md) { - grpc_compression_level clevel; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + grpc_compression_algorithm algorithm; + void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); if (user_data) { - clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; } else { - gpr_uint32 parsed_clevel_bytes; - if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { - /* the following cast is safe, as a gpr_uint32 should be able to hold all - * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level)parsed_clevel_bytes; - } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + const char *md_c_str = grpc_mdstr_as_c_string(md->value); + if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) { + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + assert(0); } grpc_mdelem_set_user_data(md, destroy_compression, - (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); + (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); } - return clevel; + return algorithm; } static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { @@ -1313,8 +1343,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value)); } else if (key == - grpc_channel_get_compresssion_level_string(call->channel)) { - set_decode_compression_level(call, decode_compression(md)); + grpc_channel_get_compression_algorithm_string(call->channel)) { + set_compression_algorithm(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { @@ -1338,7 +1368,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { l->md = 0; } } - if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { @@ -1429,7 +1459,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; - req->flags = ops->flags; + req->flags = op->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ |