aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r--src/core/lib/surface/completion_queue.cc328
1 files changed, 173 insertions, 155 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 12385b7130..98d7e35943 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -62,12 +62,13 @@ typedef struct {
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset* pollset, gpr_mu** mu);
- grpc_error* (*kick)(grpc_pollset* pollset,
+ grpc_error* (*kick)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
grpc_pollset_worker* specific_worker);
- grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
- grpc_millis deadline);
- void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
- void (*destroy)(grpc_pollset* pollset);
+ grpc_error* (*work)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_pollset_worker** worker, grpc_millis deadline);
+ void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_closure* closure);
+ void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
@@ -93,12 +94,14 @@ static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
*mu = &npp->mu;
}
-static void non_polling_poller_destroy(grpc_pollset* pollset) {
+static void non_polling_poller_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_pollset* pollset) {
non_polling_poller* npp = (non_polling_poller*)pollset;
gpr_mu_destroy(&npp->mu);
}
-static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
+static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
+ grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_millis deadline) {
non_polling_poller* npp = (non_polling_poller*)pollset;
@@ -119,12 +122,12 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
while (!npp->shutdown && !w.kicked &&
!gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
- grpc_core::ExecCtx::Get()->InvalidateNow();
+ grpc_exec_ctx_invalidate_now(exec_ctx);
if (&w == npp->root) {
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
- GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = nullptr;
}
@@ -137,7 +140,8 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
}
static grpc_error* non_polling_poller_kick(
- grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
+ grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker) {
non_polling_poller* p = (non_polling_poller*)pollset;
if (specific_worker == nullptr)
specific_worker = (grpc_pollset_worker*)p->root;
@@ -151,13 +155,14 @@ static grpc_error* non_polling_poller_kick(
return GRPC_ERROR_NONE;
}
-static void non_polling_poller_shutdown(grpc_pollset* pollset,
+static void non_polling_poller_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_pollset* pollset,
grpc_closure* closure) {
non_polling_poller* p = (non_polling_poller*)pollset;
GPR_ASSERT(closure != nullptr);
p->shutdown = closure;
if (p->root == nullptr) {
- GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
non_polling_worker* w = p->root;
do {
@@ -184,11 +189,13 @@ typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
void (*init)(void* data);
- void (*shutdown)(grpc_completion_queue* cq);
+ void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
- void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
- void (*done)(void* done_arg, grpc_cq_completion* storage),
+ void (*end_op)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
+ grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved);
@@ -273,23 +280,31 @@ struct grpc_completion_queue {
};
/* Forward declarations */
-static void cq_finish_shutdown_next(grpc_completion_queue* cq);
-static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
-static void cq_shutdown_next(grpc_completion_queue* cq);
-static void cq_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
+static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
+static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
+static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
-static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(void* done_arg,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
-static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(void* done_arg,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
@@ -331,7 +346,8 @@ grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout");
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(void* cq, grpc_error* error);
+static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* cq,
+ grpc_error* error);
void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event);
@@ -353,18 +369,19 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
if (storage != nullptr &&
(grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
*tag = storage->tag;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
*ok = (storage->next & (uintptr_t)(1)) == 1;
- storage->done(storage->done_arg, storage);
+ storage->done(&exec_ctx, storage->done_arg, storage);
ret = 1;
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(cq);
+ cq_finish_shutdown_next(&exec_ctx, cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down");
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
gpr_tls_set(&g_cached_event, (intptr_t)0);
gpr_tls_set(&g_cached_cq, (intptr_t)0);
@@ -389,22 +406,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
grpc_cq_completion* c = nullptr;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (gpr_spinlock_trylock(&q->queue_lock)) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
bool is_empty = false;
c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
if (c == nullptr && !is_empty) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
}
} else {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
}
+ grpc_exec_ctx_finish(&exec_ctx);
+
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
}
@@ -434,8 +453,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- grpc_core::ExecCtx exec_ctx;
- GRPC_STATS_INC_CQS_CREATED();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) +
vtable->data_size +
@@ -517,14 +537,15 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) {
gpr_ref(&cq->owning_refs);
}
-static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
+static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_completion_queue* cq = (grpc_completion_queue*)arg;
- GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
- const char* file, int line) {
+void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
+ const char* reason, const char* file, int line) {
if (grpc_trace_cq_refcount.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -532,11 +553,12 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
reason);
}
#else
-void grpc_cq_internal_unref(grpc_completion_queue* cq) {
+void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
#endif
if (gpr_unref(&cq->owning_refs)) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
- cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
+ cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
gpr_free(cq->outstanding_tags);
#endif
@@ -617,9 +639,11 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
-static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(void* done_arg,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
@@ -628,9 +652,9 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
(grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 6, (cq, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
@@ -665,7 +689,7 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
if (is_first) {
gpr_mu_lock(cq->mu);
grpc_error* kick_error =
- cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
+ cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
@@ -677,17 +701,17 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(cq);
+ cq_finish_shutdown_next(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
} else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_atm_rel_store(&cqd->pending_events, 0);
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(cq);
+ cq_finish_shutdown_next(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
}
@@ -699,9 +723,11 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_PLUCK) */
-static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(void* done_arg,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
@@ -713,9 +739,9 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
(grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 6, (cq, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
@@ -736,7 +762,7 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
cqd->completed_tail = storage;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(cq);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
} else {
grpc_pollset_worker* pluck_worker = nullptr;
@@ -748,7 +774,7 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
}
grpc_error* kick_error =
- cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
+ cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker);
gpr_mu_unlock(cq->mu);
@@ -765,10 +791,12 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
- void (*done)(void* done_arg, grpc_cq_completion* storage),
+void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
+ void* tag, grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
+ grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
- cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
+ cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
}
typedef struct {
@@ -780,40 +808,31 @@ typedef struct {
bool first_loop;
} cq_is_finished_arg;
-class ExecCtxNext : public grpc_core::ExecCtx {
- public:
- ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
+static bool cq_is_next_finished(grpc_exec_ctx* exec_ctx, void* arg) {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
+ grpc_completion_queue* cq = a->cq;
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
+ GPR_ASSERT(a->stolen_completion == nullptr);
- bool CheckReadyToFinish() override {
- cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_;
- grpc_completion_queue* cq = a->cq;
- cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
- GPR_ASSERT(a->stolen_completion == nullptr);
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- gpr_atm current_last_seen_things_queued_ever =
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- if (current_last_seen_things_queued_ever !=
- a->last_seen_things_queued_ever) {
- a->last_seen_things_queued_ever =
- gpr_atm_no_barrier_load(&cqd->things_queued_ever);
-
- /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
- * might return NULL in some cases even if the queue is not empty; but
- * that
- * is ok and doesn't affect correctness. Might effect the tail latencies a
- * bit) */
- a->stolen_completion = cq_event_queue_pop(&cqd->queue);
- if (a->stolen_completion != nullptr) {
- return true;
- }
+ /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
+ * might return NULL in some cases even if the queue is not empty; but
+ * that
+ * is ok and doesn't affect correctness. Might effect the tail latencies a
+ * bit) */
+ a->stolen_completion = cq_event_queue_pop(&cqd->queue);
+ if (a->stolen_completion != nullptr) {
+ return true;
}
- return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
}
-
- private:
- void* check_ready_to_finish_arg_;
-};
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
+}
#ifndef NDEBUG
static void dump_pending_tags(grpc_completion_queue* cq) {
@@ -868,7 +887,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
nullptr,
nullptr,
true};
- ExecCtxNext exec_ctx(&is_finished_arg);
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
for (;;) {
grpc_millis iteration_deadline = deadline_millis;
@@ -878,7 +898,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(&exec_ctx, c->done_arg, c);
break;
}
@@ -888,7 +908,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(&exec_ctx, c->done_arg, c);
break;
} else {
/* If c == NULL it means either the queue is empty OR in an transient
@@ -919,7 +939,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
}
if (!is_finished_arg.first_loop &&
- grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cq);
@@ -929,8 +949,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cq->mu);
cq->num_polls++;
- grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
- iteration_deadline);
+ grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
+ nullptr, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
@@ -949,13 +969,13 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_acq_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
- cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
+ cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(cq, "next");
-
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
GPR_TIMER_END("grpc_completion_queue_next", 0);
@@ -969,16 +989,19 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
- Must be called only once in completion queue's lifetime
- grpc_completion_queue_shutdown() MUST have been called before calling
this function */
-static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
+static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
- cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
}
-static void cq_shutdown_next(grpc_completion_queue* cq) {
+static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -991,7 +1014,7 @@ static void cq_shutdown_next(grpc_completion_queue* cq) {
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
return;
}
cqd->shutdown_called = true;
@@ -999,10 +1022,10 @@ static void cq_shutdown_next(grpc_completion_queue* cq) {
* cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
* on this counter without necessarily holding a lock on cq */
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_next(cq);
+ cq_finish_shutdown_next(exec_ctx, cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
@@ -1035,46 +1058,37 @@ static void del_plucker(grpc_completion_queue* cq, void* tag,
GPR_UNREACHABLE_CODE(return );
}
-class ExecCtxPluck : public grpc_core::ExecCtx {
- public:
- ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
-
- bool CheckReadyToFinish() override {
- cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_;
- grpc_completion_queue* cq = a->cq;
- cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
+static bool cq_is_pluck_finished(grpc_exec_ctx* exec_ctx, void* arg) {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
+ grpc_completion_queue* cq = a->cq;
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
- GPR_ASSERT(a->stolen_completion == nullptr);
- gpr_atm current_last_seen_things_queued_ever =
+ GPR_ASSERT(a->stolen_completion == nullptr);
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- if (current_last_seen_things_queued_ever !=
- a->last_seen_things_queued_ever) {
- gpr_mu_lock(cq->mu);
- a->last_seen_things_queued_ever =
- gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- grpc_cq_completion* c;
- grpc_cq_completion* prev = &cqd->completed_head;
- while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
- &cqd->completed_head) {
- if (c->tag == a->tag) {
- prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
- if (c == cqd->completed_tail) {
- cqd->completed_tail = prev;
- }
- gpr_mu_unlock(cq->mu);
- a->stolen_completion = c;
- return true;
+ grpc_cq_completion* c;
+ grpc_cq_completion* prev = &cqd->completed_head;
+ while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
+ &cqd->completed_head) {
+ if (c->tag == a->tag) {
+ prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
+ if (c == cqd->completed_tail) {
+ cqd->completed_tail = prev;
}
- prev = c;
+ gpr_mu_unlock(cq->mu);
+ a->stolen_completion = c;
+ return true;
}
- gpr_mu_unlock(cq->mu);
+ prev = c;
}
- return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
+ gpr_mu_unlock(cq->mu);
}
-
- private:
- void* check_ready_to_finish_arg_;
-};
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
+}
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved) {
@@ -1111,7 +1125,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
nullptr,
tag,
true};
- ExecCtxPluck exec_ctx(&is_finished_arg);
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != nullptr) {
gpr_mu_unlock(cq->mu);
@@ -1120,7 +1135,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(&exec_ctx, c->done_arg, c);
break;
}
prev = &cqd->completed_head;
@@ -1135,7 +1150,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(&exec_ctx, c->done_arg, c);
goto done;
}
prev = c;
@@ -1159,7 +1174,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
if (!is_finished_arg.first_loop &&
- grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
+ grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
@@ -1168,8 +1183,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
cq->num_polls++;
- grpc_error* err =
- cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
+ grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
+ &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
@@ -1187,8 +1202,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
-
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
GPR_TIMER_END("grpc_completion_queue_pluck", 0);
@@ -1201,19 +1216,22 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
+static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
- cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
}
/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
* merging them is a bit tricky and probably not worth it */
-static void cq_shutdown_pluck(grpc_completion_queue* cq) {
+static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -1226,25 +1244,25 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) {
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
return;
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(cq);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
- cq->vtable->shutdown(cq);
-
+ cq->vtable->shutdown(&exec_ctx, cq);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
@@ -1253,9 +1271,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
grpc_completion_queue_shutdown(cq);
- grpc_core::ExecCtx exec_ctx;
- GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
-
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}