diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/skyframe')
6 files changed, 125 insertions, 82 deletions
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java index f8f21f254e..268db07014 100644 --- a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java @@ -249,7 +249,9 @@ public abstract class AbstractExceptionalParallelEvaluator<E extends Exception> // This must be equivalent to the code in enqueueChild above, in order to be thread-safe. switch (entry.addReverseDepAndCheckIfDone(null)) { case NEEDS_SCHEDULING: - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Low priority because this node is not needed by any other currently evaluating node. + // So keep it at the back of the queue as long as there's other useful work to be done. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MIN_VALUE); break; case DONE: informProgressReceiverThatValueIsDone(skyKey, entry); diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java index 04fcdeae91..1b1cd432c7 100644 --- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -53,6 +54,15 @@ import javax.annotation.Nullable; * <p>This does not implement other parts of Skyframe evaluation setup and post-processing, such as * translating a set of requested top-level nodes into actions, or constructing an evaluation * result. Derived classes should do this. + * + * <p>Work is prioritized in a depth-first fashion when using the {@link + * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler, + * EmittedEventState, EventFilter, ErrorInfoManager, boolean, int, DirtyTrackingProgressReceiver, + * GraphInconsistencyReceiver, CycleDetector)} constructor, but not when using the {@link + * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler, + * EmittedEventState, EventFilter, ErrorInfoManager, boolean, DirtyTrackingProgressReceiver, + * GraphInconsistencyReceiver, ForkJoinPool, CycleDetector, EvaluationVersionBehavior)} (the + * constructor with a {@link ForkJoinPool}). */ public abstract class AbstractParallelEvaluator { private static final Logger logger = Logger.getLogger(AbstractParallelEvaluator.class.getName()); @@ -60,6 +70,7 @@ public abstract class AbstractParallelEvaluator { final ProcessableGraph graph; final ParallelEvaluatorContext evaluatorContext; protected final CycleDetector cycleDetector; + @Nullable private final AtomicInteger globalEnqueuedIndex; AbstractParallelEvaluator( ProcessableGraph graph, @@ -87,9 +98,10 @@ public abstract class AbstractParallelEvaluator { progressReceiver, storedEventFilter, errorInfoManager, - Evaluate::new, + (skyKey, evaluationPriority) -> new Evaluate(evaluationPriority, skyKey), graphInconsistencyReceiver, threadCount); + this.globalEnqueuedIndex = new AtomicInteger(0); } AbstractParallelEvaluator( @@ -119,10 +131,11 @@ public abstract class AbstractParallelEvaluator { progressReceiver, storedEventFilter, errorInfoManager, - Evaluate::new, + (skyKey, evaluationPriority) -> new Evaluate(0, skyKey), graphInconsistencyReceiver, Preconditions.checkNotNull(forkJoinPool), evaluationVersionBehavior); + this.globalEnqueuedIndex = null; } /** @@ -141,21 +154,40 @@ public abstract class AbstractParallelEvaluator { NEEDS_EVALUATION } - /** An action that evaluates a value. */ - private class Evaluate implements Runnable { + /** + * An action that evaluates a value. + * + * <p>{@link Comparable} for use in priority queues. Experimentally, grouping enqueued evaluations + * together by parent leads to fewer in-flight evaluations and thus lower peak memory usage. Thus + * we store the {@link #evaluationPriority} (coming from the {@link #globalEnqueuedIndex} and use + * it for comparisons: later enqueuings should be evaluated earlier, to do a depth-first search, + * except for re-enqueued nodes, which always get top priority. + * + * <p>This is not applicable when using a {@link ForkJoinPool}, since it does not allow for easy + * work prioritization. + */ + private class Evaluate implements ParallelEvaluatorContext.ComparableRunnable { + private final int evaluationPriority; /** The name of the value to be evaluated. */ private final SkyKey skyKey; - private Evaluate(SkyKey skyKey) { + private Evaluate(int evaluationPriority, SkyKey skyKey) { + this.evaluationPriority = evaluationPriority; this.skyKey = skyKey; } + @Override + public int compareTo(ParallelEvaluatorContext.ComparableRunnable o) { + return -1 * Integer.compare(this.evaluationPriority, ((Evaluate) o).evaluationPriority); + } + private void enqueueChild( SkyKey skyKey, NodeEntry entry, SkyKey child, NodeEntry childEntry, - boolean depAlreadyExists) + boolean depAlreadyExists, + int childEvaluationPriority) throws InterruptedException { Preconditions.checkState(!entry.isDone(), "%s %s", skyKey, entry); DependencyState dependencyState = @@ -166,13 +198,15 @@ public abstract class AbstractParallelEvaluator { case DONE: if (entry.signalDep(childEntry.getVersion())) { // This can only happen if there are no more children to be added. - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Maximum priority, since this node has already started evaluation before, and we want + // it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); } break; case ALREADY_EVALUATING: break; case NEEDS_SCHEDULING: - evaluatorContext.getVisitor().enqueueEvaluation(child); + evaluatorContext.getVisitor().enqueueEvaluation(child, childEvaluationPriority); break; } } @@ -288,7 +322,10 @@ public abstract class AbstractParallelEvaluator { unknownStatusDeps); continue; } - handleKnownChildrenForDirtyNode(unknownStatusDeps, state); + handleKnownChildrenForDirtyNode( + unknownStatusDeps, + state, + globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0); return DirtyOutcome.ALREADY_PROCESSED; } switch (state.getDirtyState()) { @@ -321,7 +358,8 @@ public abstract class AbstractParallelEvaluator { } } - private void handleKnownChildrenForDirtyNode(Collection<SkyKey> knownChildren, NodeEntry state) + private void handleKnownChildrenForDirtyNode( + Collection<SkyKey> knownChildren, NodeEntry state, int childEvaluationPriority) throws InterruptedException { Map<SkyKey, ? extends NodeEntry> oldChildren = graph.getBatch(skyKey, Reason.ENQUEUING_CHILD, knownChildren); @@ -342,7 +380,8 @@ public abstract class AbstractParallelEvaluator { state, recreatedEntry.getKey(), recreatedEntry.getValue(), - /*depAlreadyExists=*/ false); + /*depAlreadyExists=*/ false, + childEvaluationPriority); } } for (Map.Entry<SkyKey, ? extends NodeEntry> e : oldChildren.entrySet()) { @@ -350,7 +389,13 @@ public abstract class AbstractParallelEvaluator { NodeEntry directDepEntry = e.getValue(); // TODO(bazel-team): If this signals the current node and makes it ready, consider // evaluating it in this thread instead of scheduling a new evaluation. - enqueueChild(skyKey, state, directDep, directDepEntry, /*depAlreadyExists=*/ true); + enqueueChild( + skyKey, + state, + directDep, + directDepEntry, + /*depAlreadyExists=*/ true, + childEvaluationPriority); } } @@ -381,7 +426,8 @@ public abstract class AbstractParallelEvaluator { } catch (UndonePreviouslyRequestedDep undonePreviouslyRequestedDep) { // If a previously requested dep is no longer done, restart this node from scratch. restart(skyKey, state); - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Top priority since this node has already been evaluating, so get it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); return; } finally { evaluatorContext @@ -487,7 +533,8 @@ public abstract class AbstractParallelEvaluator { } if (maybeHandleRestart(skyKey, state, value)) { - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Top priority since this node has already been evaluating, so get it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); return; } @@ -618,17 +665,26 @@ public abstract class AbstractParallelEvaluator { Set<SkyKey> newDepsThatWereInTheLastEvaluation = Sets.difference(uniqueNewDeps, newDepsThatWerentInTheLastEvaluation); + int childEvaluationPriority = + globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0; InterruptibleSupplier<Map<SkyKey, ? extends NodeEntry>> newDepsThatWerentInTheLastEvaluationNodes = graph.createIfAbsentBatchAsync( skyKey, Reason.RDEP_ADDITION, newDepsThatWerentInTheLastEvaluation); - handleKnownChildrenForDirtyNode(newDepsThatWereInTheLastEvaluation, state); + handleKnownChildrenForDirtyNode( + newDepsThatWereInTheLastEvaluation, state, childEvaluationPriority); for (Map.Entry<SkyKey, ? extends NodeEntry> e : newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) { SkyKey newDirectDep = e.getKey(); NodeEntry newDirectDepEntry = e.getValue(); - enqueueChild(skyKey, state, newDirectDep, newDirectDepEntry, /*depAlreadyExists=*/ false); + enqueueChild( + skyKey, + state, + newDirectDep, + newDirectDepEntry, + /*depAlreadyExists=*/ false, + childEvaluationPriority); } // It is critical that there is no code below this point in the try block. } catch (InterruptedException ie) { @@ -824,7 +880,9 @@ public abstract class AbstractParallelEvaluator { .noteInconsistencyAndMaybeThrow( skyKey, depKey, Inconsistency.CHILD_UNDONE_FOR_BUILDING_NODE); if (triState == DependencyState.NEEDS_SCHEDULING) { - evaluatorContext.getVisitor().enqueueEvaluation(depKey); + // Top priority since this depKey was already evaluated before, and we want to finish it off + // again, reducing the chance that another node may observe this dep to be undone. + evaluatorContext.getVisitor().enqueueEvaluation(depKey, Integer.MAX_VALUE); } return true; } diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java index 3577230e3a..27e6b61ceb 100644 --- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java +++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java @@ -13,14 +13,9 @@ // limitations under the License. package com.google.devtools.build.skyframe; -import com.google.common.base.Function; -import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; -import com.google.devtools.build.lib.concurrent.ErrorHandler; -import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import javax.annotation.Nullable; @@ -73,11 +68,9 @@ public final class EagerInvalidator { QueryableGraph graph, Iterable<SkyKey> diff, DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function<ExecutorParams, ? extends ExecutorService> executorFactory) { + InvalidationState state) { state.update(diff); - return state.isEmpty() ? null - : new DirtyingNodeVisitor(graph, progressReceiver, state, executorFactory); + return state.isEmpty() ? null : new DirtyingNodeVisitor(graph, progressReceiver, state); } @Nullable @@ -87,8 +80,7 @@ public final class EagerInvalidator { DirtyTrackingProgressReceiver progressReceiver, InvalidationState state, ForkJoinPool forkJoinPool, - boolean supportInterruptions, - ErrorHandler errorHandler) { + boolean supportInterruptions) { state.update(diff); return state.isEmpty() ? null @@ -99,20 +91,15 @@ public final class EagerInvalidator { forkJoinPool, supportInterruptions); } - /** - * Invalidates given values and their upward transitive closure in the graph if necessary, using - * an executor constructed with the provided factory. - */ + /** Invalidates given values and their upward transitive closure in the graph if necessary. */ public static void invalidate( QueryableGraph graph, Iterable<SkyKey> diff, DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function<ExecutorParams, ? extends ExecutorService> executorFactory) + InvalidationState state) throws InterruptedException { DirtyingNodeVisitor visitor = - createInvalidatingVisitorIfNeeded( - graph, diff, progressReceiver, state, executorFactory); + createInvalidatingVisitorIfNeeded(graph, diff, progressReceiver, state); if (visitor != null) { visitor.run(); } @@ -132,26 +119,9 @@ public final class EagerInvalidator { throws InterruptedException { DirtyingNodeVisitor visitor = createInvalidatingVisitorIfNeeded( - graph, - diff, - progressReceiver, - state, - forkJoinPool, - supportInterruptions, - ErrorHandler.NullHandler.INSTANCE); + graph, diff, progressReceiver, state, forkJoinPool, supportInterruptions); if (visitor != null) { visitor.run(); } } - - /** Invalidates given values and their upward transitive closure in the graph. */ - public static void invalidate( - QueryableGraph graph, - Iterable<SkyKey> diff, - DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state) - throws InterruptedException { - invalidate(graph, diff, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY); - } - } diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index 168ad23b95..ae9af7705e 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -22,7 +22,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.ErrorClassifier; -import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; import com.google.devtools.build.lib.concurrent.QuiescingExecutor; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; @@ -35,7 +34,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -88,15 +86,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> { TGraph graph, DirtyTrackingProgressReceiver progressReceiver, InvalidationState state) { - this( - graph, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY); - } - - protected InvalidatingNodeVisitor( - TGraph graph, - DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function<ExecutorParams, ? extends ExecutorService> executorFactory) { this.executor = new AbstractQueueVisitor( /*parallelism=*/ DEFAULT_THREAD_COUNT, @@ -104,7 +93,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> { /*units=*/ TimeUnit.SECONDS, /*failFastOnException=*/ true, "skyframe-invalidator", - executorFactory, errorClassifier); this.graph = Preconditions.checkNotNull(graph); this.progressReceiver = Preconditions.checkNotNull(progressReceiver); @@ -357,9 +345,8 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> { protected DirtyingNodeVisitor( QueryableGraph graph, DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function<ExecutorParams, ? extends ExecutorService> executorFactory) { - super(graph, progressReceiver, state, executorFactory); + InvalidationState state) { + super(graph, progressReceiver, state); this.supportInterruptions = true; } diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java index 5d1cd5f26b..46672db153 100644 --- a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java @@ -14,12 +14,12 @@ package com.google.devtools.build.skyframe; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; import com.google.devtools.build.lib.concurrent.QuiescingExecutor; +import com.google.devtools.build.skyframe.ParallelEvaluatorContext.RunnableMaker; import java.util.Collection; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -58,12 +58,12 @@ class NodeEntryVisitor { * Function that allows this visitor to execute the appropriate {@link Runnable} when given a * {@link SkyKey} to evaluate. */ - private final Function<SkyKey, Runnable> runnableMaker; + private final RunnableMaker runnableMaker; NodeEntryVisitor( ForkJoinPool forkJoinPool, DirtyTrackingProgressReceiver progressReceiver, - Function<SkyKey, Runnable> runnableMaker) { + RunnableMaker runnableMaker) { this.quiescingExecutor = ForkJoinQuiescingExecutor.newBuilder() .withOwnershipOf(forkJoinPool) .setErrorClassifier(NODE_ENTRY_VISITOR_ERROR_CLASSIFIER) @@ -75,15 +75,14 @@ class NodeEntryVisitor { NodeEntryVisitor( int threadCount, DirtyTrackingProgressReceiver progressReceiver, - Function<SkyKey, Runnable> runnableMaker) { + RunnableMaker runnableMaker) { quiescingExecutor = - new AbstractQueueVisitor( + AbstractQueueVisitor.createWithPriorityQueue( threadCount, /*keepAliveTime=*/ 1, TimeUnit.SECONDS, /*failFastOnException*/ true, "skyframe-evaluator", - AbstractQueueVisitor.EXECUTOR_FACTORY, NODE_ENTRY_VISITOR_ERROR_CLASSIFIER); this.progressReceiver = progressReceiver; this.runnableMaker = runnableMaker; @@ -93,7 +92,23 @@ class NodeEntryVisitor { quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true); } - void enqueueEvaluation(SkyKey key) { + /** + * Enqueue {@code key} for evaluation, at {@code evaluationPriority} if this visitor is using a + * priority queue. + * + * <p>{@code evaluationPriority} is used to minimize evaluation "sprawl": inefficiencies coming + * from incompletely evaluating many nodes, versus focusing on finishing the evaluation of nodes + * that have already started evaluating. Sprawl can be expensive because an incompletely evaluated + * node keeps state in Skyframe, and often in external caches, that uses memory. + * + * <p>In general, {@code evaluationPriority} should be maximal ({@link Integer#MAX_VALUE}) when + * restarting a node that has already started evaluation, and minimal when enqueueing a node that + * no other tasks depend on. Setting {@code evaluationPriority} to the same value for all children + * of a parent has good results experimentally, since it prioritizes batches of work that can be + * used together. Similarly, prioritizing deeper nodes (depth-first search of the evaluation + * graph) also has good results experimentally, since it minimizes sprawl. + */ + void enqueueEvaluation(SkyKey key, int evaluationPriority) { if (preventNewEvaluations.get()) { // If an error happens in nokeep_going mode, we still want to mark these nodes as inflight, // otherwise cleanup will not happen properly. @@ -101,7 +116,7 @@ class NodeEntryVisitor { return; } progressReceiver.enqueueing(key); - quiescingExecutor.execute(runnableMaker.apply(key)); + quiescingExecutor.execute(runnableMaker.make(key, evaluationPriority)); } /** diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java index 60987ac7f2..db2afd10e5 100644 --- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java +++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java @@ -13,7 +13,6 @@ // limitations under the License. package com.google.devtools.build.skyframe; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -73,7 +72,7 @@ class ParallelEvaluatorContext { final DirtyTrackingProgressReceiver progressReceiver, EventFilter storedEventFilter, ErrorInfoManager errorInfoManager, - final Function<SkyKey, Runnable> runnableMaker, + RunnableMaker runnableMaker, GraphInconsistencyReceiver graphInconsistencyReceiver, final int threadCount) { this( @@ -101,7 +100,7 @@ class ParallelEvaluatorContext { final DirtyTrackingProgressReceiver progressReceiver, EventFilter storedEventFilter, ErrorInfoManager errorInfoManager, - final Function<SkyKey, Runnable> runnableMaker, + RunnableMaker runnableMaker, GraphInconsistencyReceiver graphInconsistencyReceiver, final ForkJoinPool forkJoinPool, EvaluationVersionBehavior evaluationVersionBehavior) { @@ -120,6 +119,18 @@ class ParallelEvaluatorContext { evaluationVersionBehavior); } + /** + * Returns a {@link Runnable} given a {@code key} to evaluate and an {@code evaluationPriority} + * indicating whether it should be scheduled for evaluation soon (higher is better). The returned + * {@link Runnable} is a {@link ComparableRunnable} so that it can be ordered by {@code + * evaluationPriority} in a priority queue if needed. + */ + interface RunnableMaker { + ComparableRunnable make(SkyKey key, int evaluationPriority); + } + + interface ComparableRunnable extends Runnable, Comparable<ComparableRunnable> {} + private ParallelEvaluatorContext( QueryableGraph graph, Version graphVersion, @@ -174,7 +185,7 @@ class ParallelEvaluatorContext { for (SkyKey key : keys) { NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key); if (entry.signalDep(version)) { - getVisitor().enqueueEvaluation(key); + getVisitor().enqueueEvaluation(key, Integer.MAX_VALUE); } } return; |