aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c96
1 files changed, 63 insertions, 33 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 0a551ac47f..0dfc4313bb 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -30,24 +30,25 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/census/grpc_context.h"
-#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
+#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <assert.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
/** The maximum number of completions possible.
Based upon the maximum number of individually queueable ops in the batch
@@ -235,8 +236,8 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /* Compression level for the call */
- grpc_compression_level compression_level;
+ /* Compression algorithm for the call */
+ grpc_compression_algorithm compression_algorithm;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -469,9 +470,14 @@ static void set_status_code(grpc_call *call, status_source source,
}
}
-static void set_decode_compression_level(grpc_call *call,
- grpc_compression_level clevel) {
- call->compression_level = clevel;
+static void set_compression_algorithm(grpc_call *call,
+ grpc_compression_algorithm algo) {
+ call->compression_algorithm = algo;
+}
+
+grpc_compression_algorithm grpc_call_get_compression_algorithm(
+ const grpc_call *call) {
+ return call->compression_algorithm;
}
static void set_status_details(grpc_call *call, status_source source,
@@ -749,8 +755,18 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) {
if (call->error_status_set == 0) {
/* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
- call->incoming_message.slices, call->incoming_message.count);
+ grpc_byte_buffer *byte_buffer;
+ /* some aliases for readability */
+ gpr_slice *slices = call->incoming_message.slices;
+ const size_t nslices = call->incoming_message.count;
+
+ if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ byte_buffer = grpc_raw_compressed_byte_buffer_create(
+ slices, nslices, call->compression_algorithm);
+ } else {
+ byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
+ }
grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
@@ -769,6 +785,25 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_free(message);
return 0;
}
+ /* sanity check: if message flags indicate a compressed message, the
+ * compression level should already be present in the call, as parsed off its
+ * corresponding metadata. */
+ if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
+ char *message = NULL;
+ char *alg_name;
+ if (!grpc_compression_algorithm_name(call->compression_algorithm,
+ &alg_name)) {
+ /* This shouldn't happen, other than due to data corruption */
+ alg_name = "<unknown>";
+ }
+ gpr_asprintf(&message,
+ "Invalid compression algorithm (%s) for compressed message.",
+ alg_name);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
+ gpr_free(message);
+ return 0;
+ }
/* stash away parameters, and prepare for incoming slices */
if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
char *message = NULL;
@@ -1263,25 +1298,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
static void destroy_compression(void *ignored) {}
static gpr_uint32 decode_compression(grpc_mdelem *md) {
- grpc_compression_level clevel;
- void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ grpc_compression_algorithm algorithm;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
- clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
- gpr_uint32 parsed_clevel_bytes;
- if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice),
- &parsed_clevel_bytes)) {
- /* the following cast is safe, as a gpr_uint32 should be able to hold all
- * possible values of the grpc_compression_level enum */
- clevel = (grpc_compression_level)parsed_clevel_bytes;
- } else {
- clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ const char *md_c_str = grpc_mdstr_as_c_string(md->value);
+ if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
+ gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
+ assert(0);
}
grpc_mdelem_set_user_data(md, destroy_compression,
- (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
+ (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
}
- return clevel;
+ return algorithm;
}
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
@@ -1300,8 +1330,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value));
} else if (key ==
- grpc_channel_get_compresssion_level_string(call->channel)) {
- set_decode_compression_level(call, decode_compression(md));
+ grpc_channel_get_compression_algorithm_string(call->channel)) {
+ set_compression_algorithm(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1416,7 +1446,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
- req->flags = ops->flags;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */