From 6ae05b419922193c4c253e51c9a5e483e4f947fa Mon Sep 17 00:00:00 2001 From: janakr Date: Mon, 13 Aug 2018 16:13:42 -0700 Subject: Order Skyframe evaluations in a priority queue, with all children of a given node having the same priority, later enqueueings having higher priority, re-enqueued nodes having highest priority, and new root nodes having lowest priority. Experimentally, this can save significant RAM (1.4G in some builds!) while not affecting speed. Also do a semi-drive-by deleting ExecutorFactory parameter to AbstractQueueVisitor, since it was always AbstractQueueVisitor.EXECUTOR_FACTORY. PiperOrigin-RevId: 208560889 --- .../build/lib/concurrent/AbstractQueueVisitor.java | 111 ++++++++++++++------- .../build/lib/concurrent/ExecutorParams.java | 61 ----------- .../lib/concurrent/ForkJoinQuiescingExecutor.java | 2 +- .../devtools/build/lib/query2/LabelVisitor.java | 1 - .../AbstractExceptionalParallelEvaluator.java | 4 +- .../build/skyframe/AbstractParallelEvaluator.java | 92 +++++++++++++---- .../devtools/build/skyframe/EagerInvalidator.java | 44 ++------ .../build/skyframe/InvalidatingNodeVisitor.java | 17 +--- .../devtools/build/skyframe/NodeEntryVisitor.java | 31 ++++-- .../build/skyframe/ParallelEvaluatorContext.java | 19 +++- .../ConcurrentMultimapWithHeadElementTest.java | 1 - .../build/lib/actions/MapBasedActionGraphTest.java | 1 - .../lib/concurrent/AbstractQueueVisitorTest.java | 3 - .../build/skyframe/EagerInvalidatorTest.java | 11 +- 14 files changed, 203 insertions(+), 195 deletions(-) delete mode 100644 src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index c4e526ba97..1eb64979d2 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -14,15 +14,16 @@ package com.google.devtools.build.lib.concurrent; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -32,27 +33,6 @@ import java.util.logging.Logger; /** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */ public class AbstractQueueVisitor implements QuiescingExecutor { - - /** - * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link - * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code - * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases - * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size - * management. - * - *

If client use cases change, they may invoke one of the {@link - * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link - * ThreadPoolExecutor}. - */ - public static final Function EXECUTOR_FACTORY = - p -> - new ThreadPoolExecutor( - /*corePoolSize=*/ p.getParallelism(), - /*maximumPoolSize=*/ p.getParallelism(), - p.getKeepAliveTime(), - p.getUnits(), - p.getWorkQueue(), - new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build()); /** * The most severe unhandled exception thrown by a worker thread, according to {@link * #errorClassifier}. This exception gets propagated to the calling thread of {@link @@ -104,6 +84,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private final ExecutorService executorService; + private final boolean usingPriorityQueue; + /** * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) is * interrupted. @@ -131,21 +113,55 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName()); + /** + * Default function for constructing {@link ThreadPoolExecutor}s. The {@link ThreadPoolExecutor}s + * this creates have the same value for {@code corePoolSize} and {@code maximumPoolSize} because + * that results in a fixed-size thread pool, and the current use cases for {@link + * AbstractQueueVisitor} don't require any more sophisticated thread pool size management. + * + *

If client use cases change, they may invoke one of the {@link + * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link + * ThreadPoolExecutor}. + */ private static ExecutorService createExecutorService( int parallelism, long keepAliveTime, TimeUnit units, + BlockingQueue workQueue, + String poolName) { + return new ThreadPoolExecutor( + /*corePoolSize=*/ parallelism, + /*maximumPoolSize=*/ parallelism, + keepAliveTime, + units, + workQueue, + new ThreadFactoryBuilder() + .setNameFormat(Preconditions.checkNotNull(poolName) + " %d") + .build()); + } + + /** + * Creates an {@link AbstractQueueVisitor}, similar to {@link #AbstractQueueVisitor(int, long, + * TimeUnit, boolean, String, ErrorClassifier)}, but whose work is ordered by a {@link + * PriorityBlockingQueue}. The {@link Runnable} objects submitted to {@link #execute(Runnable)} + * must implement {@link Comparable}. + */ + public static AbstractQueueVisitor createWithPriorityQueue( + int parallelism, + long keepAliveTime, + TimeUnit units, + boolean failFastOnException, String poolName, - Function executorFactory) { - return Preconditions.checkNotNull(executorFactory) - .apply( - new ExecutorParams( - parallelism, - keepAliveTime, - units, - Preconditions.checkNotNull(poolName), - new BlockingStack())); + ErrorClassifier errorClassifier) { + return new AbstractQueueVisitor( + createExecutorService( + parallelism, keepAliveTime, units, new PriorityBlockingQueue<>(), poolName), + true, + failFastOnException, + errorClassifier, + /*usingPriorityQueue=*/ true); } + /** * Create the {@link AbstractQueueVisitor}. * @@ -157,7 +173,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code * null}, default thread naming will be used. - * @param executorFactory the factory for constructing the executor service. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. */ public AbstractQueueVisitor( @@ -166,10 +181,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor { TimeUnit units, boolean failFastOnException, String poolName, - Function executorFactory, ErrorClassifier errorClassifier) { this( - createExecutorService(parallelism, keepAliveTime, units, poolName, executorFactory), + createExecutorService(parallelism, keepAliveTime, units, new BlockingStack<>(), poolName), true, failFastOnException, errorClassifier); @@ -190,10 +204,25 @@ public class AbstractQueueVisitor implements QuiescingExecutor { boolean shutdownOnCompletion, boolean failFastOnException, ErrorClassifier errorClassifier) { + this( + executorService, + shutdownOnCompletion, + failFastOnException, + errorClassifier, + /*usingPriorityQueue=*/ false); + } + + private AbstractQueueVisitor( + ExecutorService executorService, + boolean shutdownOnCompletion, + boolean failFastOnException, + ErrorClassifier errorClassifier, + boolean usingPriorityQueue) { this.failFastOnException = failFastOnException; this.ownExecutorService = shutdownOnCompletion; this.executorService = Preconditions.checkNotNull(executorService); this.errorClassifier = Preconditions.checkNotNull(errorClassifier); + this.usingPriorityQueue = usingPriorityQueue; } @Override @@ -218,6 +247,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor { /** Schedules a call. Called in a worker thread. */ @Override public final void execute(Runnable runnable) { + if (usingPriorityQueue) { + Preconditions.checkState(runnable instanceof Comparable); + } WrappedRunnable wrappedRunnable = new WrappedRunnable(runnable); try { // It's impossible for this increment to result in remainingTasks.get <= 0 because @@ -239,7 +271,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } - protected void executeRunnable(Runnable runnable) { + protected void executeRunnable(WrappedRunnable runnable) { executorService.execute(runnable); } @@ -309,7 +341,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { *

  • And, lastly, calls {@link #decrementRemainingTasks}. * */ - private final class WrappedRunnable implements Runnable { + protected final class WrappedRunnable implements Runnable, Comparable { private final Runnable originalRunnable; private volatile boolean ran; @@ -345,6 +377,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } } + + @SuppressWarnings("unchecked") + @Override + public int compareTo(WrappedRunnable o) { + // This should only be called when the concrete class is submitting comparable runnables. + return ((Comparable) originalRunnable).compareTo(o.originalRunnable); + } } private void addJob(Thread thread) { diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java b/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java deleted file mode 100644 index c06144c8df..0000000000 --- a/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.concurrent; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** Configuration parameters for {@link ExecutorService} construction. */ -public class ExecutorParams { - private final int parallelism; - private final long keepAliveTime; - private final TimeUnit units; - private final String poolName; - private final BlockingQueue workQueue; - - public ExecutorParams( - int parallelism, - long keepAliveTime, - TimeUnit units, - String poolName, - BlockingQueue workQueue) { - this.parallelism = parallelism; - this.keepAliveTime = keepAliveTime; - this.units = units; - this.poolName = poolName; - this.workQueue = workQueue; - } - - public int getParallelism() { - return parallelism; - } - - public long getKeepAliveTime() { - return keepAliveTime; - } - - public TimeUnit getUnits() { - return units; - } - - public String getPoolName() { - return poolName; - } - - public BlockingQueue getWorkQueue() { - return workQueue; - } -} diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java index 005d4f1ac9..89a7454f0f 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java @@ -92,7 +92,7 @@ public class ForkJoinQuiescingExecutor extends AbstractQueueVisitor { } @Override - protected void executeRunnable(Runnable runnable) { + protected void executeRunnable(WrappedRunnable runnable) { if (ForkJoinTask.inForkJoinPool()) { @SuppressWarnings("unused") Future possiblyIgnoredError = ForkJoinTask.adapt(runnable).fork(); diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java index af06cde70e..b65538ccb9 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java @@ -249,7 +249,6 @@ final class LabelVisitor { TimeUnit.SECONDS, !keepGoing, THREAD_NAME, - AbstractQueueVisitor.EXECUTOR_FACTORY, ErrorClassifier.DEFAULT); this.eventHandler = eventHandler; this.maxDepth = maxDepth; diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java index f8f21f254e..268db07014 100644 --- a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java @@ -249,7 +249,9 @@ public abstract class AbstractExceptionalParallelEvaluator // This must be equivalent to the code in enqueueChild above, in order to be thread-safe. switch (entry.addReverseDepAndCheckIfDone(null)) { case NEEDS_SCHEDULING: - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Low priority because this node is not needed by any other currently evaluating node. + // So keep it at the back of the queue as long as there's other useful work to be done. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MIN_VALUE); break; case DONE: informProgressReceiverThatValueIsDone(skyKey, entry); diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java index 04fcdeae91..1b1cd432c7 100644 --- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -53,6 +54,15 @@ import javax.annotation.Nullable; *

    This does not implement other parts of Skyframe evaluation setup and post-processing, such as * translating a set of requested top-level nodes into actions, or constructing an evaluation * result. Derived classes should do this. + * + *

    Work is prioritized in a depth-first fashion when using the {@link + * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler, + * EmittedEventState, EventFilter, ErrorInfoManager, boolean, int, DirtyTrackingProgressReceiver, + * GraphInconsistencyReceiver, CycleDetector)} constructor, but not when using the {@link + * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler, + * EmittedEventState, EventFilter, ErrorInfoManager, boolean, DirtyTrackingProgressReceiver, + * GraphInconsistencyReceiver, ForkJoinPool, CycleDetector, EvaluationVersionBehavior)} (the + * constructor with a {@link ForkJoinPool}). */ public abstract class AbstractParallelEvaluator { private static final Logger logger = Logger.getLogger(AbstractParallelEvaluator.class.getName()); @@ -60,6 +70,7 @@ public abstract class AbstractParallelEvaluator { final ProcessableGraph graph; final ParallelEvaluatorContext evaluatorContext; protected final CycleDetector cycleDetector; + @Nullable private final AtomicInteger globalEnqueuedIndex; AbstractParallelEvaluator( ProcessableGraph graph, @@ -87,9 +98,10 @@ public abstract class AbstractParallelEvaluator { progressReceiver, storedEventFilter, errorInfoManager, - Evaluate::new, + (skyKey, evaluationPriority) -> new Evaluate(evaluationPriority, skyKey), graphInconsistencyReceiver, threadCount); + this.globalEnqueuedIndex = new AtomicInteger(0); } AbstractParallelEvaluator( @@ -119,10 +131,11 @@ public abstract class AbstractParallelEvaluator { progressReceiver, storedEventFilter, errorInfoManager, - Evaluate::new, + (skyKey, evaluationPriority) -> new Evaluate(0, skyKey), graphInconsistencyReceiver, Preconditions.checkNotNull(forkJoinPool), evaluationVersionBehavior); + this.globalEnqueuedIndex = null; } /** @@ -141,21 +154,40 @@ public abstract class AbstractParallelEvaluator { NEEDS_EVALUATION } - /** An action that evaluates a value. */ - private class Evaluate implements Runnable { + /** + * An action that evaluates a value. + * + *

    {@link Comparable} for use in priority queues. Experimentally, grouping enqueued evaluations + * together by parent leads to fewer in-flight evaluations and thus lower peak memory usage. Thus + * we store the {@link #evaluationPriority} (coming from the {@link #globalEnqueuedIndex} and use + * it for comparisons: later enqueuings should be evaluated earlier, to do a depth-first search, + * except for re-enqueued nodes, which always get top priority. + * + *

    This is not applicable when using a {@link ForkJoinPool}, since it does not allow for easy + * work prioritization. + */ + private class Evaluate implements ParallelEvaluatorContext.ComparableRunnable { + private final int evaluationPriority; /** The name of the value to be evaluated. */ private final SkyKey skyKey; - private Evaluate(SkyKey skyKey) { + private Evaluate(int evaluationPriority, SkyKey skyKey) { + this.evaluationPriority = evaluationPriority; this.skyKey = skyKey; } + @Override + public int compareTo(ParallelEvaluatorContext.ComparableRunnable o) { + return -1 * Integer.compare(this.evaluationPriority, ((Evaluate) o).evaluationPriority); + } + private void enqueueChild( SkyKey skyKey, NodeEntry entry, SkyKey child, NodeEntry childEntry, - boolean depAlreadyExists) + boolean depAlreadyExists, + int childEvaluationPriority) throws InterruptedException { Preconditions.checkState(!entry.isDone(), "%s %s", skyKey, entry); DependencyState dependencyState = @@ -166,13 +198,15 @@ public abstract class AbstractParallelEvaluator { case DONE: if (entry.signalDep(childEntry.getVersion())) { // This can only happen if there are no more children to be added. - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Maximum priority, since this node has already started evaluation before, and we want + // it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); } break; case ALREADY_EVALUATING: break; case NEEDS_SCHEDULING: - evaluatorContext.getVisitor().enqueueEvaluation(child); + evaluatorContext.getVisitor().enqueueEvaluation(child, childEvaluationPriority); break; } } @@ -288,7 +322,10 @@ public abstract class AbstractParallelEvaluator { unknownStatusDeps); continue; } - handleKnownChildrenForDirtyNode(unknownStatusDeps, state); + handleKnownChildrenForDirtyNode( + unknownStatusDeps, + state, + globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0); return DirtyOutcome.ALREADY_PROCESSED; } switch (state.getDirtyState()) { @@ -321,7 +358,8 @@ public abstract class AbstractParallelEvaluator { } } - private void handleKnownChildrenForDirtyNode(Collection knownChildren, NodeEntry state) + private void handleKnownChildrenForDirtyNode( + Collection knownChildren, NodeEntry state, int childEvaluationPriority) throws InterruptedException { Map oldChildren = graph.getBatch(skyKey, Reason.ENQUEUING_CHILD, knownChildren); @@ -342,7 +380,8 @@ public abstract class AbstractParallelEvaluator { state, recreatedEntry.getKey(), recreatedEntry.getValue(), - /*depAlreadyExists=*/ false); + /*depAlreadyExists=*/ false, + childEvaluationPriority); } } for (Map.Entry e : oldChildren.entrySet()) { @@ -350,7 +389,13 @@ public abstract class AbstractParallelEvaluator { NodeEntry directDepEntry = e.getValue(); // TODO(bazel-team): If this signals the current node and makes it ready, consider // evaluating it in this thread instead of scheduling a new evaluation. - enqueueChild(skyKey, state, directDep, directDepEntry, /*depAlreadyExists=*/ true); + enqueueChild( + skyKey, + state, + directDep, + directDepEntry, + /*depAlreadyExists=*/ true, + childEvaluationPriority); } } @@ -381,7 +426,8 @@ public abstract class AbstractParallelEvaluator { } catch (UndonePreviouslyRequestedDep undonePreviouslyRequestedDep) { // If a previously requested dep is no longer done, restart this node from scratch. restart(skyKey, state); - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Top priority since this node has already been evaluating, so get it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); return; } finally { evaluatorContext @@ -487,7 +533,8 @@ public abstract class AbstractParallelEvaluator { } if (maybeHandleRestart(skyKey, state, value)) { - evaluatorContext.getVisitor().enqueueEvaluation(skyKey); + // Top priority since this node has already been evaluating, so get it off our plate. + evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE); return; } @@ -618,17 +665,26 @@ public abstract class AbstractParallelEvaluator { Set newDepsThatWereInTheLastEvaluation = Sets.difference(uniqueNewDeps, newDepsThatWerentInTheLastEvaluation); + int childEvaluationPriority = + globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0; InterruptibleSupplier> newDepsThatWerentInTheLastEvaluationNodes = graph.createIfAbsentBatchAsync( skyKey, Reason.RDEP_ADDITION, newDepsThatWerentInTheLastEvaluation); - handleKnownChildrenForDirtyNode(newDepsThatWereInTheLastEvaluation, state); + handleKnownChildrenForDirtyNode( + newDepsThatWereInTheLastEvaluation, state, childEvaluationPriority); for (Map.Entry e : newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) { SkyKey newDirectDep = e.getKey(); NodeEntry newDirectDepEntry = e.getValue(); - enqueueChild(skyKey, state, newDirectDep, newDirectDepEntry, /*depAlreadyExists=*/ false); + enqueueChild( + skyKey, + state, + newDirectDep, + newDirectDepEntry, + /*depAlreadyExists=*/ false, + childEvaluationPriority); } // It is critical that there is no code below this point in the try block. } catch (InterruptedException ie) { @@ -824,7 +880,9 @@ public abstract class AbstractParallelEvaluator { .noteInconsistencyAndMaybeThrow( skyKey, depKey, Inconsistency.CHILD_UNDONE_FOR_BUILDING_NODE); if (triState == DependencyState.NEEDS_SCHEDULING) { - evaluatorContext.getVisitor().enqueueEvaluation(depKey); + // Top priority since this depKey was already evaluated before, and we want to finish it off + // again, reducing the chance that another node may observe this dep to be undone. + evaluatorContext.getVisitor().enqueueEvaluation(depKey, Integer.MAX_VALUE); } return true; } diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java index 3577230e3a..27e6b61ceb 100644 --- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java +++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java @@ -13,14 +13,9 @@ // limitations under the License. package com.google.devtools.build.skyframe; -import com.google.common.base.Function; -import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; -import com.google.devtools.build.lib.concurrent.ErrorHandler; -import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import javax.annotation.Nullable; @@ -73,11 +68,9 @@ public final class EagerInvalidator { QueryableGraph graph, Iterable diff, DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function executorFactory) { + InvalidationState state) { state.update(diff); - return state.isEmpty() ? null - : new DirtyingNodeVisitor(graph, progressReceiver, state, executorFactory); + return state.isEmpty() ? null : new DirtyingNodeVisitor(graph, progressReceiver, state); } @Nullable @@ -87,8 +80,7 @@ public final class EagerInvalidator { DirtyTrackingProgressReceiver progressReceiver, InvalidationState state, ForkJoinPool forkJoinPool, - boolean supportInterruptions, - ErrorHandler errorHandler) { + boolean supportInterruptions) { state.update(diff); return state.isEmpty() ? null @@ -99,20 +91,15 @@ public final class EagerInvalidator { forkJoinPool, supportInterruptions); } - /** - * Invalidates given values and their upward transitive closure in the graph if necessary, using - * an executor constructed with the provided factory. - */ + /** Invalidates given values and their upward transitive closure in the graph if necessary. */ public static void invalidate( QueryableGraph graph, Iterable diff, DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function executorFactory) + InvalidationState state) throws InterruptedException { DirtyingNodeVisitor visitor = - createInvalidatingVisitorIfNeeded( - graph, diff, progressReceiver, state, executorFactory); + createInvalidatingVisitorIfNeeded(graph, diff, progressReceiver, state); if (visitor != null) { visitor.run(); } @@ -132,26 +119,9 @@ public final class EagerInvalidator { throws InterruptedException { DirtyingNodeVisitor visitor = createInvalidatingVisitorIfNeeded( - graph, - diff, - progressReceiver, - state, - forkJoinPool, - supportInterruptions, - ErrorHandler.NullHandler.INSTANCE); + graph, diff, progressReceiver, state, forkJoinPool, supportInterruptions); if (visitor != null) { visitor.run(); } } - - /** Invalidates given values and their upward transitive closure in the graph. */ - public static void invalidate( - QueryableGraph graph, - Iterable diff, - DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state) - throws InterruptedException { - invalidate(graph, diff, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY); - } - } diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index 168ad23b95..ae9af7705e 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -22,7 +22,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.ErrorClassifier; -import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; import com.google.devtools.build.lib.concurrent.QuiescingExecutor; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; @@ -35,7 +34,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -88,15 +86,6 @@ public abstract class InvalidatingNodeVisitor { TGraph graph, DirtyTrackingProgressReceiver progressReceiver, InvalidationState state) { - this( - graph, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY); - } - - protected InvalidatingNodeVisitor( - TGraph graph, - DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function executorFactory) { this.executor = new AbstractQueueVisitor( /*parallelism=*/ DEFAULT_THREAD_COUNT, @@ -104,7 +93,6 @@ public abstract class InvalidatingNodeVisitor { /*units=*/ TimeUnit.SECONDS, /*failFastOnException=*/ true, "skyframe-invalidator", - executorFactory, errorClassifier); this.graph = Preconditions.checkNotNull(graph); this.progressReceiver = Preconditions.checkNotNull(progressReceiver); @@ -357,9 +345,8 @@ public abstract class InvalidatingNodeVisitor { protected DirtyingNodeVisitor( QueryableGraph graph, DirtyTrackingProgressReceiver progressReceiver, - InvalidationState state, - Function executorFactory) { - super(graph, progressReceiver, state, executorFactory); + InvalidationState state) { + super(graph, progressReceiver, state); this.supportInterruptions = true; } diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java index 5d1cd5f26b..46672db153 100644 --- a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java @@ -14,12 +14,12 @@ package com.google.devtools.build.skyframe; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; import com.google.devtools.build.lib.concurrent.QuiescingExecutor; +import com.google.devtools.build.skyframe.ParallelEvaluatorContext.RunnableMaker; import java.util.Collection; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -58,12 +58,12 @@ class NodeEntryVisitor { * Function that allows this visitor to execute the appropriate {@link Runnable} when given a * {@link SkyKey} to evaluate. */ - private final Function runnableMaker; + private final RunnableMaker runnableMaker; NodeEntryVisitor( ForkJoinPool forkJoinPool, DirtyTrackingProgressReceiver progressReceiver, - Function runnableMaker) { + RunnableMaker runnableMaker) { this.quiescingExecutor = ForkJoinQuiescingExecutor.newBuilder() .withOwnershipOf(forkJoinPool) .setErrorClassifier(NODE_ENTRY_VISITOR_ERROR_CLASSIFIER) @@ -75,15 +75,14 @@ class NodeEntryVisitor { NodeEntryVisitor( int threadCount, DirtyTrackingProgressReceiver progressReceiver, - Function runnableMaker) { + RunnableMaker runnableMaker) { quiescingExecutor = - new AbstractQueueVisitor( + AbstractQueueVisitor.createWithPriorityQueue( threadCount, /*keepAliveTime=*/ 1, TimeUnit.SECONDS, /*failFastOnException*/ true, "skyframe-evaluator", - AbstractQueueVisitor.EXECUTOR_FACTORY, NODE_ENTRY_VISITOR_ERROR_CLASSIFIER); this.progressReceiver = progressReceiver; this.runnableMaker = runnableMaker; @@ -93,7 +92,23 @@ class NodeEntryVisitor { quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true); } - void enqueueEvaluation(SkyKey key) { + /** + * Enqueue {@code key} for evaluation, at {@code evaluationPriority} if this visitor is using a + * priority queue. + * + *

    {@code evaluationPriority} is used to minimize evaluation "sprawl": inefficiencies coming + * from incompletely evaluating many nodes, versus focusing on finishing the evaluation of nodes + * that have already started evaluating. Sprawl can be expensive because an incompletely evaluated + * node keeps state in Skyframe, and often in external caches, that uses memory. + * + *

    In general, {@code evaluationPriority} should be maximal ({@link Integer#MAX_VALUE}) when + * restarting a node that has already started evaluation, and minimal when enqueueing a node that + * no other tasks depend on. Setting {@code evaluationPriority} to the same value for all children + * of a parent has good results experimentally, since it prioritizes batches of work that can be + * used together. Similarly, prioritizing deeper nodes (depth-first search of the evaluation + * graph) also has good results experimentally, since it minimizes sprawl. + */ + void enqueueEvaluation(SkyKey key, int evaluationPriority) { if (preventNewEvaluations.get()) { // If an error happens in nokeep_going mode, we still want to mark these nodes as inflight, // otherwise cleanup will not happen properly. @@ -101,7 +116,7 @@ class NodeEntryVisitor { return; } progressReceiver.enqueueing(key); - quiescingExecutor.execute(runnableMaker.apply(key)); + quiescingExecutor.execute(runnableMaker.make(key, evaluationPriority)); } /** diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java index 60987ac7f2..db2afd10e5 100644 --- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java +++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java @@ -13,7 +13,6 @@ // limitations under the License. package com.google.devtools.build.skyframe; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -73,7 +72,7 @@ class ParallelEvaluatorContext { final DirtyTrackingProgressReceiver progressReceiver, EventFilter storedEventFilter, ErrorInfoManager errorInfoManager, - final Function runnableMaker, + RunnableMaker runnableMaker, GraphInconsistencyReceiver graphInconsistencyReceiver, final int threadCount) { this( @@ -101,7 +100,7 @@ class ParallelEvaluatorContext { final DirtyTrackingProgressReceiver progressReceiver, EventFilter storedEventFilter, ErrorInfoManager errorInfoManager, - final Function runnableMaker, + RunnableMaker runnableMaker, GraphInconsistencyReceiver graphInconsistencyReceiver, final ForkJoinPool forkJoinPool, EvaluationVersionBehavior evaluationVersionBehavior) { @@ -120,6 +119,18 @@ class ParallelEvaluatorContext { evaluationVersionBehavior); } + /** + * Returns a {@link Runnable} given a {@code key} to evaluate and an {@code evaluationPriority} + * indicating whether it should be scheduled for evaluation soon (higher is better). The returned + * {@link Runnable} is a {@link ComparableRunnable} so that it can be ordered by {@code + * evaluationPriority} in a priority queue if needed. + */ + interface RunnableMaker { + ComparableRunnable make(SkyKey key, int evaluationPriority); + } + + interface ComparableRunnable extends Runnable, Comparable {} + private ParallelEvaluatorContext( QueryableGraph graph, Version graphVersion, @@ -174,7 +185,7 @@ class ParallelEvaluatorContext { for (SkyKey key : keys) { NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key); if (entry.signalDep(version)) { - getVisitor().enqueueEvaluation(key); + getVisitor().enqueueEvaluation(key, Integer.MAX_VALUE); } } return; diff --git a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java index d48cc5d2ec..7a3aa5f312 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java @@ -137,7 +137,6 @@ public class ConcurrentMultimapWithHeadElementTest { TimeUnit.SECONDS, /*failFastOnException=*/ true, "action-graph-test", - AbstractQueueVisitor.EXECUTOR_FACTORY, ErrorClassifier.DEFAULT); } diff --git a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java index 5de14d956c..5ec0d4a58f 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java @@ -89,7 +89,6 @@ public class MapBasedActionGraphTest { TimeUnit.SECONDS, /*failFastOnException=*/ true, "action-graph-test", - AbstractQueueVisitor.EXECUTOR_FACTORY, ErrorClassifier.DEFAULT); Path execRoot = fileSystem.getPath("/"); Path root = fileSystem.getPath("/root"); diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java index 070011b0d9..79ab720789 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java @@ -564,7 +564,6 @@ public class AbstractQueueVisitorTest { TimeUnit.SECONDS, /* failFastOnException= */ false, THREAD_NAME, - AbstractQueueVisitor.EXECUTOR_FACTORY, ErrorClassifier.DEFAULT); } @@ -603,7 +602,6 @@ public class AbstractQueueVisitorTest { TimeUnit.SECONDS, /* failFastOnException= */ false, THREAD_NAME, - AbstractQueueVisitor.EXECUTOR_FACTORY, ErrorClassifier.DEFAULT); } @@ -614,7 +612,6 @@ public class AbstractQueueVisitorTest { TimeUnit.SECONDS, failFast, THREAD_NAME, - AbstractQueueVisitor.EXECUTOR_FACTORY, ErrorClassifier.DEFAULT); } diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java index 223b14b37c..6239319247 100644 --- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java +++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.eventbus.EventBus; import com.google.common.testing.GcFinalization; -import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.testutil.TestUtils; import com.google.devtools.build.lib.util.Pair; @@ -562,8 +561,7 @@ public class EagerInvalidatorTest { ImmutableList diff = ImmutableList.of(GraphTester.nonHermeticKey("a")); InvalidationState state1 = new DirtyingInvalidationState(); Preconditions.checkNotNull( - EagerInvalidator.createInvalidatingVisitorIfNeeded( - graph, diff, receiver, state1, AbstractQueueVisitor.EXECUTOR_FACTORY)) + EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff, receiver, state1)) .run(); assertThat(receiver.getUnenqueuedDirtyKeys()).containsExactly(diff.get(0), skyKey("ab")); @@ -585,12 +583,7 @@ public class EagerInvalidatorTest { throws InterruptedException { Iterable diff = ImmutableList.copyOf(keys); DirtyingNodeVisitor dirtyingNodeVisitor = - EagerInvalidator.createInvalidatingVisitorIfNeeded( - graph, - diff, - progressReceiver, - state, - AbstractQueueVisitor.EXECUTOR_FACTORY); + EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff, progressReceiver, state); if (dirtyingNodeVisitor != null) { visitor.set(dirtyingNodeVisitor); dirtyingNodeVisitor.run(); -- cgit v1.2.3