aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-11-02 17:07:29 +0000
committerGravatar David Chen <dzc@google.com>2015-11-02 23:18:53 +0000
commit96f789448481eb7a981ed76c344c42042b3c30cb (patch)
tree9e6582a8e677f3fcc8e2e2b31c12b85f4fed98fc /src
parent29375d405399a2d39b42c853849e39cb82048c19 (diff)
Cleanup ValueVisitor (and dirty QuiescingExecutor)
Raises the level of abstraction of ValueVisitor's dependence on AbstractQueueVisitor. Except for the "ForTestingOnly" methods now available on the QuiescingExecutor interface, ValueVisitor is agnostic to the implementation of its executor. This also cleans up the full spectrum of visibility modifiers on ValueVisitor methods, all of which ought to be private. -- MOS_MIGRATED_REVID=106847453
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java9
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java11
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java29
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java25
-rw-r--r--src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java72
5 files changed, 77 insertions, 69 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index d5b6f1a7e9..b082bf20a9 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -574,15 +574,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
return (failFastOnInterrupt && isInterrupted()) || (unhandled != null && failFastOnException);
}
- /**
- * Await interruption. Used only in tests.
- */
- @VisibleForTesting
- public boolean awaitInterruptionForTestingOnly(long timeout, TimeUnit units)
- throws InterruptedException {
- return interruptedLatch.await(timeout, units);
- }
-
/** Get latch that is released when exception is received by visitor. Used only in tests. */
@VisibleForTesting
public CountDownLatch getExceptionLatchForTestingOnly() {
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
index 721c47a714..65718c6205 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -13,6 +13,9 @@
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
/**
@@ -49,4 +52,12 @@ public interface QuiescingExecutor extends Executor {
* If false, just wait for them to terminate normally.
*/
void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
+
+ /** Get latch that is released if a task throws an exception. Used only in tests. */
+ @VisibleForTesting
+ CountDownLatch getExceptionLatchForTestingOnly();
+
+ /** Get latch that is released if a task is interrupted. Used only in tests. */
+ @VisibleForTesting
+ CountDownLatch getInterruptionLatchForTestingOnly();
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index 3c64b616e0..dcc1e07bc3 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -32,6 +32,7 @@ import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
+import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
@@ -613,17 +614,17 @@ public final class ParallelEvaluator implements Evaluator {
private class ValueVisitor {
- private final AbstractQueueVisitor abstractQueueVisitor;
- private AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
+ private final QuiescingExecutor quiescingExecutor;
+ private final AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
private final Set<SkyKey> inflightNodes = Sets.newConcurrentHashSet();
private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
private ValueVisitor(int threadCount) {
- abstractQueueVisitor =
+ quiescingExecutor =
new AbstractQueueVisitor(
/*concurrent*/ true,
threadCount,
- 1,
+ /*keepAliveTime=*/ 1,
TimeUnit.SECONDS,
/*failFastOnException*/ true,
/*failFastOnInterrupt*/ true,
@@ -631,11 +632,11 @@ public final class ParallelEvaluator implements Evaluator {
VALUE_VISITOR_ERROR_CLASSIFIER);
}
- protected void waitForCompletion() throws InterruptedException {
- abstractQueueVisitor.awaitQuiescence(/*interruptWorkers=*/ true);
+ private void waitForCompletion() throws InterruptedException {
+ quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
}
- public void enqueueEvaluation(final SkyKey key) {
+ private void enqueueEvaluation(SkyKey key) {
// We unconditionally add the key to the set of in-flight nodes because even if evaluation is
// never scheduled we still want to remove the previously created NodeEntry from the graph.
// Otherwise we would leave the graph in a weird state (wasteful garbage in the best case and
@@ -653,7 +654,7 @@ public final class ParallelEvaluator implements Evaluator {
if (newlyEnqueued && progressReceiver != null) {
progressReceiver.enqueueing(key);
}
- abstractQueueVisitor.execute(new Evaluate(this, key));
+ quiescingExecutor.execute(new Evaluate(this, key));
}
/**
@@ -662,19 +663,19 @@ public final class ParallelEvaluator implements Evaluator {
* thread already requested a halt and will throw an exception, and so this thread can simply
* end.
*/
- boolean preventNewEvaluations() {
+ private boolean preventNewEvaluations() {
return preventNewEvaluations.compareAndSet(false, true);
}
- void noteCrash(RuntimeException e) {
+ private void noteCrash(RuntimeException e) {
crashes.add(e);
}
- Collection<RuntimeException> getCrashes() {
+ private Collection<RuntimeException> getCrashes() {
return crashes;
}
- void notifyDone(SkyKey key) {
+ private void notifyDone(SkyKey key) {
inflightNodes.remove(key);
}
@@ -683,8 +684,8 @@ public final class ParallelEvaluator implements Evaluator {
}
@VisibleForTesting
- public CountDownLatch getExceptionLatchForTestingOnly() {
- return abstractQueueVisitor.getExceptionLatchForTestingOnly();
+ private CountDownLatch getExceptionLatchForTestingOnly() {
+ return quiescingExecutor.getExceptionLatchForTestingOnly();
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
index 7bc6912e95..f9cf3ac32d 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
@@ -190,16 +190,19 @@ public class AbstractQueueVisitorTest {
}
});
- TestThread interrupterThread = new TestThread() {
- @Override
- public void runTest() throws Exception {
- latch1.await();
- mainThread.interrupt();
- assertTrue(visitor.awaitInterruptionForTestingOnly(TestUtils.WAIT_TIMEOUT_MILLISECONDS,
- TimeUnit.MILLISECONDS));
- latch2.countDown();
- }
- };
+ TestThread interrupterThread =
+ new TestThread() {
+ @Override
+ public void runTest() throws Exception {
+ latch1.await();
+ mainThread.interrupt();
+ assertTrue(
+ visitor
+ .getInterruptionLatchForTestingOnly()
+ .await(TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS));
+ latch2.countDown();
+ }
+ };
interrupterThread.start();
@@ -461,7 +464,7 @@ public class AbstractQueueVisitorTest {
try {
assertTrue(
interrupt
- ? visitor.awaitInterruptionForTestingOnly(1, TimeUnit.MINUTES)
+ ? visitor.getInterruptionLatchForTestingOnly().await(1, TimeUnit.MINUTES)
: visitor.getExceptionLatchForTestingOnly().await(1, TimeUnit.MINUTES));
} catch (InterruptedException e) {
// Unexpected.
diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
index 629c8f332d..0fcc250b30 100644
--- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
@@ -388,41 +388,43 @@ public class EagerInvalidatorTest {
eval(/*keepGoing=*/false, parent);
final Thread mainThread = Thread.currentThread();
final AtomicReference<SkyKey> badKey = new AtomicReference<>();
- EvaluationProgressReceiver receiver = new EvaluationProgressReceiver() {
- @Override
- public void invalidated(SkyKey skyKey, InvalidationState state) {
- if (skyKey.equals(child)) {
- // Interrupt on the very first invalidate
- mainThread.interrupt();
- } else if (!skyKey.functionName().equals(NODE_TYPE)) {
- // All other invalidations should have the GraphTester's key type.
- // Exceptions thrown here may be silently dropped, so keep track of errors ourselves.
- badKey.set(skyKey);
- }
- try {
- assertTrue(visitor.get().awaitInterruptionForTestingOnly(2, TimeUnit.HOURS));
- } catch (InterruptedException e) {
- // We may well have thrown here because by the time we try to await, the main thread is
- // already interrupted.
- }
- }
-
- @Override
- public void enqueueing(SkyKey skyKey) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void computed(SkyKey skyKey, long elapsedTimeNanos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void evaluated(SkyKey skyKey, Supplier<SkyValue> skyValueSupplier,
- EvaluationState state) {
- throw new UnsupportedOperationException();
- }
- };
+ EvaluationProgressReceiver receiver =
+ new EvaluationProgressReceiver() {
+ @Override
+ public void invalidated(SkyKey skyKey, InvalidationState state) {
+ if (skyKey.equals(child)) {
+ // Interrupt on the very first invalidate
+ mainThread.interrupt();
+ } else if (!skyKey.functionName().equals(NODE_TYPE)) {
+ // All other invalidations should have the GraphTester's key type.
+ // Exceptions thrown here may be silently dropped, so keep track of errors ourselves.
+ badKey.set(skyKey);
+ }
+ try {
+ assertTrue(
+ visitor.get().getInterruptionLatchForTestingOnly().await(2, TimeUnit.HOURS));
+ } catch (InterruptedException e) {
+ // We may well have thrown here because by the time we try to await, the main
+ // thread is already interrupted.
+ }
+ }
+
+ @Override
+ public void enqueueing(SkyKey skyKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void computed(SkyKey skyKey, long elapsedTimeNanos) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void evaluated(
+ SkyKey skyKey, Supplier<SkyValue> skyValueSupplier, EvaluationState state) {
+ throw new UnsupportedOperationException();
+ }
+ };
try {
invalidateWithoutError(receiver, child);
fail();