diff options
author | Nathan Harmata <nharmata@google.com> | 2016-09-23 22:57:45 +0000 |
---|---|---|
committer | Laszlo Csomor <laszlocsomor@google.com> | 2016-09-26 07:47:39 +0000 |
commit | db64da3a62704c5e904cbdb081050549ebddab06 (patch) | |
tree | a7a2113258b90bbbea641bdb66e088d2746c5856 /src/test | |
parent | 65b4ff030e85d54b860d82f952a3bff4c8edeb50 (diff) |
Introduce MoreFutures#waitForAllInterruptiblyFailFast and use this in the places we wait for tasks (plural!) submitted to a ForkJoinPool to finish since we actually want to do so interruptibly.
As was to be expected, testing this was tricky :)
--
MOS_MIGRATED_REVID=134128019
Diffstat (limited to 'src/test')
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/concurrent/MoreFuturesTest.java | 104 |
1 files changed, 100 insertions, 4 deletions
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/MoreFuturesTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/MoreFuturesTest.java index b8fbe352a7..eea935df17 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/MoreFuturesTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/MoreFuturesTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Tests for MoreFutures @@ -104,6 +105,93 @@ public class MoreFuturesTest { } } + @Test + public void waitForAllInterruptiblyFailFast_AllSuccessful() throws Exception { + List<DelayedFuture> futureList = new ArrayList<>(); + for (int i = 1; i < 6; i++) { + DelayedFuture future = new DelayedFuture(i * 1000); + executorService.execute(future); + futureList.add(future); + } + MoreFutures.waitForAllInterruptiblyFailFast(futureList); + for (DelayedFuture delayedFuture : futureList) { + assertFalse(delayedFuture.wasCanceled); + assertFalse(delayedFuture.wasInterrupted); + assertNotNull(delayedFuture.get()); + } + } + + @Test + public void waitForAllInterruptiblyFailFast_Interrupt() throws Exception { + final List<DelayedFuture> futureList = new ArrayList<>(); + for (int i = 1; i < 6; i++) { + // When we have a bunch of futures that never complete. + DelayedFuture future = new DelayedFuture(Integer.MAX_VALUE); + // And submit them to an Executor. + executorService.execute(future); + futureList.add(future); + } + final Thread testThread = Thread.currentThread(); + // And have a thread that interrupts the current thread (the one running the test) once all the + // futures were polled at least once via Future#get(long, TimeUnit). + Thread interruptThread = new Thread() { + @Override + public void run() { + for (DelayedFuture delayedFuture : futureList) { + try { + delayedFuture.getLatch.await( + TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + throw new IllegalStateException(ie); + } + } + testThread.interrupt(); + } + }; + // And run this thread in the background. + interruptThread.start(); + try { + try { + // And then wait for all the futures to complete, interruptibly. + MoreFutures.waitForAllInterruptiblyFailFast(futureList); + fail(); + } catch (InterruptedException expected) { + // Then, as expected, waitForAllInterruptiblyFailFast propagates the interrupt sent to the + // main test thread by our background thread. + } + } finally { + // The @After-annotated shutdownExecutor method blocks on completion of all tasks. Since we + // submitted a bunch of tasks that never complete, we need to explictly cancel them. + for (DelayedFuture delayedFuture : futureList) { + delayedFuture.cancel(/*mayInterruptIfRunning=*/ true); + } + // If we're here and the test were to pass, then the background thread must have already + // completed. Interrupt it unconditionally - if the test were to pass, this is benign. But if + // the test were to fail then we'd have a rogue thread in the background which can be very + // evil (e.g. can interfere with the execution of other test cases). + interruptThread.interrupt(); + } + } + + @Test + public void waitForAllInterruptiblyFailFast_Failure() throws Exception { + List<DelayedFuture> futureList = new ArrayList<>(); + for (int i = 1; i < 6; i++) { + DelayedFuture future = new DelayedFuture(i * 1000); + executorService.execute(future); + futureList.add(future); + } + DelayedFuture toFail = new DelayedFuture(1000); + futureList.add(toFail); + toFail.makeItFail(); + try { + MoreFutures.waitForAllInterruptiblyFailFast(futureList); + fail(); + } catch (ExecutionException ee) { + assertThat(ee.getCause()).hasMessage("I like to fail!!"); + } + } + /** * A future that (if added to an executor) waits {@code delay} milliseconds before setting a * response. @@ -111,7 +199,8 @@ public class MoreFuturesTest { private static class DelayedFuture extends AbstractFuture<Object> implements Runnable { private final int delay; - private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch failOrInterruptLatch = new CountDownLatch(1); + private final CountDownLatch getLatch = new CountDownLatch(1); private boolean wasCanceled; private boolean wasInterrupted; @@ -122,7 +211,7 @@ public class MoreFuturesTest { @Override public void run() { try { - wasCanceled = latch.await(delay, TimeUnit.MILLISECONDS); + wasCanceled = failOrInterruptLatch.await(delay, TimeUnit.MILLISECONDS); // Not canceled and not done (makeItFail sets the value, so in that case is done). if (!wasCanceled && !isDone()) { set(new Object()); @@ -134,7 +223,7 @@ public class MoreFuturesTest { public void makeItFail() { setException(new RuntimeException("I like to fail!!")); - latch.countDown(); + failOrInterruptLatch.countDown(); } @Override @@ -144,7 +233,14 @@ public class MoreFuturesTest { @Override protected void interruptTask() { - latch.countDown(); + failOrInterruptLatch.countDown(); + } + + @Override + public Object get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException { + getLatch.countDown(); + return super.get(timeout, unit); } } |