diff options
author | 2017-12-08 09:30:40 -0800 | |
---|---|---|
committer | 2017-12-08 09:30:40 -0800 | |
commit | 94e676e10f8c739289924b8458a246699e3623ce (patch) | |
tree | e223fa15a4c6e22582771a456bfb58d844c96485 /src/core/lib/surface/completion_queue.cc | |
parent | a25697095b39ce4014457a64c2917c5abadbe998 (diff) | |
parent | 9e5dc246eebdc3b7c7dc4d75f35d3697bee90d9a (diff) |
Merge pull request #13659 from grpc/revert-13658-revert-13058-execctx
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 328 |
1 files changed, 155 insertions, 173 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 98d7e35943..12385b7130 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -62,13 +62,12 @@ typedef struct { bool can_listen; size_t (*size)(void); void (*init)(grpc_pollset* pollset, gpr_mu** mu); - grpc_error* (*kick)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_error* (*kick)(grpc_pollset* pollset, grpc_pollset_worker* specific_worker); - grpc_error* (*work)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, - grpc_pollset_worker** worker, grpc_millis deadline); - void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, - grpc_closure* closure); - void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset); + grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker, + grpc_millis deadline); + void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure); + void (*destroy)(grpc_pollset* pollset); } cq_poller_vtable; typedef struct non_polling_worker { @@ -94,14 +93,12 @@ static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { *mu = &npp->mu; } -static void non_polling_poller_destroy(grpc_exec_ctx* exec_ctx, - grpc_pollset* pollset) { +static void non_polling_poller_destroy(grpc_pollset* pollset) { non_polling_poller* npp = (non_polling_poller*)pollset; gpr_mu_destroy(&npp->mu); } -static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx, - grpc_pollset* pollset, +static grpc_error* non_polling_poller_work(grpc_pollset* pollset, grpc_pollset_worker** worker, grpc_millis deadline) { non_polling_poller* npp = (non_polling_poller*)pollset; @@ -122,12 +119,12 @@ static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx, while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; - grpc_exec_ctx_invalidate_now(exec_ctx); + grpc_core::ExecCtx::Get()->InvalidateNow(); if (&w == npp->root) { npp->root = w.next; if (&w == npp->root) { if (npp->shutdown) { - GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE); } npp->root = nullptr; } @@ -140,8 +137,7 @@ static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx, } static grpc_error* non_polling_poller_kick( - grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, - grpc_pollset_worker* specific_worker) { + grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { non_polling_poller* p = (non_polling_poller*)pollset; if (specific_worker == nullptr) specific_worker = (grpc_pollset_worker*)p->root; @@ -155,14 +151,13 @@ static grpc_error* non_polling_poller_kick( return GRPC_ERROR_NONE; } -static void non_polling_poller_shutdown(grpc_exec_ctx* exec_ctx, - grpc_pollset* pollset, +static void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) { non_polling_poller* p = (non_polling_poller*)pollset; GPR_ASSERT(closure != nullptr); p->shutdown = closure; if (p->root == nullptr) { - GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); } else { non_polling_worker* w = p->root; do { @@ -189,13 +184,11 @@ typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; void (*init)(void* data); - void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq); + void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); - void (*end_op)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg, - grpc_cq_completion* storage), + void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -280,31 +273,23 @@ struct grpc_completion_queue { }; /* Forward declarations */ -static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq); -static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq); -static void cq_shutdown_next(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq); -static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq); +static void cq_finish_shutdown_next(grpc_completion_queue* cq); +static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); +static void cq_shutdown_next(grpc_completion_queue* cq); +static void cq_shutdown_pluck(grpc_completion_queue* cq); static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); -static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(grpc_exec_ctx* exec_ctx, - void* done_arg, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage); -static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(grpc_exec_ctx* exec_ctx, - void* done_arg, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage); @@ -346,8 +331,7 @@ grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout"); gpr_free(_ev); \ } -static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* cq, - grpc_error* error); +static void on_pollset_shutdown_done(void* cq, grpc_error* error); void grpc_cq_global_init() { gpr_tls_init(&g_cached_event); @@ -369,19 +353,18 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, if (storage != nullptr && (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) { *tag = storage->tag; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; *ok = (storage->next & (uintptr_t)(1)) == 1; - storage->done(&exec_ctx, storage->done_arg, storage); + storage->done(storage->done_arg, storage); ret = 1; cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(&exec_ctx, cq); + cq_finish_shutdown_next(cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } - grpc_exec_ctx_finish(&exec_ctx); } gpr_tls_set(&g_cached_event, (intptr_t)0); gpr_tls_set(&g_cached_cq, (intptr_t)0); @@ -406,24 +389,22 @@ static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) { static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { grpc_cq_completion* c = nullptr; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; if (gpr_spinlock_trylock(&q->queue_lock)) { - GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); bool is_empty = false; c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty); gpr_spinlock_unlock(&q->queue_lock); if (c == nullptr && !is_empty) { - GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx); + GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(); } } else { - GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(); } - grpc_exec_ctx_finish(&exec_ctx); - if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } @@ -453,9 +434,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal( const cq_poller_vtable* poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_STATS_INC_CQS_CREATED(&exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); + grpc_core::ExecCtx exec_ctx; + GRPC_STATS_INC_CQS_CREATED(); cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + @@ -537,15 +517,14 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) { gpr_ref(&cq->owning_refs); } -static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void on_pollset_shutdown_done(void* arg, grpc_error* error) { grpc_completion_queue* cq = (grpc_completion_queue*)arg; - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); + GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, - const char* reason, const char* file, int line) { +void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason, + const char* file, int line) { if (grpc_trace_cq_refcount.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, @@ -553,12 +532,11 @@ void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, reason); } #else -void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq) { +void grpc_cq_internal_unref(grpc_completion_queue* cq) { #endif if (gpr_unref(&cq->owning_refs)) { cq->vtable->destroy(DATA_FROM_CQ(cq)); - cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); + cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); #ifndef NDEBUG gpr_free(cq->outstanding_tags); #endif @@ -639,11 +617,9 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ -static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(grpc_exec_ctx* exec_ctx, - void* done_arg, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage) { GPR_TIMER_BEGIN("cq_end_op_for_next", 0); @@ -652,9 +628,9 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { const char* errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " + "cq_end_op_for_next(cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); + 6, (cq, tag, errmsg, done, done_arg, storage)); if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } @@ -689,7 +665,7 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, if (is_first) { gpr_mu_lock(cq->mu); grpc_error* kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), nullptr); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { @@ -701,17 +677,17 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); + cq_finish_shutdown_next(cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); + cq_finish_shutdown_next(cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } } @@ -723,11 +699,9 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_PLUCK) */ -static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(grpc_exec_ctx* exec_ctx, - void* done_arg, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage) { cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); @@ -739,9 +713,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { const char* errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); + 6, (cq, tag, errmsg, done, done_arg, storage)); if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } @@ -762,7 +736,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, cqd->completed_tail = storage; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_pluck(exec_ctx, cq); + cq_finish_shutdown_pluck(cq); gpr_mu_unlock(cq->mu); } else { grpc_pollset_worker* pluck_worker = nullptr; @@ -774,7 +748,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, } grpc_error* kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); gpr_mu_unlock(cq->mu); @@ -791,12 +765,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, - void* tag, grpc_error* error, - void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg, - grpc_cq_completion* storage), +void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage) { - cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); + cq->vtable->end_op(cq, tag, error, done, done_arg, storage); } typedef struct { @@ -808,31 +780,40 @@ typedef struct { bool first_loop; } cq_is_finished_arg; -static bool cq_is_next_finished(grpc_exec_ctx* exec_ctx, void* arg) { - cq_is_finished_arg* a = (cq_is_finished_arg*)arg; - grpc_completion_queue* cq = a->cq; - cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); - GPR_ASSERT(a->stolen_completion == nullptr); +class ExecCtxNext : public grpc_core::ExecCtx { + public: + ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} - gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + bool CheckReadyToFinish() override { + cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_; + grpc_completion_queue* cq = a->cq; + cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); + GPR_ASSERT(a->stolen_completion == nullptr); - if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - a->last_seen_things_queued_ever = + gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); - /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but - * that - * is ok and doesn't affect correctness. Might effect the tail latencies a - * bit) */ - a->stolen_completion = cq_event_queue_pop(&cqd->queue); - if (a->stolen_completion != nullptr) { - return true; + if (current_last_seen_things_queued_ever != + a->last_seen_things_queued_ever) { + a->last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + + /* Pop a cq_completion from the queue. Returns NULL if the queue is empty + * might return NULL in some cases even if the queue is not empty; but + * that + * is ok and doesn't affect correctness. Might effect the tail latencies a + * bit) */ + a->stolen_completion = cq_event_queue_pop(&cqd->queue); + if (a->stolen_completion != nullptr) { + return true; + } } + return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); } - return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); -} + + private: + void* check_ready_to_finish_arg_; +}; #ifndef NDEBUG static void dump_pending_tags(grpc_completion_queue* cq) { @@ -887,8 +868,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, nullptr, nullptr, true}; - grpc_exec_ctx exec_ctx = - GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); + ExecCtxNext exec_ctx(&is_finished_arg); for (;;) { grpc_millis iteration_deadline = deadline_millis; @@ -898,7 +878,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); break; } @@ -908,7 +888,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); break; } else { /* If c == NULL it means either the queue is empty OR in an transient @@ -939,7 +919,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, } if (!is_finished_arg.first_loop && - grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { + grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -949,8 +929,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, /* The main polling work happens in grpc_pollset_work */ gpr_mu_lock(cq->mu); cq->num_polls++; - grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - nullptr, iteration_deadline); + grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr, + iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { @@ -969,13 +949,13 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, if (cq_event_queue_num_items(&cqd->queue) > 0 && gpr_atm_acq_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); - cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), nullptr); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); gpr_mu_unlock(cq->mu); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CQ_INTERNAL_UNREF(cq, "next"); + GPR_ASSERT(is_finished_arg.stolen_completion == nullptr); GPR_TIMER_END("grpc_completion_queue_next", 0); @@ -989,19 +969,16 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, - Must be called only once in completion queue's lifetime - grpc_completion_queue_shutdown() MUST have been called before calling this function */ -static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq) { +static void cq_finish_shutdown_next(grpc_completion_queue* cq) { cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); - cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); + cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } -static void cq_shutdown_next(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq) { +static void cq_shutdown_next(grpc_completion_queue* cq) { cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: @@ -1014,7 +991,7 @@ static void cq_shutdown_next(grpc_exec_ctx* exec_ctx, gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); return; } cqd->shutdown_called = true; @@ -1022,10 +999,10 @@ static void cq_shutdown_next(grpc_exec_ctx* exec_ctx, * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write * on this counter without necessarily holding a lock on cq */ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_next(exec_ctx, cq); + cq_finish_shutdown_next(cq); } gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } grpc_event grpc_completion_queue_next(grpc_completion_queue* cq, @@ -1058,37 +1035,46 @@ static void del_plucker(grpc_completion_queue* cq, void* tag, GPR_UNREACHABLE_CODE(return ); } -static bool cq_is_pluck_finished(grpc_exec_ctx* exec_ctx, void* arg) { - cq_is_finished_arg* a = (cq_is_finished_arg*)arg; - grpc_completion_queue* cq = a->cq; - cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); +class ExecCtxPluck : public grpc_core::ExecCtx { + public: + ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} - GPR_ASSERT(a->stolen_completion == nullptr); - gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); - if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); - a->last_seen_things_queued_ever = + bool CheckReadyToFinish() override { + cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_; + grpc_completion_queue* cq = a->cq; + cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); + + GPR_ASSERT(a->stolen_completion == nullptr); + gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); - grpc_cq_completion* c; - grpc_cq_completion* prev = &cqd->completed_head; - while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) != - &cqd->completed_head) { - if (c->tag == a->tag) { - prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cqd->completed_tail) { - cqd->completed_tail = prev; + if (current_last_seen_things_queued_ever != + a->last_seen_things_queued_ever) { + gpr_mu_lock(cq->mu); + a->last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + grpc_cq_completion* c; + grpc_cq_completion* prev = &cqd->completed_head; + while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) != + &cqd->completed_head) { + if (c->tag == a->tag) { + prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; + } + gpr_mu_unlock(cq->mu); + a->stolen_completion = c; + return true; } - gpr_mu_unlock(cq->mu); - a->stolen_completion = c; - return true; + prev = c; } - prev = c; + gpr_mu_unlock(cq->mu); } - gpr_mu_unlock(cq->mu); + return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); } - return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); -} + + private: + void* check_ready_to_finish_arg_; +}; static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved) { @@ -1125,8 +1111,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, nullptr, tag, true}; - grpc_exec_ctx exec_ctx = - GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); + ExecCtxPluck exec_ctx(&is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != nullptr) { gpr_mu_unlock(cq->mu); @@ -1135,7 +1120,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); break; } prev = &cqd->completed_head; @@ -1150,7 +1135,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); goto done; } prev = c; @@ -1174,7 +1159,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, break; } if (!is_finished_arg.first_loop && - grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { + grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1183,8 +1168,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, break; } cq->num_polls++; - grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - &worker, deadline_millis); + grpc_error* err = + cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); @@ -1202,8 +1187,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CQ_INTERNAL_UNREF(cq, "pluck"); + GPR_ASSERT(is_finished_arg.stolen_completion == nullptr); GPR_TIMER_END("grpc_completion_queue_pluck", 0); @@ -1216,22 +1201,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, return cq->vtable->pluck(cq, tag, deadline, reserved); } -static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq) { +static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) { cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); + cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } /* NOTE: This function is almost exactly identical to cq_shutdown_next() but * merging them is a bit tricky and probably not worth it */ -static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx, - grpc_completion_queue* cq) { +static void cq_shutdown_pluck(grpc_completion_queue* cq) { cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: @@ -1244,25 +1226,25 @@ static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx, gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); return; } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_pluck(exec_ctx, cq); + cq_finish_shutdown_pluck(cq); } gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); } /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); - cq->vtable->shutdown(&exec_ctx, cq); - grpc_exec_ctx_finish(&exec_ctx); + cq->vtable->shutdown(cq); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -1271,9 +1253,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue* cq) { GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cq); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); + grpc_core::ExecCtx exec_ctx; + GRPC_CQ_INTERNAL_UNREF(cq, "destroy"); + GPR_TIMER_END("grpc_completion_queue_destroy", 0); } |