aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java')
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java65
1 files changed, 49 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 3c2c399bd5..2a9072ec85 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -23,12 +23,16 @@ import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.ExecutorParams;
+import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
+import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Pair;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -52,8 +56,7 @@ import javax.annotation.Nullable;
*
* <p>This is intended only for use in alternative {@code MemoizingEvaluator} implementations.
*/
-public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGraph>
- extends AbstractQueueVisitor {
+public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGraph> {
// Default thread count is equal to the number of cores to exploit
// that level of hardware parallelism, since invalidation should be CPU-bound.
@@ -77,13 +80,15 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
protected final DirtyKeyTracker dirtyKeyTracker;
// Aliased to InvalidationState.pendingVisitations.
protected final Set<Pair<SkyKey, InvalidationType>> pendingVisitations;
+ protected final QuiescingExecutor executor;
protected InvalidatingNodeVisitor(
TGraph graph,
@Nullable EvaluationProgressReceiver invalidationReceiver,
InvalidationState state,
DirtyKeyTracker dirtyKeyTracker) {
- this(graph, invalidationReceiver, state, dirtyKeyTracker, EXECUTOR_FACTORY);
+ this(
+ graph, invalidationReceiver, state, dirtyKeyTracker, AbstractQueueVisitor.EXECUTOR_FACTORY);
}
protected InvalidatingNodeVisitor(
@@ -92,16 +97,30 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
InvalidationState state,
DirtyKeyTracker dirtyKeyTracker,
Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
- super(
- /*concurrent=*/ true,
- /*parallelism=*/ DEFAULT_THREAD_COUNT,
- /*keepAliveTime=*/ 1,
- /*units=*/ TimeUnit.SECONDS,
- /*failFastOnException=*/ true,
- /*failFastOnInterrupt=*/ true,
- "skyframe-invalidator",
- executorFactory,
- errorClassifier);
+ this.executor =
+ new AbstractQueueVisitor(
+ /*concurrent=*/ true,
+ /*parallelism=*/ DEFAULT_THREAD_COUNT,
+ /*keepAliveTime=*/ 1,
+ /*units=*/ TimeUnit.SECONDS,
+ /*failFastOnException=*/ true,
+ /*failFastOnInterrupt=*/ true,
+ "skyframe-invalidator",
+ executorFactory,
+ errorClassifier);
+ this.graph = Preconditions.checkNotNull(graph);
+ this.invalidationReceiver = invalidationReceiver;
+ this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker);
+ this.pendingVisitations = state.pendingValues;
+ }
+
+ protected InvalidatingNodeVisitor(
+ TGraph graph,
+ @Nullable EvaluationProgressReceiver invalidationReceiver,
+ InvalidationState state,
+ DirtyKeyTracker dirtyKeyTracker,
+ ForkJoinPool forkJoinPool) {
+ this.executor = new ForkJoinQuiescingExecutor(forkJoinPool, errorClassifier);
this.graph = Preconditions.checkNotNull(graph);
this.invalidationReceiver = invalidationReceiver;
this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker);
@@ -121,11 +140,16 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
// exist in the graph, so we must be tolerant of that case.
visit(visitData.first, visitData.second, !MUST_EXIST);
}
- awaitQuiescence(/*interruptWorkers=*/ true);
+ executor.awaitQuiescence(/*interruptWorkers=*/ true);
Preconditions.checkState(pendingVisitations.isEmpty(),
"All dirty nodes should have been processed: %s", pendingVisitations);
}
+ @VisibleForTesting
+ public CountDownLatch getInterruptionLatchForTestingOnly() {
+ return executor.getInterruptionLatchForTestingOnly();
+ }
+
protected abstract long count();
protected void informInvalidationReceiver(SkyKey key,
@@ -232,7 +256,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
}
final Pair<SkyKey, InvalidationType> invalidationPair = Pair.of(key, invalidationType);
pendingVisitations.add(invalidationPair);
- execute(
+ executor.execute(
new Runnable() {
@Override
public void run() {
@@ -309,6 +333,15 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
super(graph, invalidationReceiver, state, dirtyKeyTracker, executorFactory);
}
+ protected DirtyingNodeVisitor(
+ ThinNodeQueryableGraph graph,
+ EvaluationProgressReceiver invalidationReceiver,
+ InvalidationState state,
+ DirtyKeyTracker dirtyKeyTracker,
+ ForkJoinPool forkJoinPool) {
+ super(graph, invalidationReceiver, state, dirtyKeyTracker, forkJoinPool);
+ }
+
@Override
protected long count() {
return visited.size();
@@ -351,7 +384,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
return;
}
pendingVisitations.add(invalidationPair);
- execute(
+ executor.execute(
new Runnable() {
@Override
public void run() {