diff options
author | 2015-09-22 10:42:19 -0700 | |
---|---|---|
committer | 2015-09-22 10:42:19 -0700 | |
commit | 45724b35e411fef7c5da66a74c78428c11d56843 (patch) | |
tree | 9264034aca675c89444e02f72ef58e67d7043604 /test/core/support/sync_test.c | |
parent | 298751c1195523ef6228595043b583c3a6270e08 (diff) |
indent pass to get logical source lines on one physical line
Diffstat (limited to 'test/core/support/sync_test.c')
-rw-r--r-- | test/core/support/sync_test.c | 529 |
1 files changed, 300 insertions, 229 deletions
diff --git a/test/core/support/sync_test.c b/test/core/support/sync_test.c index f729eb0b92..72788a75ba 100644 --- a/test/core/support/sync_test.c +++ b/test/core/support/sync_test.c @@ -49,37 +49,44 @@ #define N 4 -typedef struct queue { - gpr_cv non_empty; /* Signalled when length becomes non-zero. */ - gpr_cv non_full; /* Signalled when length becomes non-N. */ - gpr_mu mu; /* Protects all fields below. - (That is, except during initialization or - destruction, the fields below should be accessed - only by a thread that holds mu.) */ - int head; /* Index of head of queue 0..N-1. */ - int length; /* Number of valid elements in queue 0..N. */ - int elem[N]; /* elem[head .. head+length-1] are queue elements. */ +typedef struct queue +{ + gpr_cv non_empty; /* Signalled when length becomes non-zero. */ + gpr_cv non_full; /* Signalled when length becomes non-N. */ + gpr_mu mu; /* Protects all fields below. + (That is, except during initialization or + destruction, the fields below should be accessed + only by a thread that holds mu.) */ + int head; /* Index of head of queue 0..N-1. */ + int length; /* Number of valid elements in queue 0..N. */ + int elem[N]; /* elem[head .. head+length-1] are queue elements. */ } queue; /* Initialize *q. */ -void queue_init(queue *q) { - gpr_mu_init(&q->mu); - gpr_cv_init(&q->non_empty); - gpr_cv_init(&q->non_full); +void +queue_init (queue * q) +{ + gpr_mu_init (&q->mu); + gpr_cv_init (&q->non_empty); + gpr_cv_init (&q->non_full); q->head = 0; q->length = 0; } /* Free storage associated with *q. */ -void queue_destroy(queue *q) { - gpr_mu_destroy(&q->mu); - gpr_cv_destroy(&q->non_empty); - gpr_cv_destroy(&q->non_full); +void +queue_destroy (queue * q) +{ + gpr_mu_destroy (&q->mu); + gpr_cv_destroy (&q->non_empty); + gpr_cv_destroy (&q->non_full); } /* Wait until there is room in *q, then append x to *q. */ -void queue_append(queue *q, int x) { - gpr_mu_lock(&q->mu); +void +queue_append (queue * q, int x) +{ + gpr_mu_lock (&q->mu); /* To wait for a predicate without a deadline, loop on the negation of the predicate, and use gpr_cv_wait(..., gpr_inf_future(GPR_CLOCK_REALTIME)) inside the loop @@ -87,78 +94,91 @@ void queue_append(queue *q, int x) { makes the condition true should use gpr_cv_broadcast() on the corresponding condition variable. The predicate must be on state protected by the lock. */ - while (q->length == N) { - gpr_cv_wait(&q->non_full, &q->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - if (q->length == 0) { /* Wake threads blocked in queue_remove(). */ - /* It's normal to use gpr_cv_broadcast() or gpr_signal() while - holding the lock. */ - gpr_cv_broadcast(&q->non_empty); - } + while (q->length == N) + { + gpr_cv_wait (&q->non_full, &q->mu, gpr_inf_future (GPR_CLOCK_REALTIME)); + } + if (q->length == 0) + { /* Wake threads blocked in queue_remove(). */ + /* It's normal to use gpr_cv_broadcast() or gpr_signal() while + holding the lock. */ + gpr_cv_broadcast (&q->non_empty); + } q->elem[(q->head + q->length) % N] = x; q->length++; - gpr_mu_unlock(&q->mu); + gpr_mu_unlock (&q->mu); } /* If it can be done without blocking, append x to *q and return non-zero. Otherwise return 0. */ -int queue_try_append(queue *q, int x) { +int +queue_try_append (queue * q, int x) +{ int result = 0; - if (gpr_mu_trylock(&q->mu)) { - if (q->length != N) { - if (q->length == 0) { /* Wake threads blocked in queue_remove(). */ - gpr_cv_broadcast(&q->non_empty); - } - q->elem[(q->head + q->length) % N] = x; - q->length++; - result = 1; + if (gpr_mu_trylock (&q->mu)) + { + if (q->length != N) + { + if (q->length == 0) + { /* Wake threads blocked in queue_remove(). */ + gpr_cv_broadcast (&q->non_empty); + } + q->elem[(q->head + q->length) % N] = x; + q->length++; + result = 1; + } + gpr_mu_unlock (&q->mu); } - gpr_mu_unlock(&q->mu); - } return result; } /* Wait until the *q is non-empty or deadline abs_deadline passes. If the queue is non-empty, remove its head entry, place it in *head, and return non-zero. Otherwise return 0. */ -int queue_remove(queue *q, int *head, gpr_timespec abs_deadline) { +int +queue_remove (queue * q, int *head, gpr_timespec abs_deadline) +{ int result = 0; - gpr_mu_lock(&q->mu); + gpr_mu_lock (&q->mu); /* To wait for a predicate with a deadline, loop on the negation of the predicate or until gpr_cv_wait() returns true. Code that makes the condition true should use gpr_cv_broadcast() on the corresponding condition variable. The predicate must be on state protected by the lock. */ - while (q->length == 0 && !gpr_cv_wait(&q->non_empty, &q->mu, abs_deadline)) { - } - if (q->length != 0) { /* Queue is non-empty. */ - result = 1; - if (q->length == N) { /* Wake threads blocked in queue_append(). */ - gpr_cv_broadcast(&q->non_full); + while (q->length == 0 && !gpr_cv_wait (&q->non_empty, &q->mu, abs_deadline)) + { } - *head = q->elem[q->head]; - q->head = (q->head + 1) % N; - q->length--; - } /* else deadline exceeded */ - gpr_mu_unlock(&q->mu); + if (q->length != 0) + { /* Queue is non-empty. */ + result = 1; + if (q->length == N) + { /* Wake threads blocked in queue_append(). */ + gpr_cv_broadcast (&q->non_full); + } + *head = q->elem[q->head]; + q->head = (q->head + 1) % N; + q->length--; + } /* else deadline exceeded */ + gpr_mu_unlock (&q->mu); return result; } /* ------------------------------------------------- */ /* Tests for gpr_mu and gpr_cv, and the queue example. */ -struct test { - int threads; /* number of threads */ +struct test +{ + int threads; /* number of threads */ - gpr_int64 iterations; /* number of iterations per thread */ + gpr_int64 iterations; /* number of iterations per thread */ gpr_int64 counter; - int thread_count; /* used to allocate thread ids */ - int done; /* threads not yet completed */ + int thread_count; /* used to allocate thread ids */ + int done; /* threads not yet completed */ - gpr_mu mu; /* protects iterations, counter, thread_count, done */ + gpr_mu mu; /* protects iterations, counter, thread_count, done */ - gpr_cv cv; /* signalling depends on test */ + gpr_cv cv; /* signalling depends on test */ - gpr_cv done_cv; /* signalled when done == 0 */ + gpr_cv done_cv; /* signalled when done == 0 */ queue q; @@ -170,287 +190,338 @@ struct test { }; /* Return pointer to a new struct test. */ -static struct test *test_new(int threads, gpr_int64 iterations) { - struct test *m = gpr_malloc(sizeof(*m)); +static struct test * +test_new (int threads, gpr_int64 iterations) +{ + struct test *m = gpr_malloc (sizeof (*m)); m->threads = threads; m->iterations = iterations; m->counter = 0; m->thread_count = 0; m->done = threads; - gpr_mu_init(&m->mu); - gpr_cv_init(&m->cv); - gpr_cv_init(&m->done_cv); - queue_init(&m->q); - gpr_stats_init(&m->stats_counter, 0); - gpr_ref_init(&m->refcount, 0); - gpr_ref_init(&m->thread_refcount, threads); - gpr_event_init(&m->event); + gpr_mu_init (&m->mu); + gpr_cv_init (&m->cv); + gpr_cv_init (&m->done_cv); + queue_init (&m->q); + gpr_stats_init (&m->stats_counter, 0); + gpr_ref_init (&m->refcount, 0); + gpr_ref_init (&m->thread_refcount, threads); + gpr_event_init (&m->event); return m; } /* Return pointer to a new struct test. */ -static void test_destroy(struct test *m) { - gpr_mu_destroy(&m->mu); - gpr_cv_destroy(&m->cv); - gpr_cv_destroy(&m->done_cv); - queue_destroy(&m->q); - gpr_free(m); +static void +test_destroy (struct test *m) +{ + gpr_mu_destroy (&m->mu); + gpr_cv_destroy (&m->cv); + gpr_cv_destroy (&m->done_cv); + queue_destroy (&m->q); + gpr_free (m); } /* Create m->threads threads, each running (*body)(m) */ -static void test_create_threads(struct test *m, void (*body)(void *arg)) { +static void +test_create_threads (struct test *m, void (*body) (void *arg)) +{ gpr_thd_id id; int i; - for (i = 0; i != m->threads; i++) { - GPR_ASSERT(gpr_thd_new(&id, body, m, NULL)); - } + for (i = 0; i != m->threads; i++) + { + GPR_ASSERT (gpr_thd_new (&id, body, m, NULL)); + } } /* Wait until all threads report done. */ -static void test_wait(struct test *m) { - gpr_mu_lock(&m->mu); - while (m->done != 0) { - gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&m->mu); +static void +test_wait (struct test *m) +{ + gpr_mu_lock (&m->mu); + while (m->done != 0) + { + gpr_cv_wait (&m->done_cv, &m->mu, gpr_inf_future (GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock (&m->mu); } /* Get an integer thread id in the raneg 0..threads-1 */ -static int thread_id(struct test *m) { +static int +thread_id (struct test *m) +{ int id; - gpr_mu_lock(&m->mu); + gpr_mu_lock (&m->mu); id = m->thread_count++; - gpr_mu_unlock(&m->mu); + gpr_mu_unlock (&m->mu); return id; } /* Indicate that a thread is done, by decrementing m->done and signalling done_cv if m->done==0. */ -static void mark_thread_done(struct test *m) { - gpr_mu_lock(&m->mu); - GPR_ASSERT(m->done != 0); +static void +mark_thread_done (struct test *m) +{ + gpr_mu_lock (&m->mu); + GPR_ASSERT (m->done != 0); m->done--; - if (m->done == 0) { - gpr_cv_signal(&m->done_cv); - } - gpr_mu_unlock(&m->mu); + if (m->done == 0) + { + gpr_cv_signal (&m->done_cv); + } + gpr_mu_unlock (&m->mu); } /* Test several threads running (*body)(struct test *m) for increasing settings of m->iterations, until about timeout_s to 2*timeout_s seconds have elapsed. If extra!=NULL, run (*extra)(m) in an additional thread. */ -static void test(const char *name, void (*body)(void *m), - void (*extra)(void *m), int timeout_s) { +static void +test (const char *name, void (*body) (void *m), void (*extra) (void *m), int timeout_s) +{ gpr_int64 iterations = 1024; struct test *m; - gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec start = gpr_now (GPR_CLOCK_REALTIME); gpr_timespec time_taken; - gpr_timespec deadline = gpr_time_add( - start, gpr_time_from_micros(timeout_s * 1000000, GPR_TIMESPAN)); - fprintf(stderr, "%s:", name); - while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0) { - iterations <<= 1; - fprintf(stderr, " %ld", (long)iterations); - m = test_new(10, iterations); - if (extra != NULL) { - gpr_thd_id id; - GPR_ASSERT(gpr_thd_new(&id, extra, m, NULL)); - m->done++; /* one more thread to wait for */ - } - test_create_threads(m, body); - test_wait(m); - if (m->counter != m->threads * m->iterations) { - fprintf(stderr, "counter %ld threads %d iterations %ld\n", - (long)m->counter, m->threads, (long)m->iterations); - GPR_ASSERT(0); + gpr_timespec deadline = gpr_time_add (start, gpr_time_from_micros (timeout_s * 1000000, GPR_TIMESPAN)); + fprintf (stderr, "%s:", name); + while (gpr_time_cmp (gpr_now (GPR_CLOCK_REALTIME), deadline) < 0) + { + iterations <<= 1; + fprintf (stderr, " %ld", (long) iterations); + m = test_new (10, iterations); + if (extra != NULL) + { + gpr_thd_id id; + GPR_ASSERT (gpr_thd_new (&id, extra, m, NULL)); + m->done++; /* one more thread to wait for */ + } + test_create_threads (m, body); + test_wait (m); + if (m->counter != m->threads * m->iterations) + { + fprintf (stderr, "counter %ld threads %d iterations %ld\n", (long) m->counter, m->threads, (long) m->iterations); + GPR_ASSERT (0); + } + test_destroy (m); } - test_destroy(m); - } - time_taken = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start); - fprintf(stderr, " done %ld.%09d s\n", (long)time_taken.tv_sec, - (int)time_taken.tv_nsec); + time_taken = gpr_time_sub (gpr_now (GPR_CLOCK_REALTIME), start); + fprintf (stderr, " done %ld.%09d s\n", (long) time_taken.tv_sec, (int) time_taken.tv_nsec); } /* Increment m->counter on each iteration; then mark thread as done. */ -static void inc(void *v /*=m*/) { +static void +inc (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - for (i = 0; i != m->iterations; i++) { - gpr_mu_lock(&m->mu); - m->counter++; - gpr_mu_unlock(&m->mu); - } - mark_thread_done(m); + for (i = 0; i != m->iterations; i++) + { + gpr_mu_lock (&m->mu); + m->counter++; + gpr_mu_unlock (&m->mu); + } + mark_thread_done (m); } /* Increment m->counter under lock acquired with trylock, m->iterations times; then mark thread as done. */ -static void inctry(void *v /*=m*/) { +static void +inctry (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - for (i = 0; i != m->iterations;) { - if (gpr_mu_trylock(&m->mu)) { - m->counter++; - gpr_mu_unlock(&m->mu); - i++; + for (i = 0; i != m->iterations;) + { + if (gpr_mu_trylock (&m->mu)) + { + m->counter++; + gpr_mu_unlock (&m->mu); + i++; + } } - } - mark_thread_done(m); + mark_thread_done (m); } /* Increment counter only when (m->counter%m->threads)==m->thread_id; then mark thread as done. */ -static void inc_by_turns(void *v /*=m*/) { +static void +inc_by_turns (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - int id = thread_id(m); - for (i = 0; i != m->iterations; i++) { - gpr_mu_lock(&m->mu); - while ((m->counter % m->threads) != id) { - gpr_cv_wait(&m->cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + int id = thread_id (m); + for (i = 0; i != m->iterations; i++) + { + gpr_mu_lock (&m->mu); + while ((m->counter % m->threads) != id) + { + gpr_cv_wait (&m->cv, &m->mu, gpr_inf_future (GPR_CLOCK_REALTIME)); + } + m->counter++; + gpr_cv_broadcast (&m->cv); + gpr_mu_unlock (&m->mu); } - m->counter++; - gpr_cv_broadcast(&m->cv); - gpr_mu_unlock(&m->mu); - } - mark_thread_done(m); + mark_thread_done (m); } /* Wait a millisecond and increment counter on each iteration; then mark thread as done. */ -static void inc_with_1ms_delay(void *v /*=m*/) { +static void +inc_with_1ms_delay (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - for (i = 0; i != m->iterations; i++) { - gpr_timespec deadline; - gpr_mu_lock(&m->mu); - deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000, GPR_TIMESPAN)); - while (!gpr_cv_wait(&m->cv, &m->mu, deadline)) { + for (i = 0; i != m->iterations; i++) + { + gpr_timespec deadline; + gpr_mu_lock (&m->mu); + deadline = gpr_time_add (gpr_now (GPR_CLOCK_REALTIME), gpr_time_from_micros (1000, GPR_TIMESPAN)); + while (!gpr_cv_wait (&m->cv, &m->mu, deadline)) + { + } + m->counter++; + gpr_mu_unlock (&m->mu); } - m->counter++; - gpr_mu_unlock(&m->mu); - } - mark_thread_done(m); + mark_thread_done (m); } /* Wait a millisecond and increment counter on each iteration, using an event for timing; then mark thread as done. */ -static void inc_with_1ms_delay_event(void *v /*=m*/) { +static void +inc_with_1ms_delay_event (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - for (i = 0; i != m->iterations; i++) { - gpr_timespec deadline; - deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000, GPR_TIMESPAN)); - GPR_ASSERT(gpr_event_wait(&m->event, deadline) == NULL); - gpr_mu_lock(&m->mu); - m->counter++; - gpr_mu_unlock(&m->mu); - } - mark_thread_done(m); + for (i = 0; i != m->iterations; i++) + { + gpr_timespec deadline; + deadline = gpr_time_add (gpr_now (GPR_CLOCK_REALTIME), gpr_time_from_micros (1000, GPR_TIMESPAN)); + GPR_ASSERT (gpr_event_wait (&m->event, deadline) == NULL); + gpr_mu_lock (&m->mu); + m->counter++; + gpr_mu_unlock (&m->mu); + } + mark_thread_done (m); } /* Produce m->iterations elements on queue m->q, then mark thread as done. Even threads use queue_append(), and odd threads use queue_try_append() until it succeeds. */ -static void many_producers(void *v /*=m*/) { +static void +many_producers (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - int x = thread_id(m); - if ((x & 1) == 0) { - for (i = 0; i != m->iterations; i++) { - queue_append(&m->q, 1); + int x = thread_id (m); + if ((x & 1) == 0) + { + for (i = 0; i != m->iterations; i++) + { + queue_append (&m->q, 1); + } } - } else { - for (i = 0; i != m->iterations; i++) { - while (!queue_try_append(&m->q, 1)) { - } + else + { + for (i = 0; i != m->iterations; i++) + { + while (!queue_try_append (&m->q, 1)) + { + } + } } - } - mark_thread_done(m); + mark_thread_done (m); } /* Consume elements from m->q until m->threads*m->iterations are seen, wait an extra second to confirm that no more elements are arriving, then mark thread as done. */ -static void consumer(void *v /*=m*/) { +static void +consumer (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 n = m->iterations * m->threads; gpr_int64 i; int value; - for (i = 0; i != n; i++) { - queue_remove(&m->q, &value, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_lock(&m->mu); + for (i = 0; i != n; i++) + { + queue_remove (&m->q, &value, gpr_inf_future (GPR_CLOCK_REALTIME)); + } + gpr_mu_lock (&m->mu); m->counter = n; - gpr_mu_unlock(&m->mu); - GPR_ASSERT( - !queue_remove(&m->q, &value, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000000, GPR_TIMESPAN)))); - mark_thread_done(m); + gpr_mu_unlock (&m->mu); + GPR_ASSERT (!queue_remove (&m->q, &value, gpr_time_add (gpr_now (GPR_CLOCK_REALTIME), gpr_time_from_micros (1000000, GPR_TIMESPAN)))); + mark_thread_done (m); } /* Increment m->stats_counter m->iterations times, transfer counter value to m->counter, then mark thread as done. */ -static void statsinc(void *v /*=m*/) { +static void +statsinc (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - for (i = 0; i != m->iterations; i++) { - gpr_stats_inc(&m->stats_counter, 1); - } - gpr_mu_lock(&m->mu); - m->counter = gpr_stats_read(&m->stats_counter); - gpr_mu_unlock(&m->mu); - mark_thread_done(m); + for (i = 0; i != m->iterations; i++) + { + gpr_stats_inc (&m->stats_counter, 1); + } + gpr_mu_lock (&m->mu); + m->counter = gpr_stats_read (&m->stats_counter); + gpr_mu_unlock (&m->mu); + mark_thread_done (m); } /* Increment m->refcount m->iterations times, decrement m->thread_refcount once, and if it reaches zero, set m->event to (void*)1; then mark thread as done. */ -static void refinc(void *v /*=m*/) { +static void +refinc (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 i; - for (i = 0; i != m->iterations; i++) { - gpr_ref(&m->refcount); - } - if (gpr_unref(&m->thread_refcount)) { - gpr_event_set(&m->event, (void *)1); - } - mark_thread_done(m); + for (i = 0; i != m->iterations; i++) + { + gpr_ref (&m->refcount); + } + if (gpr_unref (&m->thread_refcount)) + { + gpr_event_set (&m->event, (void *) 1); + } + mark_thread_done (m); } /* Wait until m->event is set to (void *)1, then decrement m->refcount m->stats_counter m->iterations times, and ensure that the last decrement caused the counter to reach zero, then mark thread as done. */ -static void refcheck(void *v /*=m*/) { +static void +refcheck (void *v /*=m*/ ) +{ struct test *m = v; gpr_int64 n = m->iterations * m->threads; gpr_int64 i; - GPR_ASSERT(gpr_event_wait(&m->event, gpr_inf_future(GPR_CLOCK_REALTIME)) == - (void *)1); - GPR_ASSERT(gpr_event_get(&m->event) == (void *)1); - for (i = 1; i != n; i++) { - GPR_ASSERT(!gpr_unref(&m->refcount)); - m->counter++; - } - GPR_ASSERT(gpr_unref(&m->refcount)); + GPR_ASSERT (gpr_event_wait (&m->event, gpr_inf_future (GPR_CLOCK_REALTIME)) == (void *) 1); + GPR_ASSERT (gpr_event_get (&m->event) == (void *) 1); + for (i = 1; i != n; i++) + { + GPR_ASSERT (!gpr_unref (&m->refcount)); + m->counter++; + } + GPR_ASSERT (gpr_unref (&m->refcount)); m->counter++; - mark_thread_done(m); + mark_thread_done (m); } /* ------------------------------------------------- */ -int main(int argc, char *argv[]) { - grpc_test_init(argc, argv); - test("mutex", &inc, NULL, 1); - test("mutex try", &inctry, NULL, 1); - test("cv", &inc_by_turns, NULL, 1); - test("timedcv", &inc_with_1ms_delay, NULL, 1); - test("queue", &many_producers, &consumer, 10); - test("stats_counter", &statsinc, NULL, 1); - test("refcount", &refinc, &refcheck, 1); - test("timedevent", &inc_with_1ms_delay_event, NULL, 1); +int +main (int argc, char *argv[]) +{ + grpc_test_init (argc, argv); + test ("mutex", &inc, NULL, 1); + test ("mutex try", &inctry, NULL, 1); + test ("cv", &inc_by_turns, NULL, 1); + test ("timedcv", &inc_with_1ms_delay, NULL, 1); + test ("queue", &many_producers, &consumer, 10); + test ("stats_counter", &statsinc, NULL, 1); + test ("refcount", &refinc, &refcheck, 1); + test ("timedevent", &inc_with_1ms_delay_event, NULL, 1); return 0; } |