diff options
author | 2015-10-03 21:17:37 -0700 | |
---|---|---|
committer | 2015-10-04 12:13:26 -0700 | |
commit | 0dfbdf6e54e73a71823907a60aeb62717a4c81df (patch) | |
tree | 4beb624c10a6f5303215410d34e04a68780d4a6e /src/core/surface | |
parent | f8460df564385af2197ffccb4d4004fd96d0eddd (diff) |
Added grpc_cq_alarm
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/completion_queue.c | 35 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 27 |
2 files changed, 62 insertions, 0 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 5dac8ebcf8..368838860f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -44,6 +44,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> typedef struct { grpc_pollset_worker *worker; @@ -354,3 +355,37 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } + +static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *c) {} + +static void cq_alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_cq_alarm *cq_alarm = arg; + grpc_cq_end_op(exec_ctx, cq_alarm->cq, cq_alarm->tag, success, + do_nothing_end_completion, NULL, &cq_alarm->completion); +} + +grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq, + gpr_timespec deadline, void *tag) { + grpc_cq_alarm *cq_alarm = gpr_malloc(sizeof(grpc_cq_alarm)); + + GRPC_CQ_INTERNAL_REF(cq, "cq_alarm"); + cq_alarm->cq = cq; + cq_alarm->tag = tag; + + grpc_alarm_init(exec_ctx, &cq_alarm->alarm, deadline, cq_alarm_cb, cq_alarm, + gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_cq_begin_op(cq); + return cq_alarm; +} + +void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) { + grpc_alarm_cancel(exec_ctx, &cq_alarm->alarm); +} + +void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) { + grpc_cq_alarm_cancel(exec_ctx, cq_alarm); + GRPC_CQ_INTERNAL_UNREF(cq_alarm->cq, "cq_alarm"); + gpr_free(cq_alarm); +} diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 5f8282e542..ab5b87c5c8 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -36,6 +36,7 @@ /* Internal API for completion queues */ +#include "src/core/iomgr/alarm.h" #include "src/core/iomgr/pollset.h" #include <grpc/grpc.h> @@ -51,6 +52,16 @@ typedef struct grpc_cq_completion { gpr_uintptr next; } grpc_cq_completion; +/** An alarm associated with a completion queue. */ +typedef struct grpc_cq_alarm { + grpc_alarm alarm; + grpc_cq_completion completion; + /** completion queue where events about this alarm will be posted */ + grpc_completion_queue *cq; + /** user supplied tag */ + void *tag; +} grpc_cq_alarm; + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); @@ -83,4 +94,20 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); +/** Create a completion queue alarm instance associated to \a cq. + * + * Once the alarm expires (at \a deadline) or it's cancelled (see ...), an event + * with tag \a tag will be added to \a cq. If the alarm expired, the event's + * success bit will be true, false otherwise (ie, upon cancellation). */ +grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq, + gpr_timespec deadline, void *tag); + +/** Cancel a completion queue alarm. Calling this function ove an alarm that has + * already run has no effect. */ +void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm); + +/** Destroy the given completion queue alarm, cancelling it in the process. */ +void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm); + #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ |