aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java')
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java92
1 files changed, 75 insertions, 17 deletions
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
index 04fcdeae91..1b1cd432c7 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -53,6 +54,15 @@ import javax.annotation.Nullable;
* <p>This does not implement other parts of Skyframe evaluation setup and post-processing, such as
* translating a set of requested top-level nodes into actions, or constructing an evaluation
* result. Derived classes should do this.
+ *
+ * <p>Work is prioritized in a depth-first fashion when using the {@link
+ * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler,
+ * EmittedEventState, EventFilter, ErrorInfoManager, boolean, int, DirtyTrackingProgressReceiver,
+ * GraphInconsistencyReceiver, CycleDetector)} constructor, but not when using the {@link
+ * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler,
+ * EmittedEventState, EventFilter, ErrorInfoManager, boolean, DirtyTrackingProgressReceiver,
+ * GraphInconsistencyReceiver, ForkJoinPool, CycleDetector, EvaluationVersionBehavior)} (the
+ * constructor with a {@link ForkJoinPool}).
*/
public abstract class AbstractParallelEvaluator {
private static final Logger logger = Logger.getLogger(AbstractParallelEvaluator.class.getName());
@@ -60,6 +70,7 @@ public abstract class AbstractParallelEvaluator {
final ProcessableGraph graph;
final ParallelEvaluatorContext evaluatorContext;
protected final CycleDetector cycleDetector;
+ @Nullable private final AtomicInteger globalEnqueuedIndex;
AbstractParallelEvaluator(
ProcessableGraph graph,
@@ -87,9 +98,10 @@ public abstract class AbstractParallelEvaluator {
progressReceiver,
storedEventFilter,
errorInfoManager,
- Evaluate::new,
+ (skyKey, evaluationPriority) -> new Evaluate(evaluationPriority, skyKey),
graphInconsistencyReceiver,
threadCount);
+ this.globalEnqueuedIndex = new AtomicInteger(0);
}
AbstractParallelEvaluator(
@@ -119,10 +131,11 @@ public abstract class AbstractParallelEvaluator {
progressReceiver,
storedEventFilter,
errorInfoManager,
- Evaluate::new,
+ (skyKey, evaluationPriority) -> new Evaluate(0, skyKey),
graphInconsistencyReceiver,
Preconditions.checkNotNull(forkJoinPool),
evaluationVersionBehavior);
+ this.globalEnqueuedIndex = null;
}
/**
@@ -141,21 +154,40 @@ public abstract class AbstractParallelEvaluator {
NEEDS_EVALUATION
}
- /** An action that evaluates a value. */
- private class Evaluate implements Runnable {
+ /**
+ * An action that evaluates a value.
+ *
+ * <p>{@link Comparable} for use in priority queues. Experimentally, grouping enqueued evaluations
+ * together by parent leads to fewer in-flight evaluations and thus lower peak memory usage. Thus
+ * we store the {@link #evaluationPriority} (coming from the {@link #globalEnqueuedIndex} and use
+ * it for comparisons: later enqueuings should be evaluated earlier, to do a depth-first search,
+ * except for re-enqueued nodes, which always get top priority.
+ *
+ * <p>This is not applicable when using a {@link ForkJoinPool}, since it does not allow for easy
+ * work prioritization.
+ */
+ private class Evaluate implements ParallelEvaluatorContext.ComparableRunnable {
+ private final int evaluationPriority;
/** The name of the value to be evaluated. */
private final SkyKey skyKey;
- private Evaluate(SkyKey skyKey) {
+ private Evaluate(int evaluationPriority, SkyKey skyKey) {
+ this.evaluationPriority = evaluationPriority;
this.skyKey = skyKey;
}
+ @Override
+ public int compareTo(ParallelEvaluatorContext.ComparableRunnable o) {
+ return -1 * Integer.compare(this.evaluationPriority, ((Evaluate) o).evaluationPriority);
+ }
+
private void enqueueChild(
SkyKey skyKey,
NodeEntry entry,
SkyKey child,
NodeEntry childEntry,
- boolean depAlreadyExists)
+ boolean depAlreadyExists,
+ int childEvaluationPriority)
throws InterruptedException {
Preconditions.checkState(!entry.isDone(), "%s %s", skyKey, entry);
DependencyState dependencyState =
@@ -166,13 +198,15 @@ public abstract class AbstractParallelEvaluator {
case DONE:
if (entry.signalDep(childEntry.getVersion())) {
// This can only happen if there are no more children to be added.
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Maximum priority, since this node has already started evaluation before, and we want
+ // it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
}
break;
case ALREADY_EVALUATING:
break;
case NEEDS_SCHEDULING:
- evaluatorContext.getVisitor().enqueueEvaluation(child);
+ evaluatorContext.getVisitor().enqueueEvaluation(child, childEvaluationPriority);
break;
}
}
@@ -288,7 +322,10 @@ public abstract class AbstractParallelEvaluator {
unknownStatusDeps);
continue;
}
- handleKnownChildrenForDirtyNode(unknownStatusDeps, state);
+ handleKnownChildrenForDirtyNode(
+ unknownStatusDeps,
+ state,
+ globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0);
return DirtyOutcome.ALREADY_PROCESSED;
}
switch (state.getDirtyState()) {
@@ -321,7 +358,8 @@ public abstract class AbstractParallelEvaluator {
}
}
- private void handleKnownChildrenForDirtyNode(Collection<SkyKey> knownChildren, NodeEntry state)
+ private void handleKnownChildrenForDirtyNode(
+ Collection<SkyKey> knownChildren, NodeEntry state, int childEvaluationPriority)
throws InterruptedException {
Map<SkyKey, ? extends NodeEntry> oldChildren =
graph.getBatch(skyKey, Reason.ENQUEUING_CHILD, knownChildren);
@@ -342,7 +380,8 @@ public abstract class AbstractParallelEvaluator {
state,
recreatedEntry.getKey(),
recreatedEntry.getValue(),
- /*depAlreadyExists=*/ false);
+ /*depAlreadyExists=*/ false,
+ childEvaluationPriority);
}
}
for (Map.Entry<SkyKey, ? extends NodeEntry> e : oldChildren.entrySet()) {
@@ -350,7 +389,13 @@ public abstract class AbstractParallelEvaluator {
NodeEntry directDepEntry = e.getValue();
// TODO(bazel-team): If this signals the current node and makes it ready, consider
// evaluating it in this thread instead of scheduling a new evaluation.
- enqueueChild(skyKey, state, directDep, directDepEntry, /*depAlreadyExists=*/ true);
+ enqueueChild(
+ skyKey,
+ state,
+ directDep,
+ directDepEntry,
+ /*depAlreadyExists=*/ true,
+ childEvaluationPriority);
}
}
@@ -381,7 +426,8 @@ public abstract class AbstractParallelEvaluator {
} catch (UndonePreviouslyRequestedDep undonePreviouslyRequestedDep) {
// If a previously requested dep is no longer done, restart this node from scratch.
restart(skyKey, state);
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Top priority since this node has already been evaluating, so get it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
return;
} finally {
evaluatorContext
@@ -487,7 +533,8 @@ public abstract class AbstractParallelEvaluator {
}
if (maybeHandleRestart(skyKey, state, value)) {
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Top priority since this node has already been evaluating, so get it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
return;
}
@@ -618,17 +665,26 @@ public abstract class AbstractParallelEvaluator {
Set<SkyKey> newDepsThatWereInTheLastEvaluation =
Sets.difference(uniqueNewDeps, newDepsThatWerentInTheLastEvaluation);
+ int childEvaluationPriority =
+ globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0;
InterruptibleSupplier<Map<SkyKey, ? extends NodeEntry>>
newDepsThatWerentInTheLastEvaluationNodes =
graph.createIfAbsentBatchAsync(
skyKey, Reason.RDEP_ADDITION, newDepsThatWerentInTheLastEvaluation);
- handleKnownChildrenForDirtyNode(newDepsThatWereInTheLastEvaluation, state);
+ handleKnownChildrenForDirtyNode(
+ newDepsThatWereInTheLastEvaluation, state, childEvaluationPriority);
for (Map.Entry<SkyKey, ? extends NodeEntry> e :
newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) {
SkyKey newDirectDep = e.getKey();
NodeEntry newDirectDepEntry = e.getValue();
- enqueueChild(skyKey, state, newDirectDep, newDirectDepEntry, /*depAlreadyExists=*/ false);
+ enqueueChild(
+ skyKey,
+ state,
+ newDirectDep,
+ newDirectDepEntry,
+ /*depAlreadyExists=*/ false,
+ childEvaluationPriority);
}
// It is critical that there is no code below this point in the try block.
} catch (InterruptedException ie) {
@@ -824,7 +880,9 @@ public abstract class AbstractParallelEvaluator {
.noteInconsistencyAndMaybeThrow(
skyKey, depKey, Inconsistency.CHILD_UNDONE_FOR_BUILDING_NODE);
if (triState == DependencyState.NEEDS_SCHEDULING) {
- evaluatorContext.getVisitor().enqueueEvaluation(depKey);
+ // Top priority since this depKey was already evaluated before, and we want to finish it off
+ // again, reducing the chance that another node may observe this dep to be undone.
+ evaluatorContext.getVisitor().enqueueEvaluation(depKey, Integer.MAX_VALUE);
}
return true;
}