aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r--src/core/lib/surface/completion_queue.cc328
1 files changed, 155 insertions, 173 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 98d7e35943..12385b7130 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -62,13 +62,12 @@ typedef struct {
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset* pollset, gpr_mu** mu);
- grpc_error* (*kick)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_error* (*kick)(grpc_pollset* pollset,
grpc_pollset_worker* specific_worker);
- grpc_error* (*work)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_pollset_worker** worker, grpc_millis deadline);
- void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_closure* closure);
- void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset);
+ grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
+ grpc_millis deadline);
+ void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
+ void (*destroy)(grpc_pollset* pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
@@ -94,14 +93,12 @@ static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
*mu = &npp->mu;
}
-static void non_polling_poller_destroy(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset) {
+static void non_polling_poller_destroy(grpc_pollset* pollset) {
non_polling_poller* npp = (non_polling_poller*)pollset;
gpr_mu_destroy(&npp->mu);
}
-static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset,
+static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_millis deadline) {
non_polling_poller* npp = (non_polling_poller*)pollset;
@@ -122,12 +119,12 @@ static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
while (!npp->shutdown && !w.kicked &&
!gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
- grpc_exec_ctx_invalidate_now(exec_ctx);
+ grpc_core::ExecCtx::Get()->InvalidateNow();
if (&w == npp->root) {
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
- GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = nullptr;
}
@@ -140,8 +137,7 @@ static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
}
static grpc_error* non_polling_poller_kick(
- grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_pollset_worker* specific_worker) {
+ grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
non_polling_poller* p = (non_polling_poller*)pollset;
if (specific_worker == nullptr)
specific_worker = (grpc_pollset_worker*)p->root;
@@ -155,14 +151,13 @@ static grpc_error* non_polling_poller_kick(
return GRPC_ERROR_NONE;
}
-static void non_polling_poller_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset,
+static void non_polling_poller_shutdown(grpc_pollset* pollset,
grpc_closure* closure) {
non_polling_poller* p = (non_polling_poller*)pollset;
GPR_ASSERT(closure != nullptr);
p->shutdown = closure;
if (p->root == nullptr) {
- GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
} else {
non_polling_worker* w = p->root;
do {
@@ -189,13 +184,11 @@ typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
void (*init)(void* data);
- void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq);
+ void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
- void (*end_op)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, void* tag,
- grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
- grpc_cq_completion* storage),
+ void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved);
@@ -280,31 +273,23 @@ struct grpc_completion_queue {
};
/* Forward declarations */
-static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
-static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
-static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
-static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
+static void cq_finish_shutdown_next(grpc_completion_queue* cq);
+static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_shutdown_next(grpc_completion_queue* cq);
+static void cq_shutdown_pluck(grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
-static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
-static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
@@ -346,8 +331,7 @@ grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout");
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* cq,
- grpc_error* error);
+static void on_pollset_shutdown_done(void* cq, grpc_error* error);
void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event);
@@ -369,19 +353,18 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
if (storage != nullptr &&
(grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
*tag = storage->tag;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
*ok = (storage->next & (uintptr_t)(1)) == 1;
- storage->done(&exec_ctx, storage->done_arg, storage);
+ storage->done(storage->done_arg, storage);
ret = 1;
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(&exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
- grpc_exec_ctx_finish(&exec_ctx);
}
gpr_tls_set(&g_cached_event, (intptr_t)0);
gpr_tls_set(&g_cached_cq, (intptr_t)0);
@@ -406,24 +389,22 @@ static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
grpc_cq_completion* c = nullptr;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
if (gpr_spinlock_trylock(&q->queue_lock)) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
bool is_empty = false;
c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
if (c == nullptr && !is_empty) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
}
} else {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
}
- grpc_exec_ctx_finish(&exec_ctx);
-
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
}
@@ -453,9 +434,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_STATS_INC_CQS_CREATED();
cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) +
vtable->data_size +
@@ -537,15 +517,14 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) {
gpr_ref(&cq->owning_refs);
}
-static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
grpc_completion_queue* cq = (grpc_completion_queue*)arg;
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
+ GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
- const char* reason, const char* file, int line) {
+void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
+ const char* file, int line) {
if (grpc_trace_cq_refcount.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -553,12 +532,11 @@ void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
reason);
}
#else
-void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+void grpc_cq_internal_unref(grpc_completion_queue* cq) {
#endif
if (gpr_unref(&cq->owning_refs)) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
- cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
+ cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
gpr_free(cq->outstanding_tags);
#endif
@@ -639,11 +617,9 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
-static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
@@ -652,9 +628,9 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
(grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 6, (cq, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
@@ -689,7 +665,7 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
if (is_first) {
gpr_mu_lock(cq->mu);
grpc_error* kick_error =
- cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
@@ -701,17 +677,17 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
} else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_atm_rel_store(&cqd->pending_events, 0);
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
}
@@ -723,11 +699,9 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_PLUCK) */
-static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
@@ -739,9 +713,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
(grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 6, (cq, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
@@ -762,7 +736,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
cqd->completed_tail = storage;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(exec_ctx, cq);
+ cq_finish_shutdown_pluck(cq);
gpr_mu_unlock(cq->mu);
} else {
grpc_pollset_worker* pluck_worker = nullptr;
@@ -774,7 +748,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
}
grpc_error* kick_error =
- cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
gpr_mu_unlock(cq->mu);
@@ -791,12 +765,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
- void* tag, grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
- grpc_cq_completion* storage),
+void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
- cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
+ cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
}
typedef struct {
@@ -808,31 +780,40 @@ typedef struct {
bool first_loop;
} cq_is_finished_arg;
-static bool cq_is_next_finished(grpc_exec_ctx* exec_ctx, void* arg) {
- cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
- grpc_completion_queue* cq = a->cq;
- cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
- GPR_ASSERT(a->stolen_completion == nullptr);
+class ExecCtxNext : public grpc_core::ExecCtx {
+ public:
+ ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
- gpr_atm current_last_seen_things_queued_ever =
- gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+ bool CheckReadyToFinish() override {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_;
+ grpc_completion_queue* cq = a->cq;
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
+ GPR_ASSERT(a->stolen_completion == nullptr);
- if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
- a->last_seen_things_queued_ever =
+ gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
- * might return NULL in some cases even if the queue is not empty; but
- * that
- * is ok and doesn't affect correctness. Might effect the tail latencies a
- * bit) */
- a->stolen_completion = cq_event_queue_pop(&cqd->queue);
- if (a->stolen_completion != nullptr) {
- return true;
+ if (current_last_seen_things_queued_ever !=
+ a->last_seen_things_queued_ever) {
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+
+ /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
+ * might return NULL in some cases even if the queue is not empty; but
+ * that
+ * is ok and doesn't affect correctness. Might effect the tail latencies a
+ * bit) */
+ a->stolen_completion = cq_event_queue_pop(&cqd->queue);
+ if (a->stolen_completion != nullptr) {
+ return true;
+ }
}
+ return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
}
- return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
-}
+
+ private:
+ void* check_ready_to_finish_arg_;
+};
#ifndef NDEBUG
static void dump_pending_tags(grpc_completion_queue* cq) {
@@ -887,8 +868,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
nullptr,
nullptr,
true};
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
+ ExecCtxNext exec_ctx(&is_finished_arg);
for (;;) {
grpc_millis iteration_deadline = deadline_millis;
@@ -898,7 +878,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
}
@@ -908,7 +888,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
} else {
/* If c == NULL it means either the queue is empty OR in an transient
@@ -939,7 +919,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
}
if (!is_finished_arg.first_loop &&
- grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
+ grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cq);
@@ -949,8 +929,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cq->mu);
cq->num_polls++;
- grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- nullptr, iteration_deadline);
+ grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
+ iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
@@ -969,13 +949,13 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_acq_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
- cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CQ_INTERNAL_UNREF(cq, "next");
+
GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
GPR_TIMER_END("grpc_completion_queue_next", 0);
@@ -989,19 +969,16 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
- Must be called only once in completion queue's lifetime
- grpc_completion_queue_shutdown() MUST have been called before calling
this function */
-static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
- cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
- &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
-static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_shutdown_next(grpc_completion_queue* cq) {
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -1014,7 +991,7 @@ static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
return;
}
cqd->shutdown_called = true;
@@ -1022,10 +999,10 @@ static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
* cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
* on this counter without necessarily holding a lock on cq */
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
@@ -1058,37 +1035,46 @@ static void del_plucker(grpc_completion_queue* cq, void* tag,
GPR_UNREACHABLE_CODE(return );
}
-static bool cq_is_pluck_finished(grpc_exec_ctx* exec_ctx, void* arg) {
- cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
- grpc_completion_queue* cq = a->cq;
- cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
+class ExecCtxPluck : public grpc_core::ExecCtx {
+ public:
+ ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
- GPR_ASSERT(a->stolen_completion == nullptr);
- gpr_atm current_last_seen_things_queued_ever =
- gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
- gpr_mu_lock(cq->mu);
- a->last_seen_things_queued_ever =
+ bool CheckReadyToFinish() override {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_;
+ grpc_completion_queue* cq = a->cq;
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
+
+ GPR_ASSERT(a->stolen_completion == nullptr);
+ gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- grpc_cq_completion* c;
- grpc_cq_completion* prev = &cqd->completed_head;
- while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
- &cqd->completed_head) {
- if (c->tag == a->tag) {
- prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
- if (c == cqd->completed_tail) {
- cqd->completed_tail = prev;
+ if (current_last_seen_things_queued_ever !=
+ a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+ grpc_cq_completion* c;
+ grpc_cq_completion* prev = &cqd->completed_head;
+ while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
+ &cqd->completed_head) {
+ if (c->tag == a->tag) {
+ prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
+ if (c == cqd->completed_tail) {
+ cqd->completed_tail = prev;
+ }
+ gpr_mu_unlock(cq->mu);
+ a->stolen_completion = c;
+ return true;
}
- gpr_mu_unlock(cq->mu);
- a->stolen_completion = c;
- return true;
+ prev = c;
}
- prev = c;
+ gpr_mu_unlock(cq->mu);
}
- gpr_mu_unlock(cq->mu);
+ return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
}
- return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
-}
+
+ private:
+ void* check_ready_to_finish_arg_;
+};
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved) {
@@ -1125,8 +1111,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
nullptr,
tag,
true};
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
+ ExecCtxPluck exec_ctx(&is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != nullptr) {
gpr_mu_unlock(cq->mu);
@@ -1135,7 +1120,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
}
prev = &cqd->completed_head;
@@ -1150,7 +1135,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
goto done;
}
prev = c;
@@ -1174,7 +1159,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
if (!is_finished_arg.first_loop &&
- grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
+ grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
@@ -1183,8 +1168,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
cq->num_polls++;
- grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- &worker, deadline_millis);
+ grpc_error* err =
+ cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
@@ -1202,8 +1187,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
+
GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
GPR_TIMER_END("grpc_completion_queue_pluck", 0);
@@ -1216,22 +1201,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
- cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
- &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
* merging them is a bit tricky and probably not worth it */
-static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_shutdown_pluck(grpc_completion_queue* cq) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -1244,25 +1226,25 @@ static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
return;
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(exec_ctx, cq);
+ cq_finish_shutdown_pluck(cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
- cq->vtable->shutdown(&exec_ctx, cq);
- grpc_exec_ctx_finish(&exec_ctx);
+ cq->vtable->shutdown(cq);
+
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
@@ -1271,9 +1253,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
grpc_completion_queue_shutdown(cq);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
+
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}