diff options
Diffstat (limited to 'src/core/lib/surface/server.cc')
-rw-r--r-- | src/core/lib/surface/server.cc | 128 |
1 files changed, 76 insertions, 52 deletions
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 5dc81b29bb..67b38e6f0c 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -28,6 +28,8 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <utility> + #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" @@ -109,7 +111,7 @@ struct channel_data { uint32_t registered_method_max_probes; grpc_closure finish_destroy_channel_closure; grpc_closure channel_connectivity_changed; - intptr_t socket_uuid; + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node; }; typedef struct shutdown_tag { @@ -192,10 +194,13 @@ struct call_data { }; struct request_matcher { + request_matcher(grpc_server* server); + ~request_matcher(); + grpc_server* server; - call_data* pending_head; - call_data* pending_tail; - gpr_locked_mpscq* requests_per_cq; + std::atomic<call_data*> pending_head{nullptr}; + call_data* pending_tail = nullptr; + gpr_locked_mpscq* requests_per_cq = nullptr; }; struct registered_method { @@ -344,22 +349,30 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb, * request_matcher */ -static void request_matcher_init(request_matcher* rm, grpc_server* server) { - memset(rm, 0, sizeof(*rm)); - rm->server = server; - rm->requests_per_cq = static_cast<gpr_locked_mpscq*>( - gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count)); +namespace { +request_matcher::request_matcher(grpc_server* server) : server(server) { + requests_per_cq = static_cast<gpr_locked_mpscq*>( + gpr_malloc(sizeof(*requests_per_cq) * server->cq_count)); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + gpr_locked_mpscq_init(&requests_per_cq[i]); } } -static void request_matcher_destroy(request_matcher* rm) { - for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); +request_matcher::~request_matcher() { + for (size_t i = 0; i < server->cq_count; i++) { + GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr); + gpr_locked_mpscq_destroy(&requests_per_cq[i]); } - gpr_free(rm->requests_per_cq); + gpr_free(requests_per_cq); +} +} // namespace + +static void request_matcher_init(request_matcher* rm, grpc_server* server) { + new (rm) request_matcher(server); +} + +static void request_matcher_destroy(request_matcher* rm) { + rm->~request_matcher(); } static void kill_zombie(void* elem, grpc_error* error) { @@ -368,9 +381,10 @@ static void kill_zombie(void* elem, grpc_error* error) { } static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { - while (rm->pending_head) { - call_data* calld = rm->pending_head; - rm->pending_head = calld->pending_next; + call_data* calld; + while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != + nullptr) { + rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, @@ -568,8 +582,9 @@ static void publish_new_rpc(void* arg, grpc_error* error) { } gpr_atm_no_barrier_store(&calld->state, PENDING); - if (rm->pending_head == nullptr) { - rm->pending_tail = rm->pending_head = calld; + if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) { + rm->pending_head.store(calld, std::memory_order_relaxed); + rm->pending_tail = calld; } else { rm->pending_tail->pending_next = calld; rm->pending_tail = calld; @@ -937,6 +952,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, static void destroy_channel_elem(grpc_channel_element* elem) { size_t i; channel_data* chand = static_cast<channel_data*>(elem->channel_data); + chand->socket_node.reset(); if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { grpc_slice_unref_internal(chand->registered_methods[i].method); @@ -1142,11 +1158,11 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, *pollsets = server->pollsets; } -void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, - grpc_pollset* accepting_pollset, - const grpc_channel_args* args, - intptr_t socket_uuid, - grpc_resource_user* resource_user) { +void grpc_server_setup_transport( + grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, + const grpc_channel_args* args, + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node, + grpc_resource_user* resource_user) { size_t num_registered_methods; size_t alloc; registered_method* rm; @@ -1167,7 +1183,7 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, chand->server = s; server_ref(s); chand->channel = channel; - chand->socket_uuid = socket_uuid; + chand->socket_node = std::move(socket_node); size_t cq_idx; for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { @@ -1243,14 +1259,13 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, } void grpc_server_populate_server_sockets( - grpc_server* s, grpc_core::channelz::ChildRefsList* server_sockets, + grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets, intptr_t start_idx) { gpr_mu_lock(&s->mu_global); channel_data* c = nullptr; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { - intptr_t socket_uuid = c->socket_uuid; - if (socket_uuid >= start_idx) { - server_sockets->push_back(socket_uuid); + if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) { + server_sockets->push_back(c->socket_node.get()); } } gpr_mu_unlock(&s->mu_global); @@ -1433,30 +1448,39 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, rm = &rc->data.registered.method->matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { - /* this was the first queued request: we need to lock and start - matching calls */ - gpr_mu_lock(&server->mu_call); - while ((calld = rm->pending_head) != nullptr) { - rc = reinterpret_cast<requested_call*>( - gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); - if (rc == nullptr) break; - rm->pending_head = calld->pending_next; - gpr_mu_unlock(&server->mu_call); - if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { - // Zombied Call - GRPC_CLOSURE_INIT( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); - } else { - publish_call(server, calld, cq_idx, rc); - } - gpr_mu_lock(&server->mu_call); - } + + // Fast path: if there is no pending request to be processed, immediately + // return. + if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) || + // Note: We are reading the pending_head without holding the server's call + // mutex. Even if we read a non-null value here due to reordering, + // we will check it below again after grabbing the lock. + rm->pending_head.load(std::memory_order_relaxed) == nullptr) { + return GRPC_CALL_OK; + } + // Slow path: This was the first queued request and there are pendings: + // We need to lock and start matching calls. + gpr_mu_lock(&server->mu_call); + while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != + nullptr) { + rc = reinterpret_cast<requested_call*>( + gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); + if (rc == nullptr) break; + rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); gpr_mu_unlock(&server->mu_call); + if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { + // Zombied Call + GRPC_CLOSURE_INIT( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + } else { + publish_call(server, calld, cq_idx, rc); + } + gpr_mu_lock(&server->mu_call); } + gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } |