aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-10-03 21:17:37 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-10-04 12:13:26 -0700
commit0dfbdf6e54e73a71823907a60aeb62717a4c81df (patch)
tree4beb624c10a6f5303215410d34e04a68780d4a6e /src/core/surface
parentf8460df564385af2197ffccb4d4004fd96d0eddd (diff)
Added grpc_cq_alarm
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/completion_queue.c35
-rw-r--r--src/core/surface/completion_queue.h27
2 files changed, 62 insertions, 0 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 5dac8ebcf8..368838860f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -44,6 +44,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include <grpc/support/time.h>
typedef struct {
grpc_pollset_worker *worker;
@@ -354,3 +355,37 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
+
+static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_cq_completion *c) {}
+
+static void cq_alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ grpc_cq_alarm *cq_alarm = arg;
+ grpc_cq_end_op(exec_ctx, cq_alarm->cq, cq_alarm->tag, success,
+ do_nothing_end_completion, NULL, &cq_alarm->completion);
+}
+
+grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq,
+ gpr_timespec deadline, void *tag) {
+ grpc_cq_alarm *cq_alarm = gpr_malloc(sizeof(grpc_cq_alarm));
+
+ GRPC_CQ_INTERNAL_REF(cq, "cq_alarm");
+ cq_alarm->cq = cq;
+ cq_alarm->tag = tag;
+
+ grpc_alarm_init(exec_ctx, &cq_alarm->alarm, deadline, cq_alarm_cb, cq_alarm,
+ gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_cq_begin_op(cq);
+ return cq_alarm;
+}
+
+void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) {
+ grpc_alarm_cancel(exec_ctx, &cq_alarm->alarm);
+}
+
+void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm) {
+ grpc_cq_alarm_cancel(exec_ctx, cq_alarm);
+ GRPC_CQ_INTERNAL_UNREF(cq_alarm->cq, "cq_alarm");
+ gpr_free(cq_alarm);
+}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 5f8282e542..ab5b87c5c8 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -36,6 +36,7 @@
/* Internal API for completion queues */
+#include "src/core/iomgr/alarm.h"
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
@@ -51,6 +52,16 @@ typedef struct grpc_cq_completion {
gpr_uintptr next;
} grpc_cq_completion;
+/** An alarm associated with a completion queue. */
+typedef struct grpc_cq_alarm {
+ grpc_alarm alarm;
+ grpc_cq_completion completion;
+ /** completion queue where events about this alarm will be posted */
+ grpc_completion_queue *cq;
+ /** user supplied tag */
+ void *tag;
+} grpc_cq_alarm;
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
@@ -83,4 +94,20 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+/** Create a completion queue alarm instance associated to \a cq.
+ *
+ * Once the alarm expires (at \a deadline) or it's cancelled (see ...), an event
+ * with tag \a tag will be added to \a cq. If the alarm expired, the event's
+ * success bit will be true, false otherwise (ie, upon cancellation). */
+grpc_cq_alarm *grpc_cq_alarm_create(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq,
+ gpr_timespec deadline, void *tag);
+
+/** Cancel a completion queue alarm. Calling this function ove an alarm that has
+ * already run has no effect. */
+void grpc_cq_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm);
+
+/** Destroy the given completion queue alarm, cancelling it in the process. */
+void grpc_cq_alarm_destroy(grpc_exec_ctx *exec_ctx, grpc_cq_alarm *cq_alarm);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */