diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.cc | 180 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 4 | ||||
-rw-r--r-- | src/core/lib/surface/channel.cc | 26 | ||||
-rw-r--r-- | src/core/lib/surface/channel.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 2 | ||||
-rw-r--r-- | src/core/lib/surface/init.cc | 2 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 248 | ||||
-rw-r--r-- | src/core/lib/surface/server.h | 13 | ||||
-rw-r--r-- | src/core/lib/surface/version.cc | 2 |
9 files changed, 293 insertions, 187 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index a9349afa68..89b3f77822 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -72,8 +72,11 @@ // Used to create arena for the first call. #define ESTIMATED_MDELEM_COUNT 16 -typedef struct batch_control { - grpc_call* call; +struct batch_control { + batch_control() { gpr_ref_init(&steps_to_complete, 0); } + + grpc_call* call = nullptr; + grpc_transport_stream_op_batch op; /* Share memory for cq_completion and notify_tag as they are never needed simultaneously. Each byte used in this data structure count as six bytes per call, so any savings we can make are worthwhile, @@ -96,84 +99,110 @@ typedef struct batch_control { grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; - gpr_atm batch_error; - grpc_transport_stream_op_batch op; -} batch_control; + gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE); +}; + +struct parent_call { + parent_call() { gpr_mu_init(&child_list_mu); } + ~parent_call() { gpr_mu_destroy(&child_list_mu); } -typedef struct { gpr_mu child_list_mu; - grpc_call* first_child; -} parent_call; + grpc_call* first_child = nullptr; +}; -typedef struct { +struct child_call { + child_call(grpc_call* parent) : parent(parent) {} grpc_call* parent; /** siblings: children of the same parent form a list, and this list is protected under parent->mu */ - grpc_call* sibling_next; - grpc_call* sibling_prev; -} child_call; + grpc_call* sibling_next = nullptr; + grpc_call* sibling_prev = nullptr; +}; #define RECV_NONE ((gpr_atm)0) #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1) struct grpc_call { + grpc_call(gpr_arena* arena, const grpc_call_create_args& args) + : arena(arena), + cq(args.cq), + channel(args.channel), + is_client(args.server_transport_data == nullptr), + stream_op_payload(context) { + gpr_ref_init(&ext_ref, 1); + grpc_call_combiner_init(&call_combiner); + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE; + } + } + } + + ~grpc_call() { + gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string))); + grpc_call_combiner_destroy(&call_combiner); + } + gpr_refcount ext_ref; gpr_arena* arena; grpc_call_combiner call_combiner; grpc_completion_queue* cq; grpc_polling_entity pollent; grpc_channel* channel; - gpr_timespec start_time; - /* parent_call* */ gpr_atm parent_call_atm; - child_call* child; + gpr_timespec start_time = gpr_now(GPR_CLOCK_MONOTONIC); + /* parent_call* */ gpr_atm parent_call_atm = 0; + child_call* child = nullptr; /* client or server call */ bool is_client; /** has grpc_call_unref been called */ - bool destroy_called; + bool destroy_called = false; /** flag indicating that cancellation is inherited */ - bool cancellation_is_inherited; + bool cancellation_is_inherited = false; /** which ops are in-flight */ - bool sent_initial_metadata; - bool sending_message; - bool sent_final_op; - bool received_initial_metadata; - bool receiving_message; - bool requested_final_op; - gpr_atm any_ops_sent_atm; - gpr_atm received_final_op_atm; - - batch_control* active_batches[MAX_CONCURRENT_BATCHES]; + bool sent_initial_metadata = false; + bool sending_message = false; + bool sent_final_op = false; + bool received_initial_metadata = false; + bool receiving_message = false; + bool requested_final_op = false; + gpr_atm any_ops_sent_atm = 0; + gpr_atm received_final_op_atm = 0; + + batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {}; grpc_transport_stream_op_batch_payload stream_op_payload; /* first idx: is_receiving, second idx: is_trailing */ - grpc_metadata_batch metadata_batch[2][2]; + grpc_metadata_batch metadata_batch[2][2] = {}; /* Buffered read metadata waiting to be returned to the application. Element 0 is initial metadata, element 1 is trailing metadata. */ - grpc_metadata_array* buffered_metadata[2]; + grpc_metadata_array* buffered_metadata[2] = {}; grpc_metadata compression_md; // A char* indicating the peer name. - gpr_atm peer_string; + gpr_atm peer_string = 0; /* Call data useful used for reporting. Only valid after the call has * completed */ grpc_call_final_info final_info; /* Compression algorithm for *incoming* data */ - grpc_message_compression_algorithm incoming_message_compression_algorithm; + grpc_message_compression_algorithm incoming_message_compression_algorithm = + GRPC_MESSAGE_COMPRESS_NONE; /* Stream compression algorithm for *incoming* data */ - grpc_stream_compression_algorithm incoming_stream_compression_algorithm; - /* Supported encodings (compression algorithms), a bitset */ - uint32_t encodings_accepted_by_peer; + grpc_stream_compression_algorithm incoming_stream_compression_algorithm = + GRPC_STREAM_COMPRESS_NONE; + /* Supported encodings (compression algorithms), a bitset. + * Always support no compression. */ + uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE; /* Supported stream encodings (stream compression algorithms), a bitset */ - uint32_t stream_encodings_accepted_by_peer; + uint32_t stream_encodings_accepted_by_peer = 0; /* Contexts for various subsystems (security, tracing, ...). */ - grpc_call_context_element context[GRPC_CONTEXT_COUNT]; + grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {}; /* for the client, extra metadata is initial metadata; for the server, it's trailing metadata */ @@ -184,14 +213,14 @@ struct grpc_call { grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream; grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream; - grpc_byte_buffer** receiving_buffer; - grpc_slice receiving_slice; + grpc_byte_buffer** receiving_buffer = nullptr; + grpc_slice receiving_slice = grpc_empty_slice(); grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; grpc_closure receiving_trailing_metadata_ready; - uint32_t test_only_last_message_flags; - gpr_atm cancelled; + uint32_t test_only_last_message_flags = 0; + gpr_atm cancelled = 0; grpc_closure release_call; @@ -207,7 +236,7 @@ struct grpc_call { grpc_server* server; } server; } final_op; - gpr_atm status_error; + gpr_atm status_error = 0; /* recv_state can contain one of the following values: RECV_NONE : : no initial metadata and messages received @@ -225,7 +254,7 @@ struct grpc_call { For 1, 4: See receiving_initial_metadata_ready() function For 2, 3: See receiving_stream_ready() function */ - gpr_atm recv_state; + gpr_atm recv_state = 0; }; grpc_core::TraceFlag grpc_call_error_trace(false, "call_error"); @@ -269,11 +298,10 @@ void* grpc_call_arena_alloc(grpc_call* call, size_t size) { static parent_call* get_or_create_parent_call(grpc_call* call) { parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm); if (p == nullptr) { - p = static_cast<parent_call*>(gpr_arena_alloc(call->arena, sizeof(*p))); - gpr_mu_init(&p->child_list_mu); + p = new (gpr_arena_alloc(call->arena, sizeof(*p))) parent_call(); if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr, (gpr_atm)p)) { - gpr_mu_destroy(&p->child_list_mu); + p->~parent_call(); p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm); } } @@ -292,7 +320,9 @@ size_t grpc_call_get_initial_size_estimate() { grpc_error* grpc_call_create(const grpc_call_create_args* args, grpc_call** out_call) { GPR_TIMER_SCOPE("grpc_call_create", 0); - size_t i, j; + + GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); + grpc_error* error = GRPC_ERROR_NONE; grpc_channel_stack* channel_stack = grpc_channel_get_channel_stack(args->channel); @@ -300,27 +330,19 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, size_t initial_size = grpc_channel_get_call_size_estimate(args->channel); GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size); gpr_arena* arena = gpr_arena_create(initial_size); - call = static_cast<grpc_call*>( - gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + - channel_stack->call_stack_size)); - gpr_ref_init(&call->ext_ref, 1); - gpr_atm_no_barrier_store(&call->cancelled, 0); - call->arena = arena; - grpc_call_combiner_init(&call->call_combiner); + call = new (gpr_arena_alloc( + arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + + channel_stack->call_stack_size)) grpc_call(arena, *args); *out_call = call; - call->channel = args->channel; - call->cq = args->cq; - call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); - /* Always support no compression */ - GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE); - call->is_client = args->server_transport_data == nullptr; - call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { + call->final_op.client.status_details = nullptr; + call->final_op.client.status = nullptr; + call->final_op.client.error_string = nullptr; GRPC_STATS_INC_CLIENT_CALLS_CREATED(); GPR_ASSERT(args->add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); - for (i = 0; i < args->add_initial_metadata_count; i++) { + for (size_t i = 0; i < args->add_initial_metadata_count; i++) { call->send_extra_metadata[i].md = args->add_initial_metadata[i]; if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) { @@ -332,23 +354,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, static_cast<int>(args->add_initial_metadata_count); } else { GRPC_STATS_INC_SERVER_CALLS_CREATED(); + call->final_op.server.cancelled = nullptr; call->final_op.server.server = args->server; GPR_ASSERT(args->add_initial_metadata_count == 0); call->send_extra_metadata_count = 0; } - for (i = 0; i < 2; i++) { - for (j = 0; j < 2; j++) { - call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE; - } - } - grpc_millis send_deadline = args->send_deadline; + grpc_millis send_deadline = args->send_deadline; bool immediately_cancel = false; if (args->parent != nullptr) { - call->child = - static_cast<child_call*>(gpr_arena_alloc(arena, sizeof(child_call))); - call->child->parent = args->parent; + call->child = new (gpr_arena_alloc(arena, sizeof(child_call))) + child_call(args->parent); GRPC_CALL_INTERNAL_REF(args->parent, "child"); GPR_ASSERT(call->is_client); @@ -382,10 +399,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, } } } - call->send_deadline = send_deadline; - - GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_unref */ grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call), args->server_transport_data, @@ -413,6 +427,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, } gpr_mu_unlock(&pc->child_list_mu); } + if (error != GRPC_ERROR_NONE) { cancel_with_error(call, GRPC_ERROR_REF(error)); } @@ -487,9 +502,9 @@ void grpc_call_internal_unref(grpc_call* c REF_ARG) { static void release_call(void* call, grpc_error* error) { grpc_call* c = static_cast<grpc_call*>(call); grpc_channel* channel = c->channel; - gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string))); - grpc_call_combiner_destroy(&c->call_combiner); - grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); + gpr_arena* arena = c->arena; + c->~grpc_call(); + grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(arena)); GRPC_CHANNEL_INTERNAL_UNREF(channel, "call"); } @@ -505,7 +520,7 @@ static void destroy_call(void* call, grpc_error* error) { c->receiving_stream.reset(); parent_call* pc = get_parent_call(c); if (pc != nullptr) { - gpr_mu_destroy(&pc->child_list_mu); + pc->~parent_call(); } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md); @@ -679,6 +694,10 @@ static void cancel_with_error(grpc_call* c, grpc_error* error) { execute_batch(c, op, &state->start_batch); } +void grpc_call_cancel_internal(grpc_call* call) { + cancel_with_error(call, GRPC_ERROR_CANCELLED); +} + static grpc_error* error_from_status(grpc_status_code status, const char* description) { // copying 'description' is needed to ensure the grpc_call_cancel_with_status @@ -1100,10 +1119,11 @@ static batch_control* reuse_or_allocate_batch_control(grpc_call* call, if (bctl->call != nullptr) { return nullptr; } - memset(bctl, 0, sizeof(*bctl)); + bctl->~batch_control(); + bctl->op = {}; } else { - bctl = static_cast<batch_control*>( - gpr_arena_alloc(call->arena, sizeof(batch_control))); + bctl = new (gpr_arena_alloc(call->arena, sizeof(batch_control))) + batch_control(); *pslot = bctl; } bctl->call = call; diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index b34260505a..bd7295fe11 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -81,6 +81,10 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call, size_t nops, grpc_closure* closure); +/* gRPC core internal version of grpc_call_cancel that does not create + * exec_ctx. */ +void grpc_call_cancel_internal(grpc_call* call); + /* Given the top call_element, get the call object. */ grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element); diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index d7095c24d4..e47cb4360e 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -39,6 +39,7 @@ #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -63,6 +64,7 @@ struct grpc_channel { grpc_compression_options compression_options; gpr_atm call_size_estimate; + grpc_resource_user* resource_user; gpr_mu registered_call_mu; registered_call* registered_calls; @@ -82,6 +84,8 @@ grpc_channel* grpc_channel_create_with_builder( char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder)); grpc_channel_args* args = grpc_channel_args_copy( grpc_channel_stack_builder_get_channel_arguments(builder)); + grpc_resource_user* resource_user = + grpc_channel_stack_builder_get_resource_user(builder); grpc_channel* channel; if (channel_stack_type == GRPC_SERVER_CHANNEL) { GRPC_STATS_INC_SERVER_CHANNELS_CREATED(); @@ -101,9 +105,11 @@ grpc_channel* grpc_channel_create_with_builder( } channel->target = target; + channel->resource_user = resource_user; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT; - size_t channel_tracer_max_memory = 0; // default to off + size_t channel_tracer_max_memory = + GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT; bool internal_channel = false; // this creates the default ChannelNode. Different types of channels may // override this to ensure a correct ChannelNode is created. @@ -142,7 +148,6 @@ grpc_channel* grpc_channel_create_with_builder( 0x1; /* always support no compression */ } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)) { - GPR_ASSERT(channel_tracer_max_memory == 0); const grpc_integer_options options = { GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; channel_tracer_max_memory = @@ -217,7 +222,8 @@ grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node( grpc_channel* grpc_channel_create(const char* target, const grpc_channel_args* input_args, grpc_channel_stack_type channel_stack_type, - grpc_transport* optional_transport) { + grpc_transport* optional_transport, + grpc_resource_user* resource_user) { grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); const grpc_core::UniquePtr<char> default_authority = get_default_authority(input_args); @@ -227,11 +233,17 @@ grpc_channel* grpc_channel_create(const char* target, grpc_channel_args_destroy(args); grpc_channel_stack_builder_set_target(builder, target); grpc_channel_stack_builder_set_transport(builder, optional_transport); + grpc_channel_stack_builder_set_resource_user(builder, resource_user); if (!grpc_channel_init_create_stack(builder, channel_stack_type)) { grpc_channel_stack_builder_destroy(builder); + if (resource_user != nullptr) { + grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } return nullptr; } - return grpc_channel_create_with_builder(builder, channel_stack_type); + grpc_channel* channel = + grpc_channel_create_with_builder(builder, channel_stack_type); + return channel; } size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) { @@ -310,8 +322,8 @@ static grpc_call* grpc_channel_create_call_internal( } grpc_call_create_args args; - memset(&args, 0, sizeof(args)); args.channel = channel; + args.server = nullptr; args.parent = parent_call; args.propagation_mask = propagation_mask; args.cq = cq; @@ -441,6 +453,10 @@ static void destroy_channel(void* arg, grpc_error* error) { GRPC_MDELEM_UNREF(rc->authority); gpr_free(rc); } + if (channel->resource_user != nullptr) { + grpc_resource_user_free(channel->resource_user, + GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel->target); gpr_free(channel); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 4ac76b8a29..ab00b8e94f 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -29,7 +29,8 @@ grpc_channel* grpc_channel_create(const char* target, const grpc_channel_args* args, grpc_channel_stack_type channel_stack_type, - grpc_transport* optional_transport); + grpc_transport* optional_transport, + grpc_resource_user* resource_user = nullptr); grpc_channel* grpc_channel_create_with_builder( grpc_channel_stack_builder* builder, diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index b81ae73b4d..661022ec5f 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -859,8 +859,8 @@ static void cq_end_op_for_callback( gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_callback(cq); gpr_mu_unlock(cq->mu); + cq_finish_shutdown_callback(cq); } else { gpr_mu_unlock(cq->mu); } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 0ad82fed99..67cf5d89bf 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -123,6 +123,7 @@ void grpc_init(void) { grpc_core::Fork::GlobalInit(); grpc_fork_handlers_auto_register(); gpr_time_init(); + gpr_arena_init(); grpc_stats_init(); grpc_slice_intern_init(); grpc_mdctx_global_init(); @@ -160,6 +161,7 @@ void grpc_shutdown(void) { if (--g_initializations == 0) { { grpc_core::ExecCtx exec_ctx(0); + grpc_iomgr_shutdown_background_closure(); { grpc_timer_manager_set_threading( false); // shutdown timer_manager thread diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 35ab2c3bce..67b38e6f0c 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -28,6 +28,8 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <utility> + #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" @@ -47,6 +49,10 @@ grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel"); +static void server_on_recv_initial_metadata(void* ptr, grpc_error* error); +static void server_recv_trailing_metadata_ready(void* user_data, + grpc_error* error); + namespace { struct listener { void* arg; @@ -105,7 +111,7 @@ struct channel_data { uint32_t registered_method_max_probes; grpc_closure finish_destroy_channel_closure; grpc_closure channel_connectivity_changed; - intptr_t socket_uuid; + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node; }; typedef struct shutdown_tag { @@ -128,46 +134,73 @@ typedef enum { typedef struct request_matcher request_matcher; struct call_data { + call_data(grpc_call_element* elem, const grpc_call_element_args& args) + : call(grpc_call_from_top_element(elem)), + call_combiner(args.call_combiner) { + GRPC_CLOSURE_INIT(&server_on_recv_initial_metadata, + ::server_on_recv_initial_metadata, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, + server_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + } + ~call_data() { + GPR_ASSERT(state != PENDING); + GRPC_ERROR_UNREF(recv_initial_metadata_error); + if (host_set) { + grpc_slice_unref_internal(host); + } + if (path_set) { + grpc_slice_unref_internal(path); + } + grpc_metadata_array_destroy(&initial_metadata); + grpc_byte_buffer_destroy(payload); + } + grpc_call* call; - gpr_atm state; + gpr_atm state = NOT_STARTED; - bool path_set; - bool host_set; + bool path_set = false; + bool host_set = false; grpc_slice path; grpc_slice host; - grpc_millis deadline; + grpc_millis deadline = GRPC_MILLIS_INF_FUTURE; - grpc_completion_queue* cq_new; + grpc_completion_queue* cq_new = nullptr; - grpc_metadata_batch* recv_initial_metadata; - uint32_t recv_initial_metadata_flags; - grpc_metadata_array initial_metadata; + grpc_metadata_batch* recv_initial_metadata = nullptr; + uint32_t recv_initial_metadata_flags = 0; + grpc_metadata_array initial_metadata = + grpc_metadata_array(); // Zero-initialize the C struct. - request_matcher* matcher; - grpc_byte_buffer* payload; + request_matcher* matcher = nullptr; + grpc_byte_buffer* payload = nullptr; grpc_closure got_initial_metadata; grpc_closure server_on_recv_initial_metadata; grpc_closure kill_zombie_closure; grpc_closure* on_done_recv_initial_metadata; grpc_closure recv_trailing_metadata_ready; - grpc_error* recv_initial_metadata_error; + grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; grpc_closure* original_recv_trailing_metadata_ready; - grpc_error* recv_trailing_metadata_error; - bool seen_recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE; + bool seen_recv_trailing_metadata_ready = false; grpc_closure publish; - call_data* pending_next; + call_data* pending_next = nullptr; grpc_call_combiner* call_combiner; }; struct request_matcher { + request_matcher(grpc_server* server); + ~request_matcher(); + grpc_server* server; - call_data* pending_head; - call_data* pending_tail; - gpr_locked_mpscq* requests_per_cq; + std::atomic<call_data*> pending_head{nullptr}; + call_data* pending_tail = nullptr; + gpr_locked_mpscq* requests_per_cq = nullptr; }; struct registered_method { @@ -189,6 +222,8 @@ typedef struct { struct grpc_server { grpc_channel_args* channel_args; + grpc_resource_user* default_resource_user; + grpc_completion_queue** cqs; grpc_pollset** pollsets; size_t cq_count; @@ -314,22 +349,30 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb, * request_matcher */ -static void request_matcher_init(request_matcher* rm, grpc_server* server) { - memset(rm, 0, sizeof(*rm)); - rm->server = server; - rm->requests_per_cq = static_cast<gpr_locked_mpscq*>( - gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count)); +namespace { +request_matcher::request_matcher(grpc_server* server) : server(server) { + requests_per_cq = static_cast<gpr_locked_mpscq*>( + gpr_malloc(sizeof(*requests_per_cq) * server->cq_count)); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + gpr_locked_mpscq_init(&requests_per_cq[i]); } } -static void request_matcher_destroy(request_matcher* rm) { - for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); +request_matcher::~request_matcher() { + for (size_t i = 0; i < server->cq_count; i++) { + GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr); + gpr_locked_mpscq_destroy(&requests_per_cq[i]); } - gpr_free(rm->requests_per_cq); + gpr_free(requests_per_cq); +} +} // namespace + +static void request_matcher_init(request_matcher* rm, grpc_server* server) { + new (rm) request_matcher(server); +} + +static void request_matcher_destroy(request_matcher* rm) { + rm->~request_matcher(); } static void kill_zombie(void* elem, grpc_error* error) { @@ -338,9 +381,10 @@ static void kill_zombie(void* elem, grpc_error* error) { } static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { - while (rm->pending_head) { - call_data* calld = rm->pending_head; - rm->pending_head = calld->pending_next; + call_data* calld; + while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != + nullptr) { + rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, @@ -538,8 +582,9 @@ static void publish_new_rpc(void* arg, grpc_error* error) { } gpr_atm_no_barrier_store(&calld->state, PENDING); - if (rm->pending_head == nullptr) { - rm->pending_tail = rm->pending_head = calld; + if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) { + rm->pending_head.store(calld, std::memory_order_relaxed); + rm->pending_tail = calld; } else { rm->pending_tail->pending_next = calld; rm->pending_tail = calld; @@ -822,11 +867,16 @@ static void accept_stream(void* cd, grpc_transport* transport, channel_data* chand = static_cast<channel_data*>(cd); /* create a call */ grpc_call_create_args args; - memset(&args, 0, sizeof(args)); args.channel = chand->channel; + args.server = chand->server; + args.parent = nullptr; + args.propagation_mask = 0; + args.cq = nullptr; + args.pollset_set_alternative = nullptr; args.server_transport_data = transport_server_data; + args.add_initial_metadata = nullptr; + args.add_initial_metadata_count = 0; args.send_deadline = GRPC_MILLIS_INF_FUTURE; - args.server = chand->server; grpc_call* call; grpc_error* error = grpc_call_create(&args, &call); grpc_call_element* elem = @@ -838,8 +888,9 @@ static void accept_stream(void* cd, grpc_transport* transport, } call_data* calld = static_cast<call_data*>(elem->call_data); grpc_op op; - memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_INITIAL_METADATA; + op.flags = 0; + op.reserved = nullptr; op.data.recv_initial_metadata.recv_initial_metadata = &calld->initial_metadata; GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem, @@ -867,40 +918,18 @@ static void channel_connectivity_changed(void* cd, grpc_error* error) { static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data); - memset(calld, 0, sizeof(call_data)); - calld->deadline = GRPC_MILLIS_INF_FUTURE; - calld->call = grpc_call_from_top_element(elem); - calld->call_combiner = args->call_combiner; - - GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, - server_on_recv_initial_metadata, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - server_recv_trailing_metadata_ready, elem, - grpc_schedule_on_exec_ctx); server_ref(chand->server); + new (elem->call_data) call_data(elem, *args); return GRPC_ERROR_NONE; } static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); - - GPR_ASSERT(calld->state != PENDING); - GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); - if (calld->host_set) { - grpc_slice_unref_internal(calld->host); - } - if (calld->path_set) { - grpc_slice_unref_internal(calld->path); - } - grpc_metadata_array_destroy(&calld->initial_metadata); - grpc_byte_buffer_destroy(calld->payload); - + calld->~call_data(); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); server_unref(chand->server); } @@ -923,6 +952,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, static void destroy_channel_elem(grpc_channel_element* elem) { size_t i; channel_data* chand = static_cast<channel_data*>(elem->channel_data); + chand->socket_node.reset(); if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { grpc_slice_unref_internal(chand->registered_methods[i].method); @@ -1024,6 +1054,15 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { grpc_slice_from_static_string("Server created")); } + if (args != nullptr) { + grpc_resource_quota* resource_quota = + grpc_resource_quota_from_channel_args(args, false /* create */); + if (resource_quota != nullptr) { + server->default_resource_user = + grpc_resource_user_create(resource_quota, "default"); + } + } + return server; } @@ -1119,10 +1158,11 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, *pollsets = server->pollsets; } -void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, - grpc_pollset* accepting_pollset, - const grpc_channel_args* args, - intptr_t socket_uuid) { +void grpc_server_setup_transport( + grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, + const grpc_channel_args* args, + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node, + grpc_resource_user* resource_user) { size_t num_registered_methods; size_t alloc; registered_method* rm; @@ -1135,14 +1175,15 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, uint32_t max_probes = 0; grpc_transport_op* op = nullptr; - channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport); + channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport, + resource_user); chand = static_cast<channel_data*>( grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0) ->channel_data); chand->server = s; server_ref(s); chand->channel = channel; - chand->socket_uuid = socket_uuid; + chand->socket_node = std::move(socket_node); size_t cq_idx; for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { @@ -1218,14 +1259,13 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, } void grpc_server_populate_server_sockets( - grpc_server* s, grpc_core::channelz::ChildRefsList* server_sockets, + grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets, intptr_t start_idx) { gpr_mu_lock(&s->mu_global); channel_data* c = nullptr; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { - intptr_t socket_uuid = c->socket_uuid; - if (socket_uuid >= start_idx) { - server_sockets->push_back(socket_uuid); + if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) { + server_sockets->push_back(c->socket_node.get()); } } gpr_mu_unlock(&s->mu_global); @@ -1330,6 +1370,13 @@ void grpc_server_shutdown_and_notify(grpc_server* server, channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */, GRPC_ERROR_NONE); + + if (server->default_resource_user != nullptr) { + grpc_resource_quota_unref( + grpc_resource_user_quota(server->default_resource_user)); + grpc_resource_user_shutdown(server->default_resource_user); + grpc_resource_user_unref(server->default_resource_user); + } } void grpc_server_cancel_all_calls(grpc_server* server) { @@ -1401,30 +1448,39 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, rm = &rc->data.registered.method->matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { - /* this was the first queued request: we need to lock and start - matching calls */ - gpr_mu_lock(&server->mu_call); - while ((calld = rm->pending_head) != nullptr) { - rc = reinterpret_cast<requested_call*>( - gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); - if (rc == nullptr) break; - rm->pending_head = calld->pending_next; - gpr_mu_unlock(&server->mu_call); - if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { - // Zombied Call - GRPC_CLOSURE_INIT( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); - } else { - publish_call(server, calld, cq_idx, rc); - } - gpr_mu_lock(&server->mu_call); - } + + // Fast path: if there is no pending request to be processed, immediately + // return. + if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) || + // Note: We are reading the pending_head without holding the server's call + // mutex. Even if we read a non-null value here due to reordering, + // we will check it below again after grabbing the lock. + rm->pending_head.load(std::memory_order_relaxed) == nullptr) { + return GRPC_CALL_OK; + } + // Slow path: This was the first queued request and there are pendings: + // We need to lock and start matching calls. + gpr_mu_lock(&server->mu_call); + while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != + nullptr) { + rc = reinterpret_cast<requested_call*>( + gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); + if (rc == nullptr) break; + rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); gpr_mu_unlock(&server->mu_call); + if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { + // Zombied Call + GRPC_CLOSURE_INIT( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + } else { + publish_call(server, calld, cq_idx, rc); + } + gpr_mu_lock(&server->mu_call); } + gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } @@ -1546,6 +1602,10 @@ const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) { return server->channel_args; } +grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) { + return server->default_resource_user; +} + int grpc_server_has_open_connections(grpc_server* server) { int r; gpr_mu_lock(&server->mu_global); diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 33c205417e..393bb24214 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -44,14 +44,15 @@ void grpc_server_add_listener(grpc_server* server, void* listener, /* Setup a transport - creates a channel stack, binds the transport to the server */ -void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport, - grpc_pollset* accepting_pollset, - const grpc_channel_args* args, - intptr_t socket_uuid); +void grpc_server_setup_transport( + grpc_server* server, grpc_transport* transport, + grpc_pollset* accepting_pollset, const grpc_channel_args* args, + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node, + grpc_resource_user* resource_user = nullptr); /* fills in the uuids of all sockets used for connections on this server */ void grpc_server_populate_server_sockets( - grpc_server* server, grpc_core::channelz::ChildRefsList* server_sockets, + grpc_server* server, grpc_core::channelz::ChildSocketsList* server_sockets, intptr_t start_idx); /* fills in the uuids of all listen sockets on this server */ @@ -63,6 +64,8 @@ grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server); +grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server); + int grpc_server_has_open_connections(grpc_server* server); /* Do not call this before grpc_server_start. Returns the pollsets and the diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index 66890ce65a..4829cc80a5 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -25,4 +25,4 @@ const char* grpc_version_string(void) { return "7.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "gizmo"; } +const char* grpc_g_stands_for(void) { return "goose"; } |