diff options
author | 2017-05-16 13:42:07 -0700 | |
---|---|---|
committer | 2017-05-16 13:42:07 -0700 | |
commit | 321b7d8fb0dbe40f27685f733a3098bd4fd9c77b (patch) | |
tree | 3386f15aa6d894f972048bd08ab25e0cce489441 /src/core/lib/surface | |
parent | 8996208bc3af635160964a784ea643e28c6bd9e0 (diff) | |
parent | bcb65f0bd533ff0c7babf3b06a424d2bd5f1d37d (diff) |
Merge github.com:grpc/grpc into thread_pool
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 687 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 12 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 112 |
3 files changed, 525 insertions, 286 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index df5b70205c..b0a4b1fbcc 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -30,7 +30,6 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - #include "src/core/lib/surface/completion_queue.h" #include <stdio.h> @@ -45,6 +44,7 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -201,33 +201,68 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { .destroy = non_polling_poller_destroy}, }; -/* Completion queue structure */ -struct grpc_completion_queue { - /** owned by pollset */ +typedef struct cq_vtable { + grpc_cq_completion_type cq_completion_type; + size_t (*size)(); + void (*begin_op)(grpc_completion_queue *cc, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved); + grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved); +} cq_vtable; + +/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue + * (a lockfree multiproducer single consumer queue). It uses a queue_lock + * to support multiple consumers. + * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */ +typedef struct grpc_cq_event_queue { + /* Spinlock to serialize consumers i.e pop() operations */ + gpr_spinlock queue_lock; + + gpr_mpscq queue; + + /* A lazy counter of number of items in the queue. This is NOT atomically + incremented/decremented along with push/pop operations and hence is only + eventually consistent */ + gpr_atm num_queue_items; +} grpc_cq_event_queue; + +/* TODO: sreek Refactor this based on the completion_type. Put completion-type + * specific data in a different structure (and co-allocate memory for it along + * with completion queue + pollset )*/ +typedef struct cq_data { gpr_mu *mu; - grpc_cq_completion_type completion_type; - - const cq_poller_vtable *poller_vtable; - - /** completed events */ + /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; + + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; + /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** counter of how many things have ever been queued on this completion queue + + /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; + /** 0 initially, 1 once we've begun shutting down */ - int shutdown; + gpr_atm shutdown; int shutdown_called; + int is_server_cq; - /** Can the server cq accept incoming channels */ - /* TODO: sreek - This will no longer be needed. Use polling_type set */ - int is_non_listening_server_cq; + int num_pluckers; + int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; @@ -236,8 +271,61 @@ struct grpc_completion_queue { size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif +} cq_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + cq_data data; + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; +}; - grpc_completion_queue *next_free; +/* Forward declarations */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc); + +static size_t cq_size(grpc_completion_queue *cc); + +static void cq_begin_op(grpc_completion_queue *cc, void *tag); + +static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + +static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + +static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved); + +static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved); + +/* Completion queue vtables based on the completion-type */ +static const cq_vtable g_cq_vtable[] = { + /* GRPC_CQ_NEXT */ + {.cq_completion_type = GRPC_CQ_NEXT, + .size = cq_size, + .begin_op = cq_begin_op, + .end_op = cq_end_op_for_next, + .next = cq_next, + .pluck = NULL}, + /* GRPC_CQ_PLUCK */ + {.cq_completion_type = GRPC_CQ_PLUCK, + .size = cq_size, + .begin_op = cq_begin_op, + .end_op = cq_end_op_for_pluck, + .next = NULL, + .pluck = cq_pluck}, }; #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) @@ -258,6 +346,47 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, grpc_error *error); +static void cq_event_queue_init(grpc_cq_event_queue *q) { + gpr_mpscq_init(&q->queue); + q->queue_lock = GPR_SPINLOCK_INITIALIZER; + gpr_atm_no_barrier_store(&q->num_queue_items, 0); +} + +static void cq_event_queue_destroy(grpc_cq_event_queue *q) { + gpr_mpscq_destroy(&q->queue); +} + +static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { + gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); +} + +static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { + grpc_cq_completion *c = NULL; + if (gpr_spinlock_trylock(&q->queue_lock)) { + c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + gpr_spinlock_unlock(&q->queue_lock); + } + + if (c) { + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); + } + + return c; +} + +/* Note: The counter is not incremented/decremented atomically with push/pop. + * The count is only eventually consistent */ +static long cq_event_queue_num_items(grpc_cq_event_queue *q) { + return (long)gpr_atm_no_barrier_load(&q->num_queue_items); +} + +static size_t cq_size(grpc_completion_queue *cc) { + /* Size of the completion queue and the size of the pollset whose memory is + allocated right after that of completion queue */ + return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); +} + grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { @@ -270,35 +399,40 @@ grpc_completion_queue *grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); + const cq_vtable *vtable = &g_cq_vtable[completion_type]; const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu); -#ifndef NDEBUG - cc->outstanding_tags = NULL; - cc->outstanding_tag_capacity = 0; -#endif + cq_data *cqd = &cc->data; - cc->completion_type = completion_type; + cc->vtable = vtable; cc->poller_vtable = poller_vtable; + poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + +#ifndef NDEBUG + cqd->outstanding_tags = NULL; + cqd->outstanding_tag_capacity = 0; +#endif + /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cc->pending_events, 1); + gpr_ref_init(&cqd->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cc->owning_refs, 2); - cc->completed_tail = &cc->completed_head; - cc->completed_head.next = (uintptr_t)cc->completed_tail; - cc->shutdown = 0; - cc->shutdown_called = 0; - cc->is_server_cq = 0; - cc->is_non_listening_server_cq = 0; - cc->num_pluckers = 0; - gpr_atm_no_barrier_store(&cc->things_queued_ever, 0); + gpr_ref_init(&cqd->owning_refs, 2); + cqd->completed_tail = &cqd->completed_head; + cqd->completed_head.next = (uintptr_t)cqd->completed_tail; + gpr_atm_no_barrier_store(&cqd->shutdown, 0); + cqd->shutdown_called = 0; + cqd->is_server_cq = 0; + cqd->num_pluckers = 0; + cqd->num_polls = 0; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); #ifndef NDEBUG - cc->outstanding_tag_count = 0; + cqd->outstanding_tag_count = 0; #endif - grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, + cq_event_queue_init(&cqd->queue); + grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); GPR_TIMER_END("grpc_completion_queue_create_internal", 0); @@ -307,18 +441,28 @@ grpc_completion_queue *grpc_completion_queue_create_internal( } grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->completion_type; + return cc->vtable->cq_completion_type; +} + +int grpc_get_cq_poll_num(grpc_completion_queue *cc) { + int cur_num_polls; + gpr_mu_lock(cc->data.mu); + cur_num_polls = cc->data.num_polls; + gpr_mu_unlock(cc->data.mu); + return cur_num_polls; } #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { + cq_data *cqd = &cc->data; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, - (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); + (int)cqd->owning_refs.count, (int)cqd->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; #endif - gpr_ref(&cc->owning_refs); + gpr_ref(&cqd->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -328,63 +472,95 @@ static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - const char *reason, const char *file, int line) { +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + cq_data *cqd = &cc->data; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, - (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); + (int)cqd->owning_refs.count, (int)cqd->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; #endif - if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); + if (gpr_unref(&cqd->owning_refs)) { + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); + cq_event_queue_destroy(&cqd->queue); #ifndef NDEBUG - gpr_free(cc->outstanding_tags); + gpr_free(cqd->outstanding_tags); #endif gpr_free(cc); } } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { +static void cq_begin_op(grpc_completion_queue *cc, void *tag) { + cq_data *cqd = &cc->data; #ifndef NDEBUG - gpr_mu_lock(cc->mu); - GPR_ASSERT(!cc->shutdown_called); - if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { - cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); - cc->outstanding_tags = - gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) * - cc->outstanding_tag_capacity); + gpr_mu_lock(cqd->mu); + GPR_ASSERT(!cqd->shutdown_called); + if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { + cqd->outstanding_tag_capacity = + GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); + cqd->outstanding_tags = + gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * + cqd->outstanding_tag_capacity); } - cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(cc->mu); + cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; + gpr_mu_unlock(cqd->mu); #endif - gpr_ref(&cc->pending_events); + gpr_ref(&cqd->pending_events); +} + +void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { + cc->vtable->begin_op(cc, tag); } -/* Signal the end of an operation - if this is the last waiting-to-be-queued - event, then enter shutdown mode */ -/* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { - int shutdown; - int i; - grpc_pollset_worker *pluck_worker; #ifndef NDEBUG +static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { + cq_data *cqd = &cc->data; int found = 0; + if (lock_cq) { + gpr_mu_lock(cqd->mu); + } + + for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { + if (cqd->outstanding_tags[i] == tag) { + cqd->outstanding_tag_count--; + GPR_SWAP(void *, cqd->outstanding_tags[i], + cqd->outstanding_tags[cqd->outstanding_tag_count]); + found = 1; + break; + } + } + + if (lock_cq) { + gpr_mu_unlock(cqd->mu); + } + + GPR_ASSERT(found); +} +#else +static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} #endif - GPR_TIMER_BEGIN("grpc_cq_end_op", 0); +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_NEXT) */ +static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + GPR_TIMER_BEGIN("cq_end_op_for_next", 0); + if (GRPC_TRACER_ON(grpc_api_trace) || (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " - "done_arg=%p, storage=%p)", + "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { @@ -392,41 +568,100 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, } } + cq_data *cqd = &cc->data; + int is_success = (error == GRPC_ERROR_NONE); + storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - storage->next = ((uintptr_t)&cc->completed_head) | - ((uintptr_t)(error == GRPC_ERROR_NONE)); + storage->next = (uintptr_t)(is_success); - gpr_mu_lock(cc->mu); -#ifndef NDEBUG - for (i = 0; i < (int)cc->outstanding_tag_count; i++) { - if (cc->outstanding_tags[i] == tag) { - cc->outstanding_tag_count--; - GPR_SWAP(void *, cc->outstanding_tags[i], - cc->outstanding_tags[cc->outstanding_tag_count]); - found = 1; - break; + cq_check_tag(cc, tag, true); /* Used in debug builds only */ + + /* Add the completion to the queue */ + cq_event_queue_push(&cqd->queue, storage); + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + + int shutdown = gpr_unref(&cqd->pending_events); + + gpr_mu_lock(cqd->mu); + if (!shutdown) { + grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); + gpr_mu_unlock(cqd->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + + GRPC_ERROR_UNREF(kick_error); } + } else { + cq_finish_shutdown(exec_ctx, cc); + gpr_mu_unlock(cqd->mu); } - GPR_ASSERT(found); -#endif - shutdown = gpr_unref(&cc->pending_events); - gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); + + GPR_TIMER_END("cq_end_op_for_next", 0); + + GRPC_ERROR_UNREF(error); +} + +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_PLUCK) */ +static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + cq_data *cqd = &cc->data; + int is_success = (error == GRPC_ERROR_NONE); + + GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); + + if (GRPC_TRACER_ON(grpc_api_trace) || + (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE)) { + const char *errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + if (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); + + gpr_mu_lock(cqd->mu); + cq_check_tag(cc, tag, false); /* Used in debug builds only */ + + /* Add to the list of completions */ + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + cqd->completed_tail->next = + ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next); + cqd->completed_tail = storage; + + int shutdown = gpr_unref(&cqd->pending_events); if (!shutdown) { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; - pluck_worker = NULL; - for (i = 0; i < cc->num_pluckers; i++) { - if (cc->pluckers[i].tag == tag) { - pluck_worker = *cc->pluckers[i].worker; + grpc_pollset_worker *pluck_worker = NULL; + for (int i = 0; i < cqd->num_pluckers; i++) { + if (cqd->pluckers[i].tag == tag) { + pluck_worker = *cqd->pluckers[i].worker; break; } } + grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); - gpr_mu_unlock(cc->mu); + + gpr_mu_unlock(cqd->mu); + if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); gpr_log(GPR_ERROR, "Kick failed: %s", msg); @@ -434,22 +669,23 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(kick_error); } } else { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; - GPR_ASSERT(!cc->shutdown); - GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); - gpr_mu_unlock(cc->mu); + cq_finish_shutdown(exec_ctx, cc); + gpr_mu_unlock(cqd->mu); } - GPR_TIMER_END("grpc_cq_end_op", 0); + GPR_TIMER_END("cq_end_op_for_pluck", 0); GRPC_ERROR_UNREF(error); } +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + void *tag, grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); +} + typedef struct { gpr_atm last_seen_things_queued_ever; grpc_completion_queue *cq; @@ -462,23 +698,24 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; + cq_data *cqd = &cq->data; GPR_ASSERT(a->stolen_completion == NULL); + gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); - if (cq->completed_tail != &cq->completed_head) { - a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next; - cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1; - if (a->stolen_completion == cq->completed_tail) { - cq->completed_tail = &cq->completed_head; - } - gpr_mu_unlock(cq->mu); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + + /* Pop a cq_completion from the queue. Returns NULL if the queue is empty + * might return NULL in some cases even if the queue is not empty; but that + * is ok and doesn't affect correctness. Might effect the tail latencies a + * bit) */ + a->stolen_completion = cq_event_queue_pop(&cqd->queue); + if (a->stolen_completion != NULL) { return true; } - gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; @@ -488,16 +725,18 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { static void dump_pending_tags(grpc_completion_queue *cc) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; + cq_data *cqd = &cc->data; + gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cc->mu); - for (size_t i = 0; i < cc->outstanding_tag_count; i++) { + gpr_mu_lock(cqd->mu); + for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cc->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); @@ -507,17 +746,11 @@ static void dump_pending_tags(grpc_completion_queue *cc) { static void dump_pending_tags(grpc_completion_queue *cc) {} #endif -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline, void *reserved) { +static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved) { grpc_event ret; gpr_timespec now; - - if (cc->completion_type != GRPC_CQ_NEXT) { - gpr_log(GPR_ERROR, - "grpc_completion_queue_next() cannot be called on this completion " - "queue since its completion type is not GRPC_CQ_NEXT"); - abort(); - } + cq_data *cqd = &cc->data; GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -536,10 +769,10 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(cc->mu); + cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cc->things_queued_ever), + gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cc, .deadline = deadline, .stolen_completion = NULL, @@ -547,9 +780,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); + for (;;) { + gpr_timespec iteration_deadline = deadline; + if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cc->mu); grpc_cq_completion *c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -558,37 +793,59 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, c->done(&exec_ctx, c->done_arg, c); break; } - if (cc->completed_tail != &cc->completed_head) { - grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; - cc->completed_head.next = c->next & ~(uintptr_t)1; - if (c == cc->completed_tail) { - cc->completed_tail = &cc->completed_head; - } - gpr_mu_unlock(cc->mu); + + grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue); + + if (c != NULL) { ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; c->done(&exec_ctx, c->done_arg, c); break; + } else { + /* If c == NULL it means either the queue is empty OR in an transient + inconsistent state. If it is the latter, we shold do a 0-timeout poll + so that the thread comes back quickly from poll to make a second + attempt at popping. Not doing this can potentially deadlock this thread + forever (if the deadline is infinity) */ + if (cq_event_queue_num_items(&cqd->queue) > 0) { + iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + } } - if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + + if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + /* Before returning, check if the queue has any items left over (since + gpr_mpscq_pop() can sometimes return NULL even if the queue is not + empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ + if (cq_event_queue_num_items(&cqd->queue) > 0) { + /* Go to the beginning of the loop. No point doing a poll because + (cc->shutdown == true) is only possible when there is no pending work + (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + events are already queued on this cq */ + continue; + } + memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } + now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cc); break; } + + /* The main polling work happens in grpc_pollset_work */ + gpr_mu_lock(cqd->mu); + cqd->num_polls++; grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), - NULL, now, deadline); + NULL, now, iteration_deadline); + gpr_mu_unlock(cqd->mu); + if (err != GRPC_ERROR_NONE) { - gpr_mu_unlock(cc->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); @@ -600,6 +857,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } is_finished_arg.first_loop = false; } + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); grpc_exec_ctx_finish(&exec_ctx); @@ -610,24 +868,30 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, return ret; } +grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, + gpr_timespec deadline, void *reserved) { + return cc->vtable->next(cc, deadline, reserved); +} + static int add_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker **worker) { - if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { + cq_data *cqd = &cc->data; + if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } - cc->pluckers[cc->num_pluckers].tag = tag; - cc->pluckers[cc->num_pluckers].worker = worker; - cc->num_pluckers++; + cqd->pluckers[cqd->num_pluckers].tag = tag; + cqd->pluckers[cqd->num_pluckers].worker = worker; + cqd->num_pluckers++; return 1; } static void del_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker **worker) { - int i; - for (i = 0; i < cc->num_pluckers; i++) { - if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { - cc->num_pluckers--; - GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]); + cq_data *cqd = &cc->data; + for (int i = 0; i < cqd->num_pluckers; i++) { + if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { + cqd->num_pluckers--; + GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]); return; } } @@ -637,51 +901,47 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; + cq_data *cqd = &cq->data; + GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); + gpr_mu_lock(cqd->mu); a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; - grpc_cq_completion *prev = &cq->completed_head; + grpc_cq_completion *prev = &cqd->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != - &cq->completed_head) { + &cqd->completed_head) { if (c->tag == a->tag) { prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cq->completed_tail) { - cq->completed_tail = prev; + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; } - gpr_mu_unlock(cq->mu); + gpr_mu_unlock(cqd->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cq->mu); + gpr_mu_unlock(cqd->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, - gpr_timespec deadline, void *reserved) { +static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; + cq_data *cqd = &cc->data; GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); - if (cc->completion_type != GRPC_CQ_PLUCK) { - gpr_log(GPR_ERROR, - "grpc_completion_queue_pluck() cannot be called on this completion " - "queue since its completion type is not GRPC_CQ_PLUCK"); - abort(); - } - if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" @@ -699,10 +959,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cc->things_queued_ever), + gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cc, .deadline = deadline, .stolen_completion = NULL, @@ -712,7 +972,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -721,15 +981,15 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, c->done(&exec_ctx, c->done_arg, c); break; } - prev = &cc->completed_head; + prev = &cqd->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != - &cc->completed_head) { + &cqd->completed_head) { if (c->tag == tag) { prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cc->completed_tail) { - cc->completed_tail = prev; + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -738,8 +998,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, } prev = c; } - if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -749,7 +1009,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -759,19 +1019,21 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cc); break; } + + cqd->num_polls++; grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); @@ -793,26 +1055,48 @@ done: return ret; } +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved) { + return cc->vtable->pluck(cc, tag, deadline, reserved); +} + +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); + gpr_atm_no_barrier_store(&cqd->shutdown, 1); + + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cqd->pollset_shutdown_done); +} + /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(cc->mu); - if (cc->shutdown_called) { - gpr_mu_unlock(cc->mu); + cq_data *cqd = &cc->data; + + gpr_mu_lock(cqd->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cqd->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } - cc->shutdown_called = 1; - if (gpr_unref(&cc->pending_events)) { - GPR_ASSERT(!cc->shutdown); - cc->shutdown = 1; - cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cqd->shutdown_called = 1; + if (gpr_unref(&cqd->pending_events)) { + cq_finish_shutdown(&exec_ctx, cc); } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -821,6 +1105,13 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); + + /* TODO (sreek): This should not ideally be here. Refactor it into the + * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ + if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { + GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); + } + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); grpc_exec_ctx_finish(&exec_ctx); @@ -835,22 +1126,12 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { return CQ_FROM_POLLSET(ps); } -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { - /* TODO: sreek - use cc->polling_type field here and add a validation check - (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose - polling_type is set to GRPC_CQ_NON_LISTENING */ - cc->is_non_listening_server_cq = 1; +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { + cc->data.is_server_cq = 1; } -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { - /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */ - return (cc->is_non_listening_server_cq == 1); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } - bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->is_server_cq; + return cc->data.is_server_cq; } bool grpc_cq_can_listen(grpc_completion_queue *cc) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 8d9ce2ec02..7963ea75e7 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -49,7 +49,13 @@ extern grpc_tracer_flag grpc_trace_operation_failures; extern grpc_tracer_flag grpc_trace_pending_tags; #endif +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_cq_completion { + gpr_mpscq_node node; + /** user supplied tag */ void *tag; /** done callback - called when this queue element is no longer @@ -101,7 +107,13 @@ bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); +int grpc_get_cq_poll_num(grpc_completion_queue *cc); + grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 560229e892..7e4ae421a0 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -47,7 +47,8 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/stack_lockfree.h" +#include "src/core/lib/support/mpscq.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -76,6 +77,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { + gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -175,7 +177,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_stack_lockfree **requests_per_cq; + gpr_locked_mpscq *requests_per_cq; }; struct registered_method { @@ -220,11 +222,6 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls_per_cq indices */ - gpr_stack_lockfree **request_freelist_per_cq; - /** requested call backing data */ - requested_call **requested_calls_per_cq; - int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -324,21 +321,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, size_t entries, - grpc_server *server) { +static void request_matcher_init(request_matcher *rm, grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); + gpr_locked_mpscq_init(&rm->requests_per_cq[i]); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); - gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); + GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); + gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -368,13 +364,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - int request_id; + requested_call *rc; for (size_t i = 0; i < server->cq_count; i++) { - while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != - -1) { - fail_call(exec_ctx, server, i, - &server->requested_calls_per_cq[i][request_id], - GRPC_ERROR_REF(error)); + /* Here we know: + 1. no requests are being added (since the server is shut down) + 2. no other threads are pulling (since the shut down process is single + threaded) + So, we can ignore the queue lock and just pop, with the guarantee that a + NULL returned here truly means that the queue is empty */ + while ((rc = (requested_call *)gpr_mpscq_pop( + &rm->requests_per_cq[i].queue)) != NULL) { + fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -409,13 +409,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); - if (server->started) { - gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); - gpr_free(server->requested_calls_per_cq[i]); - } } - gpr_free(server->request_freelist_per_cq); - gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -473,21 +467,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - requested_call *rc = req; - grpc_server *server = rc->server; - - if (rc >= server->requested_calls_per_cq[rc->cq_idx] && - rc < server->requested_calls_per_cq[rc->cq_idx] + - server->max_requested_calls_per_cq) { - GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); - gpr_stack_lockfree_push( - server->request_freelist_per_cq[rc->cq_idx], - (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); - } else { - gpr_free(req); - } - - server_unref(exec_ctx, server); + gpr_free(req); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -516,10 +496,6 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } - grpc_call_element *elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - channel_data *chand = elem->channel_data; - server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -547,15 +523,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) { + requested_call *rc = + (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + publish_call(exec_ctx, server, calld, cq_idx, rc); return; /* early out */ } } @@ -1029,8 +1005,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1103,29 +1077,15 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); - server->request_freelist_per_cq = - gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); - server->requested_calls_per_cq = - gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } - server->request_freelist_per_cq[i] = - gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); - for (int j = 0; j < server->max_requested_calls_per_cq; j++) { - gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); - } - server->requested_calls_per_cq[i] = - gpr_malloc((size_t)server->max_requested_calls_per_cq * - sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&server->unregistered_request_matcher, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&rm->request_matcher, server); } server_ref(server); @@ -1379,21 +1339,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; - int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); - if (request_id == -1) { - /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, cq_idx, rc, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), - GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); - return GRPC_CALL_OK; - } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1402,15 +1352,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - server->requested_calls_per_cq[cq_idx][request_id] = *rc; - gpr_free(rc); - if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { + if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) break; + rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1426,8 +1374,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + publish_call(exec_ctx, server, calld, cq_idx, rc); } gpr_mu_lock(&server->mu_call); } @@ -1534,7 +1481,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); - server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } |