diff options
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 328 |
1 files changed, 173 insertions, 155 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 12385b7130..98d7e35943 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -62,12 +62,13 @@ typedef struct { bool can_listen; size_t (*size)(void); void (*init)(grpc_pollset* pollset, gpr_mu** mu); - grpc_error* (*kick)(grpc_pollset* pollset, + grpc_error* (*kick)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_pollset_worker* specific_worker); - grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker, - grpc_millis deadline); - void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure); - void (*destroy)(grpc_pollset* pollset); + grpc_error* (*work)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_pollset_worker** worker, grpc_millis deadline); + void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_closure* closure); + void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset); } cq_poller_vtable; typedef struct non_polling_worker { @@ -93,12 +94,14 @@ static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { *mu = &npp->mu; } -static void non_polling_poller_destroy(grpc_pollset* pollset) { +static void non_polling_poller_destroy(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset) { non_polling_poller* npp = (non_polling_poller*)pollset; gpr_mu_destroy(&npp->mu); } -static grpc_error* non_polling_poller_work(grpc_pollset* pollset, +static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset, grpc_pollset_worker** worker, grpc_millis deadline) { non_polling_poller* npp = (non_polling_poller*)pollset; @@ -119,12 +122,12 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; - grpc_core::ExecCtx::Get()->InvalidateNow(); + grpc_exec_ctx_invalidate_now(exec_ctx); if (&w == npp->root) { npp->root = w.next; if (&w == npp->root) { if (npp->shutdown) { - GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); } npp->root = nullptr; } @@ -137,7 +140,8 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, } static grpc_error* non_polling_poller_kick( - grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { + grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_pollset_worker* specific_worker) { non_polling_poller* p = (non_polling_poller*)pollset; if (specific_worker == nullptr) specific_worker = (grpc_pollset_worker*)p->root; @@ -151,13 +155,14 @@ static grpc_error* non_polling_poller_kick( return GRPC_ERROR_NONE; } -static void non_polling_poller_shutdown(grpc_pollset* pollset, +static void non_polling_poller_shutdown(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset, grpc_closure* closure) { non_polling_poller* p = (non_polling_poller*)pollset; GPR_ASSERT(closure != nullptr); p->shutdown = closure; if (p->root == nullptr) { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); } else { non_polling_worker* w = p->root; do { @@ -184,11 +189,13 @@ typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; void (*init)(void* data); - void (*shutdown)(grpc_completion_queue* cq); + void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); - void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), + void (*end_op)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, void* tag, + grpc_error* error, + void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg, + grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -273,23 +280,31 @@ struct grpc_completion_queue { }; /* Forward declarations */ -static void cq_finish_shutdown_next(grpc_completion_queue* cq); -static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); -static void cq_shutdown_next(grpc_completion_queue* cq); -static void cq_shutdown_pluck(grpc_completion_queue* cq); +static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq); +static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq); +static void cq_shutdown_next(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq); +static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq); static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, + void (*done)(grpc_exec_ctx* exec_ctx, + void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage); -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, + void (*done)(grpc_exec_ctx* exec_ctx, + void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage); @@ -331,7 +346,8 @@ grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout"); gpr_free(_ev); \ } -static void on_pollset_shutdown_done(void* cq, grpc_error* error); +static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* cq, + grpc_error* error); void grpc_cq_global_init() { gpr_tls_init(&g_cached_event); @@ -353,18 +369,19 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, if (storage != nullptr && (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) { *tag = storage->tag; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; *ok = (storage->next & (uintptr_t)(1)) == 1; - storage->done(storage->done_arg, storage); + storage->done(&exec_ctx, storage->done_arg, storage); ret = 1; cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(cq); + cq_finish_shutdown_next(&exec_ctx, cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down"); } + grpc_exec_ctx_finish(&exec_ctx); } gpr_tls_set(&g_cached_event, (intptr_t)0); gpr_tls_set(&g_cached_cq, (intptr_t)0); @@ -389,22 +406,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) { static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { grpc_cq_completion* c = nullptr; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (gpr_spinlock_trylock(&q->queue_lock)) { - GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx); bool is_empty = false; c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty); gpr_spinlock_unlock(&q->queue_lock); if (c == nullptr && !is_empty) { - GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(); + GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx); } } else { - GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx); } + grpc_exec_ctx_finish(&exec_ctx); + if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } @@ -434,8 +453,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal( const cq_poller_vtable* poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - grpc_core::ExecCtx exec_ctx; - GRPC_STATS_INC_CQS_CREATED(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_STATS_INC_CQS_CREATED(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + @@ -517,14 +537,15 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) { gpr_ref(&cq->owning_refs); } -static void on_pollset_shutdown_done(void* arg, grpc_error* error) { +static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_completion_queue* cq = (grpc_completion_queue*)arg; - GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason, - const char* file, int line) { +void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, + const char* reason, const char* file, int line) { if (grpc_trace_cq_refcount.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, @@ -532,11 +553,12 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason, reason); } #else -void grpc_cq_internal_unref(grpc_completion_queue* cq) { +void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq) { #endif if (gpr_unref(&cq->owning_refs)) { cq->vtable->destroy(DATA_FROM_CQ(cq)); - cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); + cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); #ifndef NDEBUG gpr_free(cq->outstanding_tags); #endif @@ -617,9 +639,11 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, + void (*done)(grpc_exec_ctx* exec_ctx, + void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage) { GPR_TIMER_BEGIN("cq_end_op_for_next", 0); @@ -628,9 +652,9 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { const char* errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(cq=%p, tag=%p, error=%s, " + "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 6, (cq, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } @@ -665,7 +689,7 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, if (is_first) { gpr_mu_lock(cq->mu); grpc_error* kick_error = - cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), nullptr); gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { @@ -677,17 +701,17 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(cq); + cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(cq); + cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } } @@ -699,9 +723,11 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_PLUCK) */ -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, +static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, + void (*done)(grpc_exec_ctx* exec_ctx, + void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage) { cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); @@ -713,9 +739,9 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { const char* errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 6, (cq, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } @@ -736,7 +762,7 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, cqd->completed_tail = storage; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_pluck(cq); + cq_finish_shutdown_pluck(exec_ctx, cq); gpr_mu_unlock(cq->mu); } else { grpc_pollset_worker* pluck_worker = nullptr; @@ -748,7 +774,7 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, } grpc_error* kick_error = - cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker); gpr_mu_unlock(cq->mu); @@ -765,10 +791,12 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), +void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, + void* tag, grpc_error* error, + void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg, + grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage) { - cq->vtable->end_op(cq, tag, error, done, done_arg, storage); + cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); } typedef struct { @@ -780,40 +808,31 @@ typedef struct { bool first_loop; } cq_is_finished_arg; -class ExecCtxNext : public grpc_core::ExecCtx { - public: - ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} +static bool cq_is_next_finished(grpc_exec_ctx* exec_ctx, void* arg) { + cq_is_finished_arg* a = (cq_is_finished_arg*)arg; + grpc_completion_queue* cq = a->cq; + cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); + GPR_ASSERT(a->stolen_completion == nullptr); - bool CheckReadyToFinish() override { - cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_; - grpc_completion_queue* cq = a->cq; - cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); - GPR_ASSERT(a->stolen_completion == nullptr); + gpr_atm current_last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cqd->things_queued_ever); - gpr_atm current_last_seen_things_queued_ever = + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { + a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); - if (current_last_seen_things_queued_ever != - a->last_seen_things_queued_ever) { - a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); - - /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but - * that - * is ok and doesn't affect correctness. Might effect the tail latencies a - * bit) */ - a->stolen_completion = cq_event_queue_pop(&cqd->queue); - if (a->stolen_completion != nullptr) { - return true; - } + /* Pop a cq_completion from the queue. Returns NULL if the queue is empty + * might return NULL in some cases even if the queue is not empty; but + * that + * is ok and doesn't affect correctness. Might effect the tail latencies a + * bit) */ + a->stolen_completion = cq_event_queue_pop(&cqd->queue); + if (a->stolen_completion != nullptr) { + return true; } - return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); } - - private: - void* check_ready_to_finish_arg_; -}; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); +} #ifndef NDEBUG static void dump_pending_tags(grpc_completion_queue* cq) { @@ -868,7 +887,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, nullptr, nullptr, true}; - ExecCtxNext exec_ctx(&is_finished_arg); + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); for (;;) { grpc_millis iteration_deadline = deadline_millis; @@ -878,7 +898,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(&exec_ctx, c->done_arg, c); break; } @@ -888,7 +908,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(&exec_ctx, c->done_arg, c); break; } else { /* If c == NULL it means either the queue is empty OR in an transient @@ -919,7 +939,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, } if (!is_finished_arg.first_loop && - grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -929,8 +949,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, /* The main polling work happens in grpc_pollset_work */ gpr_mu_lock(cq->mu); cq->num_polls++; - grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr, - iteration_deadline); + grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), + nullptr, iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { @@ -949,13 +969,13 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, if (cq_event_queue_num_items(&cqd->queue) > 0 && gpr_atm_acq_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); - cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); + cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), nullptr); gpr_mu_unlock(cq->mu); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(cq, "next"); - + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == nullptr); GPR_TIMER_END("grpc_completion_queue_next", 0); @@ -969,16 +989,19 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, - Must be called only once in completion queue's lifetime - grpc_completion_queue_shutdown() MUST have been called before calling this function */ -static void cq_finish_shutdown_next(grpc_completion_queue* cq) { +static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq) { cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); - cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); } -static void cq_shutdown_next(grpc_completion_queue* cq) { +static void cq_shutdown_next(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq) { cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: @@ -991,7 +1014,7 @@ static void cq_shutdown_next(grpc_completion_queue* cq) { gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); return; } cqd->shutdown_called = true; @@ -999,10 +1022,10 @@ static void cq_shutdown_next(grpc_completion_queue* cq) { * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write * on this counter without necessarily holding a lock on cq */ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_next(cq); + cq_finish_shutdown_next(exec_ctx, cq); } gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } grpc_event grpc_completion_queue_next(grpc_completion_queue* cq, @@ -1035,46 +1058,37 @@ static void del_plucker(grpc_completion_queue* cq, void* tag, GPR_UNREACHABLE_CODE(return ); } -class ExecCtxPluck : public grpc_core::ExecCtx { - public: - ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} - - bool CheckReadyToFinish() override { - cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_; - grpc_completion_queue* cq = a->cq; - cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); +static bool cq_is_pluck_finished(grpc_exec_ctx* exec_ctx, void* arg) { + cq_is_finished_arg* a = (cq_is_finished_arg*)arg; + grpc_completion_queue* cq = a->cq; + cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); - GPR_ASSERT(a->stolen_completion == nullptr); - gpr_atm current_last_seen_things_queued_ever = + GPR_ASSERT(a->stolen_completion == nullptr); + gpr_atm current_last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { + gpr_mu_lock(cq->mu); + a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); - if (current_last_seen_things_queued_ever != - a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); - a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); - grpc_cq_completion* c; - grpc_cq_completion* prev = &cqd->completed_head; - while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) != - &cqd->completed_head) { - if (c->tag == a->tag) { - prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cqd->completed_tail) { - cqd->completed_tail = prev; - } - gpr_mu_unlock(cq->mu); - a->stolen_completion = c; - return true; + grpc_cq_completion* c; + grpc_cq_completion* prev = &cqd->completed_head; + while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) != + &cqd->completed_head) { + if (c->tag == a->tag) { + prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; } - prev = c; + gpr_mu_unlock(cq->mu); + a->stolen_completion = c; + return true; } - gpr_mu_unlock(cq->mu); + prev = c; } - return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); + gpr_mu_unlock(cq->mu); } - - private: - void* check_ready_to_finish_arg_; -}; + return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); +} static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved) { @@ -1111,7 +1125,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, nullptr, tag, true}; - ExecCtxPluck exec_ctx(&is_finished_arg); + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != nullptr) { gpr_mu_unlock(cq->mu); @@ -1120,7 +1135,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(&exec_ctx, c->done_arg, c); break; } prev = &cqd->completed_head; @@ -1135,7 +1150,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(&exec_ctx, c->done_arg, c); goto done; } prev = c; @@ -1159,7 +1174,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, break; } if (!is_finished_arg.first_loop && - grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { + grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1168,8 +1183,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, break; } cq->num_polls++; - grpc_error* err = - cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis); + grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), + &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); @@ -1187,8 +1202,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(cq, "pluck"); - + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == nullptr); GPR_TIMER_END("grpc_completion_queue_pluck", 0); @@ -1201,19 +1216,22 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, return cq->vtable->pluck(cq, tag, deadline, reserved); } -static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) { +static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq) { cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); } /* NOTE: This function is almost exactly identical to cq_shutdown_next() but * merging them is a bit tricky and probably not worth it */ -static void cq_shutdown_pluck(grpc_completion_queue* cq) { +static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx, + grpc_completion_queue* cq) { cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: @@ -1226,25 +1244,25 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) { gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); return; } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_pluck(cq); + cq_finish_shutdown_pluck(exec_ctx, cq); } gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); } /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); - cq->vtable->shutdown(cq); - + cq->vtable->shutdown(&exec_ctx, cq); + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -1253,9 +1271,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue* cq) { GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cq); - grpc_core::ExecCtx exec_ctx; - GRPC_CQ_INTERNAL_UNREF(cq, "destroy"); - + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } |