From 9c5ca8365c650c8891ef222b43edb1514e7d36f3 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 2 Jan 2019 23:03:30 +0100 Subject: Revert "Implement a lock-free fast path for queue_call_request()" --- src/core/lib/surface/server.cc | 104 ++++++++++++++++------------------------- 1 file changed, 41 insertions(+), 63 deletions(-) diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 67b38e6f0c..7ae6e51a5f 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -194,13 +194,10 @@ struct call_data { }; struct request_matcher { - request_matcher(grpc_server* server); - ~request_matcher(); - grpc_server* server; - std::atomic pending_head{nullptr}; - call_data* pending_tail = nullptr; - gpr_locked_mpscq* requests_per_cq = nullptr; + call_data* pending_head; + call_data* pending_tail; + gpr_locked_mpscq* requests_per_cq; }; struct registered_method { @@ -349,30 +346,22 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb, * request_matcher */ -namespace { -request_matcher::request_matcher(grpc_server* server) : server(server) { - requests_per_cq = static_cast( - gpr_malloc(sizeof(*requests_per_cq) * server->cq_count)); - for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&requests_per_cq[i]); - } -} - -request_matcher::~request_matcher() { +static void request_matcher_init(request_matcher* rm, grpc_server* server) { + memset(rm, 0, sizeof(*rm)); + rm->server = server; + rm->requests_per_cq = static_cast( + gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count)); for (size_t i = 0; i < server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr); - gpr_locked_mpscq_destroy(&requests_per_cq[i]); + gpr_locked_mpscq_init(&rm->requests_per_cq[i]); } - gpr_free(requests_per_cq); -} -} // namespace - -static void request_matcher_init(request_matcher* rm, grpc_server* server) { - new (rm) request_matcher(server); } static void request_matcher_destroy(request_matcher* rm) { - rm->~request_matcher(); + for (size_t i = 0; i < rm->server->cq_count; i++) { + GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr); + gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + } + gpr_free(rm->requests_per_cq); } static void kill_zombie(void* elem, grpc_error* error) { @@ -381,10 +370,9 @@ static void kill_zombie(void* elem, grpc_error* error) { } static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { - call_data* calld; - while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != - nullptr) { - rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); + while (rm->pending_head) { + call_data* calld = rm->pending_head; + rm->pending_head = calld->pending_next; gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, @@ -582,9 +570,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) { } gpr_atm_no_barrier_store(&calld->state, PENDING); - if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) { - rm->pending_head.store(calld, std::memory_order_relaxed); - rm->pending_tail = calld; + if (rm->pending_head == nullptr) { + rm->pending_tail = rm->pending_head = calld; } else { rm->pending_tail->pending_next = calld; rm->pending_tail = calld; @@ -1448,39 +1435,30 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, rm = &rc->data.registered.method->matcher; break; } - - // Fast path: if there is no pending request to be processed, immediately - // return. - if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) || - // Note: We are reading the pending_head without holding the server's call - // mutex. Even if we read a non-null value here due to reordering, - // we will check it below again after grabbing the lock. - rm->pending_head.load(std::memory_order_relaxed) == nullptr) { - return GRPC_CALL_OK; - } - // Slow path: This was the first queued request and there are pendings: - // We need to lock and start matching calls. - gpr_mu_lock(&server->mu_call); - while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != - nullptr) { - rc = reinterpret_cast( - gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); - if (rc == nullptr) break; - rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); - gpr_mu_unlock(&server->mu_call); - if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { - // Zombied Call - GRPC_CLOSURE_INIT( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); - } else { - publish_call(server, calld, cq_idx, rc); - } + if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + /* this was the first queued request: we need to lock and start + matching calls */ gpr_mu_lock(&server->mu_call); + while ((calld = rm->pending_head) != nullptr) { + rc = reinterpret_cast( + gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); + if (rc == nullptr) break; + rm->pending_head = calld->pending_next; + gpr_mu_unlock(&server->mu_call); + if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { + // Zombied Call + GRPC_CLOSURE_INIT( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + } else { + publish_call(server, calld, cq_idx, rc); + } + gpr_mu_lock(&server->mu_call); + } + gpr_mu_unlock(&server->mu_call); } - gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } -- cgit v1.2.3