diff options
author | Sree Kuchibhotla <sreek@google.com> | 2017-04-24 19:45:20 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2017-04-24 19:45:20 -0700 |
commit | a72d79b1fc9528918b978e0df8aa7ebc4d375ffe (patch) | |
tree | 749f97941477e86623232ebee027bf06ba9c5c08 /src/core/lib/surface | |
parent | 76b1c0d84454a301bd9b45227a153d36e98f4232 (diff) | |
parent | c090c619c117912b8c9e88a3c4bc8f778a94d582 (diff) |
Merge branch 'master' into cq_mpsc_based
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 201 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 6 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 19 |
3 files changed, 181 insertions, 45 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index c4ee222043..122156e93c 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -61,6 +61,146 @@ typedef struct { void *tag; } plucker; +typedef struct { + bool can_get_pollset; + bool can_listen; + size_t (*size)(void); + void (*init)(grpc_pollset *pollset, gpr_mu **mu); + grpc_error *(*kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); + grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure); + void (*destroy)(grpc_pollset *pollset); +} cq_poller_vtable; + +typedef struct non_polling_worker { + gpr_cv cv; + bool kicked; + struct non_polling_worker *next; + struct non_polling_worker *prev; +} non_polling_worker; + +typedef struct { + gpr_mu mu; + non_polling_worker *root; + grpc_closure *shutdown; +} non_polling_poller; + +static size_t non_polling_poller_size(void) { + return sizeof(non_polling_poller); +} + +static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_init(&npp->mu); + *mu = &npp->mu; +} + +static void non_polling_poller_destroy(grpc_pollset *pollset) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_destroy(&npp->mu); +} + +static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker **worker, + gpr_timespec now, + gpr_timespec deadline) { + non_polling_poller *npp = (non_polling_poller *)pollset; + if (npp->shutdown) return GRPC_ERROR_NONE; + non_polling_worker w; + gpr_cv_init(&w.cv); + if (worker != NULL) *worker = (grpc_pollset_worker *)&w; + if (npp->root == NULL) { + npp->root = w.next = w.prev = &w; + } else { + w.next = npp->root; + w.prev = w.next->prev; + w.next->prev = w.prev->next = &w; + } + w.kicked = false; + while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + ; + if (&w == npp->root) { + npp->root = w.next; + if (&w == npp->root) { + if (npp->shutdown) { + grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); + } + npp->root = NULL; + } + } + w.next->prev = w.prev; + w.prev->next = w.next; + gpr_cv_destroy(&w.cv); + if (worker != NULL) *worker = NULL; + return GRPC_ERROR_NONE; +} + +static grpc_error *non_polling_poller_kick( + grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + non_polling_poller *p = (non_polling_poller *)pollset; + if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root; + if (specific_worker != NULL) { + non_polling_worker *w = (non_polling_worker *)specific_worker; + if (!w->kicked) { + w->kicked = true; + gpr_cv_signal(&w->cv); + } + } + return GRPC_ERROR_NONE; +} + +static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_closure *closure) { + non_polling_poller *p = (non_polling_poller *)pollset; + GPR_ASSERT(closure != NULL); + p->shutdown = closure; + if (p->root == NULL) { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + } else { + non_polling_worker *w = p->root; + do { + gpr_cv_signal(&w->cv); + w = w->next; + } while (w != p->root); + } +} + +static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { + /* GRPC_CQ_DEFAULT_POLLING */ + {.can_get_pollset = true, + .can_listen = true, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_LISTENING */ + {.can_get_pollset = true, + .can_listen = false, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_POLLING */ + {.can_get_pollset = false, + .can_listen = false, + .size = non_polling_poller_size, + .init = non_polling_poller_init, + .kick = non_polling_poller_kick, + .work = non_polling_poller_work, + .shutdown = non_polling_poller_shutdown, + .destroy = non_polling_poller_destroy}, +}; + /* Queue that holds the cq_completion_events. This internally uses gpr_mpscq * queue (a lockfree multiproducer single consumer queue). However this queue * supports multiple consumers too. As such, it uses the queue_mu to serialize @@ -86,10 +226,10 @@ struct grpc_completion_queue { gpr_mu *mu; grpc_cq_completion_type completion_type; - grpc_cq_polling_type polling_type; - /** Completed events (Only relevant if the completion_type is NOT - * GRPC_CQ_NEXT) */ + const cq_poller_vtable *poller_vtable; + + /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -188,15 +328,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); - cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); - grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); + const cq_poller_vtable *poller_vtable = + &g_poller_vtable_by_poller_type[polling_type]; + + cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); + poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; #endif cc->completion_type = completion_type; - cc->polling_type = polling_type; + cc->poller_vtable = poller_vtable; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); @@ -257,7 +400,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); + cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc)); cq_event_queue_destroy(&cc->queue); #ifndef NDEBUG gpr_free(cc->outstanding_tags); @@ -331,7 +474,8 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, int shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { gpr_mu_lock(cc->mu); - grpc_error *kick_error = grpc_pollset_kick(POLLSET_FROM_CQ(cc), NULL); + + grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); gpr_mu_unlock(cc->mu); if (kick_error != GRPC_ERROR_NONE) { @@ -347,8 +491,8 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, gpr_atm_no_barrier_store(&cc->shutdown, 1); gpr_mu_lock(cc->mu); - grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); } } @@ -387,7 +531,8 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); + gpr_mu_unlock(cc->mu); if (kick_error != GRPC_ERROR_NONE) { @@ -400,8 +545,8 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); GPR_ASSERT(cc->shutdown_called); gpr_atm_no_barrier_store(&cc->shutdown, 1); - grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); } } @@ -477,7 +622,6 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { return true; } } - return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } @@ -534,6 +678,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); + cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cc->things_queued_ever), @@ -601,9 +746,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, dump_pending_tags(cc); break; } - - /* Check alarms - these are a global resource so we just ping each time - through on every pollset. May update deadline to ensure timely wakeups.*/ + /* Check alarms - these are a global resource so we just ping + each time through on every pollset. + May update deadline to ensure timely wakeups. if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); grpc_exec_ctx_flush(&exec_ctx); @@ -612,8 +757,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, /* The main polling work happens in grpc_pollset_work */ gpr_mu_lock(cc->mu); - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, - now, iteration_deadline); + grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + NULL, now, iteration_deadline); gpr_mu_unlock(cc->mu); if (err != GRPC_ERROR_NONE) { @@ -805,8 +950,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(cc->mu); } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), - &worker, now, iteration_deadline); + grpc_error *err = cc->poller_vtable->work( + &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); if (err != GRPC_ERROR_NONE) { del_plucker(cc, tag, &worker); gpr_mu_unlock(cc->mu); @@ -850,8 +995,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } gpr_mu_unlock(cc->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -872,7 +1017,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return POLLSET_FROM_CQ(cc); + return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; } grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { @@ -893,4 +1038,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } -int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } +bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { + return cc->is_server_cq; +} + +bool grpc_cq_can_listen(grpc_completion_queue *cc) { + return cc->poller_vtable->can_listen; +} diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 9c8bc3b53a..f7eb148982 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -96,13 +96,11 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -int grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); -grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc); grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index e133d1d2a4..934ca0431a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -981,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = { static void register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - bool is_non_listening, void *reserved) { + void *reserved) { size_t i, n; GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { @@ -990,10 +990,6 @@ static void register_completion_queue(grpc_server *server, grpc_cq_mark_server_cq(cq); - if (is_non_listening) { - grpc_cq_mark_non_listening_server_cq(cq); - } - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1016,16 +1012,7 @@ void grpc_server_register_completion_queue(grpc_server *server, calls grpc_completion_queue_pluck() on server completion queues */ } - register_completion_queue(server, cq, false, reserved); -} - -void grpc_server_register_non_listening_completion_queue( - grpc_server *server, grpc_completion_queue *cq, void *reserved) { - GRPC_API_TRACE( - "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " - "reserved=%p)", - 3, (server, cq, reserved)); - register_completion_queue(server, cq, true, reserved); + register_completion_queue(server, cq, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { @@ -1121,7 +1108,7 @@ void grpc_server_start(grpc_server *server) { server->requested_calls_per_cq = gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { - if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { + if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } |