diff options
author | Han-Wen Nienhuys <hanwen@google.com> | 2015-02-25 16:45:20 +0100 |
---|---|---|
committer | Han-Wen Nienhuys <hanwen@google.com> | 2015-02-25 16:45:20 +0100 |
commit | d08b27fa9701fecfdb69e1b0d1ac2459efc2129b (patch) | |
tree | 5d50963026239ca5aebfb47ea5b8db7e814e57c8 /src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java |
Update from Google.
--
MOE_MIGRATED_REVID=85702957
Diffstat (limited to 'src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java')
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java new file mode 100644 index 0000000000..8a6485c12c --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java @@ -0,0 +1,493 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.devtools.build.lib.testutil.TestThread; +import com.google.devtools.build.lib.testutil.TestUtils; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Tests for AbstractQueueVisitor. + */ +@RunWith(JUnit4.class) +public class AbstractQueueVisitorTest { + + private static final RuntimeException THROWABLE = new RuntimeException(); + + @Test + public void simpleCounter() throws Exception { + CountingQueueVisitor counter = new CountingQueueVisitor(); + counter.enqueue(); + counter.work(false); + assertSame(10, counter.getCount()); + } + + @Test + public void callerOwnedPool() throws Exception { + ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + assertSame(0, executor.getActiveCount()); + + CountingQueueVisitor counter = new CountingQueueVisitor(executor); + counter.enqueue(); + counter.work(false); + assertSame(10, counter.getCount()); + + executor.shutdown(); + assertTrue(executor.awaitTermination(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + } + + @Test + public void doubleCounter() throws Exception { + CountingQueueVisitor counter = new CountingQueueVisitor(); + counter.enqueue(); + counter.enqueue(); + counter.work(false); + assertSame(10, counter.getCount()); + } + + @Test + public void exceptionFromWorkerThread() { + final RuntimeException myException = new IllegalStateException(); + ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); + visitor.enqueue(new Runnable() { + @Override + public void run() { + throw myException; + } + }); + + try { + // The exception from the worker thread should be + // re-thrown from the main thread. + visitor.work(false); + fail(); + } catch (Exception e) { + assertSame(myException, e); + } + } + + // Regression test for "AbstractQueueVisitor loses track of jobs if thread allocation fails". + @Test + public void threadPoolThrowsSometimes() throws Exception { + // In certain cases (for example, if the address space is almost entirely consumed by a huge + // JVM heap), thread allocation can fail with an OutOfMemoryError. If the queue visitor + // does not handle this gracefully, we lose track of tasks and hang the visitor indefinitely. + + ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()) { + private final AtomicLong count = new AtomicLong(); + + @Override + public void execute(Runnable command) { + long count = this.count.incrementAndGet(); + if (count == 6) { + throw new Error("Could not create thread (fakeout)"); + } + super.execute(command); + } + }; + + CountingQueueVisitor counter = new CountingQueueVisitor(executor); + counter.enqueue(); + try { + counter.work(false); + fail(); + } catch (Error expected) { + assertEquals("Could not create thread (fakeout)", expected.getMessage()); + } + assertSame(5, counter.getCount()); + + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + + // Regression test to make sure that AbstractQueueVisitor doesn't swallow unchecked exceptions if + // it is interrupted concurrently with the unchecked exception being thrown. + @Test + public void interruptAndThrownIsInterruptedAndThrown() throws Exception { + final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); + // Use a latch to make sure the thread gets a chance to start. + final CountDownLatch threadStarted = new CountDownLatch(1); + visitor.enqueue(new Runnable() { + @Override + public void run() { + threadStarted.countDown(); + assertTrue(Uninterruptibles.awaitUninterruptibly( + visitor.getInterruptionLatchForTestingOnly(), 2, TimeUnit.SECONDS)); + throw THROWABLE; + } + }); + assertTrue(threadStarted.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + // Interrupt will not be processed until work starts. + Thread.currentThread().interrupt(); + try { + visitor.work(/*interruptWorkers=*/true); + fail(); + } catch (Exception e) { + assertEquals(THROWABLE, e); + assertTrue(Thread.interrupted()); + } + } + + @Test + public void interruptionWithoutInterruptingWorkers() throws Exception { + final Thread mainThread = Thread.currentThread(); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final boolean[] workerThreadCompleted = { false }; + final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); + + visitor.enqueue(new Runnable() { + @Override + public void run() { + try { + latch1.countDown(); + latch2.await(); + workerThreadCompleted[0] = true; + } catch (InterruptedException e) { + // Do not set workerThreadCompleted to true + } + } + }); + + TestThread interrupterThread = new TestThread() { + @Override + public void runTest() throws Exception { + latch1.await(); + mainThread.interrupt(); + assertTrue(visitor.awaitInterruptionForTestingOnly(TestUtils.WAIT_TIMEOUT_MILLISECONDS, + TimeUnit.MILLISECONDS)); + latch2.countDown(); + } + }; + + interrupterThread.start(); + + try { + visitor.work(false); + fail(); + } catch (InterruptedException e) { + // Expected. + } + + interrupterThread.joinAndAssertState(400); + assertTrue(workerThreadCompleted[0]); + } + + @Test + public void interruptionWithInterruptingWorkers() throws Exception { + assertInterruptWorkers(null); + + ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + assertInterruptWorkers(executor); + executor.shutdown(); + executor.awaitTermination(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + private void assertInterruptWorkers(ThreadPoolExecutor executor) throws Exception { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final boolean[] workerThreadInterrupted = { false }; + ConcreteQueueVisitor visitor = (executor == null) + ? new ConcreteQueueVisitor() + : new ConcreteQueueVisitor(executor, true); + + visitor.enqueue(new Runnable() { + @Override + public void run() { + try { + latch1.countDown(); + latch2.await(); + } catch (InterruptedException e) { + workerThreadInterrupted[0] = true; + } + } + }); + + latch1.await(); + Thread.currentThread().interrupt(); + + try { + visitor.work(true); + fail(); + } catch (InterruptedException e) { + // Expected. + } + + assertTrue(workerThreadInterrupted[0]); + } + + @Test + public void failFast() throws Exception { + // In failFast mode, we only run actions queued before the exception. + assertFailFast(null, true, false, false, "a", "b"); + + // In !failFast mode, we complete all queued actions. + assertFailFast(null, false, false, false, "a", "b", "1", "2"); + + // Now check fail-fast on interrupt: + assertFailFast(null, false, true, true, "a", "b"); + assertFailFast(null, false, false, true, "a", "b", "1", "2"); + } + + @Test + public void failFastNoShutdown() throws Exception { + ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + // In failFast mode, we only run actions queued before the exception. + assertFailFast(executor, true, false, false, "a", "b"); + + // In !failFast mode, we complete all queued actions. + assertFailFast(executor, false, false, false, "a", "b", "1", "2"); + + // Now check fail-fast on interrupt: + assertFailFast(executor, false, true, true, "a", "b"); + assertFailFast(executor, false, false, true, "a", "b", "1", "2"); + + executor.shutdown(); + assertTrue(executor.awaitTermination(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + } + + private void assertFailFast(ThreadPoolExecutor executor, + boolean failFastOnException, boolean failFastOnInterrupt, + boolean interrupt, String... expectedVisited) throws Exception { + assertTrue(executor == null || !executor.isShutdown()); + AbstractQueueVisitor visitor = (executor == null) + ? new ConcreteQueueVisitor(failFastOnException, failFastOnInterrupt) + : new ConcreteQueueVisitor(executor, failFastOnException, failFastOnInterrupt); + + List<String> visitedList = Collections.synchronizedList(Lists.<String>newArrayList()); + + // Runnable "ra" will await the uncaught exception from + // "throwingRunnable", then add "a" to the list and + // enqueue "r1". Runnable "r1" should be + // executed iff !failFast. + + CountDownLatch latchA = new CountDownLatch(1); + CountDownLatch latchB = new CountDownLatch(1); + + Runnable r1 = awaitAddAndEnqueueRunnable(interrupt, visitor, null, visitedList, "1", null); + Runnable r2 = awaitAddAndEnqueueRunnable(interrupt, visitor, null, visitedList, "2", null); + Runnable ra = awaitAddAndEnqueueRunnable(interrupt, visitor, latchA, visitedList, "a", r1); + Runnable rb = awaitAddAndEnqueueRunnable(interrupt, visitor, latchB, visitedList, "b", r2); + + visitor.enqueue(ra); + visitor.enqueue(rb); + latchA.await(); + latchB.await(); + visitor.enqueue(interrupt ? interruptingRunnable(Thread.currentThread()) : throwingRunnable()); + + try { + visitor.work(false); + fail(); + } catch (Exception e) { + if (interrupt) { + assertTrue(e instanceof InterruptedException); + } else { + assertSame(THROWABLE, e); + } + } + assertTrue( + "got: " + visitedList + "\nwant: " + Arrays.toString(expectedVisited), + Sets.newHashSet(visitedList).equals(Sets.newHashSet(expectedVisited))); + + if (executor != null) { + assertFalse(executor.isShutdown()); + assertEquals(0, visitor.getTaskCount()); + } + } + + @Test + public void jobIsInterruptedWhenOtherFails() throws Exception { + ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + + final QueueVisitorWithCriticalError visitor = new QueueVisitorWithCriticalError(executor); + final CountDownLatch latch1 = new CountDownLatch(1); + final AtomicBoolean wasInterrupted = new AtomicBoolean(false); + + Runnable r1 = new Runnable() { + + @Override + public void run() { + latch1.countDown(); + try { + // Interruption is expected during a sleep. There is no sense in fail or assert call + // because exception is going to be swallowed inside AbstractQueueVisitior. + // We are using wasInterrupted flag to assert in the end of test. + Thread.sleep(1000); + } catch (InterruptedException e) { + wasInterrupted.set(true); + } + } + }; + + visitor.enqueue(r1); + latch1.await(); + visitor.enqueue(throwingRunnable()); + + try { + visitor.work(true); + fail(); + } catch (Exception e) { + assertSame(THROWABLE, e); + } + + assertTrue(wasInterrupted.get()); + assertTrue(executor.isShutdown()); + } + + private Runnable throwingRunnable() { + return new Runnable() { + @Override + public void run() { + throw THROWABLE; + } + }; + } + + private Runnable interruptingRunnable(final Thread thread) { + return new Runnable() { + @Override + public void run() { + thread.interrupt(); + } + }; + } + + private static Runnable awaitAddAndEnqueueRunnable(final boolean interrupt, + final AbstractQueueVisitor visitor, + final CountDownLatch started, + final List<String> list, + final String toAdd, + final Runnable toEnqueue) { + return new Runnable() { + @Override + public void run() { + if (started != null) { + started.countDown(); + } + + try { + assertTrue(interrupt + ? visitor.awaitInterruptionForTestingOnly(1, TimeUnit.MINUTES) + : visitor.getExceptionLatchForTestingOnly().await(1, TimeUnit.MINUTES)); + } catch (InterruptedException e) { + // Unexpected. + throw new RuntimeException(e); + } + list.add(toAdd); + if (toEnqueue != null) { + visitor.enqueue(toEnqueue); + } + } + }; + } + + private static class CountingQueueVisitor extends AbstractQueueVisitor { + + private final static String THREAD_NAME = "BlazeTest CountingQueueVisitor"; + + private int theInt = 0; + private final Object lock = new Object(); + + public CountingQueueVisitor() { + super(5, 5, 3L, TimeUnit.SECONDS, THREAD_NAME); + } + + public CountingQueueVisitor(ThreadPoolExecutor executor) { + super(executor, false, true, true); + } + + public void enqueue() { + super.enqueue(new Runnable() { + @Override + public void run() { + synchronized (lock) { + if (theInt < 10) { + theInt++; + enqueue(); + } + } + } + }); + } + + public int getCount() { + return theInt; + } + } + + private static class ConcreteQueueVisitor extends AbstractQueueVisitor { + + private final static String THREAD_NAME = "BlazeTest ConcreteQueueVisitor"; + + public ConcreteQueueVisitor() { + super(5, 5, 3L, TimeUnit.SECONDS, THREAD_NAME); + } + + public ConcreteQueueVisitor(boolean failFast) { + super(true, 5, 5, 3L, TimeUnit.SECONDS, failFast, THREAD_NAME); + } + + public ConcreteQueueVisitor(boolean failFast, boolean failFastOnInterrupt) { + super(true, 5, 5, 3L, TimeUnit.SECONDS, failFast, failFastOnInterrupt, THREAD_NAME); + } + + public ConcreteQueueVisitor(ThreadPoolExecutor executor, boolean failFast, + boolean failFastOnInterrupt) { + super(executor, /*shutdownOnCompletion=*/false, failFast, failFastOnInterrupt); + } + + public ConcreteQueueVisitor(ThreadPoolExecutor executor, boolean failFast) { + super(executor, /*shutdownOnCompletion=*/false, failFast, true); + } + } + + private static class QueueVisitorWithCriticalError extends AbstractQueueVisitor { + + public QueueVisitorWithCriticalError(ThreadPoolExecutor executor) { + super(executor, false); + } + + @Override + protected boolean isCriticalError(Throwable e) { + return true; + } + } +} |