aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c518
1 files changed, 365 insertions, 153 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 9b2b94eedf..708ea3502a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -40,6 +40,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/slice.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
@@ -52,7 +53,9 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/transport.h"
/** The maximum number of concurrent batches possible.
Based upon the maximum number of individually queueable ops in the batch
@@ -65,12 +68,6 @@
- status/close recv (depending on client/server) */
#define MAX_CONCURRENT_BATCHES 6
-typedef struct {
- grpc_ioreq_completion_func on_complete;
- void *user_data;
- int success;
-} completed_request;
-
#define MAX_SEND_EXTRA_METADATA_COUNT 3
/* Status data for a request can come from several sources; this
@@ -97,31 +94,13 @@ typedef struct {
grpc_mdstr *details;
} received_status;
-/* How far through the GRPC stream have we read? */
-typedef enum {
- /* We are still waiting for initial metadata to complete */
- READ_STATE_INITIAL = 0,
- /* We have gotten initial metadata, and are reading either
- messages or trailing metadata */
- READ_STATE_GOT_INITIAL_METADATA,
- /* The stream is closed for reading */
- READ_STATE_READ_CLOSED,
- /* The stream is closed for reading & writing */
- READ_STATE_STREAM_CLOSED
-} read_state;
-
-typedef enum {
- WRITE_STATE_INITIAL = 0,
- WRITE_STATE_STARTED,
- WRITE_STATE_WRITE_CLOSED
-} write_state;
-
typedef struct batch_control {
grpc_call *call;
grpc_cq_completion cq_completion;
grpc_closure finish_batch;
void *notify_tag;
gpr_refcount steps_to_complete;
+ grpc_error *error;
uint8_t send_initial_metadata;
uint8_t send_message;
@@ -130,11 +109,11 @@ typedef struct batch_control {
uint8_t recv_message;
uint8_t recv_final_op;
uint8_t is_notify_tag_closure;
- uint8_t success;
} batch_control;
struct grpc_call {
grpc_completion_queue *cq;
+ grpc_polling_entity pollent;
grpc_channel *channel;
grpc_call *parent;
grpc_call *first_child;
@@ -176,10 +155,10 @@ struct grpc_call {
received_status status[STATUS_SOURCE_COUNT];
/* Call stats: only valid after trailing metadata received */
- grpc_transport_stream_stats stats;
+ grpc_call_stats stats;
- /* Compression algorithm for the call */
- grpc_compression_algorithm compression_algorithm;
+ /* Compression algorithm for *incoming* data */
+ grpc_compression_algorithm incoming_compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
uint32_t encodings_accepted_by_peer;
@@ -238,18 +217,19 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
+static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
+ grpc_status_code status,
+ const char *description);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
- bool success);
+ grpc_error *error);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success);
-
-grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
- uint32_t propagation_mask,
- grpc_completion_queue *cq,
- const void *server_transport_data,
- grpc_mdelem **add_initial_metadata,
- size_t add_initial_metadata_count,
- gpr_timespec send_deadline) {
+ grpc_error *error);
+
+grpc_call *grpc_call_create(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative,
+ const void *server_transport_data, grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count, gpr_timespec send_deadline) {
size_t i, j;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -261,6 +241,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
call->channel = channel;
call->cq = cq;
call->parent = parent_call;
+ /* Always support no compression */
+ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = server_transport_data == NULL;
if (call->is_client) {
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT);
@@ -284,9 +266,20 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
call->context, server_transport_data,
CALL_STACK_FROM_CALL(call));
if (cq != NULL) {
+ GPR_ASSERT(
+ pollset_set_alternative == NULL &&
+ "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
GRPC_CQ_INTERNAL_REF(cq, "bind");
- grpc_call_stack_set_pollset(&exec_ctx, CALL_STACK_FROM_CALL(call),
- grpc_cq_pollset(cq));
+ call->pollent =
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+ }
+ if (pollset_set_alternative != NULL) {
+ call->pollent =
+ grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
+ }
+ if (!grpc_polling_entity_is_empty(&call->pollent)) {
+ grpc_call_stack_set_pollset_or_pollset_set(
+ &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
@@ -341,10 +334,16 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_completion_queue *cq) {
GPR_ASSERT(cq);
+
+ if (grpc_polling_entity_pollset_set(&call->pollent) != NULL) {
+ gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
+ abort();
+ }
call->cq = cq;
GRPC_CQ_INTERNAL_REF(cq, "bind");
- grpc_call_stack_set_pollset(exec_ctx, CALL_STACK_FROM_CALL(call),
- grpc_cq_pollset(cq));
+ call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+ grpc_call_stack_set_pollset_or_pollset_set(
+ exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
@@ -361,7 +360,8 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
+static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_error *error) {
size_t i;
int ii;
grpc_call *c = call;
@@ -391,7 +391,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
grpc_channel *channel = c->channel;
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c);
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->stats, c);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
GPR_TIMER_END("destroy_call", 0);
}
@@ -402,24 +402,74 @@ static void set_status_code(grpc_call *call, status_source source,
call->status[source].is_set = 1;
call->status[source].code = (grpc_status_code)status;
+}
+
+static void set_status_details(grpc_call *call, status_source source,
+ grpc_mdstr *status) {
+ if (call->status[source].details != NULL) {
+ GRPC_MDSTR_UNREF(status);
+ } else {
+ call->status[source].details = status;
+ }
+}
+
+static void get_final_status(grpc_call *call,
+ void (*set_value)(grpc_status_code code,
+ void *user_data),
+ void *set_value_user_data) {
+ int i;
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ if (call->status[i].is_set) {
+ set_value(call->status[i].code, set_value_user_data);
+ return;
+ }
+ }
+ if (call->is_client) {
+ set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
+ } else {
+ set_value(GRPC_STATUS_OK, set_value_user_data);
+ }
+}
- /* TODO(ctiller): what to do about the flush that was previously here */
+static void set_status_from_error(grpc_call *call, status_source source,
+ grpc_error *error) {
+ intptr_t status;
+ if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
+ set_status_code(call, source, (uint32_t)status);
+ } else {
+ set_status_code(call, source, GRPC_STATUS_INTERNAL);
+ }
+ const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
+ bool free_msg = false;
+ if (msg == NULL) {
+ free_msg = true;
+ msg = grpc_error_string(error);
+ }
+ set_status_details(call, source, grpc_mdstr_from_string(msg));
+ if (free_msg) grpc_error_free_string(msg);
}
-static void set_compression_algorithm(grpc_call *call,
- grpc_compression_algorithm algo) {
- call->compression_algorithm = algo;
+static void set_incoming_compression_algorithm(
+ grpc_call *call, grpc_compression_algorithm algo) {
+ GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
+ call->incoming_compression_algorithm = algo;
}
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) {
grpc_compression_algorithm algorithm;
gpr_mu_lock(&call->mu);
- algorithm = call->compression_algorithm;
+ algorithm = call->incoming_compression_algorithm;
gpr_mu_unlock(&call->mu);
return algorithm;
}
+static grpc_compression_algorithm compression_algorithm_for_level_locked(
+ grpc_call *call, grpc_compression_level level) {
+ return grpc_compression_algorithm_for_level(level,
+ call->encodings_accepted_by_peer);
+}
+
uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
uint32_t flags;
gpr_mu_lock(&call->mu);
@@ -485,32 +535,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
return encodings_accepted_by_peer;
}
-static void set_status_details(grpc_call *call, status_source source,
- grpc_mdstr *status) {
- if (call->status[source].details != NULL) {
- GRPC_MDSTR_UNREF(call->status[source].details);
- }
- call->status[source].details = status;
-}
-
-static void get_final_status(grpc_call *call,
- void (*set_value)(grpc_status_code code,
- void *user_data),
- void *set_value_user_data) {
- int i;
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- set_value(call->status[i].code, set_value_user_data);
- return;
- }
- }
- if (call->is_client) {
- set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
- } else {
- set_value(GRPC_STATUS_OK, set_value_user_data);
- }
-}
-
static void get_final_details(grpc_call *call, char **out_details,
size_t *out_details_capacity) {
int i;
@@ -545,15 +569,28 @@ static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
return (grpc_linked_mdelem *)&md->internal_data;
}
+static grpc_metadata *get_md_elem(grpc_metadata *metadata,
+ grpc_metadata *additional_metadata, int i,
+ int count) {
+ grpc_metadata *res =
+ i < count ? &metadata[i] : &additional_metadata[i - count];
+ GPR_ASSERT(res);
+ return res;
+}
+
static int prepare_application_metadata(grpc_call *call, int count,
grpc_metadata *metadata,
int is_trailing,
- int prepend_extra_metadata) {
+ int prepend_extra_metadata,
+ grpc_metadata *additional_metadata,
+ int additional_metadata_count) {
+ int total_count = count + additional_metadata_count;
int i;
grpc_metadata_batch *batch =
&call->metadata_batch[0 /* is_receiving */][is_trailing];
- for (i = 0; i < count; i++) {
- grpc_metadata *md = &metadata[i];
+ for (i = 0; i < total_count; i++) {
+ const grpc_metadata *md =
+ get_md_elem(metadata, additional_metadata, i, count);
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(
@@ -572,9 +609,10 @@ static int prepare_application_metadata(grpc_call *call, int count,
break;
}
}
- if (i != count) {
+ if (i != total_count) {
for (int j = 0; j <= i; j++) {
- grpc_metadata *md = &metadata[j];
+ const grpc_metadata *md =
+ get_md_elem(metadata, additional_metadata, j, count);
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
GRPC_MDELEM_UNREF(l->md);
}
@@ -595,24 +633,36 @@ static int prepare_application_metadata(grpc_call *call, int count,
}
}
}
- for (i = 1; i < count; i++) {
- linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]);
+ for (i = 1; i < total_count; i++) {
+ grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
+ grpc_metadata *prev_md =
+ get_md_elem(metadata, additional_metadata, i - 1, count);
+ linked_from_md(md)->prev = linked_from_md(prev_md);
}
- for (i = 0; i < count - 1; i++) {
- linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]);
+ for (i = 0; i < total_count - 1; i++) {
+ grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
+ grpc_metadata *next_md =
+ get_md_elem(metadata, additional_metadata, i + 1, count);
+ linked_from_md(md)->next = linked_from_md(next_md);
}
- switch (prepend_extra_metadata * 2 + (count != 0)) {
+
+ switch (prepend_extra_metadata * 2 + (total_count != 0)) {
case 0:
/* no prepend, no metadata => nothing to do */
batch->list.head = batch->list.tail = NULL;
break;
- case 1:
+ case 1: {
/* metadata, but no prepend */
- batch->list.head = linked_from_md(&metadata[0]);
- batch->list.tail = linked_from_md(&metadata[count - 1]);
+ grpc_metadata *first_md =
+ get_md_elem(metadata, additional_metadata, 0, count);
+ grpc_metadata *last_md =
+ get_md_elem(metadata, additional_metadata, total_count - 1, count);
+ batch->list.head = linked_from_md(first_md);
+ batch->list.tail = linked_from_md(last_md);
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
break;
+ }
case 2:
/* prepend, but no md */
batch->list.head = &call->send_extra_metadata[0];
@@ -621,17 +671,22 @@ static int prepare_application_metadata(grpc_call *call, int count,
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
break;
- case 3:
+ case 3: {
/* prepend AND md */
+ grpc_metadata *first_md =
+ get_md_elem(metadata, additional_metadata, 0, count);
+ grpc_metadata *last_md =
+ get_md_elem(metadata, additional_metadata, total_count - 1, count);
batch->list.head = &call->send_extra_metadata[0];
call->send_extra_metadata[call->send_extra_metadata_count - 1].next =
- linked_from_md(&metadata[0]);
- linked_from_md(&metadata[0])->prev =
+ linked_from_md(first_md);
+ linked_from_md(first_md)->prev =
&call->send_extra_metadata[call->send_extra_metadata_count - 1];
- batch->list.tail = linked_from_md(&metadata[count - 1]);
+ batch->list.tail = linked_from_md(last_md);
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
break;
+ }
default:
GPR_UNREACHABLE_CODE(return 0);
}
@@ -700,48 +755,98 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return r;
}
-typedef struct cancel_closure {
+typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
- grpc_status_code status;
-} cancel_closure;
+ grpc_error *error;
+ grpc_closure *op_closure;
+ enum { TC_CANCEL, TC_CLOSE } type;
+} termination_closure;
+
+static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
+ grpc_error *error) {
+ termination_closure *tc = tcp;
+ switch (tc->type) {
+ case TC_CANCEL:
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel");
+ break;
+ case TC_CLOSE:
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close");
+ break;
+ }
+ GRPC_ERROR_UNREF(tc->error);
+ grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
+ gpr_free(tc);
+}
-static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
- cancel_closure *cc = ccp;
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
- gpr_free(cc);
+static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
+ grpc_transport_stream_op op;
+ termination_closure *tc = tcp;
+ memset(&op, 0, sizeof(op));
+ op.cancel_error = tc->error;
+ /* reuse closure to catch completion */
+ grpc_closure_init(&tc->closure, done_termination, tc);
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
}
-static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
+static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
grpc_transport_stream_op op;
- cancel_closure *cc = ccp;
+ termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
- op.cancel_with_status = cc->status;
+ op.close_error = tc->error;
/* reuse closure to catch completion */
- grpc_closure_init(&cc->closure, done_cancel, cc);
- op.on_complete = &cc->closure;
- execute_op(exec_ctx, cc->call, &op);
+ grpc_closure_init(&tc->closure, done_termination, tc);
+ tc->op_closure = op.on_complete;
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
+}
+
+static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
+ termination_closure *tc) {
+ set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error);
+
+ if (tc->type == TC_CANCEL) {
+ grpc_closure_init(&tc->closure, send_cancel, tc);
+ GRPC_CALL_INTERNAL_REF(tc->call, "cancel");
+ } else if (tc->type == TC_CLOSE) {
+ grpc_closure_init(&tc->closure, send_close, tc);
+ GRPC_CALL_INTERNAL_REF(tc->call, "close");
+ }
+ grpc_exec_ctx_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE, NULL);
+ return GRPC_CALL_OK;
}
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
- grpc_mdstr *details =
- description ? grpc_mdstr_from_string(description) : NULL;
- cancel_closure *cc = gpr_malloc(sizeof(*cc));
-
GPR_ASSERT(status != GRPC_STATUS_OK);
+ termination_closure *tc = gpr_malloc(sizeof(*tc));
+ memset(tc, 0, sizeof(termination_closure));
+ tc->type = TC_CANCEL;
+ tc->call = c;
+ tc->error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE(description),
+ GRPC_ERROR_STR_GRPC_MESSAGE, description),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
+
+ return terminate_with_status(exec_ctx, tc);
+}
- set_status_code(c, STATUS_FROM_API_OVERRIDE, (uint32_t)status);
- set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
-
- grpc_closure_init(&cc->closure, send_cancel, cc);
- cc->call = c;
- cc->status = status;
- GRPC_CALL_INTERNAL_REF(c, "cancel");
- grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL);
-
- return GRPC_CALL_OK;
+static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
+ grpc_status_code status,
+ const char *description) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ termination_closure *tc = gpr_malloc(sizeof(*tc));
+ memset(tc, 0, sizeof(termination_closure));
+ tc->type = TC_CLOSE;
+ tc->call = c;
+ tc->error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE(description),
+ GRPC_ERROR_STR_GRPC_MESSAGE, description),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
+
+ return terminate_with_status(exec_ctx, tc);
}
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
@@ -775,11 +880,11 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_call *call = arg;
gpr_mu_lock(&call->mu);
call->have_alarm = 0;
- if (success) {
+ if (error != GRPC_ERROR_CANCELLED) {
cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
}
@@ -828,12 +933,16 @@ static uint32_t decode_status(grpc_mdelem *md) {
return status;
}
-static uint32_t decode_compression(grpc_mdelem *md) {
+static grpc_compression_algorithm decode_compression(grpc_mdelem *md) {
grpc_compression_algorithm algorithm =
grpc_compression_algorithm_from_mdstr(md->value);
if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
+ gpr_log(GPR_ERROR,
+ "Invalid incoming compression algorithm: '%s'. Interpreting "
+ "incoming data as uncompressed.",
+ md_c_str);
+ return GRPC_COMPRESS_NONE;
}
return algorithm;
}
@@ -878,9 +987,9 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) {
if (elem == NULL) {
return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) {
- GPR_TIMER_BEGIN("compression_algorithm", 0);
- set_compression_algorithm(call, decode_compression(elem));
- GPR_TIMER_END("compression_algorithm", 0);
+ GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
+ set_incoming_compression_algorithm(call, decode_compression(elem));
+ GPR_TIMER_END("incoming_compression_algorithm", 0);
return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
@@ -961,7 +1070,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
- grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL);
+ /* unrefs bctl->error */
+ grpc_exec_ctx_sched(exec_ctx, bctl->notify_tag, bctl->error, NULL);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(uint8_t)(bctl->call->used_batches &
@@ -969,7 +1079,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
- grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,
+ /* unrefs bctl->error */
+ grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->error,
finish_batch_completion, bctl, &bctl->cq_completion);
}
}
@@ -1001,15 +1112,18 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
}
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success) {
+ grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
continue_receiving_slices(exec_ctx, bctl);
} else {
+ if (grpc_trace_operation_failures) {
+ GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
+ }
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
@@ -1043,9 +1157,9 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) {
*call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
- NULL, 0, call->compression_algorithm);
+ NULL, 0, call->incoming_compression_algorithm);
} else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
}
@@ -1058,35 +1172,89 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
}
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success) {
+ grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
- if (bctl->call->has_initial_md_been_received || !success ||
+ if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu);
- process_data_after_md(exec_ctx, bctlp, success);
+ process_data_after_md(exec_ctx, bctlp, error);
} else {
call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu);
}
}
+static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
+ batch_control *bctl) {
+ grpc_call *call = bctl->call;
+ /* validate call->incoming_compression_algorithm */
+ if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
+ const grpc_compression_algorithm algo =
+ call->incoming_compression_algorithm;
+ char *error_msg = NULL;
+ const grpc_compression_options compression_options =
+ grpc_channel_compression_options(call->channel);
+ /* check if algorithm is known */
+ if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) {
+ gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
+ algo);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ } else if (grpc_compression_options_is_algorithm_enabled(
+ &compression_options, algo) == 0) {
+ /* check if algorithm is supported by current channel config */
+ char *algo_name;
+ grpc_compression_algorithm_name(algo, &algo_name);
+ gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
+ algo_name);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ } else {
+ call->incoming_compression_algorithm = algo;
+ }
+ gpr_free(error_msg);
+ }
+
+ /* make sure the received grpc-encoding is amongst the ones listed in
+ * grpc-accept-encoding */
+ GPR_ASSERT(call->encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->encodings_accepted_by_peer,
+ call->incoming_compression_algorithm)) {
+ extern int grpc_compression_trace;
+ if (grpc_compression_trace) {
+ char *algo_name;
+ grpc_compression_algorithm_name(call->incoming_compression_algorithm,
+ &algo_name);
+ gpr_log(GPR_ERROR,
+ "Compression algorithm (grpc-encoding = '%s') not present in "
+ "the bitset of accepted encodings (grpc-accept-encodings: "
+ "'0x%x')",
+ algo_name, call->encodings_accepted_by_peer);
+ }
+ }
+}
+
static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
- void *bctlp, bool success) {
+ void *bctlp, grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
gpr_mu_lock(&call->mu);
- if (!success) {
- bctl->success = false;
+ if (error != GRPC_ERROR_NONE) {
+ bctl->error = GRPC_ERROR_REF(error);
} else {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_initial_filter, call);
+ GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
+ validate_filtered_metadata(exec_ctx, bctl);
+ GPR_TIMER_END("validate_filtered_metadata", 0);
+
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
@@ -1101,7 +1269,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
grpc_closure *saved_rsr_closure = grpc_closure_create(
receiving_stream_ready, call->saved_receiving_stream_ready_bctlp);
call->saved_receiving_stream_ready_bctlp = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL);
+ grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL);
}
gpr_mu_unlock(&call->mu);
@@ -1111,15 +1279,18 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
}
-static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
+static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
+ grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
grpc_call *child_call;
grpc_call *next_child_call;
+ GRPC_ERROR_REF(error);
+
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
}
grpc_metadata_batch_destroy(
@@ -1165,13 +1336,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
call->final_op.server.cancelled);
}
- success = 1;
+ GRPC_ERROR_UNREF(error);
+ error = GRPC_ERROR_NONE;
}
- bctl->success = success != 0;
+ GRPC_ERROR_UNREF(bctl->error);
+ bctl->error = GRPC_ERROR_REF(error);
gpr_mu_unlock(&call->mu);
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
+
+ GRPC_ERROR_UNREF(error);
}
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
@@ -1201,7 +1376,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
- bctl->success = 1;
+ bctl->error = GRPC_ERROR_NONE;
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag);
}
@@ -1229,7 +1404,40 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- if (op->data.send_initial_metadata.count > INT_MAX) {
+ /* process compression level */
+ grpc_metadata compression_md;
+ memset(&compression_md, 0, sizeof(grpc_metadata));
+ size_t additional_metadata_count = 0;
+ grpc_compression_level effective_compression_level;
+ bool level_set = false;
+ if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
+ effective_compression_level =
+ op->data.send_initial_metadata.maybe_compression_level.level;
+ level_set = true;
+ } else {
+ const grpc_compression_options copts =
+ grpc_channel_compression_options(call->channel);
+ level_set = copts.default_level.is_set;
+ if (level_set) {
+ effective_compression_level = copts.default_level.level;
+ }
+ }
+ if (level_set && !call->is_client) {
+ const grpc_compression_algorithm calgo =
+ compression_algorithm_for_level_locked(
+ call, effective_compression_level);
+ char *calgo_name;
+ grpc_compression_algorithm_name(calgo, &calgo_name);
+ // the following will be picked up by the compress filter and used as
+ // the call's compression algorithm.
+ compression_md.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY;
+ compression_md.value = calgo_name;
+ compression_md.value_length = strlen(calgo_name);
+ additional_metadata_count++;
+ }
+
+ if (op->data.send_initial_metadata.count + additional_metadata_count >
+ INT_MAX) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1237,7 +1445,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->sent_initial_metadata = 1;
if (!prepare_application_metadata(
call, (int)op->data.send_initial_metadata.count,
- op->data.send_initial_metadata.metadata, 0, call->is_client)) {
+ op->data.send_initial_metadata.metadata, 0, call->is_client,
+ &compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1325,7 +1534,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
call,
(int)op->data.send_status_from_server.trailing_metadata_count,
- op->data.send_status_from_server.trailing_metadata, 1, 1)) {
+ op->data.send_status_from_server.trailing_metadata, 1, 1, NULL,
+ 0)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1397,7 +1607,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats;
+ stream_op.collect_stats = &call->stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
/* Flag validation: currently allow no flags */
@@ -1419,7 +1629,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats;
+ stream_op.collect_stats = &call->stats.transport_stream_stats;
break;
}
}
@@ -1474,7 +1684,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_call_error err;
GRPC_API_TRACE(
- "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
+ "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
+ "reserved=%p)",
5, (call, ops, (unsigned long)nops, tag, reserved));
if (reserved != NULL) {
@@ -1513,9 +1724,10 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call *call, grpc_compression_level level) {
gpr_mu_lock(&call->mu);
- const uint32_t accepted_encodings = call->encodings_accepted_by_peer;
+ grpc_compression_algorithm algo =
+ compression_algorithm_for_level_locked(call, level);
gpr_mu_unlock(&call->mu);
- return grpc_compression_algorithm_for_level(level, accepted_encodings);
+ return algo;
}
const char *grpc_call_error_to_string(grpc_call_error error) {