aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.cc180
-rw-r--r--src/core/lib/surface/call.h4
-rw-r--r--src/core/lib/surface/channel.cc26
-rw-r--r--src/core/lib/surface/channel.h3
-rw-r--r--src/core/lib/surface/completion_queue.cc2
-rw-r--r--src/core/lib/surface/init.cc2
-rw-r--r--src/core/lib/surface/server.cc248
-rw-r--r--src/core/lib/surface/server.h13
-rw-r--r--src/core/lib/surface/version.cc2
9 files changed, 293 insertions, 187 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index a9349afa68..89b3f77822 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -72,8 +72,11 @@
// Used to create arena for the first call.
#define ESTIMATED_MDELEM_COUNT 16
-typedef struct batch_control {
- grpc_call* call;
+struct batch_control {
+ batch_control() { gpr_ref_init(&steps_to_complete, 0); }
+
+ grpc_call* call = nullptr;
+ grpc_transport_stream_op_batch op;
/* Share memory for cq_completion and notify_tag as they are never needed
simultaneously. Each byte used in this data structure count as six bytes
per call, so any savings we can make are worthwhile,
@@ -96,84 +99,110 @@ typedef struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
- gpr_atm batch_error;
- grpc_transport_stream_op_batch op;
-} batch_control;
+ gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
+};
+
+struct parent_call {
+ parent_call() { gpr_mu_init(&child_list_mu); }
+ ~parent_call() { gpr_mu_destroy(&child_list_mu); }
-typedef struct {
gpr_mu child_list_mu;
- grpc_call* first_child;
-} parent_call;
+ grpc_call* first_child = nullptr;
+};
-typedef struct {
+struct child_call {
+ child_call(grpc_call* parent) : parent(parent) {}
grpc_call* parent;
/** siblings: children of the same parent form a list, and this list is
protected under
parent->mu */
- grpc_call* sibling_next;
- grpc_call* sibling_prev;
-} child_call;
+ grpc_call* sibling_next = nullptr;
+ grpc_call* sibling_prev = nullptr;
+};
#define RECV_NONE ((gpr_atm)0)
#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
struct grpc_call {
+ grpc_call(gpr_arena* arena, const grpc_call_create_args& args)
+ : arena(arena),
+ cq(args.cq),
+ channel(args.channel),
+ is_client(args.server_transport_data == nullptr),
+ stream_op_payload(context) {
+ gpr_ref_init(&ext_ref, 1);
+ grpc_call_combiner_init(&call_combiner);
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
+ }
+ }
+ }
+
+ ~grpc_call() {
+ gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
+ grpc_call_combiner_destroy(&call_combiner);
+ }
+
gpr_refcount ext_ref;
gpr_arena* arena;
grpc_call_combiner call_combiner;
grpc_completion_queue* cq;
grpc_polling_entity pollent;
grpc_channel* channel;
- gpr_timespec start_time;
- /* parent_call* */ gpr_atm parent_call_atm;
- child_call* child;
+ gpr_timespec start_time = gpr_now(GPR_CLOCK_MONOTONIC);
+ /* parent_call* */ gpr_atm parent_call_atm = 0;
+ child_call* child = nullptr;
/* client or server call */
bool is_client;
/** has grpc_call_unref been called */
- bool destroy_called;
+ bool destroy_called = false;
/** flag indicating that cancellation is inherited */
- bool cancellation_is_inherited;
+ bool cancellation_is_inherited = false;
/** which ops are in-flight */
- bool sent_initial_metadata;
- bool sending_message;
- bool sent_final_op;
- bool received_initial_metadata;
- bool receiving_message;
- bool requested_final_op;
- gpr_atm any_ops_sent_atm;
- gpr_atm received_final_op_atm;
-
- batch_control* active_batches[MAX_CONCURRENT_BATCHES];
+ bool sent_initial_metadata = false;
+ bool sending_message = false;
+ bool sent_final_op = false;
+ bool received_initial_metadata = false;
+ bool receiving_message = false;
+ bool requested_final_op = false;
+ gpr_atm any_ops_sent_atm = 0;
+ gpr_atm received_final_op_atm = 0;
+
+ batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
grpc_transport_stream_op_batch_payload stream_op_payload;
/* first idx: is_receiving, second idx: is_trailing */
- grpc_metadata_batch metadata_batch[2][2];
+ grpc_metadata_batch metadata_batch[2][2] = {};
/* Buffered read metadata waiting to be returned to the application.
Element 0 is initial metadata, element 1 is trailing metadata. */
- grpc_metadata_array* buffered_metadata[2];
+ grpc_metadata_array* buffered_metadata[2] = {};
grpc_metadata compression_md;
// A char* indicating the peer name.
- gpr_atm peer_string;
+ gpr_atm peer_string = 0;
/* Call data useful used for reporting. Only valid after the call has
* completed */
grpc_call_final_info final_info;
/* Compression algorithm for *incoming* data */
- grpc_message_compression_algorithm incoming_message_compression_algorithm;
+ grpc_message_compression_algorithm incoming_message_compression_algorithm =
+ GRPC_MESSAGE_COMPRESS_NONE;
/* Stream compression algorithm for *incoming* data */
- grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
- /* Supported encodings (compression algorithms), a bitset */
- uint32_t encodings_accepted_by_peer;
+ grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
+ GRPC_STREAM_COMPRESS_NONE;
+ /* Supported encodings (compression algorithms), a bitset.
+ * Always support no compression. */
+ uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
/* Supported stream encodings (stream compression algorithms), a bitset */
- uint32_t stream_encodings_accepted_by_peer;
+ uint32_t stream_encodings_accepted_by_peer = 0;
/* Contexts for various subsystems (security, tracing, ...). */
- grpc_call_context_element context[GRPC_CONTEXT_COUNT];
+ grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
/* for the client, extra metadata is initial metadata; for the
server, it's trailing metadata */
@@ -184,14 +213,14 @@ struct grpc_call {
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
- grpc_byte_buffer** receiving_buffer;
- grpc_slice receiving_slice;
+ grpc_byte_buffer** receiving_buffer = nullptr;
+ grpc_slice receiving_slice = grpc_empty_slice();
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
grpc_closure receiving_trailing_metadata_ready;
- uint32_t test_only_last_message_flags;
- gpr_atm cancelled;
+ uint32_t test_only_last_message_flags = 0;
+ gpr_atm cancelled = 0;
grpc_closure release_call;
@@ -207,7 +236,7 @@ struct grpc_call {
grpc_server* server;
} server;
} final_op;
- gpr_atm status_error;
+ gpr_atm status_error = 0;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@@ -225,7 +254,7 @@ struct grpc_call {
For 1, 4: See receiving_initial_metadata_ready() function
For 2, 3: See receiving_stream_ready() function */
- gpr_atm recv_state;
+ gpr_atm recv_state = 0;
};
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
@@ -269,11 +298,10 @@ void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
static parent_call* get_or_create_parent_call(grpc_call* call) {
parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
if (p == nullptr) {
- p = static_cast<parent_call*>(gpr_arena_alloc(call->arena, sizeof(*p)));
- gpr_mu_init(&p->child_list_mu);
+ p = new (gpr_arena_alloc(call->arena, sizeof(*p))) parent_call();
if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
(gpr_atm)p)) {
- gpr_mu_destroy(&p->child_list_mu);
+ p->~parent_call();
p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
}
@@ -292,7 +320,9 @@ size_t grpc_call_get_initial_size_estimate() {
grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_call** out_call) {
GPR_TIMER_SCOPE("grpc_call_create", 0);
- size_t i, j;
+
+ GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
+
grpc_error* error = GRPC_ERROR_NONE;
grpc_channel_stack* channel_stack =
grpc_channel_get_channel_stack(args->channel);
@@ -300,27 +330,19 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
- call = static_cast<grpc_call*>(
- gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
- channel_stack->call_stack_size));
- gpr_ref_init(&call->ext_ref, 1);
- gpr_atm_no_barrier_store(&call->cancelled, 0);
- call->arena = arena;
- grpc_call_combiner_init(&call->call_combiner);
+ call = new (gpr_arena_alloc(
+ arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
+ channel_stack->call_stack_size)) grpc_call(arena, *args);
*out_call = call;
- call->channel = args->channel;
- call->cq = args->cq;
- call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
- /* Always support no compression */
- GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
- call->is_client = args->server_transport_data == nullptr;
- call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
+ call->final_op.client.status_details = nullptr;
+ call->final_op.client.status = nullptr;
+ call->final_op.client.error_string = nullptr;
GRPC_STATS_INC_CLIENT_CALLS_CREATED();
GPR_ASSERT(args->add_initial_metadata_count <
MAX_SEND_EXTRA_METADATA_COUNT);
- for (i = 0; i < args->add_initial_metadata_count; i++) {
+ for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
call->send_extra_metadata[i].md = args->add_initial_metadata[i];
if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]),
GRPC_MDSTR_PATH)) {
@@ -332,23 +354,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
static_cast<int>(args->add_initial_metadata_count);
} else {
GRPC_STATS_INC_SERVER_CALLS_CREATED();
+ call->final_op.server.cancelled = nullptr;
call->final_op.server.server = args->server;
GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
- for (i = 0; i < 2; i++) {
- for (j = 0; j < 2; j++) {
- call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
- }
- }
- grpc_millis send_deadline = args->send_deadline;
+ grpc_millis send_deadline = args->send_deadline;
bool immediately_cancel = false;
if (args->parent != nullptr) {
- call->child =
- static_cast<child_call*>(gpr_arena_alloc(arena, sizeof(child_call)));
- call->child->parent = args->parent;
+ call->child = new (gpr_arena_alloc(arena, sizeof(child_call)))
+ child_call(args->parent);
GRPC_CALL_INTERNAL_REF(args->parent, "child");
GPR_ASSERT(call->is_client);
@@ -382,10 +399,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
}
}
}
-
call->send_deadline = send_deadline;
-
- GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_unref */
grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
args->server_transport_data,
@@ -413,6 +427,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
}
gpr_mu_unlock(&pc->child_list_mu);
}
+
if (error != GRPC_ERROR_NONE) {
cancel_with_error(call, GRPC_ERROR_REF(error));
}
@@ -487,9 +502,9 @@ void grpc_call_internal_unref(grpc_call* c REF_ARG) {
static void release_call(void* call, grpc_error* error) {
grpc_call* c = static_cast<grpc_call*>(call);
grpc_channel* channel = c->channel;
- gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string)));
- grpc_call_combiner_destroy(&c->call_combiner);
- grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
+ gpr_arena* arena = c->arena;
+ c->~grpc_call();
+ grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(arena));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
}
@@ -505,7 +520,7 @@ static void destroy_call(void* call, grpc_error* error) {
c->receiving_stream.reset();
parent_call* pc = get_parent_call(c);
if (pc != nullptr) {
- gpr_mu_destroy(&pc->child_list_mu);
+ pc->~parent_call();
}
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
@@ -679,6 +694,10 @@ static void cancel_with_error(grpc_call* c, grpc_error* error) {
execute_batch(c, op, &state->start_batch);
}
+void grpc_call_cancel_internal(grpc_call* call) {
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
+}
+
static grpc_error* error_from_status(grpc_status_code status,
const char* description) {
// copying 'description' is needed to ensure the grpc_call_cancel_with_status
@@ -1100,10 +1119,11 @@ static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
if (bctl->call != nullptr) {
return nullptr;
}
- memset(bctl, 0, sizeof(*bctl));
+ bctl->~batch_control();
+ bctl->op = {};
} else {
- bctl = static_cast<batch_control*>(
- gpr_arena_alloc(call->arena, sizeof(batch_control)));
+ bctl = new (gpr_arena_alloc(call->arena, sizeof(batch_control)))
+ batch_control();
*pslot = bctl;
}
bctl->call = call;
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index b34260505a..bd7295fe11 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -81,6 +81,10 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
size_t nops,
grpc_closure* closure);
+/* gRPC core internal version of grpc_call_cancel that does not create
+ * exec_ctx. */
+void grpc_call_cancel_internal(grpc_call* call);
+
/* Given the top call_element, get the call object. */
grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element);
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index d7095c24d4..e47cb4360e 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -39,6 +39,7 @@
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -63,6 +64,7 @@ struct grpc_channel {
grpc_compression_options compression_options;
gpr_atm call_size_estimate;
+ grpc_resource_user* resource_user;
gpr_mu registered_call_mu;
registered_call* registered_calls;
@@ -82,6 +84,8 @@ grpc_channel* grpc_channel_create_with_builder(
char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
grpc_channel_args* args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
+ grpc_resource_user* resource_user =
+ grpc_channel_stack_builder_get_resource_user(builder);
grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) {
GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
@@ -101,9 +105,11 @@ grpc_channel* grpc_channel_create_with_builder(
}
channel->target = target;
+ channel->resource_user = resource_user;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
- size_t channel_tracer_max_memory = 0; // default to off
+ size_t channel_tracer_max_memory =
+ GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT;
bool internal_channel = false;
// this creates the default ChannelNode. Different types of channels may
// override this to ensure a correct ChannelNode is created.
@@ -142,7 +148,6 @@ grpc_channel* grpc_channel_create_with_builder(
0x1; /* always support no compression */
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)) {
- GPR_ASSERT(channel_tracer_max_memory == 0);
const grpc_integer_options options = {
GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX};
channel_tracer_max_memory =
@@ -217,7 +222,8 @@ grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* input_args,
grpc_channel_stack_type channel_stack_type,
- grpc_transport* optional_transport) {
+ grpc_transport* optional_transport,
+ grpc_resource_user* resource_user) {
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
const grpc_core::UniquePtr<char> default_authority =
get_default_authority(input_args);
@@ -227,11 +233,17 @@ grpc_channel* grpc_channel_create(const char* target,
grpc_channel_args_destroy(args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
+ grpc_channel_stack_builder_set_resource_user(builder, resource_user);
if (!grpc_channel_init_create_stack(builder, channel_stack_type)) {
grpc_channel_stack_builder_destroy(builder);
+ if (resource_user != nullptr) {
+ grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
+ }
return nullptr;
}
- return grpc_channel_create_with_builder(builder, channel_stack_type);
+ grpc_channel* channel =
+ grpc_channel_create_with_builder(builder, channel_stack_type);
+ return channel;
}
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) {
@@ -310,8 +322,8 @@ static grpc_call* grpc_channel_create_call_internal(
}
grpc_call_create_args args;
- memset(&args, 0, sizeof(args));
args.channel = channel;
+ args.server = nullptr;
args.parent = parent_call;
args.propagation_mask = propagation_mask;
args.cq = cq;
@@ -441,6 +453,10 @@ static void destroy_channel(void* arg, grpc_error* error) {
GRPC_MDELEM_UNREF(rc->authority);
gpr_free(rc);
}
+ if (channel->resource_user != nullptr) {
+ grpc_resource_user_free(channel->resource_user,
+ GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
+ }
gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target);
gpr_free(channel);
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 4ac76b8a29..ab00b8e94f 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -29,7 +29,8 @@
grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* args,
grpc_channel_stack_type channel_stack_type,
- grpc_transport* optional_transport);
+ grpc_transport* optional_transport,
+ grpc_resource_user* resource_user = nullptr);
grpc_channel* grpc_channel_create_with_builder(
grpc_channel_stack_builder* builder,
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index b81ae73b4d..661022ec5f 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -859,8 +859,8 @@ static void cq_end_op_for_callback(
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_callback(cq);
gpr_mu_unlock(cq->mu);
+ cq_finish_shutdown_callback(cq);
} else {
gpr_mu_unlock(cq->mu);
}
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 0ad82fed99..67cf5d89bf 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -123,6 +123,7 @@ void grpc_init(void) {
grpc_core::Fork::GlobalInit();
grpc_fork_handlers_auto_register();
gpr_time_init();
+ gpr_arena_init();
grpc_stats_init();
grpc_slice_intern_init();
grpc_mdctx_global_init();
@@ -160,6 +161,7 @@ void grpc_shutdown(void) {
if (--g_initializations == 0) {
{
grpc_core::ExecCtx exec_ctx(0);
+ grpc_iomgr_shutdown_background_closure();
{
grpc_timer_manager_set_threading(
false); // shutdown timer_manager thread
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 35ab2c3bce..67b38e6f0c 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -28,6 +28,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <utility>
+
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h"
@@ -47,6 +49,10 @@
grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
+static void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
+static void server_recv_trailing_metadata_ready(void* user_data,
+ grpc_error* error);
+
namespace {
struct listener {
void* arg;
@@ -105,7 +111,7 @@ struct channel_data {
uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed;
- intptr_t socket_uuid;
+ grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node;
};
typedef struct shutdown_tag {
@@ -128,46 +134,73 @@ typedef enum {
typedef struct request_matcher request_matcher;
struct call_data {
+ call_data(grpc_call_element* elem, const grpc_call_element_args& args)
+ : call(grpc_call_from_top_element(elem)),
+ call_combiner(args.call_combiner) {
+ GRPC_CLOSURE_INIT(&server_on_recv_initial_metadata,
+ ::server_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ }
+ ~call_data() {
+ GPR_ASSERT(state != PENDING);
+ GRPC_ERROR_UNREF(recv_initial_metadata_error);
+ if (host_set) {
+ grpc_slice_unref_internal(host);
+ }
+ if (path_set) {
+ grpc_slice_unref_internal(path);
+ }
+ grpc_metadata_array_destroy(&initial_metadata);
+ grpc_byte_buffer_destroy(payload);
+ }
+
grpc_call* call;
- gpr_atm state;
+ gpr_atm state = NOT_STARTED;
- bool path_set;
- bool host_set;
+ bool path_set = false;
+ bool host_set = false;
grpc_slice path;
grpc_slice host;
- grpc_millis deadline;
+ grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
- grpc_completion_queue* cq_new;
+ grpc_completion_queue* cq_new = nullptr;
- grpc_metadata_batch* recv_initial_metadata;
- uint32_t recv_initial_metadata_flags;
- grpc_metadata_array initial_metadata;
+ grpc_metadata_batch* recv_initial_metadata = nullptr;
+ uint32_t recv_initial_metadata_flags = 0;
+ grpc_metadata_array initial_metadata =
+ grpc_metadata_array(); // Zero-initialize the C struct.
- request_matcher* matcher;
- grpc_byte_buffer* payload;
+ request_matcher* matcher = nullptr;
+ grpc_byte_buffer* payload = nullptr;
grpc_closure got_initial_metadata;
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure* on_done_recv_initial_metadata;
grpc_closure recv_trailing_metadata_ready;
- grpc_error* recv_initial_metadata_error;
+ grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
grpc_closure* original_recv_trailing_metadata_ready;
- grpc_error* recv_trailing_metadata_error;
- bool seen_recv_trailing_metadata_ready;
+ grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
+ bool seen_recv_trailing_metadata_ready = false;
grpc_closure publish;
- call_data* pending_next;
+ call_data* pending_next = nullptr;
grpc_call_combiner* call_combiner;
};
struct request_matcher {
+ request_matcher(grpc_server* server);
+ ~request_matcher();
+
grpc_server* server;
- call_data* pending_head;
- call_data* pending_tail;
- gpr_locked_mpscq* requests_per_cq;
+ std::atomic<call_data*> pending_head{nullptr};
+ call_data* pending_tail = nullptr;
+ gpr_locked_mpscq* requests_per_cq = nullptr;
};
struct registered_method {
@@ -189,6 +222,8 @@ typedef struct {
struct grpc_server {
grpc_channel_args* channel_args;
+ grpc_resource_user* default_resource_user;
+
grpc_completion_queue** cqs;
grpc_pollset** pollsets;
size_t cq_count;
@@ -314,22 +349,30 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb,
* request_matcher
*/
-static void request_matcher_init(request_matcher* rm, grpc_server* server) {
- memset(rm, 0, sizeof(*rm));
- rm->server = server;
- rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
- gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
+namespace {
+request_matcher::request_matcher(grpc_server* server) : server(server) {
+ requests_per_cq = static_cast<gpr_locked_mpscq*>(
+ gpr_malloc(sizeof(*requests_per_cq) * server->cq_count));
for (size_t i = 0; i < server->cq_count; i++) {
- gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
+ gpr_locked_mpscq_init(&requests_per_cq[i]);
}
}
-static void request_matcher_destroy(request_matcher* rm) {
- for (size_t i = 0; i < rm->server->cq_count; i++) {
- GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
- gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+request_matcher::~request_matcher() {
+ for (size_t i = 0; i < server->cq_count; i++) {
+ GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr);
+ gpr_locked_mpscq_destroy(&requests_per_cq[i]);
}
- gpr_free(rm->requests_per_cq);
+ gpr_free(requests_per_cq);
+}
+} // namespace
+
+static void request_matcher_init(request_matcher* rm, grpc_server* server) {
+ new (rm) request_matcher(server);
+}
+
+static void request_matcher_destroy(request_matcher* rm) {
+ rm->~request_matcher();
}
static void kill_zombie(void* elem, grpc_error* error) {
@@ -338,9 +381,10 @@ static void kill_zombie(void* elem, grpc_error* error) {
}
static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
- while (rm->pending_head) {
- call_data* calld = rm->pending_head;
- rm->pending_head = calld->pending_next;
+ call_data* calld;
+ while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
+ nullptr) {
+ rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
@@ -538,8 +582,9 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
- if (rm->pending_head == nullptr) {
- rm->pending_tail = rm->pending_head = calld;
+ if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
+ rm->pending_head.store(calld, std::memory_order_relaxed);
+ rm->pending_tail = calld;
} else {
rm->pending_tail->pending_next = calld;
rm->pending_tail = calld;
@@ -822,11 +867,16 @@ static void accept_stream(void* cd, grpc_transport* transport,
channel_data* chand = static_cast<channel_data*>(cd);
/* create a call */
grpc_call_create_args args;
- memset(&args, 0, sizeof(args));
args.channel = chand->channel;
+ args.server = chand->server;
+ args.parent = nullptr;
+ args.propagation_mask = 0;
+ args.cq = nullptr;
+ args.pollset_set_alternative = nullptr;
args.server_transport_data = transport_server_data;
+ args.add_initial_metadata = nullptr;
+ args.add_initial_metadata_count = 0;
args.send_deadline = GRPC_MILLIS_INF_FUTURE;
- args.server = chand->server;
grpc_call* call;
grpc_error* error = grpc_call_create(&args, &call);
grpc_call_element* elem =
@@ -838,8 +888,9 @@ static void accept_stream(void* cd, grpc_transport* transport,
}
call_data* calld = static_cast<call_data*>(elem->call_data);
grpc_op op;
- memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
+ op.flags = 0;
+ op.reserved = nullptr;
op.data.recv_initial_metadata.recv_initial_metadata =
&calld->initial_metadata;
GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
@@ -867,40 +918,18 @@ static void channel_connectivity_changed(void* cd, grpc_error* error) {
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- memset(calld, 0, sizeof(call_data));
- calld->deadline = GRPC_MILLIS_INF_FUTURE;
- calld->call = grpc_call_from_top_element(elem);
- calld->call_combiner = args->call_combiner;
-
- GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
- server_on_recv_initial_metadata, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
- server_recv_trailing_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
server_ref(chand->server);
+ new (elem->call_data) call_data(elem, *args);
return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
-
- GPR_ASSERT(calld->state != PENDING);
- GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
- if (calld->host_set) {
- grpc_slice_unref_internal(calld->host);
- }
- if (calld->path_set) {
- grpc_slice_unref_internal(calld->path);
- }
- grpc_metadata_array_destroy(&calld->initial_metadata);
- grpc_byte_buffer_destroy(calld->payload);
-
+ calld->~call_data();
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
server_unref(chand->server);
}
@@ -923,6 +952,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
static void destroy_channel_elem(grpc_channel_element* elem) {
size_t i;
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ chand->socket_node.reset();
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
grpc_slice_unref_internal(chand->registered_methods[i].method);
@@ -1024,6 +1054,15 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
grpc_slice_from_static_string("Server created"));
}
+ if (args != nullptr) {
+ grpc_resource_quota* resource_quota =
+ grpc_resource_quota_from_channel_args(args, false /* create */);
+ if (resource_quota != nullptr) {
+ server->default_resource_user =
+ grpc_resource_user_create(resource_quota, "default");
+ }
+ }
+
return server;
}
@@ -1119,10 +1158,11 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
*pollsets = server->pollsets;
}
-void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
- grpc_pollset* accepting_pollset,
- const grpc_channel_args* args,
- intptr_t socket_uuid) {
+void grpc_server_setup_transport(
+ grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
+ const grpc_channel_args* args,
+ grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
+ grpc_resource_user* resource_user) {
size_t num_registered_methods;
size_t alloc;
registered_method* rm;
@@ -1135,14 +1175,15 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
uint32_t max_probes = 0;
grpc_transport_op* op = nullptr;
- channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
+ channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
+ resource_user);
chand = static_cast<channel_data*>(
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
->channel_data);
chand->server = s;
server_ref(s);
chand->channel = channel;
- chand->socket_uuid = socket_uuid;
+ chand->socket_node = std::move(socket_node);
size_t cq_idx;
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
@@ -1218,14 +1259,13 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
}
void grpc_server_populate_server_sockets(
- grpc_server* s, grpc_core::channelz::ChildRefsList* server_sockets,
+ grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx) {
gpr_mu_lock(&s->mu_global);
channel_data* c = nullptr;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
- intptr_t socket_uuid = c->socket_uuid;
- if (socket_uuid >= start_idx) {
- server_sockets->push_back(socket_uuid);
+ if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) {
+ server_sockets->push_back(c->socket_node.get());
}
}
gpr_mu_unlock(&s->mu_global);
@@ -1330,6 +1370,13 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
GRPC_ERROR_NONE);
+
+ if (server->default_resource_user != nullptr) {
+ grpc_resource_quota_unref(
+ grpc_resource_user_quota(server->default_resource_user));
+ grpc_resource_user_shutdown(server->default_resource_user);
+ grpc_resource_user_unref(server->default_resource_user);
+ }
}
void grpc_server_cancel_all_calls(grpc_server* server) {
@@ -1401,30 +1448,39 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
rm = &rc->data.registered.method->matcher;
break;
}
- if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
- /* this was the first queued request: we need to lock and start
- matching calls */
- gpr_mu_lock(&server->mu_call);
- while ((calld = rm->pending_head) != nullptr) {
- rc = reinterpret_cast<requested_call*>(
- gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
- if (rc == nullptr) break;
- rm->pending_head = calld->pending_next;
- gpr_mu_unlock(&server->mu_call);
- if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
- // Zombied Call
- GRPC_CLOSURE_INIT(
- &calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
- } else {
- publish_call(server, calld, cq_idx, rc);
- }
- gpr_mu_lock(&server->mu_call);
- }
+
+ // Fast path: if there is no pending request to be processed, immediately
+ // return.
+ if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) ||
+ // Note: We are reading the pending_head without holding the server's call
+ // mutex. Even if we read a non-null value here due to reordering,
+ // we will check it below again after grabbing the lock.
+ rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
+ return GRPC_CALL_OK;
+ }
+ // Slow path: This was the first queued request and there are pendings:
+ // We need to lock and start matching calls.
+ gpr_mu_lock(&server->mu_call);
+ while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
+ nullptr) {
+ rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
+ if (rc == nullptr) break;
+ rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
gpr_mu_unlock(&server->mu_call);
+ if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
+ // Zombied Call
+ GRPC_CLOSURE_INIT(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ } else {
+ publish_call(server, calld, cq_idx, rc);
+ }
+ gpr_mu_lock(&server->mu_call);
}
+ gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
@@ -1546,6 +1602,10 @@ const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
return server->channel_args;
}
+grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
+ return server->default_resource_user;
+}
+
int grpc_server_has_open_connections(grpc_server* server) {
int r;
gpr_mu_lock(&server->mu_global);
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 33c205417e..393bb24214 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -44,14 +44,15 @@ void grpc_server_add_listener(grpc_server* server, void* listener,
/* Setup a transport - creates a channel stack, binds the transport to the
server */
-void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
- grpc_pollset* accepting_pollset,
- const grpc_channel_args* args,
- intptr_t socket_uuid);
+void grpc_server_setup_transport(
+ grpc_server* server, grpc_transport* transport,
+ grpc_pollset* accepting_pollset, const grpc_channel_args* args,
+ grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
+ grpc_resource_user* resource_user = nullptr);
/* fills in the uuids of all sockets used for connections on this server */
void grpc_server_populate_server_sockets(
- grpc_server* server, grpc_core::channelz::ChildRefsList* server_sockets,
+ grpc_server* server, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx);
/* fills in the uuids of all listen sockets on this server */
@@ -63,6 +64,8 @@ grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
+grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server);
+
int grpc_server_has_open_connections(grpc_server* server);
/* Do not call this before grpc_server_start. Returns the pollsets and the
diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc
index 66890ce65a..4829cc80a5 100644
--- a/src/core/lib/surface/version.cc
+++ b/src/core/lib/surface/version.cc
@@ -25,4 +25,4 @@
const char* grpc_version_string(void) { return "7.0.0-dev"; }
-const char* grpc_g_stands_for(void) { return "gizmo"; }
+const char* grpc_g_stands_for(void) { return "goose"; }