aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2017-10-25 10:28:02 -0700
committerGravatar GitHub <noreply@github.com>2017-10-25 10:28:02 -0700
commit1bf7207852b4138c8a30e5a2f8f2c4bfffbba262 (patch)
treea49ce2246b55aa230802c909898a20d136f3561d /src/core
parent1bda5106421c5e6f449e6dbdd2214cdf31b26fcf (diff)
parent42bd87e376913939850bfa78a3c7f96ce83af11e (diff)
Merge pull request #13084 from kpayson64/cq_lambda
CompletionQueue DoThenAsyncNext
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/surface/completion_queue.cc116
-rw-r--r--src/core/lib/surface/completion_queue.h3
-rw-r--r--src/core/lib/surface/init.cc1
3 files changed, 89 insertions, 31 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 21664f03c8..5009f786e6 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include <grpc/support/tls.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/pollset.h"
@@ -48,6 +49,14 @@ grpc_tracer_flag grpc_trace_cq_refcount =
GRPC_TRACER_INITIALIZER(false, "cq_refcount");
#endif
+// Specifies a cq thread local cache.
+// The first event that occurs on a thread
+// with a cq cache will go into that cache, and
+// will only be returned on the thread that initialized the cache.
+// NOTE: Only one event will ever be cached.
+GPR_TLS_DECL(g_cached_event);
+GPR_TLS_DECL(g_cached_cq);
+
typedef struct {
grpc_pollset_worker **worker;
void *tag;
@@ -345,6 +354,46 @@ grpc_tracer_flag grpc_cq_event_timeout_trace =
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
grpc_error *error);
+void grpc_cq_global_init() {
+ gpr_tls_init(&g_cached_event);
+ gpr_tls_init(&g_cached_cq);
+}
+
+void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) {
+ if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) {
+ gpr_tls_set(&g_cached_event, (intptr_t)0);
+ gpr_tls_set(&g_cached_cq, (intptr_t)cq);
+ }
+}
+
+int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq,
+ void **tag, int *ok) {
+ grpc_cq_completion *storage =
+ (grpc_cq_completion *)gpr_tls_get(&g_cached_event);
+ int ret = 0;
+ if (storage != NULL &&
+ (grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) {
+ *tag = storage->tag;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ storage->done(&exec_ctx, storage->done_arg, storage);
+ *ok = (storage->next & (uintptr_t)(1)) == 1;
+ ret = 1;
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(&exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down");
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+ gpr_tls_set(&g_cached_event, (intptr_t)0);
+ gpr_tls_set(&g_cached_cq, (intptr_t)0);
+
+ return ret;
+}
+
static void cq_event_queue_init(grpc_cq_event_queue *q) {
gpr_mpscq_init(&q->queue);
q->queue_lock = GPR_SPINLOCK_INITIALIZER;
@@ -617,7 +666,6 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
-
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
@@ -628,44 +676,50 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
cq_check_tag(cq, tag, true); /* Used in debug builds only */
- /* Add the completion to the queue */
- bool is_first = cq_event_queue_push(&cqd->queue, storage);
- gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
-
- /* Since we do not hold the cq lock here, it is important to do an 'acquire'
- load here (instead of a 'no_barrier' load) to match with the release store
- (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
- */
- bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
-
- if (!will_definitely_shutdown) {
- /* Only kick if this is the first item queued */
- if (is_first) {
- gpr_mu_lock(cq->mu);
- grpc_error *kick_error =
- cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL);
- gpr_mu_unlock(cq->mu);
+ if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq &&
+ (grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) {
+ gpr_tls_set(&g_cached_event, (intptr_t)storage);
+ } else {
+ /* Add the completion to the queue */
+ bool is_first = cq_event_queue_push(&cqd->queue, storage);
+ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+
+ /* Since we do not hold the cq lock here, it is important to do an 'acquire'
+ load here (instead of a 'no_barrier' load) to match with the release
+ store
+ (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
+ */
+ bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
+
+ if (!will_definitely_shutdown) {
+ /* Only kick if this is the first item queued */
+ if (is_first) {
+ gpr_mu_lock(cq->mu);
+ grpc_error *kick_error =
+ cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
- if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
- gpr_log(GPR_ERROR, "Kick failed: %s", msg);
- GRPC_ERROR_UNREF(kick_error);
+ if (kick_error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(kick_error);
+ gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+ GRPC_ERROR_UNREF(kick_error);
+ }
}
- }
- if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ }
+ } else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_atm_rel_store(&cqd->pending_events, 0);
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
- } else {
- GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
- gpr_atm_rel_store(&cqd->pending_events, 0);
- gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(exec_ctx, cq);
- gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
GPR_TIMER_END("cq_end_op_for_next", 0);
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 69d144bd95..c02bc5da07 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -70,6 +70,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc)
#endif
+/* Initializes global variables used by completion queues */
+void grpc_cq_global_init();
+
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. Return true on success, and
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index b089da2c54..058e88f804 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -64,6 +64,7 @@ static void do_basic_init(void) {
gpr_log_verbosity_init();
gpr_mu_init(&g_init_mu);
grpc_register_built_in_plugins();
+ grpc_cq_global_init();
g_initializations = 0;
}