aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.cc')
-rw-r--r--src/core/lib/surface/call.cc570
1 files changed, 243 insertions, 327 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 2923a86646..89b3f77822 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -48,6 +48,7 @@
#include "src/core/lib/surface/call_test_only.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata.h"
@@ -71,48 +72,11 @@
// Used to create arena for the first call.
#define ESTIMATED_MDELEM_COUNT 16
-/* Status data for a request can come from several sources; this
- enumerates them all, and acts as a priority sorting for which
- status to return to the application - earlier entries override
- later ones */
-typedef enum {
- /* Status came from the application layer overriding whatever
- the wire says */
- STATUS_FROM_API_OVERRIDE = 0,
- /* Status came from 'the wire' - or somewhere below the surface
- layer */
- STATUS_FROM_WIRE,
- /* Status was created by some internal channel stack operation: must come via
- add_batch_error */
- STATUS_FROM_CORE,
- /* Status was created by some surface error */
- STATUS_FROM_SURFACE,
- /* Status came from the server sending status */
- STATUS_FROM_SERVER_STATUS,
- STATUS_SOURCE_COUNT
-} status_source;
+struct batch_control {
+ batch_control() { gpr_ref_init(&steps_to_complete, 0); }
-typedef struct {
- bool is_set;
- grpc_error* error;
-} received_status;
-
-static gpr_atm pack_received_status(received_status r) {
- return r.is_set ? (1 | (gpr_atm)r.error) : 0;
-}
-
-static received_status unpack_received_status(gpr_atm atm) {
- if ((atm & 1) == 0) {
- return {false, GRPC_ERROR_NONE};
- } else {
- return {true, (grpc_error*)(atm & ~static_cast<gpr_atm>(1))};
- }
-}
-
-#define MAX_ERRORS_PER_BATCH 4
-
-typedef struct batch_control {
- grpc_call* call;
+ grpc_call* call = nullptr;
+ grpc_transport_stream_op_batch op;
/* Share memory for cq_completion and notify_tag as they are never needed
simultaneously. Each byte used in this data structure count as six bytes
per call, so any savings we can make are worthwhile,
@@ -135,90 +99,110 @@ typedef struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
+ gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
+};
- grpc_error* errors[MAX_ERRORS_PER_BATCH];
- gpr_atm num_errors;
-
- grpc_transport_stream_op_batch op;
-} batch_control;
+struct parent_call {
+ parent_call() { gpr_mu_init(&child_list_mu); }
+ ~parent_call() { gpr_mu_destroy(&child_list_mu); }
-typedef struct {
gpr_mu child_list_mu;
- grpc_call* first_child;
-} parent_call;
+ grpc_call* first_child = nullptr;
+};
-typedef struct {
+struct child_call {
+ child_call(grpc_call* parent) : parent(parent) {}
grpc_call* parent;
/** siblings: children of the same parent form a list, and this list is
protected under
parent->mu */
- grpc_call* sibling_next;
- grpc_call* sibling_prev;
-} child_call;
+ grpc_call* sibling_next = nullptr;
+ grpc_call* sibling_prev = nullptr;
+};
#define RECV_NONE ((gpr_atm)0)
#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
struct grpc_call {
+ grpc_call(gpr_arena* arena, const grpc_call_create_args& args)
+ : arena(arena),
+ cq(args.cq),
+ channel(args.channel),
+ is_client(args.server_transport_data == nullptr),
+ stream_op_payload(context) {
+ gpr_ref_init(&ext_ref, 1);
+ grpc_call_combiner_init(&call_combiner);
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
+ }
+ }
+ }
+
+ ~grpc_call() {
+ gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
+ grpc_call_combiner_destroy(&call_combiner);
+ }
+
gpr_refcount ext_ref;
gpr_arena* arena;
grpc_call_combiner call_combiner;
grpc_completion_queue* cq;
grpc_polling_entity pollent;
grpc_channel* channel;
- gpr_timespec start_time;
- /* parent_call* */ gpr_atm parent_call_atm;
- child_call* child;
+ gpr_timespec start_time = gpr_now(GPR_CLOCK_MONOTONIC);
+ /* parent_call* */ gpr_atm parent_call_atm = 0;
+ child_call* child = nullptr;
/* client or server call */
bool is_client;
/** has grpc_call_unref been called */
- bool destroy_called;
+ bool destroy_called = false;
/** flag indicating that cancellation is inherited */
- bool cancellation_is_inherited;
+ bool cancellation_is_inherited = false;
/** which ops are in-flight */
- bool sent_initial_metadata;
- bool sending_message;
- bool sent_final_op;
- bool received_initial_metadata;
- bool receiving_message;
- bool requested_final_op;
- gpr_atm any_ops_sent_atm;
- gpr_atm received_final_op_atm;
-
- batch_control* active_batches[MAX_CONCURRENT_BATCHES];
+ bool sent_initial_metadata = false;
+ bool sending_message = false;
+ bool sent_final_op = false;
+ bool received_initial_metadata = false;
+ bool receiving_message = false;
+ bool requested_final_op = false;
+ gpr_atm any_ops_sent_atm = 0;
+ gpr_atm received_final_op_atm = 0;
+
+ batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
grpc_transport_stream_op_batch_payload stream_op_payload;
/* first idx: is_receiving, second idx: is_trailing */
- grpc_metadata_batch metadata_batch[2][2];
+ grpc_metadata_batch metadata_batch[2][2] = {};
/* Buffered read metadata waiting to be returned to the application.
Element 0 is initial metadata, element 1 is trailing metadata. */
- grpc_metadata_array* buffered_metadata[2];
+ grpc_metadata_array* buffered_metadata[2] = {};
grpc_metadata compression_md;
// A char* indicating the peer name.
- gpr_atm peer_string;
-
- /* Packed received call statuses from various sources */
- gpr_atm status[STATUS_SOURCE_COUNT];
+ gpr_atm peer_string = 0;
/* Call data useful used for reporting. Only valid after the call has
* completed */
grpc_call_final_info final_info;
/* Compression algorithm for *incoming* data */
- grpc_message_compression_algorithm incoming_message_compression_algorithm;
+ grpc_message_compression_algorithm incoming_message_compression_algorithm =
+ GRPC_MESSAGE_COMPRESS_NONE;
/* Stream compression algorithm for *incoming* data */
- grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
- /* Supported encodings (compression algorithms), a bitset */
- uint32_t encodings_accepted_by_peer;
+ grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
+ GRPC_STREAM_COMPRESS_NONE;
+ /* Supported encodings (compression algorithms), a bitset.
+ * Always support no compression. */
+ uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
/* Supported stream encodings (stream compression algorithms), a bitset */
- uint32_t stream_encodings_accepted_by_peer;
+ uint32_t stream_encodings_accepted_by_peer = 0;
/* Contexts for various subsystems (security, tracing, ...). */
- grpc_call_context_element context[GRPC_CONTEXT_COUNT];
+ grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
/* for the client, extra metadata is initial metadata; for the
server, it's trailing metadata */
@@ -229,13 +213,14 @@ struct grpc_call {
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
- grpc_byte_buffer** receiving_buffer;
- grpc_slice receiving_slice;
+ grpc_byte_buffer** receiving_buffer = nullptr;
+ grpc_slice receiving_slice = grpc_empty_slice();
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
grpc_closure receiving_trailing_metadata_ready;
- uint32_t test_only_last_message_flags;
+ uint32_t test_only_last_message_flags = 0;
+ gpr_atm cancelled = 0;
grpc_closure release_call;
@@ -247,8 +232,11 @@ struct grpc_call {
} client;
struct {
int* cancelled;
+ // backpointer to owning server if this is a server side call.
+ grpc_server* server;
} server;
} final_op;
+ gpr_atm status_error = 0;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@@ -266,7 +254,7 @@ struct grpc_call {
For 1, 4: See receiving_initial_metadata_ready() function
For 2, 3: See receiving_stream_ready() function */
- gpr_atm recv_state;
+ gpr_atm recv_state = 0;
};
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
@@ -286,23 +274,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression");
static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
grpc_closure* start_batch_closure);
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description);
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error);
+static void cancel_with_error(grpc_call* c, grpc_error* error);
static void destroy_call(void* call_stack, grpc_error* error);
static void receiving_slice_ready(void* bctlp, grpc_error* error);
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string);
-static void set_status_value_directly(grpc_status_code status, void* dest);
-static void set_status_from_error(grpc_call* call, status_source source,
- grpc_error* error);
+static void set_final_status(grpc_call* call, grpc_error* error);
static void process_data_after_md(batch_control* bctl);
static void post_batch_completion(batch_control* bctl);
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled);
static void add_init_error(grpc_error** composite, grpc_error* new_err) {
if (new_err == GRPC_ERROR_NONE) return;
@@ -318,11 +298,10 @@ void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
static parent_call* get_or_create_parent_call(grpc_call* call) {
parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
if (p == nullptr) {
- p = static_cast<parent_call*>(gpr_arena_alloc(call->arena, sizeof(*p)));
- gpr_mu_init(&p->child_list_mu);
+ p = new (gpr_arena_alloc(call->arena, sizeof(*p))) parent_call();
if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
(gpr_atm)p)) {
- gpr_mu_destroy(&p->child_list_mu);
+ p->~parent_call();
p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
}
@@ -341,7 +320,9 @@ size_t grpc_call_get_initial_size_estimate() {
grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_call** out_call) {
GPR_TIMER_SCOPE("grpc_call_create", 0);
- size_t i, j;
+
+ GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
+
grpc_error* error = GRPC_ERROR_NONE;
grpc_channel_stack* channel_stack =
grpc_channel_get_channel_stack(args->channel);
@@ -349,30 +330,19 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
- call = static_cast<grpc_call*>(
- gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
- channel_stack->call_stack_size));
- gpr_ref_init(&call->ext_ref, 1);
- call->arena = arena;
- grpc_call_combiner_init(&call->call_combiner);
+ call = new (gpr_arena_alloc(
+ arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
+ channel_stack->call_stack_size)) grpc_call(arena, *args);
*out_call = call;
- call->channel = args->channel;
- call->cq = args->cq;
- call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
- /* Always support no compression */
- GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
- call->is_client = args->server_transport_data == nullptr;
- if (call->is_client) {
- GRPC_STATS_INC_CLIENT_CALLS_CREATED();
- } else {
- GRPC_STATS_INC_SERVER_CALLS_CREATED();
- }
- call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
+ call->final_op.client.status_details = nullptr;
+ call->final_op.client.status = nullptr;
+ call->final_op.client.error_string = nullptr;
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED();
GPR_ASSERT(args->add_initial_metadata_count <
MAX_SEND_EXTRA_METADATA_COUNT);
- for (i = 0; i < args->add_initial_metadata_count; i++) {
+ for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
call->send_extra_metadata[i].md = args->add_initial_metadata[i];
if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]),
GRPC_MDSTR_PATH)) {
@@ -383,22 +353,19 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
call->send_extra_metadata_count =
static_cast<int>(args->add_initial_metadata_count);
} else {
+ GRPC_STATS_INC_SERVER_CALLS_CREATED();
+ call->final_op.server.cancelled = nullptr;
+ call->final_op.server.server = args->server;
GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
- for (i = 0; i < 2; i++) {
- for (j = 0; j < 2; j++) {
- call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
- }
- }
- grpc_millis send_deadline = args->send_deadline;
+ grpc_millis send_deadline = args->send_deadline;
bool immediately_cancel = false;
if (args->parent != nullptr) {
- call->child =
- static_cast<child_call*>(gpr_arena_alloc(arena, sizeof(child_call)));
- call->child->parent = args->parent;
+ call->child = new (gpr_arena_alloc(arena, sizeof(child_call)))
+ child_call(args->parent);
GRPC_CALL_INTERNAL_REF(args->parent, "child");
GPR_ASSERT(call->is_client);
@@ -432,10 +399,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
}
}
}
-
call->send_deadline = send_deadline;
-
- GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_unref */
grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
args->server_transport_data,
@@ -463,11 +427,12 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
}
gpr_mu_unlock(&pc->child_list_mu);
}
+
if (error != GRPC_ERROR_NONE) {
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
if (immediately_cancel) {
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
}
if (args->cq != nullptr) {
GPR_ASSERT(args->pollset_set_alternative == nullptr &&
@@ -486,10 +451,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
&call->pollent);
}
- grpc_core::channelz::ChannelNode* channelz_channel =
- grpc_channel_get_channelz_node(call->channel);
- if (channelz_channel != nullptr) {
- channelz_channel->RecordCallStarted();
+ if (call->is_client) {
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ channelz_channel->RecordCallStarted();
+ }
+ } else {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ channelz_server->RecordCallStarted();
+ }
}
grpc_slice_unref_internal(path);
@@ -529,9 +502,9 @@ void grpc_call_internal_unref(grpc_call* c REF_ARG) {
static void release_call(void* call, grpc_error* error) {
grpc_call* c = static_cast<grpc_call*>(call);
grpc_channel* channel = c->channel;
- gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string)));
- grpc_call_combiner_destroy(&c->call_combiner);
- grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
+ gpr_arena* arena = c->arena;
+ c->~grpc_call();
+ grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(arena));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
}
@@ -547,7 +520,7 @@ static void destroy_call(void* call, grpc_error* error) {
c->receiving_stream.reset();
parent_call* pc = get_parent_call(c);
if (pc != nullptr) {
- gpr_mu_destroy(&pc->child_list_mu);
+ pc->~parent_call();
}
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
@@ -561,16 +534,15 @@ static void destroy_call(void* call, grpc_error* error) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- get_final_status(c, set_status_value_directly, &c->final_info.final_status,
- nullptr, &(c->final_info.error_string));
+ grpc_error* status_error =
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
+ grpc_error_get_status(status_error, c->send_deadline,
+ &c->final_info.final_status, nullptr, nullptr,
+ &(c->final_info.error_string));
+ GRPC_ERROR_UNREF(status_error);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- GRPC_ERROR_UNREF(
- unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
- }
-
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
@@ -608,7 +580,7 @@ void grpc_call_unref(grpc_call* c) {
bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
- cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(c, GRPC_ERROR_CANCELLED);
} else {
// Unset the call combiner cancellation closure. This has the
// effect of scheduling the previously set cancellation closure, if
@@ -626,8 +598,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
grpc_core::ExecCtx exec_ctx;
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
-
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
return GRPC_CALL_OK;
}
@@ -681,8 +652,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == nullptr);
- cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description);
-
+ cancel_with_status(c, status, description);
return GRPC_CALL_OK;
}
@@ -702,15 +672,17 @@ static void done_termination(void* arg, grpc_error* error) {
gpr_free(state);
}
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error) {
+static void cancel_with_error(grpc_call* c, grpc_error* error) {
+ if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
GRPC_CALL_INTERNAL_REF(c, "termination");
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
// down the filter stack in a timely manner.
grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
- set_status_from_error(c, source, GRPC_ERROR_REF(error));
cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
@@ -722,6 +694,10 @@ static void cancel_with_error(grpc_call* c, status_source source,
execute_batch(c, op, &state->start_batch);
}
+void grpc_call_cancel_internal(grpc_call* call) {
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
+}
+
static grpc_error* error_from_status(grpc_status_code status,
const char* description) {
// copying 'description' is needed to ensure the grpc_call_cancel_with_status
@@ -733,90 +709,47 @@ static grpc_error* error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description) {
- cancel_with_error(c, source, error_from_status(status, description));
-}
-
-/*******************************************************************************
- * FINAL STATUS CODE MANIPULATION
- */
-
-static bool get_final_status_from(
- grpc_call* call, grpc_error* error, bool allow_ok_status,
- void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string) {
- grpc_status_code code;
- grpc_slice slice = grpc_empty_slice();
- grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
- error_string);
- if (code == GRPC_STATUS_OK && !allow_ok_status) {
- return false;
- }
-
- set_value(code, set_value_user_data);
- if (details != nullptr) {
- *details = grpc_slice_ref_internal(slice);
- }
- return true;
+ cancel_with_error(c, error_from_status(status, description));
}
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string) {
- int i;
- received_status status[STATUS_SOURCE_COUNT];
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
- }
+static void set_final_status(grpc_call* call, grpc_error* error) {
if (grpc_call_error_trace.enabled()) {
- gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR");
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set) {
- gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error));
- }
- }
+ gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
+ gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
}
- /* first search through ignoring "OK" statuses: if something went wrong,
- * ensure we report it */
- for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
- /* search for the best status we can present: ideally the error we use has a
- clearly defined grpc-status, and we'll prefer that. */
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set &&
- grpc_error_has_clear_grpc_status(status[i].error)) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
- return;
- }
+ if (call->is_client) {
+ grpc_error_get_status(error, call->send_deadline,
+ call->final_op.client.status,
+ call->final_op.client.status_details, nullptr,
+ call->final_op.client.error_string);
+ // explicitly take a ref
+ grpc_slice_ref_internal(*call->final_op.client.status_details);
+ gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ if (*call->final_op.client.status != GRPC_STATUS_OK) {
+ channelz_channel->RecordCallFailed();
+ } else {
+ channelz_channel->RecordCallSucceeded();
}
}
- /* If no clearly defined status exists, search for 'anything' */
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
- return;
- }
+ } else {
+ *call->final_op.server.cancelled =
+ error != GRPC_ERROR_NONE ||
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
+ GRPC_ERROR_NONE;
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ if (*call->final_op.server.cancelled) {
+ channelz_server->RecordCallFailed();
+ } else {
+ channelz_server->RecordCallSucceeded();
}
}
- }
- /* If nothing exists, set some default */
- if (call->is_client) {
- set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
- } else {
- set_value(GRPC_STATUS_OK, set_value_user_data);
- }
-}
-
-static void set_status_from_error(grpc_call* call, status_source source,
- grpc_error* error) {
- if (!gpr_atm_rel_cas(&call->status[source],
- pack_received_status({false, GRPC_ERROR_NONE}),
- pack_received_status({true, error}))) {
GRPC_ERROR_UNREF(error);
}
}
@@ -1035,6 +968,7 @@ static grpc_stream_compression_algorithm decode_stream_compression(
static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
int is_trailing) {
if (b->list.count == 0) return;
+ if (!call->is_client && is_trailing) return;
if (is_trailing && call->buffered_metadata[1] == nullptr) return;
GPR_TIMER_SCOPE("publish_app_metadata", 0);
grpc_metadata_array* dest;
@@ -1088,9 +1022,12 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
publish_app_metadata(call, b, false);
}
-static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
+static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
+ grpc_error* batch_error) {
grpc_call* call = static_cast<grpc_call*>(args);
- if (b->idx.named.grpc_status != nullptr) {
+ if (batch_error != GRPC_ERROR_NONE) {
+ set_final_status(call, batch_error);
+ } else if (b->idx.named.grpc_status != nullptr) {
grpc_status_code status_code =
grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
grpc_error* error = GRPC_ERROR_NONE;
@@ -1108,8 +1045,18 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
- set_status_from_error(call, STATUS_FROM_WIRE, error);
+ set_final_status(call, GRPC_ERROR_REF(error));
grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
+ GRPC_ERROR_UNREF(error);
+ } else if (!call->is_client) {
+ set_final_status(call, GRPC_ERROR_NONE);
+ } else {
+ gpr_log(GPR_DEBUG,
+ "Received trailing metadata with no error and no status");
+ set_final_status(
+ call, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
}
publish_app_metadata(call, b, true);
}
@@ -1124,14 +1071,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
* BATCH API IMPLEMENTATION
*/
-static void set_status_value_directly(grpc_status_code status, void* dest) {
- *static_cast<grpc_status_code*>(dest) = status;
-}
-
-static void set_cancelled_value(grpc_status_code status, void* dest) {
- *static_cast<int*>(dest) = (status != GRPC_STATUS_OK);
-}
-
static bool are_write_flags_valid(uint32_t flags) {
/* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
const uint32_t allowed_write_positions =
@@ -1180,10 +1119,11 @@ static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
if (bctl->call != nullptr) {
return nullptr;
}
- memset(bctl, 0, sizeof(*bctl));
+ bctl->~batch_control();
+ bctl->op = {};
} else {
- bctl = static_cast<batch_control*>(
- gpr_arena_alloc(call->arena, sizeof(batch_control)));
+ bctl = new (gpr_arena_alloc(call->arena, sizeof(batch_control)))
+ batch_control();
*pslot = bctl;
}
bctl->call = call;
@@ -1199,31 +1139,18 @@ static void finish_batch_completion(void* user_data,
GRPC_CALL_INTERNAL_UNREF(call, "completion");
}
-static grpc_error* consolidate_batch_errors(batch_control* bctl) {
- size_t n = static_cast<size_t>(gpr_atm_acq_load(&bctl->num_errors));
- if (n == 0) {
- return GRPC_ERROR_NONE;
- } else if (n == 1) {
- /* Skip creating a composite error in the case that only one error was
- logged */
- grpc_error* e = bctl->errors[0];
- bctl->errors[0] = nullptr;
- return e;
- } else {
- grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Call batch failed", bctl->errors, n);
- for (size_t i = 0; i < n; i++) {
- GRPC_ERROR_UNREF(bctl->errors[i]);
- bctl->errors[i] = nullptr;
- }
- return error;
- }
+static void reset_batch_errors(batch_control* bctl) {
+ GRPC_ERROR_UNREF(
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
}
static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
- grpc_error* error = consolidate_batch_errors(bctl);
+ grpc_error* error = GRPC_ERROR_REF(
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
@@ -1249,8 +1176,7 @@ static void post_batch_completion(batch_control* bctl) {
next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
- cancel_with_error(child, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
+ cancel_with_error(child, GRPC_ERROR_CANCELLED);
GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
}
child = next_child_call;
@@ -1258,24 +1184,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
- if (call->is_client) {
- get_final_status(call, set_status_value_directly,
- call->final_op.client.status,
- call->final_op.client.status_details,
- call->final_op.client.error_string);
- } else {
- get_final_status(call, set_cancelled_value,
- call->final_op.server.cancelled, nullptr, nullptr);
- }
- grpc_core::channelz::ChannelNode* channelz_channel =
- grpc_channel_get_channelz_node(call->channel);
- if (channelz_channel != nullptr) {
- if (*call->final_op.client.status != GRPC_STATUS_OK) {
- channelz_channel->RecordCallFailed();
- } else {
- channelz_channel->RecordCallSucceeded();
- }
- }
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1284,9 +1192,10 @@ static void post_batch_completion(batch_control* bctl) {
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
}
+ reset_batch_errors(bctl);
if (bctl->completion_data.notify_tag.is_closure) {
- /* unrefs bctl->error */
+ /* unrefs error */
bctl->call = nullptr;
/* This closure may be meant to be run within some combiner. Since we aren't
* running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
@@ -1296,7 +1205,7 @@ static void post_batch_completion(batch_control* bctl) {
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
- /* unrefs bctl->error */
+ /* unrefs error */
grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
finish_batch_completion, bctl,
&bctl->completion_data.cq_completion);
@@ -1405,8 +1314,12 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
call->receiving_stream.reset();
- add_batch_error(bctl, GRPC_ERROR_REF(error), true);
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
+ }
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
/* If recv_state is RECV_NONE, we will save the batch_control
* object with rel_cas, and will not use it after the cas. Its corresponding
@@ -1442,8 +1355,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else if (
grpc_compression_algorithm_from_message_stream_compression_algorithm(
@@ -1455,8 +1367,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
"compression (%d).",
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else {
char* error_msg = nullptr;
@@ -1466,8 +1377,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, compression_algorithm) == 0) {
/* check if algorithm is supported by current channel config */
@@ -1476,8 +1386,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
}
gpr_free(error_msg);
@@ -1495,23 +1404,12 @@ static void validate_filtered_metadata(batch_control* bctl) {
}
}
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled) {
- if (error == GRPC_ERROR_NONE) return;
- int idx = static_cast<int>(gpr_atm_full_fetch_add(&bctl->num_errors, 1));
- if (idx == 0 && !has_cancelled) {
- cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error));
- }
- bctl->errors[idx] = error;
-}
-
static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1524,6 +1422,13 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
call->send_deadline = md->deadline;
}
+ } else {
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
+ }
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
grpc_closure* saved_rsr_closure = nullptr;
@@ -1561,10 +1466,9 @@ static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
+ recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
finish_batch_step(bctl);
}
@@ -1572,7 +1476,14 @@ static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
+ }
+ if (error != GRPC_ERROR_NONE) {
+ cancel_with_error(call, GRPC_ERROR_REF(error));
+ }
finish_batch_step(bctl);
}
@@ -1774,28 +1685,33 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
call->channel, op->data.send_status_from_server.status);
- {
- grpc_error* override_error = GRPC_ERROR_NONE;
- if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
- override_error =
- error_from_status(op->data.send_status_from_server.status,
- "Returned non-ok status");
- }
- if (op->data.send_status_from_server.status_details != nullptr) {
- call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
- GRPC_MDSTR_GRPC_MESSAGE,
- grpc_slice_ref_internal(
- *op->data.send_status_from_server.status_details));
- call->send_extra_metadata_count++;
+ grpc_error* status_error =
+ op->data.send_status_from_server.status == GRPC_STATUS_OK
+ ? GRPC_ERROR_NONE
+ : grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Server returned error"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ static_cast<intptr_t>(
+ op->data.send_status_from_server.status));
+ if (op->data.send_status_from_server.status_details != nullptr) {
+ call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_slice_ref_internal(
+ *op->data.send_status_from_server.status_details));
+ call->send_extra_metadata_count++;
+ if (status_error != GRPC_ERROR_NONE) {
char* msg = grpc_slice_to_c_string(
GRPC_MDVALUE(call->send_extra_metadata[1].md));
- override_error =
- grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
+ status_error =
+ grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
- set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error);
}
+
+ gpr_atm_rel_store(&call->status_error,
+ reinterpret_cast<gpr_atm>(status_error));
if (!prepare_application_metadata(
call,
static_cast<int>(