diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 86 | ||||
-rw-r--r-- | src/core/surface/channel.c | 13 | ||||
-rw-r--r-- | src/core/surface/channel.h | 3 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 2 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 2 | ||||
-rw-r--r-- | src/core/surface/server.c | 6 | ||||
-rw-r--r-- | src/core/surface/server.h | 6 | ||||
-rw-r--r-- | src/core/surface/server_create.c | 5 |
8 files changed, 76 insertions, 47 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a28a542c8d..5f982f9f6e 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -30,24 +30,25 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <grpc/compression.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/census/grpc_context.h" -#include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" +#include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <assert.h> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; @@ -221,8 +222,8 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /* Compression level for the call */ - grpc_compression_level compression_level; + /* Compression algorithm for the call */ + grpc_compression_algorithm compression_algorithm; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -427,9 +428,9 @@ static void set_status_code(grpc_call *call, status_source source, } } -static void set_decode_compression_level(grpc_call *call, - grpc_compression_level clevel) { - call->compression_level = clevel; +static void set_compression_algorithm(grpc_call *call, + grpc_compression_algorithm algo) { + call->compression_algorithm = algo; } static void set_status_details(grpc_call *call, status_source source, @@ -707,8 +708,16 @@ static void call_on_done_send(void *pc, int success) { static void finish_message(grpc_call *call) { if (call->error_status_set == 0) { /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( - call->incoming_message.slices, call->incoming_message.count); + grpc_byte_buffer *byte_buffer; + /* some aliases for readability */ + gpr_slice *slices = call->incoming_message.slices; + const size_t nslices = call->incoming_message.count; + if (call->compression_algorithm > GRPC_COMPRESS_NONE) { + byte_buffer = grpc_raw_compressed_byte_buffer_create( + slices, nslices, call->compression_algorithm); + } else { + byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); + } grpc_bbq_push(&call->incoming_queue, byte_buffer); } gpr_slice_buffer_reset_and_unref(&call->incoming_message); @@ -727,6 +736,22 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { gpr_free(message); return 0; } + /* sanity check: if message flags indicate a compressed message, the + * compression level should already be present in the call, as parsed off its + * corresponding metadata. */ + if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm == GRPC_COMPRESS_NONE)) { + char *message = NULL; + char *alg_name; + if (!grpc_compression_algorithm_name(call->compression_algorithm, &alg_name)) { + /* This shouldn't happen, other than due to data corruption */ + alg_name = "<unknown>"; + } + gpr_asprintf(&message, + "Invalid compression algorithm (%s) for compressed message.", + alg_name); + cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message); + } /* stash away parameters, and prepare for incoming slices */ if (msg.length > grpc_channel_get_max_message_length(call->channel)) { char *message = NULL; @@ -1220,25 +1245,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { static void destroy_compression(void *ignored) {} static gpr_uint32 decode_compression(grpc_mdelem *md) { - grpc_compression_level clevel; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + grpc_compression_algorithm algorithm; + void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); if (user_data) { - clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; } else { - gpr_uint32 parsed_clevel_bytes; - if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { - /* the following cast is safe, as a gpr_uint32 should be able to hold all - * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level)parsed_clevel_bytes; - } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + const char *md_c_str = grpc_mdstr_as_c_string(md->value); + if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) { + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + assert(0); } grpc_mdelem_set_user_data(md, destroy_compression, - (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); + (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); } - return clevel; + return algorithm; } static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { @@ -1257,8 +1277,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); } else if (key == - grpc_channel_get_compresssion_level_string(call->channel)) { - set_decode_compression_level(call, decode_compression(md)); + grpc_channel_get_compression_algorithm_string(call->channel)) { + set_compression_algorithm(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { @@ -1369,7 +1389,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; - req->flags = ops->flags; + req->flags = op->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index f8151c121c..5f6187d2bf 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -63,7 +63,7 @@ struct grpc_channel { grpc_mdctx *metadata_context; /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; - grpc_mdstr *grpc_compression_level_string; + grpc_mdstr *grpc_compression_algorithm_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; @@ -97,8 +97,8 @@ grpc_channel *grpc_channel_create_from_filters( gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); - channel->grpc_compression_level_string = - grpc_mdstr_from_string(mdctx, "grpc-compression-level"); + channel->grpc_compression_algorithm_string = + grpc_mdstr_from_string(mdctx, "grpc-encoding"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; @@ -208,7 +208,7 @@ static void destroy_channel(void *p, int ok) { grpc_mdelem_unref(channel->grpc_status_elem[i]); } grpc_mdstr_unref(channel->grpc_status_string); - grpc_mdstr_unref(channel->grpc_compression_level_string); + grpc_mdstr_unref(channel->grpc_compression_algorithm_string); grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); @@ -261,8 +261,9 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } -grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { - return channel->grpc_compression_level_string; +grpc_mdstr *grpc_channel_get_compression_algorithm_string( + grpc_channel *channel) { + return channel->grpc_compression_algorithm_string; } grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 71f8a55731..4e03eb4411 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -54,7 +54,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); -grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); +grpc_mdstr *grpc_channel_get_compression_algorithm_string( + grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index e205f0a9f8..91c7b35550 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -40,6 +40,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" +#include "src/core/channel/compress_filter.h" #include "src/core/channel/http_client_filter.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/tcp_client.h" @@ -163,6 +164,7 @@ grpc_channel *grpc_channel_create(const char *target, if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ + filters[n++] = &grpc_compress_filter; filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 34ee3f8400..5f77e4c06e 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -40,6 +40,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" +#include "src/core/channel/compress_filter.h" #include "src/core/channel/http_client_filter.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/tcp_client.h" @@ -212,6 +213,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ + filters[n++] = &grpc_compress_filter; filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 341ca2942c..b26637a6ac 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -723,9 +723,9 @@ void grpc_server_register_completion_queue(grpc_server *server, server->cqs[n] = cq; } -grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, - size_t filter_count, - const grpc_channel_args *args) { +grpc_server *grpc_server_create_from_filters( + const grpc_channel_filter **filters, size_t filter_count, + const grpc_channel_args *args) { size_t i; /* TODO(census): restore this once we finalize census filter etc. int census_enabled = grpc_channel_args_is_census_enabled(args); */ diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 2899c6dea3..c638d682bb 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -39,9 +39,9 @@ #include "src/core/transport/transport.h" /* Create a server */ -grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, - size_t filter_count, - const grpc_channel_args *args); +grpc_server *grpc_server_create_from_filters( + const grpc_channel_filter **filters, size_t filter_count, + const grpc_channel_args *args); /* Add a listener to the server: when the server starts, it will call start, and when it shuts down, it will call destroy */ diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index b7390675ad..0433164053 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -32,9 +32,12 @@ */ #include <grpc/grpc.h> + +#include "src/core/channel/compress_filter.h" #include "src/core/surface/completion_queue.h" #include "src/core/surface/server.h" grpc_server *grpc_server_create(const grpc_channel_args *args) { - return grpc_server_create_from_filters(NULL, 0, args); + const grpc_channel_filter *filters[] = {&grpc_compress_filter}; + return grpc_server_create_from_filters(filters, 0, args); } |