aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.cc533
-rw-r--r--src/core/lib/surface/call.h8
-rw-r--r--src/core/lib/surface/channel.cc73
-rw-r--r--src/core/lib/surface/channel.h4
-rw-r--r--src/core/lib/surface/completion_queue.cc174
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/completion_queue_factory.cc17
-rw-r--r--src/core/lib/surface/init.cc15
-rw-r--r--src/core/lib/surface/init.h1
-rw-r--r--src/core/lib/surface/server.cc80
-rw-r--r--src/core/lib/surface/server.h4
-rw-r--r--src/core/lib/surface/version.cc2
12 files changed, 572 insertions, 344 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index da488034ca..a9349afa68 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -34,6 +34,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/compression/algorithm_metadata.h"
#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
@@ -47,6 +48,7 @@
#include "src/core/lib/surface/call_test_only.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata.h"
@@ -67,45 +69,8 @@
#define MAX_SEND_EXTRA_METADATA_COUNT 3
-/* Status data for a request can come from several sources; this
- enumerates them all, and acts as a priority sorting for which
- status to return to the application - earlier entries override
- later ones */
-typedef enum {
- /* Status came from the application layer overriding whatever
- the wire says */
- STATUS_FROM_API_OVERRIDE = 0,
- /* Status came from 'the wire' - or somewhere below the surface
- layer */
- STATUS_FROM_WIRE,
- /* Status was created by some internal channel stack operation: must come via
- add_batch_error */
- STATUS_FROM_CORE,
- /* Status was created by some surface error */
- STATUS_FROM_SURFACE,
- /* Status came from the server sending status */
- STATUS_FROM_SERVER_STATUS,
- STATUS_SOURCE_COUNT
-} status_source;
-
-typedef struct {
- bool is_set;
- grpc_error* error;
-} received_status;
-
-static gpr_atm pack_received_status(received_status r) {
- return r.is_set ? (1 | (gpr_atm)r.error) : 0;
-}
-
-static received_status unpack_received_status(gpr_atm atm) {
- if ((atm & 1) == 0) {
- return {false, GRPC_ERROR_NONE};
- } else {
- return {true, (grpc_error*)(atm & ~static_cast<gpr_atm>(1))};
- }
-}
-
-#define MAX_ERRORS_PER_BATCH 4
+// Used to create arena for the first call.
+#define ESTIMATED_MDELEM_COUNT 16
typedef struct batch_control {
grpc_call* call;
@@ -131,10 +96,7 @@ typedef struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
-
- grpc_error* errors[MAX_ERRORS_PER_BATCH];
- gpr_atm num_errors;
-
+ gpr_atm batch_error;
grpc_transport_stream_op_batch op;
} batch_control;
@@ -197,9 +159,6 @@ struct grpc_call {
// A char* indicating the peer name.
gpr_atm peer_string;
- /* Packed received call statuses from various sources */
- gpr_atm status[STATUS_SOURCE_COUNT];
-
/* Call data useful used for reporting. Only valid after the call has
* completed */
grpc_call_final_info final_info;
@@ -230,7 +189,9 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
+ grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
+ gpr_atm cancelled;
grpc_closure release_call;
@@ -242,8 +203,11 @@ struct grpc_call {
} client;
struct {
int* cancelled;
+ // backpointer to owning server if this is a server side call.
+ grpc_server* server;
} server;
} final_op;
+ gpr_atm status_error;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@@ -267,8 +231,13 @@ struct grpc_call {
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1))
-#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1)
+#define CALL_STACK_FROM_CALL(call) \
+ (grpc_call_stack*)((char*)(call) + \
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+#define CALL_FROM_CALL_STACK(call_stack) \
+ (grpc_call*)(((char*)(call_stack)) - \
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \
@@ -276,23 +245,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression");
static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
grpc_closure* start_batch_closure);
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description);
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error);
+static void cancel_with_error(grpc_call* c, grpc_error* error);
static void destroy_call(void* call_stack, grpc_error* error);
static void receiving_slice_ready(void* bctlp, grpc_error* error);
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string);
-static void set_status_value_directly(grpc_status_code status, void* dest);
-static void set_status_from_error(grpc_call* call, status_source source,
- grpc_error* error);
+static void set_final_status(grpc_call* call, grpc_error* error);
static void process_data_after_md(batch_control* bctl);
static void post_batch_completion(batch_control* bctl);
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled);
static void add_init_error(grpc_error** composite, grpc_error* new_err) {
if (new_err == GRPC_ERROR_NONE) return;
@@ -323,6 +284,11 @@ static parent_call* get_parent_call(grpc_call* call) {
return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
+size_t grpc_call_get_initial_size_estimate() {
+ return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
+ sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
+}
+
grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_call** out_call) {
GPR_TIMER_SCOPE("grpc_call_create", 0);
@@ -334,9 +300,11 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
- call = static_cast<grpc_call*>(gpr_arena_alloc(
- arena, sizeof(grpc_call) + channel_stack->call_stack_size));
+ call = static_cast<grpc_call*>(
+ gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
+ channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1);
+ gpr_atm_no_barrier_store(&call->cancelled, 0);
call->arena = arena;
grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
@@ -346,14 +314,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
call->is_client = args->server_transport_data == nullptr;
- if (call->is_client) {
- GRPC_STATS_INC_CLIENT_CALLS_CREATED();
- } else {
- GRPC_STATS_INC_SERVER_CALLS_CREATED();
- }
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED();
GPR_ASSERT(args->add_initial_metadata_count <
MAX_SEND_EXTRA_METADATA_COUNT);
for (i = 0; i < args->add_initial_metadata_count; i++) {
@@ -367,6 +331,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
call->send_extra_metadata_count =
static_cast<int>(args->add_initial_metadata_count);
} else {
+ GRPC_STATS_INC_SERVER_CALLS_CREATED();
+ call->final_op.server.server = args->server;
GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
@@ -448,10 +414,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
gpr_mu_unlock(&pc->child_list_mu);
}
if (error != GRPC_ERROR_NONE) {
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
if (immediately_cancel) {
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
}
if (args->cq != nullptr) {
GPR_ASSERT(args->pollset_set_alternative == nullptr &&
@@ -470,6 +436,20 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
&call->pollent);
}
+ if (call->is_client) {
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ channelz_channel->RecordCallStarted();
+ }
+ } else {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ channelz_server->RecordCallStarted();
+ }
+ }
+
grpc_slice_unref_internal(path);
return error;
@@ -507,13 +487,12 @@ void grpc_call_internal_unref(grpc_call* c REF_ARG) {
static void release_call(void* call, grpc_error* error) {
grpc_call* c = static_cast<grpc_call*>(call);
grpc_channel* channel = c->channel;
+ gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string)));
grpc_call_combiner_destroy(&c->call_combiner);
- gpr_free((char*)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
}
-static void set_status_value_directly(grpc_status_code status, void* dest);
static void destroy_call(void* call, grpc_error* error) {
GPR_TIMER_SCOPE("destroy_call", 0);
size_t i;
@@ -540,16 +519,15 @@ static void destroy_call(void* call, grpc_error* error) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- get_final_status(c, set_status_value_directly, &c->final_info.final_status,
- nullptr, c->final_info.error_string);
+ grpc_error* status_error =
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
+ grpc_error_get_status(status_error, c->send_deadline,
+ &c->final_info.final_status, nullptr, nullptr,
+ &(c->final_info.error_string));
+ GRPC_ERROR_UNREF(status_error);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- GRPC_ERROR_UNREF(
- unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
- }
-
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
@@ -587,13 +565,16 @@ void grpc_call_unref(grpc_call* c) {
bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
- cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(c, GRPC_ERROR_CANCELLED);
} else {
// Unset the call combiner cancellation closure. This has the
// effect of scheduling the previously set cancellation closure, if
// any, so that it can release any internal references it may be
- // holding to the call stack.
+ // holding to the call stack. Also flush the closures on exec_ctx so that
+ // filters that schedule cancel notification closures on exec_ctx do not
+ // need to take a ref of the call stack to guarantee closure liveness.
grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, nullptr);
+ grpc_core::ExecCtx::Get()->Flush();
}
GRPC_CALL_INTERNAL_UNREF(c, "destroy");
}
@@ -602,8 +583,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
grpc_core::ExecCtx exec_ctx;
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
-
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
return GRPC_CALL_OK;
}
@@ -657,8 +637,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == nullptr);
- cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description);
-
+ cancel_with_status(c, status, description);
return GRPC_CALL_OK;
}
@@ -678,15 +657,17 @@ static void done_termination(void* arg, grpc_error* error) {
gpr_free(state);
}
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error) {
+static void cancel_with_error(grpc_call* c, grpc_error* error) {
+ if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
GRPC_CALL_INTERNAL_REF(c, "termination");
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
// down the filter stack in a timely manner.
grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
- set_status_from_error(c, source, GRPC_ERROR_REF(error));
cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
@@ -709,90 +690,47 @@ static grpc_error* error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description) {
- cancel_with_error(c, source, error_from_status(status, description));
+ cancel_with_error(c, error_from_status(status, description));
}
-/*******************************************************************************
- * FINAL STATUS CODE MANIPULATION
- */
-
-static bool get_final_status_from(
- grpc_call* call, grpc_error* error, bool allow_ok_status,
- void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string) {
- grpc_status_code code;
- grpc_slice slice = grpc_empty_slice();
- grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
- error_string);
- if (code == GRPC_STATUS_OK && !allow_ok_status) {
- return false;
- }
-
- set_value(code, set_value_user_data);
- if (details != nullptr) {
- *details = grpc_slice_ref_internal(slice);
- }
- return true;
-}
-
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string) {
- int i;
- received_status status[STATUS_SOURCE_COUNT];
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
- }
+static void set_final_status(grpc_call* call, grpc_error* error) {
if (grpc_call_error_trace.enabled()) {
- gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR");
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set) {
- gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error));
- }
- }
+ gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
+ gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
}
- /* first search through ignoring "OK" statuses: if something went wrong,
- * ensure we report it */
- for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
- /* search for the best status we can present: ideally the error we use has a
- clearly defined grpc-status, and we'll prefer that. */
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set &&
- grpc_error_has_clear_grpc_status(status[i].error)) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
- return;
- }
+ if (call->is_client) {
+ grpc_error_get_status(error, call->send_deadline,
+ call->final_op.client.status,
+ call->final_op.client.status_details, nullptr,
+ call->final_op.client.error_string);
+ // explicitly take a ref
+ grpc_slice_ref_internal(*call->final_op.client.status_details);
+ gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ if (*call->final_op.client.status != GRPC_STATUS_OK) {
+ channelz_channel->RecordCallFailed();
+ } else {
+ channelz_channel->RecordCallSucceeded();
}
}
- /* If no clearly defined status exists, search for 'anything' */
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
- return;
- }
+ } else {
+ *call->final_op.server.cancelled =
+ error != GRPC_ERROR_NONE ||
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
+ GRPC_ERROR_NONE;
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ if (*call->final_op.server.cancelled) {
+ channelz_server->RecordCallFailed();
+ } else {
+ channelz_server->RecordCallSucceeded();
}
}
- }
- /* If nothing exists, set some default */
- if (call->is_client) {
- set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
- } else {
- set_value(GRPC_STATUS_OK, set_value_user_data);
- }
-}
-
-static void set_status_from_error(grpc_call* call, status_source source,
- grpc_error* error) {
- if (!gpr_atm_rel_cas(&call->status[source],
- pack_received_status({false, GRPC_ERROR_NONE}),
- pack_received_status({true, error}))) {
GRPC_ERROR_UNREF(error);
}
}
@@ -1011,6 +949,7 @@ static grpc_stream_compression_algorithm decode_stream_compression(
static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
int is_trailing) {
if (b->list.count == 0) return;
+ if (!call->is_client && is_trailing) return;
if (is_trailing && call->buffered_metadata[1] == nullptr) return;
GPR_TIMER_SCOPE("publish_app_metadata", 0);
grpc_metadata_array* dest;
@@ -1064,18 +1003,20 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
publish_app_metadata(call, b, false);
}
-static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
+static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
+ grpc_error* batch_error) {
grpc_call* call = static_cast<grpc_call*>(args);
- if (b->idx.named.grpc_status != nullptr) {
+ if (batch_error != GRPC_ERROR_NONE) {
+ set_final_status(call, batch_error);
+ } else if (b->idx.named.grpc_status != nullptr) {
grpc_status_code status_code =
grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
- grpc_error* error =
- status_code == GRPC_STATUS_OK
- ? GRPC_ERROR_NONE
- : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Error received from peer"),
- GRPC_ERROR_INT_GRPC_STATUS,
- static_cast<intptr_t>(status_code));
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (status_code != GRPC_STATUS_OK) {
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error received from peer"),
+ GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
+ }
if (b->idx.named.grpc_message != nullptr) {
error = grpc_error_set_str(
error, GRPC_ERROR_STR_GRPC_MESSAGE,
@@ -1085,12 +1026,24 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
- set_status_from_error(call, STATUS_FROM_WIRE, error);
+ set_final_status(call, GRPC_ERROR_REF(error));
grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
+ GRPC_ERROR_UNREF(error);
+ } else if (!call->is_client) {
+ set_final_status(call, GRPC_ERROR_NONE);
+ } else {
+ gpr_log(GPR_DEBUG,
+ "Received trailing metadata with no error and no status");
+ set_final_status(
+ call, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
}
publish_app_metadata(call, b, true);
}
+gpr_arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
+
grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
return CALL_STACK_FROM_CALL(call);
}
@@ -1099,14 +1052,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
* BATCH API IMPLEMENTATION
*/
-static void set_status_value_directly(grpc_status_code status, void* dest) {
- *static_cast<grpc_status_code*>(dest) = status;
-}
-
-static void set_cancelled_value(grpc_status_code status, void* dest) {
- *static_cast<int*>(dest) = (status != GRPC_STATUS_OK);
-}
-
static bool are_write_flags_valid(uint32_t flags) {
/* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
const uint32_t allowed_write_positions =
@@ -1124,7 +1069,7 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
return !(flags & invalid_positions);
}
-static int batch_slot_for_op(grpc_op_type type) {
+static size_t batch_slot_for_op(grpc_op_type type) {
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
return 0;
@@ -1144,20 +1089,23 @@ static int batch_slot_for_op(grpc_op_type type) {
GPR_UNREACHABLE_CODE(return 123456789);
}
-static batch_control* allocate_batch_control(grpc_call* call,
- const grpc_op* ops,
- size_t num_ops) {
- int slot = batch_slot_for_op(ops[0].op);
- batch_control** pslot = &call->active_batches[slot];
- if (*pslot == nullptr) {
- *pslot = static_cast<batch_control*>(
+static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
+ const grpc_op* ops,
+ size_t num_ops) {
+ size_t slot_idx = batch_slot_for_op(ops[0].op);
+ batch_control** pslot = &call->active_batches[slot_idx];
+ batch_control* bctl;
+ if (*pslot != nullptr) {
+ bctl = *pslot;
+ if (bctl->call != nullptr) {
+ return nullptr;
+ }
+ memset(bctl, 0, sizeof(*bctl));
+ } else {
+ bctl = static_cast<batch_control*>(
gpr_arena_alloc(call->arena, sizeof(batch_control)));
+ *pslot = bctl;
}
- batch_control* bctl = *pslot;
- if (bctl->call != nullptr) {
- return nullptr;
- }
- memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
bctl->op.payload = &call->stream_op_payload;
return bctl;
@@ -1171,35 +1119,21 @@ static void finish_batch_completion(void* user_data,
GRPC_CALL_INTERNAL_UNREF(call, "completion");
}
-static grpc_error* consolidate_batch_errors(batch_control* bctl) {
- size_t n = static_cast<size_t>(gpr_atm_acq_load(&bctl->num_errors));
- if (n == 0) {
- return GRPC_ERROR_NONE;
- } else if (n == 1) {
- /* Skip creating a composite error in the case that only one error was
- logged */
- grpc_error* e = bctl->errors[0];
- bctl->errors[0] = nullptr;
- return e;
- } else {
- grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Call batch failed", bctl->errors, n);
- for (size_t i = 0; i < n; i++) {
- GRPC_ERROR_UNREF(bctl->errors[i]);
- bctl->errors[i] = nullptr;
- }
- return error;
- }
+static void reset_batch_errors(batch_control* bctl) {
+ GRPC_ERROR_UNREF(
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
}
static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
- grpc_error* error = consolidate_batch_errors(bctl);
+ grpc_error* error = GRPC_ERROR_REF(
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1207,14 +1141,9 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
-
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1227,8 +1156,7 @@ static void post_batch_completion(batch_control* bctl) {
next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
- cancel_with_error(child, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
+ cancel_with_error(child, GRPC_ERROR_CANCELLED);
GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
}
child = next_child_call;
@@ -1236,17 +1164,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
-
- if (call->is_client) {
- get_final_status(call, set_status_value_directly,
- call->final_op.client.status,
- call->final_op.client.status_details,
- call->final_op.client.error_string);
- } else {
- get_final_status(call, set_cancelled_value,
- call->final_op.server.cancelled, nullptr, nullptr);
- }
-
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1255,15 +1172,20 @@ static void post_batch_completion(batch_control* bctl) {
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
}
+ reset_batch_errors(bctl);
if (bctl->completion_data.notify_tag.is_closure) {
- /* unrefs bctl->error */
+ /* unrefs error */
bctl->call = nullptr;
- GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
- error);
+ /* This closure may be meant to be run within some combiner. Since we aren't
+ * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
+ * of GRPC_CLOSURE_RUN.
+ */
+ GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
+ error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
- /* unrefs bctl->error */
+ /* unrefs error */
grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
finish_batch_completion, bctl,
&bctl->completion_data.cq_completion);
@@ -1372,8 +1294,12 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
call->receiving_stream.reset();
- add_batch_error(bctl, GRPC_ERROR_REF(error), true);
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
+ }
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
/* If recv_state is RECV_NONE, we will save the batch_control
* object with rel_cas, and will not use it after the cas. Its corresponding
@@ -1409,8 +1335,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else if (
grpc_compression_algorithm_from_message_stream_compression_algorithm(
@@ -1422,8 +1347,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
"compression (%d).",
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else {
char* error_msg = nullptr;
@@ -1433,8 +1357,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, compression_algorithm) == 0) {
/* check if algorithm is supported by current channel config */
@@ -1443,8 +1366,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
}
gpr_free(error_msg);
@@ -1462,23 +1384,12 @@ static void validate_filtered_metadata(batch_control* bctl) {
}
}
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled) {
- if (error == GRPC_ERROR_NONE) return;
- int idx = static_cast<int>(gpr_atm_full_fetch_add(&bctl->num_errors, 1));
- if (idx == 0 && !has_cancelled) {
- cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error));
- }
- bctl->errors[idx] = error;
-}
-
static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1491,6 +1402,13 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
call->send_deadline = md->deadline;
}
+ } else {
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
+ }
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
grpc_closure* saved_rsr_closure = nullptr;
@@ -1524,11 +1442,28 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
+static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = static_cast<batch_control*>(bctlp);
+ grpc_call* call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
+ finish_batch_step(bctl);
+}
+
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
+ GRPC_ERROR_NONE) {
+ gpr_atm_rel_store(&bctl->batch_error,
+ reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
+ }
+ if (error != GRPC_ERROR_NONE) {
+ cancel_with_error(call, GRPC_ERROR_REF(error));
+ }
finish_batch_step(bctl);
}
@@ -1544,7 +1479,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
- int num_completion_callbacks_needed = 1;
+ bool has_send_ops = false;
+ int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1565,7 +1501,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
goto done;
}
- bctl = allocate_batch_control(call, ops, nops);
+ bctl = reuse_or_allocate_batch_control(call, ops, nops);
if (bctl == nullptr) {
return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
}
@@ -1650,6 +1586,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1679,6 +1616,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1699,6 +1637,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1726,28 +1665,33 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
call->channel, op->data.send_status_from_server.status);
- {
- grpc_error* override_error = GRPC_ERROR_NONE;
- if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
- override_error =
- error_from_status(op->data.send_status_from_server.status,
- "Returned non-ok status");
- }
- if (op->data.send_status_from_server.status_details != nullptr) {
- call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
- GRPC_MDSTR_GRPC_MESSAGE,
- grpc_slice_ref_internal(
- *op->data.send_status_from_server.status_details));
- call->send_extra_metadata_count++;
+ grpc_error* status_error =
+ op->data.send_status_from_server.status == GRPC_STATUS_OK
+ ? GRPC_ERROR_NONE
+ : grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Server returned error"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ static_cast<intptr_t>(
+ op->data.send_status_from_server.status));
+ if (op->data.send_status_from_server.status_details != nullptr) {
+ call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_slice_ref_internal(
+ *op->data.send_status_from_server.status_details));
+ call->send_extra_metadata_count++;
+ if (status_error != GRPC_ERROR_NONE) {
char* msg = grpc_slice_to_c_string(
GRPC_MDVALUE(call->send_extra_metadata[1].md));
- override_error =
- grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
+ status_error =
+ grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
- set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error);
}
+
+ gpr_atm_rel_store(&call->status_error,
+ reinterpret_cast<gpr_atm>(status_error));
if (!prepare_application_metadata(
call,
static_cast<int>(
@@ -1763,6 +1707,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1790,7 +1735,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1812,7 +1757,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1838,11 +1783,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1863,11 +1813,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
}
@@ -1877,20 +1832,22 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
- gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+ if (has_send_ops) {
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
+ }
+ gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_batch(call, stream_op, &bctl->start_batch);
done:
return error;
done_with_error:
- /* reverse any mutations that occured */
+ /* reverse any mutations that occurred */
if (stream_op->send_initial_metadata) {
call->sent_initial_metadata = false;
grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 793cce4efa..b34260505a 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -33,6 +33,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
typedef struct grpc_call_create_args {
grpc_channel* channel;
+ grpc_server* server;
grpc_call* parent;
uint32_t propagation_mask;
@@ -71,6 +72,8 @@ void grpc_call_internal_unref(grpc_call* call);
#define GRPC_CALL_INTERNAL_UNREF(call, reason) grpc_call_internal_unref(call)
#endif
+gpr_arena* grpc_call_get_arena(grpc_call* call);
+
grpc_call_stack* grpc_call_get_call_stack(grpc_call* call);
grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
@@ -98,6 +101,11 @@ void* grpc_call_context_get(grpc_call* call, grpc_context_index elem);
uint8_t grpc_call_is_client(grpc_call* call);
+/* Get the estimated memory size for a call BESIDES the call stack. Combined
+ * with the size of the call stack, it helps estimate the arena size for the
+ * initial call. */
+size_t grpc_call_get_initial_size_estimate();
+
/* Return an appropriate compression algorithm for the requested compression \a
* level in the context of \a call. */
grpc_compression_algorithm grpc_call_compression_for_level(
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index d740ebd411..054fe105c3 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -32,6 +32,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_trace.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
@@ -66,16 +67,12 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call* registered_calls;
- grpc_core::RefCountedPtr<grpc_core::ChannelTrace> tracer;
+ grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_channel;
char* target;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1))
-#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
- (((grpc_channel*)(channel_stack)) - 1)
-#define CHANNEL_FROM_TOP_ELEM(top_elem) \
- CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
static void destroy_channel(void* arg, grpc_error* error);
@@ -103,16 +100,22 @@ grpc_channel* grpc_channel_create_with_builder(
return channel;
}
- memset(channel, 0, sizeof(*channel));
channel->target = target;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
size_t channel_tracer_max_nodes = 0; // default to off
+ bool channelz_enabled = false;
+ bool internal_channel = false;
+ // this creates the default ChannelNode. Different types of channels may
+ // override this to ensure a correct ChannelNode is created.
+ grpc_core::channelz::ChannelNodeCreationFunc channel_node_create_func =
+ grpc_core::channelz::ChannelNode::MakeChannelNode;
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = nullptr;
gpr_atm_no_barrier_store(
&channel->call_size_estimate,
- (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size);
+ (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size +
+ grpc_call_get_initial_size_estimate());
grpc_compression_options_init(&channel->compression_options);
for (size_t i = 0; i < args->num_args; i++) {
@@ -144,15 +147,33 @@ grpc_channel* grpc_channel_create_with_builder(
const grpc_integer_options options = {0, 0, INT_MAX};
channel_tracer_max_nodes =
(size_t)grpc_channel_arg_get_integer(&args->args[i], options);
+ } else if (0 == strcmp(args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
+ // channelz will not be enabled by default until all concerns in
+ // https://github.com/grpc/grpc/issues/15986 are addressed.
+ channelz_enabled = grpc_channel_arg_get_bool(&args->args[i], false);
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC)) {
+ GPR_ASSERT(args->args[i].type == GRPC_ARG_POINTER);
+ GPR_ASSERT(args->args[i].value.pointer.p != nullptr);
+ channel_node_create_func =
+ reinterpret_cast<grpc_core::channelz::ChannelNodeCreationFunc>(
+ args->args[i].value.pointer.p);
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL)) {
+ internal_channel = grpc_channel_arg_get_bool(&args->args[i], false);
}
}
grpc_channel_args_destroy(args);
- channel->tracer = grpc_core::MakeRefCounted<grpc_core::ChannelTrace>(
- channel_tracer_max_nodes);
- channel->tracer->AddTraceEvent(
- grpc_core::ChannelTrace::Severity::Info,
- grpc_slice_from_static_string("Channel created"));
+ // we only need to do the channelz bookkeeping for clients here. The channelz
+ // bookkeeping for server channels occurs in src/core/lib/surface/server.cc
+ if (channelz_enabled && channel->is_client) {
+ channel->channelz_channel = channel_node_create_func(
+ channel, channel_tracer_max_nodes, !internal_channel);
+ channel->channelz_channel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Channel created"));
+ }
return channel;
}
@@ -187,12 +208,9 @@ static grpc_channel_args* build_channel_args(
return grpc_channel_args_copy_and_add(input_args, new_args, num_new_args);
}
-char* grpc_channel_get_trace(grpc_channel* channel) {
- return channel->tracer->RenderTrace();
-}
-
-intptr_t grpc_channel_get_uuid(grpc_channel* channel) {
- return channel->tracer->GetUuid();
+grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
+ grpc_channel* channel) {
+ return channel->channelz_channel.get();
}
grpc_channel* grpc_channel_create(const char* target,
@@ -263,6 +281,17 @@ void grpc_channel_get_info(grpc_channel* channel,
elem->filter->get_channel_info(elem, channel_info);
}
+void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_API_TRACE("grpc_channel_reset_connect_backoff(channel=%p)", 1,
+ (channel));
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->reset_connect_backoff = true;
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
static grpc_call* grpc_channel_create_call_internal(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
@@ -398,6 +427,13 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) {
static void destroy_channel(void* arg, grpc_error* error) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
+ if (channel->channelz_channel != nullptr) {
+ channel->channelz_channel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Channel destroyed"));
+ channel->channelz_channel->MarkChannelDestroyed();
+ channel->channelz_channel.reset();
+ }
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
registered_call* rc = channel->registered_calls;
@@ -406,7 +442,6 @@ static void destroy_channel(void* arg, grpc_error* error) {
GRPC_MDELEM_UNREF(rc->authority);
gpr_free(rc);
}
- channel->tracer.reset();
gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target);
gpr_free(channel);
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 288313951e..e5ff2c3596 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -23,6 +23,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/surface/channel_stack_type.h"
grpc_channel* grpc_channel_create(const char* target,
@@ -50,6 +51,9 @@ grpc_call* grpc_channel_create_pollset_set_call(
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel);
+grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
+ grpc_channel* channel);
+
/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of
status_code.
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index f751741712..5dc9991f70 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void* data);
+ void (*init)(void* data,
+ grpc_experimental_completion_queue_functor* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@@ -253,6 +254,23 @@ typedef struct cq_pluck_data {
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
} cq_pluck_data;
+typedef struct cq_callback_data {
+ /** No actual completed events queue, unlike other types */
+
+ /** Number of pending events (+1 if we're not shutdown) */
+ gpr_atm pending_events;
+
+ /** Counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
+
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
+
+ /** A callback that gets invoked when the CQ completes shutdown */
+ grpc_experimental_completion_queue_functor* shutdown_callback;
+} cq_callback_data;
+
/* Completion queue structure */
struct grpc_completion_queue {
/** Once owning_refs drops to zero, we will destroy the cq */
@@ -276,12 +294,21 @@ struct grpc_completion_queue {
/* Forward declarations */
static void cq_finish_shutdown_next(grpc_completion_queue* cq);
static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
static void cq_shutdown_next(grpc_completion_queue* cq);
static void cq_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_shutdown_callback(grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
-
+static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
+
+// A cq_end_op function is called when an operation on a given CQ with
+// a given tag has completed. The storage argument is a reference to the
+// space reserved for this completion as it is placed into the corresponding
+// queue. The done argument is a callback that will be invoked when it is
+// safe to free up that storage. The storage MUST NOT be freed until the
+// done callback is invoked.
static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_error* error,
void (*done)(void* done_arg,
@@ -294,16 +321,28 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
+static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
+
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved);
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
-static void cq_init_next(void* data);
-static void cq_init_pluck(void* data);
+// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_callback(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
+static void cq_destroy_callback(void* data);
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
@@ -315,14 +354,17 @@ static const cq_vtable g_cq_vtable[] = {
{GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
cq_pluck},
+ /* GRPC_CQ_CALLBACK */
+ {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
+ cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
+ cq_end_op_for_callback, nullptr, nullptr},
};
#define DATA_FROM_CQ(cq) ((void*)(cq + 1))
#define POLLSET_FROM_CQ(cq) \
((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
-grpc_core::TraceFlag grpc_cq_pluck_trace(true, "queue_pluck");
-grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout");
+grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
if (grpc_api_trace.enabled() && (grpc_cq_pluck_trace.enabled() || \
@@ -420,8 +462,8 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
}
grpc_completion_queue* grpc_completion_queue_create_internal(
- grpc_cq_completion_type completion_type,
- grpc_cq_polling_type polling_type) {
+ grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
+ grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq;
@@ -449,15 +491,16 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
gpr_ref_init(&cq->owning_refs, 2);
poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
- vtable->init(DATA_FROM_CQ(cq));
+ vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
grpc_schedule_on_exec_ctx);
return cq;
}
-static void cq_init_next(void* ptr) {
- cq_next_data* cqd = static_cast<cq_next_data*>(ptr);
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
+ cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
@@ -465,14 +508,15 @@ static void cq_init_next(void* ptr) {
cq_event_queue_init(&cqd->queue);
}
-static void cq_destroy_next(void* ptr) {
- cq_next_data* cqd = static_cast<cq_next_data*>(ptr);
+static void cq_destroy_next(void* data) {
+ cq_next_data* cqd = static_cast<cq_next_data*>(data);
GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void* ptr) {
- cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr);
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
+ cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
@@ -483,11 +527,23 @@ static void cq_init_pluck(void* ptr) {
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
-static void cq_destroy_pluck(void* ptr) {
- cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr);
+static void cq_destroy_pluck(void* data) {
+ cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
}
+static void cq_init_callback(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
+ cqd->shutdown_called = false;
+ gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
+ cqd->shutdown_callback = shutdown_callback;
+}
+
+static void cq_destroy_callback(void* data) {}
+
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
return cq->vtable->cq_completion_type;
}
@@ -597,6 +653,11 @@ static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
return atm_inc_if_nonzero(&cqd->pending_events);
}
+static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+ return atm_inc_if_nonzero(&cqd->pending_events);
+}
+
bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
@@ -760,6 +821,49 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
GRPC_ERROR_UNREF(error);
}
+/* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
+static void cq_end_op_for_callback(
+ grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
+ grpc_cq_completion* storage) {
+ GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
+
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+ bool is_success = (error == GRPC_ERROR_NONE);
+
+ if (grpc_api_trace.enabled() ||
+ (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
+ const char* errmsg = grpc_error_string(error);
+ GRPC_API_TRACE(
+ "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
+ "done=%p, done_arg=%p, storage=%p)",
+ 6, (cq, tag, errmsg, done, done_arg, storage));
+ if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
+ }
+ }
+
+ // The callback-based CQ isn't really a queue at all and thus has no need
+ // for reserved storage. Invoke the done callback right away to release it.
+ done(done_arg, storage);
+
+ gpr_mu_lock(cq->mu);
+ cq_check_tag(cq, tag, false); /* Used in debug builds only */
+
+ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_callback(cq);
+ gpr_mu_unlock(cq->mu);
+ } else {
+ gpr_mu_unlock(cq->mu);
+ }
+
+ GRPC_ERROR_UNREF(error);
+
+ auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
+ (*functor->functor_run)(functor, is_success);
+}
+
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
@@ -1234,6 +1338,42 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) {
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
}
+static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+ auto* callback = cqd->shutdown_callback;
+
+ GPR_ASSERT(cqd->shutdown_called);
+
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
+ (*callback->functor_run)(callback, true);
+}
+
+static void cq_shutdown_callback(grpc_completion_queue* cq) {
+ cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
+
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
+ gpr_mu_lock(cq->mu);
+ if (cqd->shutdown_called) {
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
+ return;
+ }
+ cqd->shutdown_called = true;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ gpr_mu_unlock(cq->mu);
+ cq_finish_shutdown_callback(cq);
+ } else {
+ gpr_mu_unlock(cq->mu);
+ }
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
+}
+
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index c9dc2d93c1..d60fe6d6ef 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -25,12 +25,12 @@
#include <grpc/grpc.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/iomgr/pollset.h"
/* These trace flags default to 1. The corresponding lines are only traced
if grpc_api_trace is also truthy */
extern grpc_core::TraceFlag grpc_cq_pluck_trace;
-extern grpc_core::TraceFlag grpc_cq_event_timeout_trace;
extern grpc_core::TraceFlag grpc_trace_operation_failures;
extern grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags;
extern grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount;
@@ -88,6 +88,7 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cc);
int grpc_get_cq_poll_num(grpc_completion_queue* cc);
grpc_completion_queue* grpc_completion_queue_create_internal(
- grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
+ grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
+ grpc_experimental_completion_queue_functor* shutdown_callback);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc
index 51c1183c5f..2616c156e4 100644
--- a/src/core/lib/surface/completion_queue_factory.cc
+++ b/src/core/lib/surface/completion_queue_factory.cc
@@ -30,8 +30,8 @@
static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attr) {
- return grpc_completion_queue_create_internal(attr->cq_completion_type,
- attr->cq_polling_type);
+ return grpc_completion_queue_create_internal(
+ attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -60,14 +60,23 @@ const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup(
grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT,
- GRPC_CQ_DEFAULT_POLLING};
+ GRPC_CQ_DEFAULT_POLLING, nullptr};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING};
+ GRPC_CQ_DEFAULT_POLLING, nullptr};
+ return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
+}
+
+grpc_completion_queue* grpc_completion_queue_create_for_callback(
+ grpc_experimental_completion_queue_functor* shutdown_callback,
+ void* reserved) {
+ GPR_ASSERT(!reserved);
+ grpc_completion_queue_attributes attr = {
+ 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index bd436d6857..0ad82fed99 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -27,13 +27,12 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/channel_trace_registry.h"
+#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gpr/fork.h"
-#include "src/core/lib/gprpp/thd.h"
+#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -65,12 +64,10 @@ static int g_initializations;
static void do_basic_init(void) {
gpr_log_verbosity_init();
- grpc_fork_support_init();
gpr_mu_init(&g_init_mu);
grpc_register_built_in_plugins();
grpc_cq_global_init();
g_initializations = 0;
- grpc_fork_handlers_auto_register();
}
static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
@@ -123,13 +120,14 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
+ grpc_core::Fork::GlobalInit();
+ grpc_fork_handlers_auto_register();
gpr_time_init();
- grpc_core::Thread::Init();
grpc_stats_init();
grpc_slice_intern_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
- grpc_channel_trace_registry_init();
+ grpc_core::channelz::ChannelzRegistry::Init();
grpc_security_pre_init();
grpc_core::ExecCtx::GlobalInit();
grpc_iomgr_init();
@@ -178,8 +176,9 @@ void grpc_shutdown(void) {
grpc_mdctx_global_shutdown();
grpc_handshaker_factory_registry_shutdown();
grpc_slice_intern_shutdown();
- grpc_channel_trace_registry_shutdown();
+ grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
+ grpc_core::Fork::GlobalShutdown();
}
grpc_core::ExecCtx::GlobalShutdown();
}
diff --git a/src/core/lib/surface/init.h b/src/core/lib/surface/init.h
index 9353208332..193f51447d 100644
--- a/src/core/lib/surface/init.h
+++ b/src/core/lib/surface/init.h
@@ -22,6 +22,5 @@
void grpc_register_security_filters(void);
void grpc_security_pre_init(void);
void grpc_security_init(void);
-int grpc_is_initialized(void);
#endif /* GRPC_CORE_LIB_SURFACE_INIT_H */
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index cb34def740..72ddc2648d 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -149,10 +149,16 @@ struct call_data {
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure* on_done_recv_initial_metadata;
+ grpc_closure recv_trailing_metadata_ready;
+ grpc_error* recv_initial_metadata_error;
+ grpc_closure* original_recv_trailing_metadata_ready;
+ grpc_error* recv_trailing_metadata_error;
+ bool seen_recv_trailing_metadata_ready;
grpc_closure publish;
call_data* pending_next;
+ grpc_call_combiner* call_combiner;
};
struct request_matcher {
@@ -219,6 +225,8 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
};
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -364,6 +372,7 @@ static void server_ref(grpc_server* server) {
static void server_delete(grpc_server* server) {
registered_method* rm;
size_t i;
+ server->channelz_server.reset();
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
@@ -721,13 +730,43 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
if (calld->host_set && calld->path_set) {
/* do nothing */
} else {
+ /* Pass the error reference to calld->recv_initial_metadata_error */
grpc_error* src_error = error;
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Missing :authority or :path", &error, 1);
+ "Missing :authority or :path", &src_error, 1);
GRPC_ERROR_UNREF(src_error);
+ calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
+ }
+ grpc_closure* closure = calld->on_done_recv_initial_metadata;
+ calld->on_done_recv_initial_metadata = nullptr;
+ if (calld->seen_recv_trailing_metadata_ready) {
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_error,
+ "continue server_recv_trailing_metadata_ready");
}
+ GRPC_CLOSURE_RUN(closure, error);
+}
- GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
+static void server_recv_trailing_metadata_ready(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->on_done_recv_initial_metadata != nullptr) {
+ calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
+ calld->seen_recv_trailing_metadata_ready = true;
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ "deferring server_recv_trailing_metadata_ready "
+ "until after server_on_recv_initial_metadata");
+ return;
+ }
+ error =
+ grpc_error_add_child(GRPC_ERROR_REF(error),
+ GRPC_ERROR_REF(calld->recv_initial_metadata_error));
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
static void server_mutate_op(grpc_call_element* elem,
@@ -745,6 +784,12 @@ static void server_mutate_op(grpc_call_element* elem,
op->payload->recv_initial_metadata.recv_flags =
&calld->recv_initial_metadata_flags;
}
+ if (op->recv_trailing_metadata) {
+ calld->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
+ }
}
static void server_start_transport_stream_op_batch(
@@ -779,6 +824,7 @@ static void accept_stream(void* cd, grpc_transport* transport,
args.channel = chand->channel;
args.server_transport_data = transport_server_data;
args.send_deadline = GRPC_MILLIS_INF_FUTURE;
+ args.server = chand->server;
grpc_call* call;
grpc_error* error = grpc_call_create(&args, &call);
grpc_call_element* elem =
@@ -824,11 +870,14 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
memset(calld, 0, sizeof(call_data));
calld->deadline = GRPC_MILLIS_INF_FUTURE;
calld->call = grpc_call_from_top_element(elem);
+ calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
-
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
server_ref(chand->server);
return GRPC_ERROR_NONE;
}
@@ -840,7 +889,7 @@ static void destroy_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
GPR_ASSERT(calld->state != PENDING);
-
+ GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
if (calld->host_set) {
grpc_slice_unref_internal(calld->host);
}
@@ -941,6 +990,7 @@ void grpc_server_register_completion_queue(grpc_server* server,
}
grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server* server =
@@ -957,6 +1007,20 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
server->channel_args = grpc_channel_args_copy(args);
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
+ if (grpc_channel_arg_get_bool(arg, false)) {
+ arg = grpc_channel_args_find(args,
+ GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
+ size_t trace_events_per_node =
+ grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+ server->channelz_server =
+ grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
+ trace_events_per_node);
+ server->channelz_server->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Server created"));
+ }
+
return server;
}
@@ -1459,3 +1523,11 @@ int grpc_server_has_open_connections(grpc_server* server) {
gpr_mu_unlock(&server->mu_global);
return r;
}
+
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server) {
+ if (server == nullptr) {
+ return nullptr;
+ }
+ return server->channelz_server.get();
+}
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index c617cc223e..0196743ff9 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -23,6 +23,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/transport.h"
@@ -46,6 +47,9 @@ void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args);
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server);
+
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
int grpc_server_has_open_connections(grpc_server* server);
diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc
index 306b7c395e..a44f9acdc3 100644
--- a/src/core/lib/surface/version.cc
+++ b/src/core/lib/surface/version.cc
@@ -25,4 +25,4 @@
const char* grpc_version_string(void) { return "6.0.0-dev"; }
-const char* grpc_g_stands_for(void) { return "gloriosa"; }
+const char* grpc_g_stands_for(void) { return "gao"; }