diff options
-rw-r--r-- | src/core/surface/completion_queue.c | 35 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 27 | ||||
-rw-r--r-- | test/core/surface/completion_queue_test.c | 42 |
3 files changed, 104 insertions, 0 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 5dac8ebcf8..368838860f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -44,6 +44,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> typedef struct { grpc_pollset_worker *worker; @@ -354,3 +355,37 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } + +static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *c) {} + +static void cq_alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_cq_alarm *cq_alarm = arg; + grpc_cq_end_op(exec_ctx, cq_alarm->cq, cq_alarm->tag, success, + do_nothing_end_completion, NULL, &cq_alarm->completion); +} + +grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq, + gpr_timespec deadline, void *tag) { + grpc_cq_alarm *cq_alarm = gpr_malloc(sizeof(grpc_cq_alarm)); + + GRPC_CQ_INTERNAL_REF(cq, "cq_alarm"); + cq_alarm->cq = cq; + cq_alarm->tag = tag; + + grpc_alarm_init(exec_ctx, &cq_alarm->alarm, deadline, cq_alarm_cb, cq_alarm, + gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_cq_begin_op(cq); + return cq_alarm; +} + +void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) { + grpc_alarm_cancel(exec_ctx, &cq_alarm->alarm); +} + +void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) { + grpc_cq_alarm_cancel(exec_ctx, cq_alarm); + GRPC_CQ_INTERNAL_UNREF(cq_alarm->cq, "cq_alarm"); + gpr_free(cq_alarm); +} diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 5f8282e542..ab5b87c5c8 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -36,6 +36,7 @@ /* Internal API for completion queues */ +#include "src/core/iomgr/alarm.h" #include "src/core/iomgr/pollset.h" #include <grpc/grpc.h> @@ -51,6 +52,16 @@ typedef struct grpc_cq_completion { gpr_uintptr next; } grpc_cq_completion; +/** An alarm associated with a completion queue. */ +typedef struct grpc_cq_alarm { + grpc_alarm alarm; + grpc_cq_completion completion; + /** completion queue where events about this alarm will be posted */ + grpc_completion_queue *cq; + /** user supplied tag */ + void *tag; +} grpc_cq_alarm; + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); @@ -83,4 +94,20 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); +/** Create a completion queue alarm instance associated to \a cq. + * + * Once the alarm expires (at \a deadline) or it's cancelled (see ...), an event + * with tag \a tag will be added to \a cq. If the alarm expired, the event's + * success bit will be true, false otherwise (ie, upon cancellation). */ +grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq, + gpr_timespec deadline, void *tag); + +/** Cancel a completion queue alarm. Calling this function ove an alarm that has + * already run has no effect. */ +void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm); + +/** Destroy the given completion queue alarm, cancelling it in the process. */ +void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm); + #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index e3fc789788..785093c60f 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -102,6 +102,47 @@ static void test_cq_end_op(void) { grpc_exec_ctx_finish(&exec_ctx); } +static void test_cq_alarm(void) { + grpc_completion_queue *cc; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + LOG_TEST("test_cq_alarm"); + cc = grpc_completion_queue_create(NULL); + { + /* regular expiry */ + grpc_event ev; + void *tag = create_test_tag(); + grpc_cq_alarm *cq_alarm = grpc_cq_alarm_create( + &exec_ctx, cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), tag); + + ev = grpc_completion_queue_next(cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), + NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(ev.tag == tag); + GPR_ASSERT(ev.success); + grpc_cq_alarm_destroy(&exec_ctx, cq_alarm); + } + { + /* cancellation */ + grpc_event ev; + void *tag = create_test_tag(); + grpc_cq_alarm *cq_alarm = grpc_cq_alarm_create( + &exec_ctx, cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), tag); + + grpc_cq_alarm_cancel(&exec_ctx, cq_alarm); + GPR_ASSERT(grpc_exec_ctx_flush(&exec_ctx) == 1); + ev = grpc_completion_queue_next(cc, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), + NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(ev.tag == tag); + GPR_ASSERT(ev.success == 0); + grpc_cq_alarm_destroy(&exec_ctx, cq_alarm); + } + + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); +} + static void test_shutdown_then_next_polling(void) { grpc_completion_queue *cc; grpc_event event; @@ -343,6 +384,7 @@ int main(int argc, char **argv) { test_shutdown_then_next_with_timeout(); test_cq_end_op(); test_pluck(); + test_cq_alarm(); test_threading(1, 1); test_threading(1, 10); test_threading(10, 1); |