diff options
author | Mark Schaller <mschaller@google.com> | 2015-10-30 22:15:34 +0000 |
---|---|---|
committer | Florian Weikert <fwe@google.com> | 2015-11-02 16:55:28 +0000 |
commit | 5350b3154665ebb37e10e6261686eb646ea23220 (patch) | |
tree | 8bce3e4c5e8e708577ad01d3e361f93a2a008548 | |
parent | e45a363cf7c2abfdf1dd67cfb7ed0a54d1ed52c2 (diff) |
Introduce QuiescingExecutor
This interface (mostly) encapsulates what the ValueVisitor expects
from the AbstractQueueVisitor class it currently inherits from. This
makes it easier for a future CL to change ValueVisitor's strategy of
code reuse from inheritance to composition.
RELNOTES:
--
MOS_MIGRATED_REVID=106728863
8 files changed, 178 insertions, 142 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 51f054746c..9f49798bde 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -32,27 +32,10 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * AbstractQueueVisitor is a wrapper around {@link ExecutorService} which delays service shutdown - * until entire visitation is complete. This is useful for cases in which worker tasks may submit - * additional tasks. - * - * <p>Consider the following example: - * <pre> - * ThreadPoolExecutor executor = <...> - * executor.submit(myRunnableTask); - * executor.shutdown(); - * executor.awaitTermination(); - * </pre> - * - * <p>This won't work properly if {@code myRunnableTask} submits additional - * tasks to the executor, because it may already have shut down - * by that point. - * - * <p>AbstractQueueVisitor supports interruption. If the main thread is - * interrupted, tasks will no longer be added to the queue, and the - * {@link #work(boolean)} method will throw {@link InterruptedException}. + * AbstractQueueVisitor is a {@link QuiescingExecutor} implementation that wraps an {@link + * ExecutorService}. */ -public class AbstractQueueVisitor { +public class AbstractQueueVisitor implements QuiescingExecutor { /** * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link @@ -114,7 +97,7 @@ public class AbstractQueueVisitor { /** * If {@link #concurrent} is {@code true}, then this is a counter of the number of {@link - * Runnable}s {@link #enqueue}-d that have not finished evaluation. + * Runnable}s {@link #execute}-d that have not finished evaluation. */ private final AtomicLong remainingTasks = new AtomicLong(0); @@ -124,13 +107,13 @@ public class AbstractQueueVisitor { /** * The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on - * first call to {@link #enqueue(Runnable)}, and removed after call to {@link #work(boolean)}. + * first call to {@link #execute(Runnable)}, and removed after call to {@link #awaitQuiescence}. */ private final ExecutorService pool; /** * Flag used to record when the main thread (the thread which called - * {@link #work(boolean)}) is interrupted. + * {@link #awaitQuiescence(boolean)}) is interrupted. * * When this is true, adding tasks to the thread pool will * fail quietly as a part of the process of shutting down the @@ -341,19 +324,9 @@ public class AbstractQueueVisitor { this(true, parallelism, keepAlive, units, false, true, poolName, EXECUTOR_FACTORY); } - /** - * Executes all tasks on the queue, and optionally shuts the pool down and deletes it. - * - * <p>Throws (the same) unchecked exception if any worker thread failed unexpectedly. If the pool - * is interrupted and a worker also throws an unchecked exception, the unchecked exception is - * rethrown, since it may indicate a programming bug. If callers handle the unchecked exception, - * they may check the interrupted bit to see if the pool was interrupted. - * - * @param interruptWorkers if true, interrupt worker threads if main thread gets an interrupt or - * if a worker throws a critical error (see {@link #classifyError(Throwable)}. If false, - * just wait for them to terminate normally. - */ - protected final void work(boolean interruptWorkers) throws InterruptedException { + + @Override + public final void awaitQuiescence(boolean interruptWorkers) throws InterruptedException { if (concurrent) { awaitTermination(interruptWorkers); } else { @@ -367,7 +340,8 @@ public class AbstractQueueVisitor { * Schedules a call. * Called in a worker thread if concurrent. */ - protected final void enqueue(Runnable runnable) { + @Override + public final void execute(Runnable runnable) { if (concurrent) { AtomicBoolean ranTask = new AtomicBoolean(false); try { diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java new file mode 100644 index 0000000000..721c47a714 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java @@ -0,0 +1,52 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import java.util.concurrent.Executor; + +/** + * QuiescingExecutor is an {@link Executor} which supports waiting until all submitted tasks are + * complete. This is useful when tasks may submit additional tasks. + * + * <p>Consider the following example: + * <pre> + * ThreadPoolExecutor executor = <...> + * executor.submit(myRunnableTask); + * executor.shutdown(); + * executor.awaitTermination(); + * </pre> + * + * <p>This won't work properly if {@code myRunnableTask} submits additional tasks to the + * executor, because it may already have shut down by that point. + * + * <p>QuiescingExecutor supports interruption. If the main thread is interrupted, tasks will no + * longer be started, and the {@link #awaitQuiescence} method will throw {@link + * InterruptedException}. + */ +public interface QuiescingExecutor extends Executor { + + /** + * Waits for all tasks to complete. If the {@link QuiescingExecutor} owns its own {@link + * java.util.concurrent.ExecutorService}, the service will also be shutdown. + * + * <p>Throws (the same) unchecked exception if any worker thread failed unexpectedly. If the main + * thread is interrupted and a worker also throws an unchecked exception, the unchecked exception + * is rethrown, since it may indicate a programming bug. If callers handle the unchecked + * exception, they may check the interrupted bit to see if the pool was interrupted. + * + * @param interruptWorkers if true, interrupt worker threads if main thread gets an interrupt. + * If false, just wait for them to terminate normally. + */ + void awaitQuiescence(boolean interruptWorkers) throws InterruptedException; +} diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java index 35e84d3b15..e5f78c1fd2 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java @@ -306,7 +306,7 @@ final class LabelVisitor { @ThreadSafe public boolean finish() throws InterruptedException { - work(true); + awaitQuiescence(/*interruptWorkers=*/ true); return !errorObserver.hasErrors(); } @@ -338,7 +338,7 @@ final class LabelVisitor { !blockNewActions() && count < RECURSION_LIMIT) { newVisitRunnable(from, attr, label, depth, count + 1).run(); } else { - enqueue(newVisitRunnable(from, attr, label, depth, 0)); + execute(newVisitRunnable(from, attr, label, depth, 0)); } } diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index cd6a986acd..2d18f910fa 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -108,7 +108,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr // exist in the graph, so we must be tolerant of that case. visit(visitData.first, visitData.second, !MUST_EXIST); } - work(/*failFastOnInterrupt=*/true); + awaitQuiescence(/*interruptWorkers=*/ true); Preconditions.checkState(pendingVisitations.isEmpty(), "All dirty nodes should have been processed: %s", pendingVisitations); } @@ -226,7 +226,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr } final Pair<SkyKey, InvalidationType> invalidationPair = Pair.of(key, invalidationType); pendingVisitations.add(invalidationPair); - enqueue( + execute( new Runnable() { @Override public void run() { @@ -345,7 +345,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr return; } pendingVisitations.add(invalidationPair); - enqueue( + execute( new Runnable() { @Override public void run() { diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java index e46a30ba9b..36231f7241 100644 --- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java @@ -621,7 +621,7 @@ public final class ParallelEvaluator implements Evaluator { } protected void waitForCompletion() throws InterruptedException { - work(/*failFastOnInterrupt=*/true); + awaitQuiescence(/*interruptWorkers=*/ true); } public void enqueueEvaluation(final SkyKey key) { @@ -642,7 +642,7 @@ public final class ParallelEvaluator implements Evaluator { if (newlyEnqueued && progressReceiver != null) { progressReceiver.enqueueing(key); } - enqueue(new Evaluate(this, key)); + execute(new Evaluate(this, key)); } /** diff --git a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java index d9fb1a3080..832ee7242d 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java @@ -139,14 +139,15 @@ public class ConcurrentMultimapWithHeadElementTest { } private void addAndRemove(final Boolean key, final Integer add, final Integer remove) { - enqueue(new Runnable() { - @Override - public void run() { - assertNotNull(multimap.putAndGet(key, add)); - multimap.remove(key, remove); - doRandom(); - } - }); + execute( + new Runnable() { + @Override + public void run() { + assertNotNull(multimap.putAndGet(key, add)); + multimap.remove(key, remove); + doRandom(); + } + }); } private Integer getRandomInt() { @@ -162,7 +163,7 @@ public class ConcurrentMultimapWithHeadElementTest { } private void work() throws InterruptedException { - work(/*failFastOnInterrupt=*/true); + awaitQuiescence(/*interruptWorkers=*/ true); } } diff --git a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java index 6d5a23e665..2dd07ed868 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java @@ -91,27 +91,29 @@ public class MapBasedActionGraphTest { } private void registerAction(final Action action) { - enqueue(new Runnable() { - @Override - public void run() { - try { - graph.registerAction(action); - } catch (ActionConflictException e) { - throw new UncheckedActionConflictException(e); - } - doRandom(); - } - }); + execute( + new Runnable() { + @Override + public void run() { + try { + graph.registerAction(action); + } catch (ActionConflictException e) { + throw new UncheckedActionConflictException(e); + } + doRandom(); + } + }); } private void unregisterAction(final Action action) { - enqueue(new Runnable() { - @Override - public void run() { - graph.unregisterAction(action); - doRandom(); - } - }); + execute( + new Runnable() { + @Override + public void run() { + graph.unregisterAction(action); + doRandom(); + } + }); } private void doRandom() { @@ -134,7 +136,7 @@ public class MapBasedActionGraphTest { } private void work() throws InterruptedException { - work(/*failFastOnInterrupt=*/true); + awaitQuiescence(/*interruptWorkers=*/ true); } } diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java index e31cd94fa9..2c62bec0b0 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java @@ -52,7 +52,7 @@ public class AbstractQueueVisitorTest { public void simpleCounter() throws Exception { CountingQueueVisitor counter = new CountingQueueVisitor(); counter.enqueue(); - counter.work(false); + counter.awaitQuiescence(/*interruptWorkers=*/ false); assertSame(10, counter.getCount()); } @@ -64,7 +64,7 @@ public class AbstractQueueVisitorTest { CountingQueueVisitor counter = new CountingQueueVisitor(executor); counter.enqueue(); - counter.work(false); + counter.awaitQuiescence(/*interruptWorkers=*/ false); assertSame(10, counter.getCount()); executor.shutdown(); @@ -76,7 +76,7 @@ public class AbstractQueueVisitorTest { CountingQueueVisitor counter = new CountingQueueVisitor(); counter.enqueue(); counter.enqueue(); - counter.work(false); + counter.awaitQuiescence(/*interruptWorkers=*/ false); assertSame(10, counter.getCount()); } @@ -84,17 +84,18 @@ public class AbstractQueueVisitorTest { public void exceptionFromWorkerThread() { final RuntimeException myException = new IllegalStateException(); ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); - visitor.enqueue(new Runnable() { - @Override - public void run() { - throw myException; - } - }); + visitor.execute( + new Runnable() { + @Override + public void run() { + throw myException; + } + }); try { // The exception from the worker thread should be // re-thrown from the main thread. - visitor.work(false); + visitor.awaitQuiescence(/*interruptWorkers=*/ false); fail(); } catch (Exception e) { assertSame(myException, e); @@ -125,7 +126,7 @@ public class AbstractQueueVisitorTest { CountingQueueVisitor counter = new CountingQueueVisitor(executor); counter.enqueue(); try { - counter.work(false); + counter.awaitQuiescence(/*interruptWorkers=*/ false); fail(); } catch (Error expected) { assertThat(expected).hasMessage("Could not create thread (fakeout)"); @@ -143,20 +144,22 @@ public class AbstractQueueVisitorTest { final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); // Use a latch to make sure the thread gets a chance to start. final CountDownLatch threadStarted = new CountDownLatch(1); - visitor.enqueue(new Runnable() { - @Override - public void run() { - threadStarted.countDown(); - assertTrue(Uninterruptibles.awaitUninterruptibly( - visitor.getInterruptionLatchForTestingOnly(), 2, TimeUnit.SECONDS)); - throw THROWABLE; - } - }); + visitor.execute( + new Runnable() { + @Override + public void run() { + threadStarted.countDown(); + assertTrue( + Uninterruptibles.awaitUninterruptibly( + visitor.getInterruptionLatchForTestingOnly(), 2, TimeUnit.SECONDS)); + throw THROWABLE; + } + }); assertTrue(threadStarted.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); // Interrupt will not be processed until work starts. Thread.currentThread().interrupt(); try { - visitor.work(/*interruptWorkers=*/true); + visitor.awaitQuiescence(/*interruptWorkers=*/ true); fail(); } catch (Exception e) { assertEquals(THROWABLE, e); @@ -172,18 +175,19 @@ public class AbstractQueueVisitorTest { final boolean[] workerThreadCompleted = { false }; final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); - visitor.enqueue(new Runnable() { - @Override - public void run() { - try { - latch1.countDown(); - latch2.await(); - workerThreadCompleted[0] = true; - } catch (InterruptedException e) { - // Do not set workerThreadCompleted to true - } - } - }); + visitor.execute( + new Runnable() { + @Override + public void run() { + try { + latch1.countDown(); + latch2.await(); + workerThreadCompleted[0] = true; + } catch (InterruptedException e) { + // Do not set workerThreadCompleted to true + } + } + }); TestThread interrupterThread = new TestThread() { @Override @@ -199,7 +203,7 @@ public class AbstractQueueVisitorTest { interrupterThread.start(); try { - visitor.work(false); + visitor.awaitQuiescence(/*interruptWorkers=*/ false); fail(); } catch (InterruptedException e) { // Expected. @@ -228,23 +232,24 @@ public class AbstractQueueVisitorTest { ? new ConcreteQueueVisitor() : new ConcreteQueueVisitor(executor, true); - visitor.enqueue(new Runnable() { - @Override - public void run() { - try { - latch1.countDown(); - latch2.await(); - } catch (InterruptedException e) { - workerThreadInterrupted[0] = true; - } - } - }); + visitor.execute( + new Runnable() { + @Override + public void run() { + try { + latch1.countDown(); + latch2.await(); + } catch (InterruptedException e) { + workerThreadInterrupted[0] = true; + } + } + }); latch1.await(); Thread.currentThread().interrupt(); try { - visitor.work(true); + visitor.awaitQuiescence(/*interruptWorkers=*/ true); fail(); } catch (InterruptedException e) { // Expected. @@ -307,14 +312,14 @@ public class AbstractQueueVisitorTest { Runnable ra = awaitAddAndEnqueueRunnable(interrupt, visitor, latchA, visitedList, "a", r1); Runnable rb = awaitAddAndEnqueueRunnable(interrupt, visitor, latchB, visitedList, "b", r2); - visitor.enqueue(ra); - visitor.enqueue(rb); + visitor.execute(ra); + visitor.execute(rb); latchA.await(); latchB.await(); - visitor.enqueue(interrupt ? interruptingRunnable(Thread.currentThread()) : throwingRunnable()); + visitor.execute(interrupt ? interruptingRunnable(Thread.currentThread()) : throwingRunnable()); try { - visitor.work(false); + visitor.awaitQuiescence(/*interruptWorkers=*/ false); fail(); } catch (Exception e) { if (interrupt) { @@ -357,12 +362,12 @@ public class AbstractQueueVisitorTest { } }; - visitor.enqueue(r1); + visitor.execute(r1); latch1.await(); - visitor.enqueue(throwingRunnable()); + visitor.execute(throwingRunnable()); try { - visitor.work(true); + visitor.awaitQuiescence(/*interruptWorkers=*/ true); fail(); } catch (Exception e) { assertSame(THROWABLE, e); @@ -404,13 +409,13 @@ public class AbstractQueueVisitorTest { } } }; - visitor.enqueue(errorRunnable); - visitor.enqueue(sleepRunnable); + visitor.execute(errorRunnable); + visitor.execute(sleepRunnable); Error thrownError = null; // Interrupt workers on a critical error. That way we can test that visitor.work doesn't wait // for all workers to finish if one of them already had a critical error. try { - visitor.work(/*interruptWorkers=*/true); + visitor.awaitQuiescence(/*interruptWorkers=*/ true); } catch (Error e) { thrownError = e; } @@ -451,16 +456,17 @@ public class AbstractQueueVisitorTest { } try { - assertTrue(interrupt - ? visitor.awaitInterruptionForTestingOnly(1, TimeUnit.MINUTES) - : visitor.getExceptionLatchForTestingOnly().await(1, TimeUnit.MINUTES)); + assertTrue( + interrupt + ? visitor.awaitInterruptionForTestingOnly(1, TimeUnit.MINUTES) + : visitor.getExceptionLatchForTestingOnly().await(1, TimeUnit.MINUTES)); } catch (InterruptedException e) { // Unexpected. throw new RuntimeException(e); } list.add(toAdd); if (toEnqueue != null) { - visitor.enqueue(toEnqueue); + visitor.execute(toEnqueue); } } }; @@ -482,17 +488,18 @@ public class AbstractQueueVisitorTest { } public void enqueue() { - super.enqueue(new Runnable() { - @Override - public void run() { - synchronized (lock) { - if (theInt < 10) { - theInt++; - enqueue(); + super.execute( + new Runnable() { + @Override + public void run() { + synchronized (lock) { + if (theInt < 10) { + theInt++; + enqueue(); + } + } } - } - } - }); + }); } public int getCount() { |