aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2019-01-02 23:03:30 +0100
committerGravatar GitHub <noreply@github.com>2019-01-02 23:03:30 +0100
commit9c5ca8365c650c8891ef222b43edb1514e7d36f3 (patch)
tree1a96de7bc9fbbd31adb84e8e6f3fde1ec723c3d1 /src
parent95ebd80bbd4f1622e64a6c485bb911e8f7abcf3f (diff)
Revert "Implement a lock-free fast path for queue_call_request()"
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/surface/server.cc104
1 files changed, 41 insertions, 63 deletions
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 67b38e6f0c..7ae6e51a5f 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -194,13 +194,10 @@ struct call_data {
};
struct request_matcher {
- request_matcher(grpc_server* server);
- ~request_matcher();
-
grpc_server* server;
- std::atomic<call_data*> pending_head{nullptr};
- call_data* pending_tail = nullptr;
- gpr_locked_mpscq* requests_per_cq = nullptr;
+ call_data* pending_head;
+ call_data* pending_tail;
+ gpr_locked_mpscq* requests_per_cq;
};
struct registered_method {
@@ -349,30 +346,22 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb,
* request_matcher
*/
-namespace {
-request_matcher::request_matcher(grpc_server* server) : server(server) {
- requests_per_cq = static_cast<gpr_locked_mpscq*>(
- gpr_malloc(sizeof(*requests_per_cq) * server->cq_count));
- for (size_t i = 0; i < server->cq_count; i++) {
- gpr_locked_mpscq_init(&requests_per_cq[i]);
- }
-}
-
-request_matcher::~request_matcher() {
+static void request_matcher_init(request_matcher* rm, grpc_server* server) {
+ memset(rm, 0, sizeof(*rm));
+ rm->server = server;
+ rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
+ gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
for (size_t i = 0; i < server->cq_count; i++) {
- GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr);
- gpr_locked_mpscq_destroy(&requests_per_cq[i]);
+ gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
}
- gpr_free(requests_per_cq);
-}
-} // namespace
-
-static void request_matcher_init(request_matcher* rm, grpc_server* server) {
- new (rm) request_matcher(server);
}
static void request_matcher_destroy(request_matcher* rm) {
- rm->~request_matcher();
+ for (size_t i = 0; i < rm->server->cq_count; i++) {
+ GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
+ gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+ }
+ gpr_free(rm->requests_per_cq);
}
static void kill_zombie(void* elem, grpc_error* error) {
@@ -381,10 +370,9 @@ static void kill_zombie(void* elem, grpc_error* error) {
}
static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
- call_data* calld;
- while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
- nullptr) {
- rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
+ while (rm->pending_head) {
+ call_data* calld = rm->pending_head;
+ rm->pending_head = calld->pending_next;
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
@@ -582,9 +570,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
- if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
- rm->pending_head.store(calld, std::memory_order_relaxed);
- rm->pending_tail = calld;
+ if (rm->pending_head == nullptr) {
+ rm->pending_tail = rm->pending_head = calld;
} else {
rm->pending_tail->pending_next = calld;
rm->pending_tail = calld;
@@ -1448,39 +1435,30 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
rm = &rc->data.registered.method->matcher;
break;
}
-
- // Fast path: if there is no pending request to be processed, immediately
- // return.
- if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) ||
- // Note: We are reading the pending_head without holding the server's call
- // mutex. Even if we read a non-null value here due to reordering,
- // we will check it below again after grabbing the lock.
- rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
- return GRPC_CALL_OK;
- }
- // Slow path: This was the first queued request and there are pendings:
- // We need to lock and start matching calls.
- gpr_mu_lock(&server->mu_call);
- while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
- nullptr) {
- rc = reinterpret_cast<requested_call*>(
- gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
- if (rc == nullptr) break;
- rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
- gpr_mu_unlock(&server->mu_call);
- if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
- // Zombied Call
- GRPC_CLOSURE_INIT(
- &calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
- } else {
- publish_call(server, calld, cq_idx, rc);
- }
+ if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
+ /* this was the first queued request: we need to lock and start
+ matching calls */
gpr_mu_lock(&server->mu_call);
+ while ((calld = rm->pending_head) != nullptr) {
+ rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
+ if (rc == nullptr) break;
+ rm->pending_head = calld->pending_next;
+ gpr_mu_unlock(&server->mu_call);
+ if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
+ // Zombied Call
+ GRPC_CLOSURE_INIT(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ } else {
+ publish_call(server, calld, cq_idx, rc);
+ }
+ gpr_mu_lock(&server->mu_call);
+ }
+ gpr_mu_unlock(&server->mu_call);
}
- gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}