diff options
author | Sree Kuchibhotla <sreek@google.com> | 2017-04-25 12:10:04 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2017-04-25 12:10:04 -0700 |
commit | c2134c3804062b835e761a7954e93c7900e564c9 (patch) | |
tree | 0ed5660c8a764ced7ed2be77d4ea94e56c0c517b /src/core/lib/surface | |
parent | 399732faed91a93169c6cf18fea9885eb7ec72f9 (diff) |
Create cq vtable
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 479 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 16 |
2 files changed, 278 insertions, 217 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 714f1fd6ad..cab9b4fef0 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -201,56 +201,66 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { .destroy = non_polling_poller_destroy}, }; -/* Queue that holds the cq_completion_events. This internally uses gpr_mpscq - * queue (a lockfree multiproducer single consumer queue). However this queue - * supports multiple consumers too. As such, it uses the queue_mu to serialize - * consumer access (but no locks for producer access). - * - * Currently this is only used in completion queues whose completion_type is - * GRPC_CQ_NEXT */ +typedef struct cq_vtable { + grpc_cq_completion_type cq_completion_type; + size_t (*size)(); + void (*begin_op)(grpc_completion_queue *cc, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved); + grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved); +} cq_vtable; + +/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue + * (a lockfree multiproducer single consumer queue). It uses a queue_lock + * to support multiple consumers. + * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */ typedef struct grpc_cq_event_queue { - /* spinlock to serialize consumers i.e pop() operations */ + /* Spinlock to serialize consumers i.e pop() operations */ gpr_spinlock queue_lock; gpr_mpscq queue; - /* A lazy counter indicating the number of items in the queue. This is NOT - atomically incremented/decrements along with push/pop operations and hence - only eventually consistent */ + /* A lazy counter of number of items in the queue. This is NOT atomically + incremented/decremented along with push/pop operations and hence is only + eventually consistent */ gpr_atm num_queue_items; } grpc_cq_event_queue; -/* Completion queue structure */ -struct grpc_completion_queue { - /** Owned by pollset */ +/* TODO: sreek Refactor this based on the completion_type. Put completion-type + * specific data in a different structure (and co-allocate memory for it along + * with completion queue + pollset )*/ +typedef struct cq_data { gpr_mu *mu; - grpc_cq_completion_type completion_type; - - const cq_poller_vtable *poller_vtable; - - /** completed events */ + /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; - /** Completed events for completion-queues of type GRPC_CQ_NEXT are stored in - * this queue */ + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ grpc_cq_event_queue queue; /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; + /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; + /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; + /** 0 initially, 1 once we've begun shutting down */ gpr_atm shutdown; int shutdown_called; + int is_server_cq; - /** Can the server cq accept incoming channels */ - /* TODO: sreek - This will no longer be needed. Use polling_type set */ - int is_non_listening_server_cq; + int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; @@ -260,8 +270,31 @@ struct grpc_completion_queue { size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif +} cq_data; - grpc_completion_queue *next_free; +/* Completion queue structure */ +struct grpc_completion_queue { + cq_data data; + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; +}; + +/* Completion queue vtables based on the completion-type */ +static const cq_vtable g_cq_vtable[] = { + /* GRPC_CQ_NEXT */ + {.cq_completion_type = GRPC_CQ_NEXT, + .size = grpc_cq_size, + .begin_op = grpc_cq_begin_op, + .end_op = grpc_cq_end_op_for_next, + .next = grpc_completion_queue_next, + .pluck = NULL}, + /* GRPC_CQ_PLUCK */ + {.cq_completion_type = GRPC_CQ_PLUCK, + .size = grpc_cq_size, + .begin_op = grpc_cq_begin_op, + .end_op = grpc_cq_end_op_for_pluck, + .next = NULL, + .pluck = grpc_completion_queue_pluck}, }; #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) @@ -316,6 +349,12 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } +size_t grpc_cq_size(grpc_completion_queue *cc) { + /* Size of the completion queue and the size of the pollset whose memory is + allocated right after that of completion queue */ + return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); +} + grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { @@ -328,36 +367,39 @@ grpc_completion_queue *grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); + const cq_vtable *vtable = &g_cq_vtable[completion_type]; const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu); -#ifndef NDEBUG - cc->outstanding_tags = NULL; - cc->outstanding_tag_capacity = 0; -#endif + cq_data *cqd = &cc->data; - cc->completion_type = completion_type; + cc->vtable = vtable; cc->poller_vtable = poller_vtable; + poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + +#ifndef NDEBUG + cqd->outstanding_tags = NULL; + cqd->outstanding_tag_capacity = 0; +#endif + /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cc->pending_events, 1); + gpr_ref_init(&cqd->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cc->owning_refs, 2); - cc->completed_tail = &cc->completed_head; - cc->completed_head.next = (uintptr_t)cc->completed_tail; - gpr_atm_no_barrier_store(&cc->shutdown, 0); - cc->shutdown_called = 0; - cc->is_server_cq = 0; - cc->is_non_listening_server_cq = 0; - cc->num_pluckers = 0; - gpr_atm_no_barrier_store(&cc->things_queued_ever, 0); + gpr_ref_init(&cqd->owning_refs, 2); + cqd->completed_tail = &cqd->completed_head; + cqd->completed_head.next = (uintptr_t)cqd->completed_tail; + gpr_atm_no_barrier_store(&cqd->shutdown, 0); + cqd->shutdown_called = 0; + cqd->is_server_cq = 0; + cqd->num_pluckers = 0; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); #ifndef NDEBUG - cc->outstanding_tag_count = 0; + cqd->outstanding_tag_count = 0; #endif - cq_event_queue_init(&cc->queue); - grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, + cq_event_queue_init(&cqd->queue); + grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); GPR_TIMER_END("grpc_completion_queue_create_internal", 0); @@ -366,18 +408,19 @@ grpc_completion_queue *grpc_completion_queue_create_internal( } grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->completion_type; + return cc->vtable->cq_completion_type; } #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, - (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); + (int)cqd->owning_refs.count, (int)cqd->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; #endif - gpr_ref(&cc->owning_refs); + gpr_ref(&cqd->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -389,57 +432,62 @@ static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { + cq_data *cqd = &cc->data; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, - (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); + (int)cqd->owning_refs.count, (int)cqd->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; #endif - if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); + if (gpr_unref(&cqd->owning_refs)) { + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc)); - cq_event_queue_destroy(&cc->queue); + cq_event_queue_destroy(&cqd->queue); #ifndef NDEBUG - gpr_free(cc->outstanding_tags); + gpr_free(cqd->outstanding_tags); #endif gpr_free(cc); } } void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { + cq_data *cqd = &cc->data; #ifndef NDEBUG - gpr_mu_lock(cc->mu); - GPR_ASSERT(!cc->shutdown_called); - if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { - cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); - cc->outstanding_tags = - gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) * - cc->outstanding_tag_capacity); + gpr_mu_lock(cqd->mu); + GPR_ASSERT(!cqd->shutdown_called); + if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { + cqd->outstanding_tag_capacity = + GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); + cqd->outstanding_tags = + gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * + cqd->outstanding_tag_capacity); } - cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(cc->mu); + cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; + gpr_mu_unlock(cqd->mu); #endif - gpr_ref(&cc->pending_events); + gpr_ref(&cqd->pending_events); } #ifndef NDEBUG void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) { + cq_data *cqd = &cc->data; int found = 0; if (lock_cq) { - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); } - for (int i = 0; i < (int)cc->outstanding_tag_count; i++) { - if (cc->outstanding_tags[i] == tag) { - cc->outstanding_tag_count--; - GPR_SWAP(void *, cc->outstanding_tags[i], - cc->outstanding_tags[cc->outstanding_tag_count]); + for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { + if (cqd->outstanding_tags[i] == tag) { + cqd->outstanding_tag_count--; + GPR_SWAP(void *, cqd->outstanding_tags[i], + cqd->outstanding_tags[cqd->outstanding_tag_count]); found = 1; break; } } if (lock_cq) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); } GPR_ASSERT(found); @@ -451,11 +499,28 @@ void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {} /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion * type of GRPC_CQ_NEXT) */ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, int is_success, + void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { + GPR_TIMER_BEGIN("grpc_cq_end_op_for_next", 0); + + if (grpc_api_trace || + (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { + const char *errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "grpc_cq_end_op_for_mext(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + + cq_data *cqd = &cc->data; + int is_success = (error == GRPC_ERROR_NONE); + storage->tag = tag; storage->done = done; storage->done_arg = done_arg; @@ -464,15 +529,15 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, check_tag_in_cq(cc, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ - cq_event_queue_push(&cc->queue, storage); - gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); + cq_event_queue_push(&cqd->queue, storage); + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - int shutdown = gpr_unref(&cc->pending_events); + int shutdown = gpr_unref(&cqd->pending_events); if (!shutdown) { - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -481,47 +546,68 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(kick_error); } } else { - GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); - GPR_ASSERT(cc->shutdown_called); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); + GPR_ASSERT(cqd->shutdown_called); - gpr_atm_no_barrier_store(&cc->shutdown, 1); + gpr_atm_no_barrier_store(&cqd->shutdown, 1); - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); - gpr_mu_unlock(cc->mu); + &cqd->pollset_shutdown_done); + gpr_mu_unlock(cqd->mu); } + + GPR_TIMER_END("grpc_cq_end_op_for_next", 0); + + GRPC_ERROR_UNREF(error); } /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion * type of GRPC_CQ_PLUCK) */ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, - int is_success, + grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { + cq_data *cqd = &cc->data; + int is_success = (error == GRPC_ERROR_NONE); + + GPR_TIMER_BEGIN("grpc_cq_end_op_for_pluck", 0); + + if (grpc_api_trace || + (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { + const char *errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "grpc_cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(is_success)); + storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); check_tag_in_cq(cc, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ - gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + cqd->completed_tail->next = + ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next); + cqd->completed_tail = storage; - int shutdown = gpr_unref(&cc->pending_events); + int shutdown = gpr_unref(&cqd->pending_events); if (!shutdown) { grpc_pollset_worker *pluck_worker = NULL; - for (int i = 0; i < cc->num_pluckers; i++) { - if (cc->pluckers[i].tag == tag) { - pluck_worker = *cc->pluckers[i].worker; + for (int i = 0; i < cqd->num_pluckers; i++) { + if (cqd->pluckers[i].tag == tag) { + pluck_worker = *cqd->pluckers[i].worker; break; } } @@ -529,7 +615,7 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -538,54 +624,25 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); - GPR_ASSERT(cc->shutdown_called); - gpr_atm_no_barrier_store(&cc->shutdown, 1); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); + GPR_ASSERT(cqd->shutdown_called); + gpr_atm_no_barrier_store(&cqd->shutdown, 1); cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); - gpr_mu_unlock(cc->mu); + &cqd->pollset_shutdown_done); + gpr_mu_unlock(cqd->mu); } + + GPR_TIMER_END("grpc_cq_end_op_for_pluck", 0); + + GRPC_ERROR_UNREF(error); } -/* Signal the end of an operation - if this is the last waiting-to-be-queued - event, then enter shutdown mode */ -/* Queue a GRPC_OP_COMPLETED operation */ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - GPR_TIMER_BEGIN("grpc_cq_end_op", 0); - - if (grpc_api_trace || - (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { - const char *errmsg = grpc_error_string(error); - GRPC_API_TRACE( - "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " - "done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); - if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); - } - } - - /* Call the appropriate function to queue the completion based on the - completion queue type */ - int is_success = (error == GRPC_ERROR_NONE); - if (cc->completion_type == GRPC_CQ_NEXT) { - grpc_cq_end_op_for_next(exec_ctx, cc, tag, is_success, done, done_arg, - storage); - } else if (cc->completion_type == GRPC_CQ_PLUCK) { - grpc_cq_end_op_for_pluck(exec_ctx, cc, tag, is_success, done, done_arg, - storage); - } else { - gpr_log(GPR_ERROR, "Unexpected completion type %d", cc->completion_type); - abort(); - } - - GPR_TIMER_END("grpc_cq_end_op", 0); - - GRPC_ERROR_UNREF(error); + cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); } typedef struct { @@ -600,20 +657,21 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; + cq_data *cqd = &cq->data; GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty * might return NULL in some cases even if the queue is not empty; but that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ - a->stolen_completion = cq_event_queue_pop(&cq->queue); + a->stolen_completion = cq_event_queue_pop(&cqd->queue); if (a->stolen_completion != NULL) { return true; } @@ -626,16 +684,18 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { static void dump_pending_tags(grpc_completion_queue *cc) { if (!grpc_trace_pending_tags) return; + cq_data *cqd = &cc->data; + gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cc->mu); - for (size_t i = 0; i < cc->outstanding_tag_count; i++) { + gpr_mu_lock(cqd->mu); + for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cc->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); @@ -649,13 +709,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - - if (cc->completion_type != GRPC_CQ_NEXT) { - gpr_log(GPR_ERROR, - "grpc_completion_queue_next() cannot be called on this completion " - "queue since its completion type is not GRPC_CQ_NEXT"); - abort(); - } + cq_data *cqd = &cc->data; GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -677,7 +731,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cc->things_queued_ever), + gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cc, .deadline = deadline, .stolen_completion = NULL, @@ -699,7 +753,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } - grpc_cq_completion *c = cq_event_queue_pop(&cc->queue); + grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue); if (c != NULL) { ret.type = GRPC_OP_COMPLETE; @@ -713,16 +767,16 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, so that the thread comes back quickly from poll to make a second attempt at popping. Not doing this can potentially deadlock this thread forever (if the deadline is infinity) */ - if (cq_event_queue_num_items(&cc->queue) > 0) { + if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } } - if (gpr_atm_no_barrier_load(&cc->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->shutdown)) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ - if (cq_event_queue_num_items(&cc->queue) > 0) { + if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because (cc->shutdown == true) is only possible when there is no pending work (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion @@ -752,10 +806,10 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } /* The main polling work happens in grpc_pollset_work */ - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, now, iteration_deadline); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); @@ -782,22 +836,23 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, static int add_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker **worker) { - if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { + cq_data *cqd = &cc->data; + if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } - cc->pluckers[cc->num_pluckers].tag = tag; - cc->pluckers[cc->num_pluckers].worker = worker; - cc->num_pluckers++; + cqd->pluckers[cqd->num_pluckers].tag = tag; + cqd->pluckers[cqd->num_pluckers].worker = worker; + cqd->num_pluckers++; return 1; } static void del_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker **worker) { - int i; - for (i = 0; i < cc->num_pluckers; i++) { - if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { - cc->num_pluckers--; - GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]); + cq_data *cqd = &cc->data; + for (int i = 0; i < cqd->num_pluckers; i++) { + if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { + cqd->num_pluckers--; + GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]); return; } } @@ -807,29 +862,31 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; + cq_data *cqd = &cq->data; + GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); + gpr_mu_lock(cqd->mu); a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; - grpc_cq_completion *prev = &cq->completed_head; + grpc_cq_completion *prev = &cqd->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != - &cq->completed_head) { + &cqd->completed_head) { if (c->tag == a->tag) { prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cq->completed_tail) { - cq->completed_tail = prev; + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; } - gpr_mu_unlock(cq->mu); + gpr_mu_unlock(cqd->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cq->mu); + gpr_mu_unlock(cqd->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; @@ -842,16 +899,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; + cq_data *cqd = &cc->data; GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); - if (cc->completion_type != GRPC_CQ_PLUCK) { - gpr_log(GPR_ERROR, - "grpc_completion_queue_pluck() cannot be called on this completion " - "queue since its completion type is not GRPC_CQ_PLUCK"); - abort(); - } - if (grpc_cq_pluck_trace) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" @@ -869,10 +920,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cc->things_queued_ever), + gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cc, .deadline = deadline, .stolen_completion = NULL, @@ -882,7 +933,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -891,15 +942,15 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, c->done(&exec_ctx, c->done_arg, c); break; } - prev = &cc->completed_head; + prev = &cqd->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != - &cc->completed_head) { + &cqd->completed_head) { if (c->tag == tag) { prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cc->completed_tail) { - cc->completed_tail = prev; + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -908,8 +959,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, } prev = c; } - if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + if (cqd->shutdown) { + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -919,7 +970,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -929,7 +980,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cc); @@ -942,15 +993,15 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); } else { grpc_error *err = cc->poller_vtable->work( &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); if (err != GRPC_ERROR_NONE) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); @@ -981,20 +1032,22 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(cc->mu); - if (cc->shutdown_called) { - gpr_mu_unlock(cc->mu); + cq_data *cqd = &cc->data; + + gpr_mu_lock(cqd->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cqd->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } - cc->shutdown_called = 1; - if (gpr_unref(&cc->pending_events)) { - GPR_ASSERT(!cc->shutdown); - cc->shutdown = 1; + cqd->shutdown_called = 1; + if (gpr_unref(&cqd->pending_events)) { + GPR_ASSERT(!cqd->shutdown); + cqd->shutdown = 1; cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + &cqd->pollset_shutdown_done); } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -1004,8 +1057,10 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); - if (cc->completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cc->queue) == 0); + /* TODO (sreek): This should not ideally be here. Refactor it into the + * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ + if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { + GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); } GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); @@ -1020,22 +1075,12 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { return CQ_FROM_POLLSET(ps); } -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { - /* TODO: sreek - use cc->polling_type field here and add a validation check - (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose - polling_type is set to GRPC_CQ_NON_LISTENING */ - cc->is_non_listening_server_cq = 1; +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { + cc->data.is_server_cq = 1; } -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { - /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */ - return (cc->is_non_listening_server_cq == 1); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } - bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->is_server_cq; + return cc->data.is_server_cq; } bool grpc_cq_can_listen(grpc_completion_queue *cc) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index f7eb148982..4d6aa7fd91 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -80,6 +80,8 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc) #endif +size_t grpc_cq_size(grpc_completion_queue *cc); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. */ @@ -87,6 +89,20 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ +void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + void *tag, grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); +void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, |