aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-04-24 19:45:20 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-04-24 19:45:20 -0700
commita72d79b1fc9528918b978e0df8aa7ebc4d375ffe (patch)
tree749f97941477e86623232ebee027bf06ba9c5c08 /src/core/lib/surface/completion_queue.c
parent76b1c0d84454a301bd9b45227a153d36e98f4232 (diff)
parentc090c619c117912b8c9e88a3c4bc8f778a94d582 (diff)
Merge branch 'master' into cq_mpsc_based
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c201
1 files changed, 176 insertions, 25 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index c4ee222043..122156e93c 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -61,6 +61,146 @@ typedef struct {
void *tag;
} plucker;
+typedef struct {
+ bool can_get_pollset;
+ bool can_listen;
+ size_t (*size)(void);
+ void (*init)(grpc_pollset *pollset, gpr_mu **mu);
+ grpc_error *(*kick)(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker);
+ grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker, gpr_timespec now,
+ gpr_timespec deadline);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure);
+ void (*destroy)(grpc_pollset *pollset);
+} cq_poller_vtable;
+
+typedef struct non_polling_worker {
+ gpr_cv cv;
+ bool kicked;
+ struct non_polling_worker *next;
+ struct non_polling_worker *prev;
+} non_polling_worker;
+
+typedef struct {
+ gpr_mu mu;
+ non_polling_worker *root;
+ grpc_closure *shutdown;
+} non_polling_poller;
+
+static size_t non_polling_poller_size(void) {
+ return sizeof(non_polling_poller);
+}
+
+static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
+ non_polling_poller *npp = (non_polling_poller *)pollset;
+ gpr_mu_init(&npp->mu);
+ *mu = &npp->mu;
+}
+
+static void non_polling_poller_destroy(grpc_pollset *pollset) {
+ non_polling_poller *npp = (non_polling_poller *)pollset;
+ gpr_mu_destroy(&npp->mu);
+}
+
+static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_pollset_worker **worker,
+ gpr_timespec now,
+ gpr_timespec deadline) {
+ non_polling_poller *npp = (non_polling_poller *)pollset;
+ if (npp->shutdown) return GRPC_ERROR_NONE;
+ non_polling_worker w;
+ gpr_cv_init(&w.cv);
+ if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
+ if (npp->root == NULL) {
+ npp->root = w.next = w.prev = &w;
+ } else {
+ w.next = npp->root;
+ w.prev = w.next->prev;
+ w.next->prev = w.prev->next = &w;
+ }
+ w.kicked = false;
+ while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
+ ;
+ if (&w == npp->root) {
+ npp->root = w.next;
+ if (&w == npp->root) {
+ if (npp->shutdown) {
+ grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
+ }
+ npp->root = NULL;
+ }
+ }
+ w.next->prev = w.prev;
+ w.prev->next = w.next;
+ gpr_cv_destroy(&w.cv);
+ if (worker != NULL) *worker = NULL;
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error *non_polling_poller_kick(
+ grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
+ non_polling_poller *p = (non_polling_poller *)pollset;
+ if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
+ if (specific_worker != NULL) {
+ non_polling_worker *w = (non_polling_worker *)specific_worker;
+ if (!w->kicked) {
+ w->kicked = true;
+ gpr_cv_signal(&w->cv);
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_closure *closure) {
+ non_polling_poller *p = (non_polling_poller *)pollset;
+ GPR_ASSERT(closure != NULL);
+ p->shutdown = closure;
+ if (p->root == NULL) {
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ } else {
+ non_polling_worker *w = p->root;
+ do {
+ gpr_cv_signal(&w->cv);
+ w = w->next;
+ } while (w != p->root);
+ }
+}
+
+static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
+ /* GRPC_CQ_DEFAULT_POLLING */
+ {.can_get_pollset = true,
+ .can_listen = true,
+ .size = grpc_pollset_size,
+ .init = grpc_pollset_init,
+ .kick = grpc_pollset_kick,
+ .work = grpc_pollset_work,
+ .shutdown = grpc_pollset_shutdown,
+ .destroy = grpc_pollset_destroy},
+ /* GRPC_CQ_NON_LISTENING */
+ {.can_get_pollset = true,
+ .can_listen = false,
+ .size = grpc_pollset_size,
+ .init = grpc_pollset_init,
+ .kick = grpc_pollset_kick,
+ .work = grpc_pollset_work,
+ .shutdown = grpc_pollset_shutdown,
+ .destroy = grpc_pollset_destroy},
+ /* GRPC_CQ_NON_POLLING */
+ {.can_get_pollset = false,
+ .can_listen = false,
+ .size = non_polling_poller_size,
+ .init = non_polling_poller_init,
+ .kick = non_polling_poller_kick,
+ .work = non_polling_poller_work,
+ .shutdown = non_polling_poller_shutdown,
+ .destroy = non_polling_poller_destroy},
+};
+
/* Queue that holds the cq_completion_events. This internally uses gpr_mpscq
* queue (a lockfree multiproducer single consumer queue). However this queue
* supports multiple consumers too. As such, it uses the queue_mu to serialize
@@ -86,10 +226,10 @@ struct grpc_completion_queue {
gpr_mu *mu;
grpc_cq_completion_type completion_type;
- grpc_cq_polling_type polling_type;
- /** Completed events (Only relevant if the completion_type is NOT
- * GRPC_CQ_NEXT) */
+ const cq_poller_vtable *poller_vtable;
+
+ /** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -188,15 +328,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
"polling_type=%d)",
2, (completion_type, polling_type));
- cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
- grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
+ const cq_poller_vtable *poller_vtable =
+ &g_poller_vtable_by_poller_type[polling_type];
+
+ cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
+ poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
#endif
cc->completion_type = completion_type;
- cc->polling_type = polling_type;
+ cc->poller_vtable = poller_vtable;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
@@ -257,7 +400,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
- grpc_pollset_destroy(POLLSET_FROM_CQ(cc));
+ cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc));
cq_event_queue_destroy(&cc->queue);
#ifndef NDEBUG
gpr_free(cc->outstanding_tags);
@@ -331,7 +474,8 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
int shutdown = gpr_unref(&cc->pending_events);
if (!shutdown) {
gpr_mu_lock(cc->mu);
- grpc_error *kick_error = grpc_pollset_kick(POLLSET_FROM_CQ(cc), NULL);
+
+ grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL);
gpr_mu_unlock(cc->mu);
if (kick_error != GRPC_ERROR_NONE) {
@@ -347,8 +491,8 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
gpr_atm_no_barrier_store(&cc->shutdown, 1);
gpr_mu_lock(cc->mu);
- grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
- &cc->pollset_shutdown_done);
+ cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
gpr_mu_unlock(cc->mu);
}
}
@@ -387,7 +531,8 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
}
grpc_error *kick_error =
- grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
+
gpr_mu_unlock(cc->mu);
if (kick_error != GRPC_ERROR_NONE) {
@@ -400,8 +545,8 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown));
GPR_ASSERT(cc->shutdown_called);
gpr_atm_no_barrier_store(&cc->shutdown, 1);
- grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
- &cc->pollset_shutdown_done);
+ cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
gpr_mu_unlock(cc->mu);
}
}
@@ -477,7 +622,6 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
return true;
}
}
-
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
@@ -534,6 +678,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
+
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cc->things_queued_ever),
@@ -601,9 +746,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
dump_pending_tags(cc);
break;
}
-
- /* Check alarms - these are a global resource so we just ping each time
- through on every pollset. May update deadline to ensure timely wakeups.*/
+ /* Check alarms - these are a global resource so we just ping
+ each time through on every pollset.
+ May update deadline to ensure timely wakeups.
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
grpc_exec_ctx_flush(&exec_ctx);
@@ -612,8 +757,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cc->mu);
- grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL,
- now, iteration_deadline);
+ grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ NULL, now, iteration_deadline);
gpr_mu_unlock(cc->mu);
if (err != GRPC_ERROR_NONE) {
@@ -805,8 +950,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(cc->mu);
} else {
- grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
- &worker, now, iteration_deadline);
+ grpc_error *err = cc->poller_vtable->work(
+ &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
@@ -850,8 +995,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->pending_events)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
- &cc->pollset_shutdown_done);
+ cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
}
gpr_mu_unlock(cc->mu);
grpc_exec_ctx_finish(&exec_ctx);
@@ -872,7 +1017,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return POLLSET_FROM_CQ(cc);
+ return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
}
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
@@ -893,4 +1038,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
-int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
+bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
+ return cc->is_server_cq;
+}
+
+bool grpc_cq_can_listen(grpc_completion_queue *cc) {
+ return cc->poller_vtable->can_listen;
+}