aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c50
1 files changed, 23 insertions, 27 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 978d7b4171..cefa8a2a89 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -57,8 +57,7 @@ typedef struct {
grpc_error *(*kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline);
+ grpc_pollset_worker **worker, grpc_millis deadline);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
@@ -96,8 +95,7 @@ static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx,
static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker **worker,
- gpr_timespec now,
- gpr_timespec deadline) {
+ grpc_millis deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
@@ -111,7 +109,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
w.next->prev = w.prev->next = &w;
}
w.kicked = false;
- while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
+ gpr_timespec deadline_ts =
+ grpc_millis_to_timespec(exec_ctx, deadline, GPR_CLOCK_REALTIME);
+ while (!npp->shutdown && !w.kicked &&
+ !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
if (&w == npp->root) {
npp->root = w.next;
@@ -738,7 +739,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
typedef struct {
gpr_atm last_seen_things_queued_ever;
grpc_completion_queue *cq;
- gpr_timespec deadline;
+ grpc_millis deadline;
grpc_cq_completion *stolen_completion;
void *tag; /* for pluck */
bool first_loop;
@@ -767,8 +768,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
return true;
}
}
- return !a->first_loop &&
- gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
#ifndef NDEBUG
@@ -797,7 +797,6 @@ static void dump_pending_tags(grpc_completion_queue *cq) {}
static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
- gpr_timespec now;
cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -814,23 +813,22 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
dump_pending_tags(cq);
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
-
GRPC_CQ_INTERNAL_REF(cq, "next");
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
.cq = cq,
- .deadline = deadline,
+ .deadline = 0,
.stolen_completion = NULL,
.tag = NULL,
.first_loop = true};
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
-
+ grpc_millis deadline_millis = is_finished_arg.deadline =
+ grpc_timespec_to_millis(&exec_ctx, deadline);
for (;;) {
- gpr_timespec iteration_deadline = deadline;
+ grpc_millis iteration_deadline = deadline_millis;
if (is_finished_arg.stolen_completion != NULL) {
grpc_cq_completion *c = is_finished_arg.stolen_completion;
@@ -858,7 +856,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
thread
forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
- iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
+ iteration_deadline = 0;
}
}
@@ -881,8 +879,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
break;
}
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop &&
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cq);
@@ -893,7 +891,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
gpr_mu_lock(cq->mu);
cq->num_polls++;
grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- NULL, now, iteration_deadline);
+ NULL, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
@@ -1022,8 +1020,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
- return !a->first_loop &&
- gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
@@ -1032,7 +1029,6 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
- gpr_timespec now;
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -1059,12 +1055,14 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
.cq = cq,
- .deadline = deadline,
+ .deadline = 0,
.stolen_completion = NULL,
.tag = tag,
.first_loop = true};
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
+ grpc_millis deadline_millis = is_finished_arg.deadline =
+ grpc_timespec_to_millis(&exec_ctx, deadline);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cq->mu);
@@ -1111,8 +1109,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
break;
}
- now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop &&
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
@@ -1120,10 +1118,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
break;
}
-
- cq->num_polls++;
grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- &worker, now, deadline);
+ &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);