aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-10-30 22:15:34 +0000
committerGravatar Florian Weikert <fwe@google.com>2015-11-02 16:55:28 +0000
commit5350b3154665ebb37e10e6261686eb646ea23220 (patch)
tree8bce3e4c5e8e708577ad01d3e361f93a2a008548
parente45a363cf7c2abfdf1dd67cfb7ed0a54d1ed52c2 (diff)
Introduce QuiescingExecutor
This interface (mostly) encapsulates what the ValueVisitor expects from the AbstractQueueVisitor class it currently inherits from. This makes it easier for a future CL to change ValueVisitor's strategy of code reuse from inheritance to composition. RELNOTES: -- MOS_MIGRATED_REVID=106728863
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java48
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java52
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java4
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java6
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java4
-rw-r--r--src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java19
-rw-r--r--src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java40
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java147
8 files changed, 178 insertions, 142 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 51f054746c..9f49798bde 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -32,27 +32,10 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * AbstractQueueVisitor is a wrapper around {@link ExecutorService} which delays service shutdown
- * until entire visitation is complete. This is useful for cases in which worker tasks may submit
- * additional tasks.
- *
- * <p>Consider the following example:
- * <pre>
- * ThreadPoolExecutor executor = <...>
- * executor.submit(myRunnableTask);
- * executor.shutdown();
- * executor.awaitTermination();
- * </pre>
- *
- * <p>This won't work properly if {@code myRunnableTask} submits additional
- * tasks to the executor, because it may already have shut down
- * by that point.
- *
- * <p>AbstractQueueVisitor supports interruption. If the main thread is
- * interrupted, tasks will no longer be added to the queue, and the
- * {@link #work(boolean)} method will throw {@link InterruptedException}.
+ * AbstractQueueVisitor is a {@link QuiescingExecutor} implementation that wraps an {@link
+ * ExecutorService}.
*/
-public class AbstractQueueVisitor {
+public class AbstractQueueVisitor implements QuiescingExecutor {
/**
* Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link
@@ -114,7 +97,7 @@ public class AbstractQueueVisitor {
/**
* If {@link #concurrent} is {@code true}, then this is a counter of the number of {@link
- * Runnable}s {@link #enqueue}-d that have not finished evaluation.
+ * Runnable}s {@link #execute}-d that have not finished evaluation.
*/
private final AtomicLong remainingTasks = new AtomicLong(0);
@@ -124,13 +107,13 @@ public class AbstractQueueVisitor {
/**
* The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on
- * first call to {@link #enqueue(Runnable)}, and removed after call to {@link #work(boolean)}.
+ * first call to {@link #execute(Runnable)}, and removed after call to {@link #awaitQuiescence}.
*/
private final ExecutorService pool;
/**
* Flag used to record when the main thread (the thread which called
- * {@link #work(boolean)}) is interrupted.
+ * {@link #awaitQuiescence(boolean)}) is interrupted.
*
* When this is true, adding tasks to the thread pool will
* fail quietly as a part of the process of shutting down the
@@ -341,19 +324,9 @@ public class AbstractQueueVisitor {
this(true, parallelism, keepAlive, units, false, true, poolName, EXECUTOR_FACTORY);
}
- /**
- * Executes all tasks on the queue, and optionally shuts the pool down and deletes it.
- *
- * <p>Throws (the same) unchecked exception if any worker thread failed unexpectedly. If the pool
- * is interrupted and a worker also throws an unchecked exception, the unchecked exception is
- * rethrown, since it may indicate a programming bug. If callers handle the unchecked exception,
- * they may check the interrupted bit to see if the pool was interrupted.
- *
- * @param interruptWorkers if true, interrupt worker threads if main thread gets an interrupt or
- * if a worker throws a critical error (see {@link #classifyError(Throwable)}. If false,
- * just wait for them to terminate normally.
- */
- protected final void work(boolean interruptWorkers) throws InterruptedException {
+
+ @Override
+ public final void awaitQuiescence(boolean interruptWorkers) throws InterruptedException {
if (concurrent) {
awaitTermination(interruptWorkers);
} else {
@@ -367,7 +340,8 @@ public class AbstractQueueVisitor {
* Schedules a call.
* Called in a worker thread if concurrent.
*/
- protected final void enqueue(Runnable runnable) {
+ @Override
+ public final void execute(Runnable runnable) {
if (concurrent) {
AtomicBoolean ranTask = new AtomicBoolean(false);
try {
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
new file mode 100644
index 0000000000..721c47a714
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -0,0 +1,52 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import java.util.concurrent.Executor;
+
+/**
+ * QuiescingExecutor is an {@link Executor} which supports waiting until all submitted tasks are
+ * complete. This is useful when tasks may submit additional tasks.
+ *
+ * <p>Consider the following example:
+ * <pre>
+ * ThreadPoolExecutor executor = <...>
+ * executor.submit(myRunnableTask);
+ * executor.shutdown();
+ * executor.awaitTermination();
+ * </pre>
+ *
+ * <p>This won't work properly if {@code myRunnableTask} submits additional tasks to the
+ * executor, because it may already have shut down by that point.
+ *
+ * <p>QuiescingExecutor supports interruption. If the main thread is interrupted, tasks will no
+ * longer be started, and the {@link #awaitQuiescence} method will throw {@link
+ * InterruptedException}.
+ */
+public interface QuiescingExecutor extends Executor {
+
+ /**
+ * Waits for all tasks to complete. If the {@link QuiescingExecutor} owns its own {@link
+ * java.util.concurrent.ExecutorService}, the service will also be shutdown.
+ *
+ * <p>Throws (the same) unchecked exception if any worker thread failed unexpectedly. If the main
+ * thread is interrupted and a worker also throws an unchecked exception, the unchecked exception
+ * is rethrown, since it may indicate a programming bug. If callers handle the unchecked
+ * exception, they may check the interrupted bit to see if the pool was interrupted.
+ *
+ * @param interruptWorkers if true, interrupt worker threads if main thread gets an interrupt.
+ * If false, just wait for them to terminate normally.
+ */
+ void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
index 35e84d3b15..e5f78c1fd2 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
@@ -306,7 +306,7 @@ final class LabelVisitor {
@ThreadSafe
public boolean finish() throws InterruptedException {
- work(true);
+ awaitQuiescence(/*interruptWorkers=*/ true);
return !errorObserver.hasErrors();
}
@@ -338,7 +338,7 @@ final class LabelVisitor {
!blockNewActions() && count < RECURSION_LIMIT) {
newVisitRunnable(from, attr, label, depth, count + 1).run();
} else {
- enqueue(newVisitRunnable(from, attr, label, depth, 0));
+ execute(newVisitRunnable(from, attr, label, depth, 0));
}
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index cd6a986acd..2d18f910fa 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -108,7 +108,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
// exist in the graph, so we must be tolerant of that case.
visit(visitData.first, visitData.second, !MUST_EXIST);
}
- work(/*failFastOnInterrupt=*/true);
+ awaitQuiescence(/*interruptWorkers=*/ true);
Preconditions.checkState(pendingVisitations.isEmpty(),
"All dirty nodes should have been processed: %s", pendingVisitations);
}
@@ -226,7 +226,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
}
final Pair<SkyKey, InvalidationType> invalidationPair = Pair.of(key, invalidationType);
pendingVisitations.add(invalidationPair);
- enqueue(
+ execute(
new Runnable() {
@Override
public void run() {
@@ -345,7 +345,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
return;
}
pendingVisitations.add(invalidationPair);
- enqueue(
+ execute(
new Runnable() {
@Override
public void run() {
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index e46a30ba9b..36231f7241 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -621,7 +621,7 @@ public final class ParallelEvaluator implements Evaluator {
}
protected void waitForCompletion() throws InterruptedException {
- work(/*failFastOnInterrupt=*/true);
+ awaitQuiescence(/*interruptWorkers=*/ true);
}
public void enqueueEvaluation(final SkyKey key) {
@@ -642,7 +642,7 @@ public final class ParallelEvaluator implements Evaluator {
if (newlyEnqueued && progressReceiver != null) {
progressReceiver.enqueueing(key);
}
- enqueue(new Evaluate(this, key));
+ execute(new Evaluate(this, key));
}
/**
diff --git a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
index d9fb1a3080..832ee7242d 100644
--- a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
+++ b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
@@ -139,14 +139,15 @@ public class ConcurrentMultimapWithHeadElementTest {
}
private void addAndRemove(final Boolean key, final Integer add, final Integer remove) {
- enqueue(new Runnable() {
- @Override
- public void run() {
- assertNotNull(multimap.putAndGet(key, add));
- multimap.remove(key, remove);
- doRandom();
- }
- });
+ execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ assertNotNull(multimap.putAndGet(key, add));
+ multimap.remove(key, remove);
+ doRandom();
+ }
+ });
}
private Integer getRandomInt() {
@@ -162,7 +163,7 @@ public class ConcurrentMultimapWithHeadElementTest {
}
private void work() throws InterruptedException {
- work(/*failFastOnInterrupt=*/true);
+ awaitQuiescence(/*interruptWorkers=*/ true);
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
index 6d5a23e665..2dd07ed868 100644
--- a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
+++ b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
@@ -91,27 +91,29 @@ public class MapBasedActionGraphTest {
}
private void registerAction(final Action action) {
- enqueue(new Runnable() {
- @Override
- public void run() {
- try {
- graph.registerAction(action);
- } catch (ActionConflictException e) {
- throw new UncheckedActionConflictException(e);
- }
- doRandom();
- }
- });
+ execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ graph.registerAction(action);
+ } catch (ActionConflictException e) {
+ throw new UncheckedActionConflictException(e);
+ }
+ doRandom();
+ }
+ });
}
private void unregisterAction(final Action action) {
- enqueue(new Runnable() {
- @Override
- public void run() {
- graph.unregisterAction(action);
- doRandom();
- }
- });
+ execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ graph.unregisterAction(action);
+ doRandom();
+ }
+ });
}
private void doRandom() {
@@ -134,7 +136,7 @@ public class MapBasedActionGraphTest {
}
private void work() throws InterruptedException {
- work(/*failFastOnInterrupt=*/true);
+ awaitQuiescence(/*interruptWorkers=*/ true);
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
index e31cd94fa9..2c62bec0b0 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
@@ -52,7 +52,7 @@ public class AbstractQueueVisitorTest {
public void simpleCounter() throws Exception {
CountingQueueVisitor counter = new CountingQueueVisitor();
counter.enqueue();
- counter.work(false);
+ counter.awaitQuiescence(/*interruptWorkers=*/ false);
assertSame(10, counter.getCount());
}
@@ -64,7 +64,7 @@ public class AbstractQueueVisitorTest {
CountingQueueVisitor counter = new CountingQueueVisitor(executor);
counter.enqueue();
- counter.work(false);
+ counter.awaitQuiescence(/*interruptWorkers=*/ false);
assertSame(10, counter.getCount());
executor.shutdown();
@@ -76,7 +76,7 @@ public class AbstractQueueVisitorTest {
CountingQueueVisitor counter = new CountingQueueVisitor();
counter.enqueue();
counter.enqueue();
- counter.work(false);
+ counter.awaitQuiescence(/*interruptWorkers=*/ false);
assertSame(10, counter.getCount());
}
@@ -84,17 +84,18 @@ public class AbstractQueueVisitorTest {
public void exceptionFromWorkerThread() {
final RuntimeException myException = new IllegalStateException();
ConcreteQueueVisitor visitor = new ConcreteQueueVisitor();
- visitor.enqueue(new Runnable() {
- @Override
- public void run() {
- throw myException;
- }
- });
+ visitor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ throw myException;
+ }
+ });
try {
// The exception from the worker thread should be
// re-thrown from the main thread.
- visitor.work(false);
+ visitor.awaitQuiescence(/*interruptWorkers=*/ false);
fail();
} catch (Exception e) {
assertSame(myException, e);
@@ -125,7 +126,7 @@ public class AbstractQueueVisitorTest {
CountingQueueVisitor counter = new CountingQueueVisitor(executor);
counter.enqueue();
try {
- counter.work(false);
+ counter.awaitQuiescence(/*interruptWorkers=*/ false);
fail();
} catch (Error expected) {
assertThat(expected).hasMessage("Could not create thread (fakeout)");
@@ -143,20 +144,22 @@ public class AbstractQueueVisitorTest {
final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor();
// Use a latch to make sure the thread gets a chance to start.
final CountDownLatch threadStarted = new CountDownLatch(1);
- visitor.enqueue(new Runnable() {
- @Override
- public void run() {
- threadStarted.countDown();
- assertTrue(Uninterruptibles.awaitUninterruptibly(
- visitor.getInterruptionLatchForTestingOnly(), 2, TimeUnit.SECONDS));
- throw THROWABLE;
- }
- });
+ visitor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ threadStarted.countDown();
+ assertTrue(
+ Uninterruptibles.awaitUninterruptibly(
+ visitor.getInterruptionLatchForTestingOnly(), 2, TimeUnit.SECONDS));
+ throw THROWABLE;
+ }
+ });
assertTrue(threadStarted.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
// Interrupt will not be processed until work starts.
Thread.currentThread().interrupt();
try {
- visitor.work(/*interruptWorkers=*/true);
+ visitor.awaitQuiescence(/*interruptWorkers=*/ true);
fail();
} catch (Exception e) {
assertEquals(THROWABLE, e);
@@ -172,18 +175,19 @@ public class AbstractQueueVisitorTest {
final boolean[] workerThreadCompleted = { false };
final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor();
- visitor.enqueue(new Runnable() {
- @Override
- public void run() {
- try {
- latch1.countDown();
- latch2.await();
- workerThreadCompleted[0] = true;
- } catch (InterruptedException e) {
- // Do not set workerThreadCompleted to true
- }
- }
- });
+ visitor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ latch1.countDown();
+ latch2.await();
+ workerThreadCompleted[0] = true;
+ } catch (InterruptedException e) {
+ // Do not set workerThreadCompleted to true
+ }
+ }
+ });
TestThread interrupterThread = new TestThread() {
@Override
@@ -199,7 +203,7 @@ public class AbstractQueueVisitorTest {
interrupterThread.start();
try {
- visitor.work(false);
+ visitor.awaitQuiescence(/*interruptWorkers=*/ false);
fail();
} catch (InterruptedException e) {
// Expected.
@@ -228,23 +232,24 @@ public class AbstractQueueVisitorTest {
? new ConcreteQueueVisitor()
: new ConcreteQueueVisitor(executor, true);
- visitor.enqueue(new Runnable() {
- @Override
- public void run() {
- try {
- latch1.countDown();
- latch2.await();
- } catch (InterruptedException e) {
- workerThreadInterrupted[0] = true;
- }
- }
- });
+ visitor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ latch1.countDown();
+ latch2.await();
+ } catch (InterruptedException e) {
+ workerThreadInterrupted[0] = true;
+ }
+ }
+ });
latch1.await();
Thread.currentThread().interrupt();
try {
- visitor.work(true);
+ visitor.awaitQuiescence(/*interruptWorkers=*/ true);
fail();
} catch (InterruptedException e) {
// Expected.
@@ -307,14 +312,14 @@ public class AbstractQueueVisitorTest {
Runnable ra = awaitAddAndEnqueueRunnable(interrupt, visitor, latchA, visitedList, "a", r1);
Runnable rb = awaitAddAndEnqueueRunnable(interrupt, visitor, latchB, visitedList, "b", r2);
- visitor.enqueue(ra);
- visitor.enqueue(rb);
+ visitor.execute(ra);
+ visitor.execute(rb);
latchA.await();
latchB.await();
- visitor.enqueue(interrupt ? interruptingRunnable(Thread.currentThread()) : throwingRunnable());
+ visitor.execute(interrupt ? interruptingRunnable(Thread.currentThread()) : throwingRunnable());
try {
- visitor.work(false);
+ visitor.awaitQuiescence(/*interruptWorkers=*/ false);
fail();
} catch (Exception e) {
if (interrupt) {
@@ -357,12 +362,12 @@ public class AbstractQueueVisitorTest {
}
};
- visitor.enqueue(r1);
+ visitor.execute(r1);
latch1.await();
- visitor.enqueue(throwingRunnable());
+ visitor.execute(throwingRunnable());
try {
- visitor.work(true);
+ visitor.awaitQuiescence(/*interruptWorkers=*/ true);
fail();
} catch (Exception e) {
assertSame(THROWABLE, e);
@@ -404,13 +409,13 @@ public class AbstractQueueVisitorTest {
}
}
};
- visitor.enqueue(errorRunnable);
- visitor.enqueue(sleepRunnable);
+ visitor.execute(errorRunnable);
+ visitor.execute(sleepRunnable);
Error thrownError = null;
// Interrupt workers on a critical error. That way we can test that visitor.work doesn't wait
// for all workers to finish if one of them already had a critical error.
try {
- visitor.work(/*interruptWorkers=*/true);
+ visitor.awaitQuiescence(/*interruptWorkers=*/ true);
} catch (Error e) {
thrownError = e;
}
@@ -451,16 +456,17 @@ public class AbstractQueueVisitorTest {
}
try {
- assertTrue(interrupt
- ? visitor.awaitInterruptionForTestingOnly(1, TimeUnit.MINUTES)
- : visitor.getExceptionLatchForTestingOnly().await(1, TimeUnit.MINUTES));
+ assertTrue(
+ interrupt
+ ? visitor.awaitInterruptionForTestingOnly(1, TimeUnit.MINUTES)
+ : visitor.getExceptionLatchForTestingOnly().await(1, TimeUnit.MINUTES));
} catch (InterruptedException e) {
// Unexpected.
throw new RuntimeException(e);
}
list.add(toAdd);
if (toEnqueue != null) {
- visitor.enqueue(toEnqueue);
+ visitor.execute(toEnqueue);
}
}
};
@@ -482,17 +488,18 @@ public class AbstractQueueVisitorTest {
}
public void enqueue() {
- super.enqueue(new Runnable() {
- @Override
- public void run() {
- synchronized (lock) {
- if (theInt < 10) {
- theInt++;
- enqueue();
+ super.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ synchronized (lock) {
+ if (theInt < 10) {
+ theInt++;
+ enqueue();
+ }
+ }
}
- }
- }
- });
+ });
}
public int getCount() {