aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-08-24 15:13:53 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-08-24 15:13:53 -0700
commitaef3a79ae4cfa7236673407e09aff001a8d2e02f (patch)
tree5552edb0da25f460b769b9fca1967ab2344e8ab0 /src/core/lib/surface/completion_queue.c
parentdfd3a8f7a566aaf59b676caf583d4048b8e9ab5b (diff)
Remove extraneous locks on cq checks
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c89
1 files changed, 50 insertions, 39 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 28450d966c..1124290699 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -71,6 +71,9 @@ struct grpc_completion_queue {
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
+ /** counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
@@ -125,15 +128,6 @@ void grpc_cq_global_shutdown(void) {
}
}
-struct grpc_cq_alarm {
- grpc_timer alarm;
- grpc_cq_completion completion;
- /** completion queue where events about this alarm will be posted */
- grpc_completion_queue *cq;
- /** user supplied tag */
- void *tag;
-};
-
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc;
GPR_ASSERT(!reserved);
@@ -170,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
+ gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
#endif
@@ -280,6 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(found);
#endif
shutdown = gpr_unref(&cc->pending_events);
+ gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
if (!shutdown) {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@@ -318,6 +314,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
}
typedef struct {
+ gpr_atm last_seen_things_queued_ever;
grpc_completion_queue *cq;
gpr_timespec deadline;
grpc_cq_completion *stolen_completion;
@@ -328,17 +325,23 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL);
- gpr_mu_lock(cq->mu);
- if (cq->completed_tail != &cq->completed_head) {
- a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
- cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
- if (a->stolen_completion == cq->completed_tail) {
- cq->completed_tail = &cq->completed_head;
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (cq->completed_tail != &cq->completed_head) {
+ a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
+ cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
+ if (a->stolen_completion == cq->completed_tail) {
+ cq->completed_tail = &cq->completed_head;
+ }
+ gpr_mu_unlock(cq->mu);
+ return true;
}
gpr_mu_unlock(cq->mu);
- return true;
}
- gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
@@ -386,12 +389,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, NULL};
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
- cq_is_next_finished, &is_finished_arg);
-
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu);
+ cq_is_finished_arg is_finished_arg = {
+ gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
+ NULL};
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
+ cq_is_next_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);
@@ -496,23 +500,29 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL);
- gpr_mu_lock(cq->mu);
- grpc_cq_completion *c;
- grpc_cq_completion *prev = &cq->completed_head;
- while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
- &cq->completed_head) {
- if (c->tag == a->tag) {
- prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
- if (c == cq->completed_tail) {
- cq->completed_tail = prev;
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ grpc_cq_completion *c;
+ grpc_cq_completion *prev = &cq->completed_head;
+ while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
+ &cq->completed_head) {
+ if (c->tag == a->tag) {
+ prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
+ if (c == cq->completed_tail) {
+ cq->completed_tail = prev;
+ }
+ gpr_mu_unlock(cq->mu);
+ a->stolen_completion = c;
+ return true;
}
- gpr_mu_unlock(cq->mu);
- a->stolen_completion = c;
- return true;
+ prev = c;
}
- prev = c;
+ gpr_mu_unlock(cq->mu);
}
- gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
@@ -543,12 +553,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, tag};
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
- cq_is_pluck_finished, &is_finished_arg);
-
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu);
+ cq_is_finished_arg is_finished_arg = {
+ gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
+ tag};
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
+ cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);