aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c320
1 files changed, 245 insertions, 75 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 74a09cf6ed..6fc91cb7ea 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -40,6 +40,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/slice.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
@@ -52,7 +53,9 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/transport.h"
/** The maximum number of concurrent batches possible.
Based upon the maximum number of individually queueable ops in the batch
@@ -153,8 +156,8 @@ struct grpc_call {
/* Call stats: only valid after trailing metadata received */
grpc_call_stats stats;
- /* Compression algorithm for the call */
- grpc_compression_algorithm compression_algorithm;
+ /* Compression algorithm for *incoming* data */
+ grpc_compression_algorithm incoming_compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
uint32_t encodings_accepted_by_peer;
@@ -213,6 +216,9 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
+static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
+ grpc_status_code status,
+ const char *description);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
bool success);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
@@ -383,21 +389,27 @@ static void set_status_code(grpc_call *call, status_source source,
/* TODO(ctiller): what to do about the flush that was previously here */
}
-static void set_compression_algorithm(grpc_call *call,
- grpc_compression_algorithm algo) {
+static void set_incoming_compression_algorithm(
+ grpc_call *call, grpc_compression_algorithm algo) {
GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
- call->compression_algorithm = algo;
+ call->incoming_compression_algorithm = algo;
}
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) {
grpc_compression_algorithm algorithm;
gpr_mu_lock(&call->mu);
- algorithm = call->compression_algorithm;
+ algorithm = call->incoming_compression_algorithm;
gpr_mu_unlock(&call->mu);
return algorithm;
}
+static grpc_compression_algorithm compression_algorithm_for_level_locked(
+ grpc_call *call, grpc_compression_level level) {
+ return grpc_compression_algorithm_for_level(level,
+ call->encodings_accepted_by_peer);
+}
+
uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
uint32_t flags;
gpr_mu_lock(&call->mu);
@@ -523,15 +535,28 @@ static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
return (grpc_linked_mdelem *)&md->internal_data;
}
+static grpc_metadata *get_md_elem(grpc_metadata *metadata,
+ grpc_metadata *additional_metadata, int i,
+ int count) {
+ grpc_metadata *res =
+ i < count ? &metadata[i] : &additional_metadata[i - count];
+ GPR_ASSERT(res);
+ return res;
+}
+
static int prepare_application_metadata(grpc_call *call, int count,
grpc_metadata *metadata,
int is_trailing,
- int prepend_extra_metadata) {
+ int prepend_extra_metadata,
+ grpc_metadata *additional_metadata,
+ int additional_metadata_count) {
+ int total_count = count + additional_metadata_count;
int i;
grpc_metadata_batch *batch =
&call->metadata_batch[0 /* is_receiving */][is_trailing];
- for (i = 0; i < count; i++) {
- grpc_metadata *md = &metadata[i];
+ for (i = 0; i < total_count; i++) {
+ const grpc_metadata *md =
+ get_md_elem(metadata, additional_metadata, i, count);
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(
@@ -550,9 +575,10 @@ static int prepare_application_metadata(grpc_call *call, int count,
break;
}
}
- if (i != count) {
+ if (i != total_count) {
for (int j = 0; j <= i; j++) {
- grpc_metadata *md = &metadata[j];
+ const grpc_metadata *md =
+ get_md_elem(metadata, additional_metadata, j, count);
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
GRPC_MDELEM_UNREF(l->md);
}
@@ -573,24 +599,36 @@ static int prepare_application_metadata(grpc_call *call, int count,
}
}
}
- for (i = 1; i < count; i++) {
- linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]);
+ for (i = 1; i < total_count; i++) {
+ grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
+ grpc_metadata *prev_md =
+ get_md_elem(metadata, additional_metadata, i - 1, count);
+ linked_from_md(md)->prev = linked_from_md(prev_md);
}
- for (i = 0; i < count - 1; i++) {
- linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]);
+ for (i = 0; i < total_count - 1; i++) {
+ grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
+ grpc_metadata *next_md =
+ get_md_elem(metadata, additional_metadata, i + 1, count);
+ linked_from_md(md)->next = linked_from_md(next_md);
}
- switch (prepend_extra_metadata * 2 + (count != 0)) {
+
+ switch (prepend_extra_metadata * 2 + (total_count != 0)) {
case 0:
/* no prepend, no metadata => nothing to do */
batch->list.head = batch->list.tail = NULL;
break;
- case 1:
+ case 1: {
/* metadata, but no prepend */
- batch->list.head = linked_from_md(&metadata[0]);
- batch->list.tail = linked_from_md(&metadata[count - 1]);
+ grpc_metadata *first_md =
+ get_md_elem(metadata, additional_metadata, 0, count);
+ grpc_metadata *last_md =
+ get_md_elem(metadata, additional_metadata, total_count - 1, count);
+ batch->list.head = linked_from_md(first_md);
+ batch->list.tail = linked_from_md(last_md);
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
break;
+ }
case 2:
/* prepend, but no md */
batch->list.head = &call->send_extra_metadata[0];
@@ -599,17 +637,22 @@ static int prepare_application_metadata(grpc_call *call, int count,
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
break;
- case 3:
+ case 3: {
/* prepend AND md */
+ grpc_metadata *first_md =
+ get_md_elem(metadata, additional_metadata, 0, count);
+ grpc_metadata *last_md =
+ get_md_elem(metadata, additional_metadata, total_count - 1, count);
batch->list.head = &call->send_extra_metadata[0];
call->send_extra_metadata[call->send_extra_metadata_count - 1].next =
- linked_from_md(&metadata[0]);
- linked_from_md(&metadata[0])->prev =
+ linked_from_md(first_md);
+ linked_from_md(first_md)->prev =
&call->send_extra_metadata[call->send_extra_metadata_count - 1];
- batch->list.tail = linked_from_md(&metadata[count - 1]);
+ batch->list.tail = linked_from_md(last_md);
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
break;
+ }
default:
GPR_UNREACHABLE_CODE(return 0);
}
@@ -678,48 +721,102 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return r;
}
-typedef struct cancel_closure {
+typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_status_code status;
-} cancel_closure;
+ gpr_slice optional_message;
+ grpc_closure *op_closure;
+ enum { TC_CANCEL, TC_CLOSE } type;
+} termination_closure;
+
+static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, bool success) {
+ termination_closure *tc = tcp;
+ if (tc->type == TC_CANCEL) {
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel");
+ }
+ if (tc->type == TC_CLOSE) {
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close");
+ }
+ gpr_slice_unref(tc->optional_message);
+ if (tc->op_closure != NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, tc->op_closure, true, NULL);
+ }
+ gpr_free(tc);
+}
-static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
- cancel_closure *cc = ccp;
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
- gpr_free(cc);
+static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, bool success) {
+ grpc_transport_stream_op op;
+ termination_closure *tc = tcp;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = tc->status;
+ /* reuse closure to catch completion */
+ grpc_closure_init(&tc->closure, done_termination, tc);
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
}
-static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
+static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, bool success) {
grpc_transport_stream_op op;
- cancel_closure *cc = ccp;
+ termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
- op.cancel_with_status = cc->status;
+ tc->optional_message = gpr_slice_ref(tc->optional_message);
+ grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message);
/* reuse closure to catch completion */
- grpc_closure_init(&cc->closure, done_cancel, cc);
- op.on_complete = &cc->closure;
- execute_op(exec_ctx, cc->call, &op);
+ grpc_closure_init(&tc->closure, done_termination, tc);
+ tc->op_closure = op.on_complete;
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
+}
+
+static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
+ termination_closure *tc) {
+ grpc_mdstr *details = NULL;
+ if (GPR_SLICE_LENGTH(tc->optional_message) > 0) {
+ tc->optional_message = gpr_slice_ref(tc->optional_message);
+ details = grpc_mdstr_from_slice(tc->optional_message);
+ }
+
+ set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status);
+ set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details);
+
+ if (tc->type == TC_CANCEL) {
+ grpc_closure_init(&tc->closure, send_cancel, tc);
+ GRPC_CALL_INTERNAL_REF(tc->call, "cancel");
+ } else if (tc->type == TC_CLOSE) {
+ grpc_closure_init(&tc->closure, send_close, tc);
+ GRPC_CALL_INTERNAL_REF(tc->call, "close");
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, &tc->closure, true, NULL);
+ return GRPC_CALL_OK;
}
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
- grpc_mdstr *details =
- description ? grpc_mdstr_from_string(description) : NULL;
- cancel_closure *cc = gpr_malloc(sizeof(*cc));
-
+ termination_closure *tc = gpr_malloc(sizeof(*tc));
+ memset(tc, 0, sizeof(termination_closure));
+ tc->type = TC_CANCEL;
+ tc->call = c;
+ tc->optional_message = gpr_slice_from_copied_string(description);
GPR_ASSERT(status != GRPC_STATUS_OK);
+ tc->status = status;
- set_status_code(c, STATUS_FROM_API_OVERRIDE, (uint32_t)status);
- set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
+ return terminate_with_status(exec_ctx, tc);
+}
- grpc_closure_init(&cc->closure, send_cancel, cc);
- cc->call = c;
- cc->status = status;
- GRPC_CALL_INTERNAL_REF(c, "cancel");
- grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL);
+static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
+ grpc_status_code status,
+ const char *description) {
+ termination_closure *tc = gpr_malloc(sizeof(*tc));
+ memset(tc, 0, sizeof(termination_closure));
+ tc->type = TC_CLOSE;
+ tc->call = c;
+ tc->optional_message = gpr_slice_from_copied_string(description);
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ tc->status = status;
- return GRPC_CALL_OK;
+ return terminate_with_status(exec_ctx, tc);
}
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
@@ -860,9 +957,9 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) {
if (elem == NULL) {
return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) {
- GPR_TIMER_BEGIN("compression_algorithm", 0);
- set_compression_algorithm(call, decode_compression(elem));
- GPR_TIMER_END("compression_algorithm", 0);
+ GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
+ set_incoming_compression_algorithm(call, decode_compression(elem));
+ GPR_TIMER_END("incoming_compression_algorithm", 0);
return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
@@ -1025,9 +1122,9 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) {
*call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
- NULL, 0, call->compression_algorithm);
+ NULL, 0, call->incoming_compression_algorithm);
} else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
}
@@ -1055,6 +1152,56 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
+static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
+ batch_control *bctl) {
+ grpc_call *call = bctl->call;
+ /* validate call->incoming_compression_algorithm */
+ if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
+ const grpc_compression_algorithm algo =
+ call->incoming_compression_algorithm;
+ char *error_msg = NULL;
+ const grpc_compression_options compression_options =
+ grpc_channel_compression_options(call->channel);
+ /* check if algorithm is known */
+ if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) {
+ gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
+ algo);
+ gpr_log(GPR_ERROR, error_msg);
+ close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ } else if (grpc_compression_options_is_algorithm_enabled(
+ &compression_options, algo) == 0) {
+ /* check if algorithm is supported by current channel config */
+ char *algo_name;
+ grpc_compression_algorithm_name(algo, &algo_name);
+ gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
+ algo_name);
+ gpr_log(GPR_ERROR, error_msg);
+ close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ } else {
+ call->incoming_compression_algorithm = algo;
+ }
+ gpr_free(error_msg);
+ }
+
+ /* make sure the received grpc-encoding is amongst the ones listed in
+ * grpc-accept-encoding */
+ GPR_ASSERT(call->encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->encodings_accepted_by_peer,
+ call->incoming_compression_algorithm)) {
+ extern int grpc_compression_trace;
+ if (grpc_compression_trace) {
+ char *algo_name;
+ grpc_compression_algorithm_name(call->incoming_compression_algorithm,
+ &algo_name);
+ gpr_log(GPR_ERROR,
+ "Compression algorithm (grpc-encoding = '%s') not present in "
+ "the bitset of accepted encodings (grpc-accept-encodings: "
+ "'0x%x')",
+ algo_name, call->encodings_accepted_by_peer);
+ }
+ }
+}
+
static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
void *bctlp, bool success) {
batch_control *bctl = bctlp;
@@ -1069,24 +1216,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_initial_filter, call);
- /* make sure the received grpc-encoding is amongst the ones listed in
- * grpc-accept-encoding */
-
- GPR_ASSERT(call->encodings_accepted_by_peer != 0);
- if (!GPR_BITGET(call->encodings_accepted_by_peer,
- call->compression_algorithm)) {
- extern int grpc_compression_trace;
- if (grpc_compression_trace) {
- char *algo_name;
- grpc_compression_algorithm_name(call->compression_algorithm,
- &algo_name);
- gpr_log(GPR_ERROR,
- "Compression algorithm (grpc-encoding = '%s') not present in "
- "the bitset of accepted encodings (grpc-accept-encodings: "
- "'0x%x')",
- algo_name, call->encodings_accepted_by_peer);
- }
- }
+ GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
+ validate_filtered_metadata(exec_ctx, bctl);
+ GPR_TIMER_END("validate_filtered_metadata", 0);
+
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
@@ -1229,7 +1362,41 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- if (op->data.send_initial_metadata.count > INT_MAX) {
+ /* process compression level */
+ grpc_metadata compression_md;
+ memset(&compression_md, 0, sizeof(grpc_metadata));
+ size_t additional_metadata_count = 0;
+ grpc_compression_level effective_compression_level;
+ bool level_set = false;
+ if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
+ effective_compression_level =
+ op->data.send_initial_metadata.maybe_compression_level
+ .compression_level;
+ level_set = true;
+ } else {
+ const grpc_compression_options copts =
+ grpc_channel_compression_options(call->channel);
+ level_set = copts.default_level.is_set;
+ if (level_set) {
+ effective_compression_level = copts.default_level.level;
+ }
+ }
+ if (level_set && !call->is_client) {
+ const grpc_compression_algorithm calgo =
+ compression_algorithm_for_level_locked(
+ call, effective_compression_level);
+ char *calgo_name;
+ grpc_compression_algorithm_name(calgo, &calgo_name);
+ // the following will be picked up by the compress filter and used as
+ // the call's compression algorithm.
+ compression_md.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY;
+ compression_md.value = calgo_name;
+ compression_md.value_length = strlen(calgo_name);
+ additional_metadata_count++;
+ }
+
+ if (op->data.send_initial_metadata.count + additional_metadata_count >
+ INT_MAX) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1237,7 +1404,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->sent_initial_metadata = 1;
if (!prepare_application_metadata(
call, (int)op->data.send_initial_metadata.count,
- op->data.send_initial_metadata.metadata, 0, call->is_client)) {
+ op->data.send_initial_metadata.metadata, 0, call->is_client,
+ &compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1325,7 +1493,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
call,
(int)op->data.send_status_from_server.trailing_metadata_count,
- op->data.send_status_from_server.trailing_metadata, 1, 1)) {
+ op->data.send_status_from_server.trailing_metadata, 1, 1, NULL,
+ 0)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1514,9 +1683,10 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call *call, grpc_compression_level level) {
gpr_mu_lock(&call->mu);
- const uint32_t accepted_encodings = call->encodings_accepted_by_peer;
+ grpc_compression_algorithm algo =
+ compression_algorithm_for_level_locked(call, level);
gpr_mu_unlock(&call->mu);
- return grpc_compression_algorithm_for_level(level, accepted_encodings);
+ return algo;
}
const char *grpc_call_error_to_string(grpc_call_error error) {