diff options
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r-- | src/core/surface/completion_queue.c | 132 |
1 files changed, 107 insertions, 25 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index aa90b3f7f5..848a33adc3 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -71,9 +71,38 @@ struct grpc_completion_queue { int is_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_destroy_done; + grpc_closure pollset_shutdown_done; + +#ifndef NDEBUG + void **outstanding_tags; + size_t outstanding_tag_count; + size_t outstanding_tag_capacity; +#endif + + grpc_completion_queue *next_free; }; +static gpr_mu g_freelist_mu; +grpc_completion_queue *g_freelist; + +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, + int success); + +void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); } + +void grpc_cq_global_shutdown(void) { + gpr_mu_destroy(&g_freelist_mu); + while (g_freelist) { + grpc_completion_queue *next = g_freelist->next_free; + grpc_pollset_destroy(&g_freelist->pollset); +#ifndef NDEBUG + gpr_free(g_freelist->outstanding_tags); +#endif + gpr_free(g_freelist); + g_freelist = next; + } +} + struct grpc_cq_alarm { grpc_timer alarm; grpc_cq_completion completion; @@ -83,22 +112,48 @@ struct grpc_cq_alarm { void *tag; }; -static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *cc, - int success); - grpc_completion_queue *grpc_completion_queue_create(void *reserved) { - grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); - GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); + grpc_completion_queue *cc; GPR_ASSERT(!reserved); - memset(cc, 0, sizeof(*cc)); + + GPR_TIMER_BEGIN("grpc_completion_queue_create", 0); + + GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); + + gpr_mu_lock(&g_freelist_mu); + if (g_freelist == NULL) { + gpr_mu_unlock(&g_freelist_mu); + + cc = gpr_malloc(sizeof(grpc_completion_queue)); + grpc_pollset_init(&cc->pollset); +#ifndef NDEBUG + cc->outstanding_tags = NULL; + cc->outstanding_tag_capacity = 0; +#endif + } else { + cc = g_freelist; + g_freelist = g_freelist->next_free; + gpr_mu_unlock(&g_freelist_mu); + /* pollset already initialized */ + } + /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ gpr_ref_init(&cc->owning_refs, 2); - grpc_pollset_init(&cc->pollset); cc->completed_tail = &cc->completed_head; cc->completed_head.next = (gpr_uintptr)cc->completed_tail; - grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc); + cc->shutdown = 0; + cc->shutdown_called = 0; + cc->is_server_cq = 0; + cc->num_pluckers = 0; +#ifndef NDEBUG + cc->outstanding_tag_count = 0; +#endif + grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc); + + GPR_TIMER_END("grpc_completion_queue_create", 0); + return cc; } @@ -113,8 +168,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { gpr_ref(&cc->owning_refs); } -static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *arg, - int success) { +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } @@ -129,15 +184,25 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head); - grpc_pollset_destroy(&cc->pollset); - gpr_free(cc); + grpc_pollset_reset(&cc->pollset); + gpr_mu_lock(&g_freelist_mu); + cc->next_free = g_freelist; + g_freelist = cc; + gpr_mu_unlock(&g_freelist_mu); } } -void grpc_cq_begin_op(grpc_completion_queue *cc) { +void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { #ifndef NDEBUG gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown_called); + if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { + cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); + cc->outstanding_tags = + gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) * + cc->outstanding_tag_capacity); + } + cc->outstanding_tags[cc->outstanding_tag_count++] = tag; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); #endif gpr_ref(&cc->pending_events); @@ -154,6 +219,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, int shutdown; int i; grpc_pollset_worker *pluck_worker; +#ifndef NDEBUG + int found = 0; +#endif GPR_TIMER_BEGIN("grpc_cq_end_op", 0); @@ -164,6 +232,18 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); +#ifndef NDEBUG + for (i = 0; i < (int)cc->outstanding_tag_count; i++) { + if (cc->outstanding_tags[i] == tag) { + cc->outstanding_tag_count--; + GPR_SWAP(void *, cc->outstanding_tags[i], + cc->outstanding_tags[cc->outstanding_tag_count]); + found = 1; + break; + } + } + GPR_ASSERT(found); +#endif shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { cc->completed_tail->next = @@ -185,8 +265,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; + grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_destroy_done); } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -205,10 +285,10 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, GRPC_API_TRACE( "grpc_completion_queue_next(" "cc=%p, " - "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, " + "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, (cc, (long)deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, - reserved)); + 5, (cc, (long long)deadline.tv_sec, (int)deadline.tv_nsec, + (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -293,9 +373,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GRPC_API_TRACE( "grpc_completion_queue_pluck(" "cc=%p, tag=%p, " - "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, " + "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, (cc, tag, (long)deadline.tv_sec, deadline.tv_nsec, + 6, (cc, tag, (long long)deadline.tv_sec, (int)deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); @@ -365,29 +445,31 @@ done: to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); if (cc->shutdown_called) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cc->shutdown_called = 1; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (gpr_unref(&cc->pending_events)) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_destroy_done); + grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); + GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); + GPR_TIMER_END("grpc_completion_queue_destroy", 0); } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { |