diff options
author | kpayson64 <kpayson@google.com> | 2017-10-25 10:28:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-25 10:28:02 -0700 |
commit | 1bf7207852b4138c8a30e5a2f8f2c4bfffbba262 (patch) | |
tree | a49ce2246b55aa230802c909898a20d136f3561d /src/core | |
parent | 1bda5106421c5e6f449e6dbdd2214cdf31b26fcf (diff) | |
parent | 42bd87e376913939850bfa78a3c7f96ce83af11e (diff) |
Merge pull request #13084 from kpayson64/cq_lambda
CompletionQueue DoThenAsyncNext
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 116 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/init.cc | 1 |
3 files changed, 89 insertions, 31 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 21664f03c8..5009f786e6 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -28,6 +28,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include <grpc/support/tls.h> #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/pollset.h" @@ -48,6 +49,14 @@ grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false, "cq_refcount"); #endif +// Specifies a cq thread local cache. +// The first event that occurs on a thread +// with a cq cache will go into that cache, and +// will only be returned on the thread that initialized the cache. +// NOTE: Only one event will ever be cached. +GPR_TLS_DECL(g_cached_event); +GPR_TLS_DECL(g_cached_cq); + typedef struct { grpc_pollset_worker **worker; void *tag; @@ -345,6 +354,46 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); +void grpc_cq_global_init() { + gpr_tls_init(&g_cached_event); + gpr_tls_init(&g_cached_cq); +} + +void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) { + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)cq); + } +} + +int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, + void **tag, int *ok) { + grpc_cq_completion *storage = + (grpc_cq_completion *)gpr_tls_get(&g_cached_event); + int ret = 0; + if (storage != NULL && + (grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) { + *tag = storage->tag; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + storage->done(&exec_ctx, storage->done_arg, storage); + *ok = (storage->next & (uintptr_t)(1)) == 1; + ret = 1; + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(&exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down"); + } + grpc_exec_ctx_finish(&exec_ctx); + } + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)0); + + return ret; +} + static void cq_event_queue_init(grpc_cq_event_queue *q) { gpr_mpscq_init(&q->queue); q->queue_lock = GPR_SPINLOCK_INITIALIZER; @@ -617,7 +666,6 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); @@ -628,44 +676,50 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, cq_check_tag(cq, tag, true); /* Used in debug builds only */ - /* Add the completion to the queue */ - bool is_first = cq_event_queue_push(&cqd->queue, storage); - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - /* Since we do not hold the cq lock here, it is important to do an 'acquire' - load here (instead of a 'no_barrier' load) to match with the release store - (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next - */ - bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; - - if (!will_definitely_shutdown) { - /* Only kick if this is the first item queued */ - if (is_first) { - gpr_mu_lock(cq->mu); - grpc_error *kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); - gpr_mu_unlock(cq->mu); + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq && + (grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)storage); + } else { + /* Add the completion to the queue */ + bool is_first = cq_event_queue_push(&cqd->queue, storage); + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + + /* Since we do not hold the cq lock here, it is important to do an 'acquire' + load here (instead of a 'no_barrier' load) to match with the release + store + (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + */ + bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - GRPC_ERROR_UNREF(kick_error); + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } } - } - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + } + } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } - } else { - GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); - gpr_atm_rel_store(&cqd->pending_events, 0); - gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); - gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 69d144bd95..c02bc5da07 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -70,6 +70,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc) #endif +/* Initializes global variables used by completion queues */ +void grpc_cq_global_init(); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. Return true on success, and diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index b089da2c54..058e88f804 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -64,6 +64,7 @@ static void do_basic_init(void) { gpr_log_verbosity_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); + grpc_cq_global_init(); g_initializations = 0; } |