aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c102
1 files changed, 66 insertions, 36 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 71f4235571..6e643b591c 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -30,24 +30,25 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/census/grpc_context.h"
-#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
+#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <assert.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
/** The maximum number of completions possible.
Based upon the maximum number of individually queueable ops in the batch
@@ -235,8 +236,8 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /* Compression level for the call */
- grpc_compression_level compression_level;
+ /* Compression algorithm for the call */
+ grpc_compression_algorithm compression_algorithm;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -347,7 +348,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
- if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(send_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
set_deadline_alarm(call, send_deadline);
}
return call;
@@ -469,9 +470,14 @@ static void set_status_code(grpc_call *call, status_source source,
}
}
-static void set_decode_compression_level(grpc_call *call,
- grpc_compression_level clevel) {
- call->compression_level = clevel;
+static void set_compression_algorithm(grpc_call *call,
+ grpc_compression_algorithm algo) {
+ call->compression_algorithm = algo;
+}
+
+grpc_compression_algorithm grpc_call_get_compression_algorithm(
+ const grpc_call *call) {
+ return call->compression_algorithm;
}
static void set_status_details(grpc_call *call, status_source source,
@@ -762,8 +768,18 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) {
if (call->error_status_set == 0) {
/* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
- call->incoming_message.slices, call->incoming_message.count);
+ grpc_byte_buffer *byte_buffer;
+ /* some aliases for readability */
+ gpr_slice *slices = call->incoming_message.slices;
+ const size_t nslices = call->incoming_message.count;
+
+ if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ byte_buffer = grpc_raw_compressed_byte_buffer_create(
+ slices, nslices, call->compression_algorithm);
+ } else {
+ byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
+ }
grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
@@ -782,6 +798,25 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_free(message);
return 0;
}
+ /* sanity check: if message flags indicate a compressed message, the
+ * compression level should already be present in the call, as parsed off its
+ * corresponding metadata. */
+ if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
+ char *message = NULL;
+ char *alg_name;
+ if (!grpc_compression_algorithm_name(call->compression_algorithm,
+ &alg_name)) {
+ /* This shouldn't happen, other than due to data corruption */
+ alg_name = "<unknown>";
+ }
+ gpr_asprintf(&message,
+ "Invalid compression algorithm (%s) for compressed message.",
+ alg_name);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
+ gpr_free(message);
+ return 0;
+ }
/* stash away parameters, and prepare for incoming slices */
if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
char *message = NULL;
@@ -985,7 +1020,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
data.send_metadata.metadata);
mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
@@ -1276,25 +1311,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
static void destroy_compression(void *ignored) {}
static gpr_uint32 decode_compression(grpc_mdelem *md) {
- grpc_compression_level clevel;
- void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ grpc_compression_algorithm algorithm;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
- clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
- gpr_uint32 parsed_clevel_bytes;
- if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice),
- &parsed_clevel_bytes)) {
- /* the following cast is safe, as a gpr_uint32 should be able to hold all
- * possible values of the grpc_compression_level enum */
- clevel = (grpc_compression_level)parsed_clevel_bytes;
- } else {
- clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ const char *md_c_str = grpc_mdstr_as_c_string(md->value);
+ if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
+ gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
+ assert(0);
}
grpc_mdelem_set_user_data(md, destroy_compression,
- (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
+ (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
}
- return clevel;
+ return algorithm;
}
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
@@ -1313,8 +1343,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value));
} else if (key ==
- grpc_channel_get_compresssion_level_string(call->channel)) {
- set_decode_compression_level(call, decode_compression(md));
+ grpc_channel_get_compression_algorithm_string(call->channel)) {
+ set_compression_algorithm(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1338,7 +1368,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
l->md = 0;
}
}
- if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
@@ -1429,7 +1459,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
- req->flags = ops->flags;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */