diff options
-rw-r--r-- | build.yaml | 2 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 346 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 2 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 16 | ||||
-rw-r--r-- | tools/run_tests/generated/tests.json | 2 |
5 files changed, 275 insertions, 93 deletions
diff --git a/build.yaml b/build.yaml index 58a474aabf..a02b8dd38a 100644 --- a/build.yaml +++ b/build.yaml @@ -3307,7 +3307,7 @@ targets: - gpr_test_util - gpr args: - - --benchmark_min_time=0 + - --benchmark_min_time=4 defaults: benchmark platforms: - mac diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 35e9f7eb30..c4ee222043 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -60,26 +61,51 @@ typedef struct { void *tag; } plucker; +/* Queue that holds the cq_completion_events. This internally uses gpr_mpscq + * queue (a lockfree multiproducer single consumer queue). However this queue + * supports multiple consumers too. As such, it uses the queue_mu to serialize + * consumer access (but no locks for producer access). + * + * Currently this is only used in completion queues whose completion_type is + * GRPC_CQ_NEXT */ +typedef struct grpc_cq_event_queue { + /* spinlock to serialize consumers i.e pop() operations */ + gpr_spinlock queue_lock; + + gpr_mpscq queue; + + /* A lazy counter indicating the number of items in the queue. This is NOT + atomically incremented/decrements along with push/pop operations and hence + only eventually consistent */ + gpr_atm num_queue_items; +} grpc_cq_event_queue; + /* Completion queue structure */ struct grpc_completion_queue { - /** owned by pollset */ + /** Owned by pollset */ gpr_mu *mu; grpc_cq_completion_type completion_type; grpc_cq_polling_type polling_type; - /** completed events */ + /** Completed events (Only relevant if the completion_type is NOT + * GRPC_CQ_NEXT) */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; + + /** Completed events for completion-queues of type GRPC_CQ_NEXT are stored in + * this queue */ + grpc_cq_event_queue queue; + /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** counter of how many things have ever been queued on this completion queue + /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; /** 0 initially, 1 once we've begun shutting down */ - int shutdown; + gpr_atm shutdown; int shutdown_called; int is_server_cq; /** Can the server cq accept incoming channels */ @@ -115,6 +141,41 @@ int grpc_cq_event_timeout_trace; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, grpc_error *error); +static void cq_event_queue_init(grpc_cq_event_queue *q) { + gpr_mpscq_init(&q->queue); + q->queue_lock = GPR_SPINLOCK_INITIALIZER; + gpr_atm_no_barrier_store(&q->num_queue_items, 0); +} + +static void cq_event_queue_destroy(grpc_cq_event_queue *q) { + gpr_mpscq_destroy(&q->queue); +} + +static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { + gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); +} + +static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { + grpc_cq_completion *c = NULL; + if (gpr_spinlock_trylock(&q->queue_lock)) { + c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + gpr_spinlock_unlock(&q->queue_lock); + } + + if (c) { + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); + } + + return c; +} + +/* Note: The counter is not incremented/decremented atomically with push/pop. + * The count is only eventually consistent */ +static long cq_event_queue_num_items(grpc_cq_event_queue *q) { + return (long)gpr_atm_no_barrier_load(&q->num_queue_items); +} + grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { @@ -143,7 +204,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( gpr_ref_init(&cc->owning_refs, 2); cc->completed_tail = &cc->completed_head; cc->completed_head.next = (uintptr_t)cc->completed_tail; - cc->shutdown = 0; + gpr_atm_no_barrier_store(&cc->shutdown, 0); cc->shutdown_called = 0; cc->is_server_cq = 0; cc->is_non_listening_server_cq = 0; @@ -152,6 +213,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( #ifndef NDEBUG cc->outstanding_tag_count = 0; #endif + cq_event_queue_init(&cc->queue); grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); @@ -196,6 +258,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); + cq_event_queue_destroy(&cc->queue); #ifndef NDEBUG gpr_free(cc->outstanding_tags); #endif @@ -219,43 +282,14 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { gpr_ref(&cc->pending_events); } -/* Signal the end of an operation - if this is the last waiting-to-be-queued - event, then enter shutdown mode */ -/* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { - int shutdown; - int i; - grpc_pollset_worker *pluck_worker; #ifndef NDEBUG +void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) { int found = 0; -#endif - - GPR_TIMER_BEGIN("grpc_cq_end_op", 0); - if (grpc_api_trace || - (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { - const char *errmsg = grpc_error_string(error); - GRPC_API_TRACE( - "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " - "done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); - if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); - } + if (lock_cq) { + gpr_mu_lock(cc->mu); } - storage->tag = tag; - storage->done = done; - storage->done_arg = done_arg; - storage->next = ((uintptr_t)&cc->completed_head) | - ((uintptr_t)(error == GRPC_ERROR_NONE)); - - gpr_mu_lock(cc->mu); -#ifndef NDEBUG - for (i = 0; i < (int)cc->outstanding_tag_count; i++) { + for (int i = 0; i < (int)cc->outstanding_tag_count; i++) { if (cc->outstanding_tags[i] == tag) { cc->outstanding_tag_count--; GPR_SWAP(void *, cc->outstanding_tags[i], @@ -264,24 +298,98 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, break; } } + + if (lock_cq) { + gpr_mu_unlock(cc->mu); + } + GPR_ASSERT(found); +} +#else +void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {} #endif - shutdown = gpr_unref(&cc->pending_events); + +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_NEXT) */ +void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + void *tag, int is_success, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = (uintptr_t)(is_success); + + check_tag_in_cq(cc, tag, true); /* Used in debug builds only */ + + /* Add the completion to the queue */ + cq_event_queue_push(&cc->queue, storage); + gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); + + int shutdown = gpr_unref(&cc->pending_events); + if (!shutdown) { + gpr_mu_lock(cc->mu); + grpc_error *kick_error = grpc_pollset_kick(POLLSET_FROM_CQ(cc), NULL); + gpr_mu_unlock(cc->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + + GRPC_ERROR_UNREF(kick_error); + } + } else { + GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); + GPR_ASSERT(cc->shutdown_called); + + gpr_atm_no_barrier_store(&cc->shutdown, 1); + + gpr_mu_lock(cc->mu); + grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); + gpr_mu_unlock(cc->mu); + } +} + +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_PLUCK) */ +void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + int is_success, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(is_success)); + + gpr_mu_lock(cc->mu); + check_tag_in_cq(cc, tag, false); /* Used in debug builds only */ + + /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); + cc->completed_tail->next = + ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); + cc->completed_tail = storage; + + int shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; - pluck_worker = NULL; - for (i = 0; i < cc->num_pluckers; i++) { + grpc_pollset_worker *pluck_worker = NULL; + for (int i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag) { pluck_worker = *cc->pluckers[i].worker; break; } } + grpc_error *kick_error = grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); + if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); gpr_log(GPR_ERROR, "Kick failed: %s", msg); @@ -289,16 +397,50 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(kick_error); } } else { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; - GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; + gpr_atm_no_barrier_store(&cc->shutdown, 1); grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); } +} + +/* Signal the end of an operation - if this is the last waiting-to-be-queued + event, then enter shutdown mode */ +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + void *tag, grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + GPR_TIMER_BEGIN("grpc_cq_end_op", 0); + + if (grpc_api_trace || + (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { + const char *errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " + "done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + + /* Call the appropriate function to queue the completion based on the + completion queue type */ + int is_success = (error == GRPC_ERROR_NONE); + if (cc->completion_type == GRPC_CQ_NEXT) { + grpc_cq_end_op_for_next(exec_ctx, cc, tag, is_success, done, done_arg, + storage); + } else if (cc->completion_type == GRPC_CQ_PLUCK) { + grpc_cq_end_op_for_pluck(exec_ctx, cc, tag, is_success, done, done_arg, + storage); + } else { + gpr_log(GPR_ERROR, "Unexpected completion type %d", cc->completion_type); + abort(); + } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -318,23 +460,24 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; GPR_ASSERT(a->stolen_completion == NULL); + gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cq->things_queued_ever); + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cq->things_queued_ever); - if (cq->completed_tail != &cq->completed_head) { - a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next; - cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1; - if (a->stolen_completion == cq->completed_tail) { - cq->completed_tail = &cq->completed_head; - } - gpr_mu_unlock(cq->mu); + + /* Pop a cq_completion from the queue. Returns NULL if the queue is empty + * might return NULL in some cases even if the queue is not empty; but that + * is ok and doesn't affect correctness. Might effect the tail latencies a + * bit) */ + a->stolen_completion = cq_event_queue_pop(&cq->queue); + if (a->stolen_completion != NULL) { return true; } - gpr_mu_unlock(cq->mu); } + return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } @@ -391,7 +534,6 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(cc->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cc->things_queued_ever), @@ -402,9 +544,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); + for (;;) { + gpr_timespec iteration_deadline = deadline; + if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cc->mu); grpc_cq_completion *c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -413,61 +557,78 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, c->done(&exec_ctx, c->done_arg, c); break; } - if (cc->completed_tail != &cc->completed_head) { - grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; - cc->completed_head.next = c->next & ~(uintptr_t)1; - if (c == cc->completed_tail) { - cc->completed_tail = &cc->completed_head; - } - gpr_mu_unlock(cc->mu); + + grpc_cq_completion *c = cq_event_queue_pop(&cc->queue); + + if (c != NULL) { ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; c->done(&exec_ctx, c->done_arg, c); break; + } else { + /* If c == NULL it means either the queue is empty OR in an transient + inconsistent state. If it is the latter, we shold do a 0-timeout poll + so that the thread comes back quickly from poll to make a second + attempt at popping. Not doing this can potentially deadlock this thread + forever (if the deadline is infinity) */ + if (cq_event_queue_num_items(&cc->queue) > 0) { + iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + } } - if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + + if (gpr_atm_no_barrier_load(&cc->shutdown)) { + /* Before returning, check if the queue has any items left over (since + gpr_mpscq_pop() can sometimes return NULL even if the queue is not + empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ + if (cq_event_queue_num_items(&cc->queue) > 0) { + /* Go to the beginning of the loop. No point doing a poll because + (cc->shutdown == true) is only possible when there is no pending work + (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + events are already queued on this cq */ + continue; + } + memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } + now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cc); break; } - /* Check alarms - these are a global resource so we just ping - each time through on every pollset. - May update deadline to ensure timely wakeups. - TODO(ctiller): can this work be localized? */ - gpr_timespec iteration_deadline = deadline; + + /* Check alarms - these are a global resource so we just ping each time + through on every pollset. May update deadline to ensure timely wakeups.*/ if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(cc->mu); continue; - } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, - now, iteration_deadline); - if (err != GRPC_ERROR_NONE) { - gpr_mu_unlock(cc->mu); - const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + } - GRPC_ERROR_UNREF(err); - memset(&ret, 0, sizeof(ret)); - ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); - break; - } + /* The main polling work happens in grpc_pollset_work */ + gpr_mu_lock(cc->mu); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, + now, iteration_deadline); + gpr_mu_unlock(cc->mu); + + if (err != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); + break; } is_finished_arg.first_loop = false; } + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); grpc_exec_ctx_finish(&exec_ctx); @@ -701,6 +862,11 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); + + if (cc->completion_type == GRPC_CQ_NEXT) { + GPR_ASSERT(cq_event_queue_num_items(&cc->queue) == 0); + } + GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 1ff3d64293..9c8bc3b53a 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -49,6 +49,8 @@ extern int grpc_trace_pending_tags; #endif typedef struct grpc_cq_completion { + gpr_mpscq_node node; + /** user supplied tag */ void *tag; /** done callback - called when this queue element is no longer diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 9d7f65d292..3362510e5a 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -79,10 +79,16 @@ static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg, gpr_free(cq_completion); } -/* Queues a completion tag. ZERO polling overhead */ +/* Queues a completion tag if deadline is > 0. + * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, grpc_pollset_worker** worker, gpr_timespec now, gpr_timespec deadline) { + if (gpr_time_cmp(deadline, gpr_time_0(GPR_CLOCK_MONOTONIC)) == 0) { + gpr_log(GPR_ERROR, "no-op"); + return GRPC_ERROR_NONE; + } + gpr_mu_unlock(&ps->mu); grpc_cq_begin_op(g_cq, g_tag); grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL, @@ -113,6 +119,14 @@ static void setup() { static void teardown() { grpc_completion_queue_shutdown(g_cq); + + /* Drain any events */ + gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + while (grpc_completion_queue_next(g_cq, deadline, NULL).type != + GRPC_QUEUE_SHUTDOWN) { + /* Do nothing */ + } + grpc_completion_queue_destroy(g_cq); } diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 120a84e8a4..6453b1de20 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2805,7 +2805,7 @@ }, { "args": [ - "--benchmark_min_time=0" + "--benchmark_min_time=4" ], "ci_platforms": [ "linux", |