aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c167
1 files changed, 150 insertions, 17 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 5978884db8..4e0feb56ac 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -39,6 +39,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/pollset.h"
@@ -50,6 +51,9 @@
#include "src/core/lib/surface/event_string.h"
int grpc_trace_operation_failures;
+#ifndef NDEBUG
+int grpc_trace_pending_tags;
+#endif
typedef struct {
grpc_pollset_worker **worker;
@@ -67,6 +71,9 @@ struct grpc_completion_queue {
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
+ /** counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
@@ -121,15 +128,6 @@ void grpc_cq_global_shutdown(void) {
}
}
-struct grpc_cq_alarm {
- grpc_timer alarm;
- grpc_cq_completion completion;
- /** completion queue where events about this alarm will be posted */
- grpc_completion_queue *cq;
- /** user supplied tag */
- void *tag;
-};
-
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc;
GPR_ASSERT(!reserved);
@@ -166,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
+ gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
#endif
@@ -276,6 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(found);
#endif
shutdown = gpr_unref(&cc->pending_events);
+ gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
if (!shutdown) {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@@ -313,13 +313,66 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GRPC_ERROR_UNREF(error);
}
+typedef struct {
+ gpr_atm last_seen_things_queued_ever;
+ grpc_completion_queue *cq;
+ gpr_timespec deadline;
+ grpc_cq_completion *stolen_completion;
+ void *tag; /* for pluck */
+ bool first_loop;
+} cq_is_finished_arg;
+
+static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
+ cq_is_finished_arg *a = arg;
+ grpc_completion_queue *cq = a->cq;
+ GPR_ASSERT(a->stolen_completion == NULL);
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (cq->completed_tail != &cq->completed_head) {
+ a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
+ cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
+ if (a->stolen_completion == cq->completed_tail) {
+ cq->completed_tail = &cq->completed_head;
+ }
+ gpr_mu_unlock(cq->mu);
+ return true;
+ }
+ gpr_mu_unlock(cq->mu);
+ }
+ return !a->first_loop &&
+ gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+}
+
+#ifndef NDEBUG
+static void dump_pending_tags(grpc_completion_queue *cc) {
+ if (!grpc_trace_pending_tags) return;
+
+ gpr_strvec v;
+ gpr_strvec_init(&v);
+ gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
+ for (size_t i = 0; i < cc->outstanding_tag_count; i++) {
+ char *s;
+ gpr_asprintf(&s, " %p", cc->outstanding_tags[i]);
+ gpr_strvec_add(&v, s);
+ }
+ char *out = gpr_strvec_flatten(&v, NULL);
+ gpr_strvec_destroy(&v);
+ gpr_log(GPR_DEBUG, "%s", out);
+ gpr_free(out);
+}
+#else
+static void dump_pending_tags(grpc_completion_queue *cc) {}
+#endif
+
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker *worker = NULL;
- int first_loop = 1;
gpr_timespec now;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -333,11 +386,33 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
reserved));
GPR_ASSERT(!reserved);
+ dump_pending_tags(cc);
+
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu);
+ cq_is_finished_arg is_finished_arg = {
+ .last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cc->things_queued_ever),
+ .cq = cc,
+ .deadline = deadline,
+ .stolen_completion = NULL,
+ .tag = NULL,
+ .first_loop = true};
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
+ cq_is_next_finished, &is_finished_arg);
for (;;) {
+ if (is_finished_arg.stolen_completion != NULL) {
+ gpr_mu_unlock(cc->mu);
+ grpc_cq_completion *c = is_finished_arg.stolen_completion;
+ is_finished_arg.stolen_completion = NULL;
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(&exec_ctx, c->done_arg, c);
+ break;
+ }
if (cc->completed_tail != &cc->completed_head) {
grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
cc->completed_head.next = c->next & ~(uintptr_t)1;
@@ -358,13 +433,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
- first_loop = 0;
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
@@ -387,13 +462,16 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
}
+ is_finished_arg.first_loop = false;
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_next", 0);
@@ -424,6 +502,37 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
GPR_UNREACHABLE_CODE(return );
}
+static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
+ cq_is_finished_arg *a = arg;
+ grpc_completion_queue *cq = a->cq;
+ GPR_ASSERT(a->stolen_completion == NULL);
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ grpc_cq_completion *c;
+ grpc_cq_completion *prev = &cq->completed_head;
+ while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
+ &cq->completed_head) {
+ if (c->tag == a->tag) {
+ prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
+ if (c == cq->completed_tail) {
+ cq->completed_tail = prev;
+ }
+ gpr_mu_unlock(cq->mu);
+ a->stolen_completion = c;
+ return true;
+ }
+ prev = c;
+ }
+ gpr_mu_unlock(cq->mu);
+ }
+ return !a->first_loop &&
+ gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+}
+
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
@@ -431,8 +540,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
- int first_loop = 1;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -448,11 +555,33 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
}
GPR_ASSERT(!reserved);
+ dump_pending_tags(cc);
+
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu);
+ cq_is_finished_arg is_finished_arg = {
+ .last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cc->things_queued_ever),
+ .cq = cc,
+ .deadline = deadline,
+ .stolen_completion = NULL,
+ .tag = tag,
+ .first_loop = true};
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
+ cq_is_pluck_finished, &is_finished_arg);
for (;;) {
+ if (is_finished_arg.stolen_completion != NULL) {
+ gpr_mu_unlock(cc->mu);
+ c = is_finished_arg.stolen_completion;
+ is_finished_arg.stolen_completion = NULL;
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(&exec_ctx, c->done_arg, c);
+ break;
+ }
prev = &cc->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
&cc->completed_head) {
@@ -485,17 +614,18 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
- first_loop = 0;
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
@@ -518,15 +648,18 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
}
+ is_finished_arg.first_loop = false;
del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_pluck", 0);