From 6ae05b419922193c4c253e51c9a5e483e4f947fa Mon Sep 17 00:00:00 2001 From: janakr Date: Mon, 13 Aug 2018 16:13:42 -0700 Subject: Order Skyframe evaluations in a priority queue, with all children of a given node having the same priority, later enqueueings having higher priority, re-enqueued nodes having highest priority, and new root nodes having lowest priority. Experimentally, this can save significant RAM (1.4G in some builds!) while not affecting speed. Also do a semi-drive-by deleting ExecutorFactory parameter to AbstractQueueVisitor, since it was always AbstractQueueVisitor.EXECUTOR_FACTORY. PiperOrigin-RevId: 208560889 --- .../build/skyframe/AbstractParallelEvaluator.java | 92 ++++++++++++++++++---- 1 file changed, 75 insertions(+), 17 deletions(-) (limited to 'src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java') diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java index 04fcdeae91..1b1cd432c7 100644 --- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -53,6 +54,15 @@ import javax.annotation.Nullable; *

This does not implement other parts of Skyframe evaluation setup and post-processing, such as * translating a set of requested top-level nodes into actions, or constructing an evaluation * result. Derived classes should do this. + * + *

Work is prioritized in a depth-first fashion when using the {@link + * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler, + * EmittedEventState, EventFilter, ErrorInfoManager, boolean, int, DirtyTrackingProgressReceiver, + * GraphInconsistencyReceiver, CycleDetector)} constructor, but not when using the {@link + * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler, + * EmittedEventState, EventFilter, ErrorInfoManager, boolean, DirtyTrackingProgressReceiver, + * GraphInconsistencyReceiver, ForkJoinPool, CycleDetector, EvaluationVersionBehavior)} (the + * constructor with a {@link ForkJoinPool}). */ public abstract class AbstractParallelEvaluator { private static final Logger logger = Logger.getLogger(AbstractParallelEvaluator.class.getName()); @@ -60,6 +70,7 @@ public abstract class AbstractParallelEvaluator { final ProcessableGraph graph; final ParallelEvaluatorContext evaluatorContext; protected final CycleDetector cycleDetector; + @Nullable private final AtomicInteger globalEnqueuedIndex; AbstractParallelEvaluator( ProcessableGraph graph, @@ -87,9 +98,10 @@ public abstract class AbstractParallelEvaluator { progressReceiver, storedEventFilter, errorInfoManager, - Evaluate::new, + (skyKey, evaluationPriority) -> new Evaluate(evaluationPriority, skyKey), graphInconsistencyReceiver, threadCount); + this.globalEnqueuedIndex = new AtomicInteger(0); } AbstractParallelEvaluator( @@ -119,10 +131,11 @@ public abstract class AbstractParallelEvaluator { progressReceiver, storedEventFilter, errorInfoManager, - Evaluate::new, + (skyKey, evaluationPriority) -> new Evaluate(0, skyKey), graphInconsistencyReceiver, Preconditions.checkNotNull(forkJoinPool), evaluationVersionBehavior); + this.globalEnqueuedIndex = null; } /** @@ -141,21 +154,40 @@ public abstract class AbstractParallelEvaluator { NEEDS_EVALUATION } - /** An action that evaluates a value. */ - private class Evaluate implements Runnable { + /** + * An action that evaluates a value. + * + *

{@link Comparable} for use in priority queues. Experimentally, grouping enqueued evaluations + * together by parent leads to fewer in-flight evaluations and thus lower peak memory usage. Thus + * we store the {@link #evaluationPriority} (coming from the {@link #globalEnqueuedIndex} and use + * it for comparisons: later enqueuings should be evaluated earlier, to do a depth-first search, + * except for re-enqueued nodes, which always get top priority. + * + *

This is not applicable when using a {@link ForkJoinPool}, since it does not allow for easy + * work prioritization. + */ + private class Evaluate implements ParallelEvaluatorContext.ComparableRunnable { + private final int evaluationPriority; /** The name of the value to be evaluated. */ private final SkyKey skyKey; - private Evaluate(SkyKey skyKey) { + private Evaluate(int evaluationPriority, SkyKey skyKey) { + this.evaluationPriority = evaluationPriority; this.skyKey = skyKey; } + @Override + public int compareTo(ParallelEvaluatorContext.ComparableRunnable o) { + return -1 * Integer.compare(this.evaluationPriority, ((Evaluate) o).evaluationPriority); + } + private void enqueueChild( SkyKey skyKey, NodeEntry entry, SkyKey child, NodeEntry childEntry, - boolean depAlreadyExists) + boolean depAlreadyExists, + int childEvaluationPriority) throws InterruptedException { Preconditions.checkState(!entry.isDone(), "%s %s", skyKey, entry); DependencyState dependencyState = @@ -166,13 +198,15 @@ public abstract class AbstractParallelEvaluator { case DONE: if (entry.signalDep(childEntry.getVersion())) { // This can only happen if there are no more children to be added. - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Maximum priority, since this node has already started evaluation before, and we want + // it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); } break; case ALREADY_EVALUATING: break; case NEEDS_SCHEDULING: - evaluatorContext.getVisitor().enqueueEvaluation(child); + evaluatorContext.getVisitor().enqueueEvaluation(child, childEvaluationPriority); break; } } @@ -288,7 +322,10 @@ public abstract class AbstractParallelEvaluator { unknownStatusDeps); continue; } - handleKnownChildrenForDirtyNode(unknownStatusDeps, state); + handleKnownChildrenForDirtyNode( + unknownStatusDeps, + state, + globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0); return DirtyOutcome.ALREADY_PROCESSED; } switch (state.getDirtyState()) { @@ -321,7 +358,8 @@ public abstract class AbstractParallelEvaluator { } } - private void handleKnownChildrenForDirtyNode(Collection knownChildren, NodeEntry state) + private void handleKnownChildrenForDirtyNode( + Collection knownChildren, NodeEntry state, int childEvaluationPriority) throws InterruptedException { Map oldChildren = graph.getBatch(skyKey, Reason.ENQUEUING_CHILD, knownChildren); @@ -342,7 +380,8 @@ public abstract class AbstractParallelEvaluator { state, recreatedEntry.getKey(), recreatedEntry.getValue(), - /*depAlreadyExists=*/ false); + /*depAlreadyExists=*/ false, + childEvaluationPriority); } } for (Map.Entry e : oldChildren.entrySet()) { @@ -350,7 +389,13 @@ public abstract class AbstractParallelEvaluator { NodeEntry directDepEntry = e.getValue(); // TODO(bazel-team): If this signals the current node and makes it ready, consider // evaluating it in this thread instead of scheduling a new evaluation. - enqueueChild(skyKey, state, directDep, directDepEntry, /*depAlreadyExists=*/ true); + enqueueChild( + skyKey, + state, + directDep, + directDepEntry, + /*depAlreadyExists=*/ true, + childEvaluationPriority); } } @@ -381,7 +426,8 @@ public abstract class AbstractParallelEvaluator { } catch (UndonePreviouslyRequestedDep undonePreviouslyRequestedDep) { // If a previously requested dep is no longer done, restart this node from scratch. restart(skyKey, state); - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Top priority since this node has already been evaluating, so get it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); return; } finally { evaluatorContext @@ -487,7 +533,8 @@ public abstract class AbstractParallelEvaluator { } if (maybeHandleRestart(skyKey, state, value)) { - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Top priority since this node has already been evaluating, so get it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); return; } @@ -618,17 +665,26 @@ public abstract class AbstractParallelEvaluator { Set newDepsThatWereInTheLastEvaluation = Sets.difference(uniqueNewDeps, newDepsThatWerentInTheLastEvaluation); + int childEvaluationPriority = + globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0; InterruptibleSupplier> newDepsThatWerentInTheLastEvaluationNodes = graph.createIfAbsentBatchAsync( skyKey, Reason.RDEP_ADDITION, newDepsThatWerentInTheLastEvaluation); - handleKnownChildrenForDirtyNode(newDepsThatWereInTheLastEvaluation, state); + handleKnownChildrenForDirtyNode( + newDepsThatWereInTheLastEvaluation, state, childEvaluationPriority); for (Map.Entry e : newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) { SkyKey newDirectDep = e.getKey(); NodeEntry newDirectDepEntry = e.getValue(); - enqueueChild(skyKey, state, newDirectDep, newDirectDepEntry, /*depAlreadyExists=*/ false); + enqueueChild( + skyKey, + state, + newDirectDep, + newDirectDepEntry, + /*depAlreadyExists=*/ false, + childEvaluationPriority); } // It is critical that there is no code below this point in the try block. } catch (InterruptedException ie) { @@ -824,7 +880,9 @@ public abstract class AbstractParallelEvaluator { .noteInconsistencyAndMaybeThrow( skyKey, depKey, Inconsistency.CHILD_UNDONE_FOR_BUILDING_NODE); if (triState == DependencyState.NEEDS_SCHEDULING) { - evaluatorContext.getVisitor().enqueueEvaluation(depKey); + // Top priority since this depKey was already evaluated before, and we want to finish it off + // again, reducing the chance that another node may observe this dep to be undone. + evaluatorContext.getVisitor().enqueueEvaluation(depKey, Integer.MAX_VALUE); } return true; } -- cgit v1.2.3