aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/server.cc')
-rw-r--r--src/core/lib/surface/server.cc212
1 files changed, 121 insertions, 91 deletions
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 93b1933809..67b38e6f0c 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -28,6 +28,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <utility>
+
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h"
@@ -47,6 +49,10 @@
grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
+static void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
+static void server_recv_trailing_metadata_ready(void* user_data,
+ grpc_error* error);
+
namespace {
struct listener {
void* arg;
@@ -105,7 +111,7 @@ struct channel_data {
uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed;
- intptr_t socket_uuid;
+ grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node;
};
typedef struct shutdown_tag {
@@ -128,46 +134,73 @@ typedef enum {
typedef struct request_matcher request_matcher;
struct call_data {
+ call_data(grpc_call_element* elem, const grpc_call_element_args& args)
+ : call(grpc_call_from_top_element(elem)),
+ call_combiner(args.call_combiner) {
+ GRPC_CLOSURE_INIT(&server_on_recv_initial_metadata,
+ ::server_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ }
+ ~call_data() {
+ GPR_ASSERT(state != PENDING);
+ GRPC_ERROR_UNREF(recv_initial_metadata_error);
+ if (host_set) {
+ grpc_slice_unref_internal(host);
+ }
+ if (path_set) {
+ grpc_slice_unref_internal(path);
+ }
+ grpc_metadata_array_destroy(&initial_metadata);
+ grpc_byte_buffer_destroy(payload);
+ }
+
grpc_call* call;
- gpr_atm state;
+ gpr_atm state = NOT_STARTED;
- bool path_set;
- bool host_set;
+ bool path_set = false;
+ bool host_set = false;
grpc_slice path;
grpc_slice host;
- grpc_millis deadline;
+ grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
- grpc_completion_queue* cq_new;
+ grpc_completion_queue* cq_new = nullptr;
- grpc_metadata_batch* recv_initial_metadata;
- uint32_t recv_initial_metadata_flags;
- grpc_metadata_array initial_metadata;
+ grpc_metadata_batch* recv_initial_metadata = nullptr;
+ uint32_t recv_initial_metadata_flags = 0;
+ grpc_metadata_array initial_metadata =
+ grpc_metadata_array(); // Zero-initialize the C struct.
- request_matcher* matcher;
- grpc_byte_buffer* payload;
+ request_matcher* matcher = nullptr;
+ grpc_byte_buffer* payload = nullptr;
grpc_closure got_initial_metadata;
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure* on_done_recv_initial_metadata;
grpc_closure recv_trailing_metadata_ready;
- grpc_error* recv_initial_metadata_error;
+ grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
grpc_closure* original_recv_trailing_metadata_ready;
- grpc_error* recv_trailing_metadata_error;
- bool seen_recv_trailing_metadata_ready;
+ grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
+ bool seen_recv_trailing_metadata_ready = false;
grpc_closure publish;
- call_data* pending_next;
+ call_data* pending_next = nullptr;
grpc_call_combiner* call_combiner;
};
struct request_matcher {
+ request_matcher(grpc_server* server);
+ ~request_matcher();
+
grpc_server* server;
- call_data* pending_head;
- call_data* pending_tail;
- gpr_locked_mpscq* requests_per_cq;
+ std::atomic<call_data*> pending_head{nullptr};
+ call_data* pending_tail = nullptr;
+ gpr_locked_mpscq* requests_per_cq = nullptr;
};
struct registered_method {
@@ -316,22 +349,30 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb,
* request_matcher
*/
-static void request_matcher_init(request_matcher* rm, grpc_server* server) {
- memset(rm, 0, sizeof(*rm));
- rm->server = server;
- rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
- gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
+namespace {
+request_matcher::request_matcher(grpc_server* server) : server(server) {
+ requests_per_cq = static_cast<gpr_locked_mpscq*>(
+ gpr_malloc(sizeof(*requests_per_cq) * server->cq_count));
for (size_t i = 0; i < server->cq_count; i++) {
- gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
+ gpr_locked_mpscq_init(&requests_per_cq[i]);
}
}
-static void request_matcher_destroy(request_matcher* rm) {
- for (size_t i = 0; i < rm->server->cq_count; i++) {
- GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
- gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+request_matcher::~request_matcher() {
+ for (size_t i = 0; i < server->cq_count; i++) {
+ GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr);
+ gpr_locked_mpscq_destroy(&requests_per_cq[i]);
}
- gpr_free(rm->requests_per_cq);
+ gpr_free(requests_per_cq);
+}
+} // namespace
+
+static void request_matcher_init(request_matcher* rm, grpc_server* server) {
+ new (rm) request_matcher(server);
+}
+
+static void request_matcher_destroy(request_matcher* rm) {
+ rm->~request_matcher();
}
static void kill_zombie(void* elem, grpc_error* error) {
@@ -340,9 +381,10 @@ static void kill_zombie(void* elem, grpc_error* error) {
}
static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
- while (rm->pending_head) {
- call_data* calld = rm->pending_head;
- rm->pending_head = calld->pending_next;
+ call_data* calld;
+ while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
+ nullptr) {
+ rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
@@ -540,8 +582,9 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
- if (rm->pending_head == nullptr) {
- rm->pending_tail = rm->pending_head = calld;
+ if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
+ rm->pending_head.store(calld, std::memory_order_relaxed);
+ rm->pending_tail = calld;
} else {
rm->pending_tail->pending_next = calld;
rm->pending_tail = calld;
@@ -875,40 +918,18 @@ static void channel_connectivity_changed(void* cd, grpc_error* error) {
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- memset(calld, 0, sizeof(call_data));
- calld->deadline = GRPC_MILLIS_INF_FUTURE;
- calld->call = grpc_call_from_top_element(elem);
- calld->call_combiner = args->call_combiner;
-
- GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
- server_on_recv_initial_metadata, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
- server_recv_trailing_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
server_ref(chand->server);
+ new (elem->call_data) call_data(elem, *args);
return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
-
- GPR_ASSERT(calld->state != PENDING);
- GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
- if (calld->host_set) {
- grpc_slice_unref_internal(calld->host);
- }
- if (calld->path_set) {
- grpc_slice_unref_internal(calld->path);
- }
- grpc_metadata_array_destroy(&calld->initial_metadata);
- grpc_byte_buffer_destroy(calld->payload);
-
+ calld->~call_data();
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
server_unref(chand->server);
}
@@ -931,6 +952,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
static void destroy_channel_elem(grpc_channel_element* elem) {
size_t i;
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ chand->socket_node.reset();
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
grpc_slice_unref_internal(chand->registered_methods[i].method);
@@ -1136,11 +1158,11 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
*pollsets = server->pollsets;
}
-void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
- grpc_pollset* accepting_pollset,
- const grpc_channel_args* args,
- intptr_t socket_uuid,
- grpc_resource_user* resource_user) {
+void grpc_server_setup_transport(
+ grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
+ const grpc_channel_args* args,
+ grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
+ grpc_resource_user* resource_user) {
size_t num_registered_methods;
size_t alloc;
registered_method* rm;
@@ -1161,7 +1183,7 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
chand->server = s;
server_ref(s);
chand->channel = channel;
- chand->socket_uuid = socket_uuid;
+ chand->socket_node = std::move(socket_node);
size_t cq_idx;
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
@@ -1237,14 +1259,13 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
}
void grpc_server_populate_server_sockets(
- grpc_server* s, grpc_core::channelz::ChildRefsList* server_sockets,
+ grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx) {
gpr_mu_lock(&s->mu_global);
channel_data* c = nullptr;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
- intptr_t socket_uuid = c->socket_uuid;
- if (socket_uuid >= start_idx) {
- server_sockets->push_back(socket_uuid);
+ if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) {
+ server_sockets->push_back(c->socket_node.get());
}
}
gpr_mu_unlock(&s->mu_global);
@@ -1427,30 +1448,39 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
rm = &rc->data.registered.method->matcher;
break;
}
- if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
- /* this was the first queued request: we need to lock and start
- matching calls */
- gpr_mu_lock(&server->mu_call);
- while ((calld = rm->pending_head) != nullptr) {
- rc = reinterpret_cast<requested_call*>(
- gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
- if (rc == nullptr) break;
- rm->pending_head = calld->pending_next;
- gpr_mu_unlock(&server->mu_call);
- if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
- // Zombied Call
- GRPC_CLOSURE_INIT(
- &calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
- } else {
- publish_call(server, calld, cq_idx, rc);
- }
- gpr_mu_lock(&server->mu_call);
- }
+
+ // Fast path: if there is no pending request to be processed, immediately
+ // return.
+ if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) ||
+ // Note: We are reading the pending_head without holding the server's call
+ // mutex. Even if we read a non-null value here due to reordering,
+ // we will check it below again after grabbing the lock.
+ rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
+ return GRPC_CALL_OK;
+ }
+ // Slow path: This was the first queued request and there are pendings:
+ // We need to lock and start matching calls.
+ gpr_mu_lock(&server->mu_call);
+ while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
+ nullptr) {
+ rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
+ if (rc == nullptr) break;
+ rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
gpr_mu_unlock(&server->mu_call);
+ if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
+ // Zombied Call
+ GRPC_CLOSURE_INIT(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ } else {
+ publish_call(server, calld, cq_idx, rc);
+ }
+ gpr_mu_lock(&server->mu_call);
}
+ gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}